/*
 * Decompiled with CFR 0.152.
 */
package cern.streaming.pool.core.support;

import akka.NotUsed;
import akka.japi.Pair;
import akka.stream.Attributes;
import akka.stream.Materializer;
import akka.stream.javadsl.AsPublisher;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.RunnableGraph;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import cern.streaming.pool.core.service.ProvidingService;
import cern.streaming.pool.core.service.StreamId;
import cern.streaming.pool.core.service.akka.AkkaSourceProvidingService;
import cern.streaming.pool.core.support.StreamSupport;
import org.reactivestreams.Publisher;

public interface AkkaStreamSupport
extends StreamSupport {
    public Materializer materializer();

    public AkkaSourceProvidingService sourceProvidingService();

    default public <Out, Mat> Publisher<Out> streamFrom(Source<Out, Mat> akkaSource) {
        return this.publisherFrom(akkaSource);
    }

    default public <Out, Mat> OngoingAkkaSourceProviding<Out, Mat> provide(Source<Out, Mat> akkaSource) {
        return new OngoingAkkaSourceProviding<Out, Mat>(this.sourceProvidingService(), this.providingService(), akkaSource, this.materializer());
    }

    default public <T, U> Publisher<T> publisherFrom(Source<T, U> source) {
        Sink<T, Publisher<T>> akkaSink = AkkaStreamSupport.defaultPublisherSink();
        return (Publisher)source.runWith(akkaSink, this.materializer());
    }

    default public <T> Source<T, NotUsed> sourceFrom(StreamId<T> id) {
        return Source.fromPublisher(this.discover(id));
    }

    public static <T> Sink<T, Publisher<T>> defaultPublisherSink() {
        return Sink.asPublisher((AsPublisher)AsPublisher.WITH_FANOUT).withAttributes(Attributes.inputBuffer((int)1, (int)1));
    }

    public static class OngoingMaterializedAkkaSourceProviding<Out, Mat> {
        private final ProvidingService providingService;
        private final Source<Out, Mat> akkaSource;
        private final Materializer materializer;

        public OngoingMaterializedAkkaSourceProviding(ProvidingService providingService, Source<Out, Mat> akkaSource, Materializer materializer) {
            this.providingService = providingService;
            this.akkaSource = akkaSource;
            this.materializer = materializer;
        }

        public Mat as(StreamId<Out> id) {
            RunnableGraph graph = this.akkaSource.toMat(AkkaStreamSupport.defaultPublisherSink(), Keep.both());
            Pair materializedPair = (Pair)graph.run(this.materializer);
            this.providingService.provide(id, (Publisher)materializedPair.second());
            return (Mat)materializedPair.first();
        }
    }

    public static class OngoingUnmaterializedAkkaSourceProviding<Out> {
        private final Source<Out, ?> akkaSource;
        private final AkkaSourceProvidingService sourceProvidingService;

        public OngoingUnmaterializedAkkaSourceProviding(AkkaSourceProvidingService sourceProvidingService, Source<Out, ?> akkaSource) {
            this.sourceProvidingService = sourceProvidingService;
            this.akkaSource = akkaSource;
        }

        public void as(StreamId<Out> id) {
            this.sourceProvidingService.provide(id, this.akkaSource);
        }
    }

    public static class OngoingAkkaSourceProviding<Out, Mat> {
        private final Source<Out, Mat> akkaSource;
        private final AkkaSourceProvidingService sourceProvidingService;
        private final ProvidingService providingService;
        private final Materializer materializer;

        public OngoingAkkaSourceProviding(AkkaSourceProvidingService sourceProvidingService, ProvidingService providingService, Source<Out, Mat> akkaSource, Materializer materializer) {
            this.sourceProvidingService = sourceProvidingService;
            this.providingService = providingService;
            this.akkaSource = akkaSource;
            this.materializer = materializer;
        }

        public OngoingUnmaterializedAkkaSourceProviding<Out> unmaterialized() {
            return new OngoingUnmaterializedAkkaSourceProviding<Out>(this.sourceProvidingService, this.akkaSource);
        }

        public OngoingMaterializedAkkaSourceProviding<Out, Mat> materialized() {
            return new OngoingMaterializedAkkaSourceProviding<Out, Mat>(this.providingService, this.akkaSource, this.materializer);
        }

        public void as(StreamId<Out> id) {
            this.unmaterialized().as(id);
        }
    }
}

