/*
 * Decompiled with CFR 0.152.
 */
package no.sysco.middleware.alpakka.zeromq.javadsl.internal;

import akka.stream.Attributes;
import akka.stream.Outlet;
import akka.stream.Shape;
import akka.stream.SourceShape;
import akka.stream.stage.AbstractOutHandler;
import akka.stream.stage.GraphStage;
import akka.stream.stage.GraphStageLogic;
import akka.stream.stage.OutHandler;
import no.sysco.middleware.alpakka.zeromq.javadsl.internal.ZmqStageLogic;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;

public class ZmqSubscribeStage
extends GraphStage<SourceShape<ZMsg>> {
    private final boolean isServer;
    private final String addresses;
    private final String subscription;
    private final Outlet<ZMsg> outlet = Outlet.create((String)"ZmqSubscribe.out");
    private final SourceShape<ZMsg> shape = new SourceShape(this.outlet);

    public ZmqSubscribeStage(boolean isServer, String addresses) {
        this.isServer = isServer;
        this.addresses = addresses;
        this.subscription = null;
    }

    public ZmqSubscribeStage(boolean isServer, String addresses, String subscription) {
        this.isServer = isServer;
        this.addresses = addresses;
        this.subscription = subscription;
    }

    public SourceShape<ZMsg> shape() {
        return this.shape;
    }

    public GraphStageLogic createLogic(Attributes inheritedAttributes) throws Exception {
        if (this.isServer) {
            throw new UnsupportedOperationException("Server SUB Socket type is not supported yet.");
        }
        return new ZmqStageLogic.ClientStageLogic((Shape)this.shape, this.addresses, 2){
            {
                this.setHandler(ZmqSubscribeStage.this.shape.out(), (OutHandler)new AbstractOutHandler(){

                    public void onPull() throws Exception {
                        ZMsg elem = ZMsg.recvMsg((ZMQ.Socket)this.socket(), (boolean)true);
                        if (elem != null) {
                            this.push(ZmqSubscribeStage.this.shape.out(), elem);
                        }
                    }
                });
            }

            @Override
            public void preStart() throws Exception {
                super.preStart();
                if (ZmqSubscribeStage.this.subscription == null) {
                    this.socket().subscribe(ZMQ.SUBSCRIPTION_ALL);
                } else {
                    this.socket().subscribe(ZmqSubscribeStage.this.subscription);
                }
            }
        };
    }
}

