/*
 * Decompiled with CFR 0.152.
 */
package org.streamingpool.core.service.akka;

import akka.stream.Graph;
import akka.stream.Materializer;
import akka.stream.javadsl.AsPublisher;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.reactivestreams.Publisher;
import org.streamingpool.core.domain.ErrorStreamPair;
import org.streamingpool.core.service.DiscoveryService;
import org.streamingpool.core.service.StreamFactory;
import org.streamingpool.core.service.StreamId;
import org.streamingpool.core.service.akka.AkkaSourceProvidingService;

public class AkkaStreamFactory
implements AkkaSourceProvidingService,
StreamFactory {
    private final Materializer materializer;
    private final ConcurrentMap<StreamId<?>, Source<?, ?>> suppliers = new ConcurrentHashMap();

    public AkkaStreamFactory(Materializer materializer) {
        this.materializer = Objects.requireNonNull(materializer, "materializer must not be null.");
    }

    @Override
    public <T> ErrorStreamPair<T> create(StreamId<T> newId, DiscoveryService discoveryService) {
        Source source = (Source)this.suppliers.get(newId);
        if (source == null) {
            return ErrorStreamPair.empty();
        }
        Sink akkaSink = Sink.asPublisher((AsPublisher)AsPublisher.WITH_FANOUT);
        return ErrorStreamPair.ofData((Publisher)source.runWith((Graph)akkaSink, this.materializer));
    }

    @Override
    public <T> void provide(StreamId<T> id, Source<T, ?> akkaSource) {
        Objects.requireNonNull(id, "id must not be null!");
        Objects.requireNonNull(akkaSource, "akkaSource must not be null!");
        Source<T, ?> existingSource = this.suppliers.putIfAbsent(id, akkaSource);
        if (existingSource != null) {
            throw new IllegalArgumentException("Id " + id + " already registered! Cannot register twice.");
        }
    }
}

