/*
 * Decompiled with CFR 0.152.
 */
package cern.streaming.pool.core.incubation.akka;

import akka.stream.Attributes;
import akka.stream.Outlet;
import akka.stream.Shape;
import akka.stream.SourceShape;
import akka.stream.stage.AbstractOutHandler;
import akka.stream.stage.GraphStage;
import akka.stream.stage.GraphStageLogic;
import akka.stream.stage.OutHandler;
import cern.streaming.pool.core.incubation.akka.StreamDiscovery;
import cern.streaming.pool.core.service.DiscoveryService;
import cern.streaming.pool.core.service.StreamId;
import java.util.Optional;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class IdBasedSource<T>
extends GraphStage<SourceShape<T>> {
    private final StreamId<T> streamId;
    private final Outlet<T> out;

    public IdBasedSource(StreamId<T> streamId) {
        this.streamId = streamId;
        this.out = Outlet.create((String)("IdBasedSource_" + streamId.toString() + ".out"));
    }

    public SourceShape<T> shape() {
        return new SourceShape(this.out);
    }

    public GraphStageLogic createLogic(Attributes inheritedAttributes) {
        final SourceSubscriber subscriber = this.getSourceSubscriber(inheritedAttributes);
        return new GraphStageLogic((Shape)this.shape()){
            {
                super(x0);
                this.setHandler(IdBasedSource.this.out, (OutHandler)new AbstractOutHandler(){

                    public void onPull() throws Exception {
                        this.push(IdBasedSource.this.out, subscriber.latestValue);
                        if (subscriber.live) {
                            subscriber.subscription.request(1L);
                        } else {
                            this.completeStage();
                        }
                    }

                    public void onDownstreamFinish() throws Exception {
                        super.onDownstreamFinish();
                        subscriber.subscription.cancel();
                    }
                });
            }
        };
    }

    private SourceSubscriber getSourceSubscriber(Attributes inheritedAttributes) {
        DiscoveryService service = this.getDiscoveryService(inheritedAttributes);
        Publisher<T> publisher = service.discover(this.streamId);
        SourceSubscriber subscriber = new SourceSubscriber();
        publisher.subscribe((Subscriber)subscriber);
        return subscriber;
    }

    protected DiscoveryService getDiscoveryService(Attributes inheritedAttributes) {
        Optional discovery = inheritedAttributes.getAttribute(StreamDiscovery.class);
        return ((StreamDiscovery)discovery.get()).service();
    }

    private class SourceSubscriber
    implements Subscriber<T> {
        private Subscription subscription;
        private T latestValue;
        private boolean live = false;

        private SourceSubscriber() {
        }

        public void onSubscribe(Subscription newSubscription) {
            this.subscription = newSubscription;
            newSubscription.request(1L);
            this.live = true;
        }

        public void onNext(T value) {
            this.latestValue = value;
        }

        public void onError(Throwable error) {
            this.live = false;
        }

        public void onComplete() {
            this.live = false;
        }
    }
}

