/*
 * Decompiled with CFR 0.152.
 */
package org.streamingpool.core.service.streamfactory;

import io.reactivex.Flowable;
import io.reactivex.flowables.ConnectableFlowable;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.streamingpool.core.domain.ErrorStreamPair;
import org.streamingpool.core.service.DiscoveryService;
import org.streamingpool.core.service.StreamFactory;
import org.streamingpool.core.service.StreamId;
import org.streamingpool.core.service.streamid.BufferSpecification;
import org.streamingpool.core.service.streamid.OverlapBufferStreamId;
import org.streamingpool.core.service.util.DoAfterFirstSubscribe;

public class OverlapBufferStreamFactory
implements StreamFactory {
    @Override
    public <T> ErrorStreamPair<T> create(StreamId<T> id, DiscoveryService discoveryService) {
        if (!(id instanceof OverlapBufferStreamId)) {
            return ErrorStreamPair.empty();
        }
        OverlapBufferStreamId analysisId = (OverlapBufferStreamId)id;
        BufferSpecification bufferSpecification = analysisId.bufferSpecification();
        StreamId<?> startId = bufferSpecification.startId();
        StreamId sourceId = analysisId.sourceId();
        Flowable<?> timeout = bufferSpecification.timeout();
        ConnectableFlowable startStream = Flowable.fromPublisher(discoveryService.discover(startId)).publish();
        ConnectableFlowable sourceStream = Flowable.fromPublisher(discoveryService.discover(sourceId)).publish();
        Set<BufferSpecification.EndStreamMatcher<?, ?>> matchers = bufferSpecification.endStreamMatchers();
        Map<BufferSpecification.EndStreamMatcher, ConnectableFlowable> endStreams = matchers.stream().collect(Collectors.toMap(m -> m, m -> Flowable.fromPublisher(discoveryService.discover(m.endStreamId())).publish()));
        StreamConnector sourceStreamConnector = new StreamConnector(sourceStream);
        Flowable bufferStream = sourceStream.compose(new DoAfterFirstSubscribe(() -> {
            endStreams.values().forEach(ConnectableFlowable::connect);
            startStream.connect();
        })).buffer((Flowable)startStream, opening -> this.closingStreamFor(opening, endStreams, timeout, sourceStreamConnector));
        return ErrorStreamPair.ofData(bufferStream);
    }

    private Flowable<?> closingStreamFor(Object opening, Map<BufferSpecification.EndStreamMatcher<Object, Object>, ConnectableFlowable<?>> endStreams, Flowable<?> timeout, StreamConnector sourceStreamConnector) {
        Set matchingEndStreams = endStreams.entrySet().stream().map(e -> ((ConnectableFlowable)e.getValue()).filter(v -> ((BufferSpecification.EndStreamMatcher)e.getKey()).matching().test(opening, v))).collect(Collectors.toSet());
        matchingEndStreams.add(timeout);
        return Flowable.merge(matchingEndStreams).compose(new DoAfterFirstSubscribe(sourceStreamConnector::connect)).take(1L);
    }

    private static class StreamConnector {
        private final ConnectableFlowable<?> stream;
        private final AtomicBoolean streamConnected = new AtomicBoolean(false);

        private StreamConnector(ConnectableFlowable<?> stream) {
            this.stream = stream;
        }

        public void connect() {
            if (this.streamConnected.compareAndSet(false, true)) {
                this.stream.connect();
            }
        }
    }
}

