/*
 * Decompiled with CFR 0.152.
 */
package org.streamingpool.core.service.streamfactory;

import io.reactivex.Flowable;
import io.reactivex.flowables.ConnectableFlowable;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.streamingpool.core.service.DiscoveryService;
import org.streamingpool.core.service.StreamFactory;
import org.streamingpool.core.service.StreamId;
import org.streamingpool.core.service.streamid.BufferSpecification;
import org.streamingpool.core.service.streamid.OverlapBufferStreamId;

public class OverlapBufferStreamFactory
implements StreamFactory {
    @Override
    public <T> Optional<Publisher<T>> create(StreamId<T> id, DiscoveryService discoveryService) {
        if (!(id instanceof OverlapBufferStreamId)) {
            return Optional.empty();
        }
        OverlapBufferStreamId analysisId = (OverlapBufferStreamId)id;
        BufferSpecification bufferSpecification = analysisId.bufferSpecification();
        StreamId<?> startId = bufferSpecification.startId();
        StreamId sourceId = analysisId.sourceId();
        Duration timeout = bufferSpecification.timeout();
        ConnectableFlowable startStream = Flowable.fromPublisher(discoveryService.discover(startId)).publish();
        ConnectableFlowable sourceStream = Flowable.fromPublisher(discoveryService.discover(sourceId)).publish();
        Set<BufferSpecification.EndStreamMatcher<?, ?>> matchers = bufferSpecification.endStreamMatchers();
        Map<BufferSpecification.EndStreamMatcher, ConnectableFlowable> endStreams = matchers.stream().collect(Collectors.toMap(m -> m, m -> Flowable.fromPublisher(discoveryService.discover(m.endStreamId())).publish()));
        Flowable bufferStream = sourceStream.buffer((Flowable)startStream, opening -> this.closingStreamFor(opening, endStreams, timeout));
        sourceStream.connect();
        for (ConnectableFlowable stream : endStreams.values()) {
            stream.connect();
        }
        startStream.connect();
        return Optional.of(bufferStream);
    }

    private Flowable<?> closingStreamFor(Object opening, Map<BufferSpecification.EndStreamMatcher<Object, Object>, ConnectableFlowable<?>> endStreams, Duration timeout) {
        Flowable<?> timeoutStream = this.timeoutStreamOf(timeout);
        Set matchingEndStreams = endStreams.entrySet().stream().map(e -> ((ConnectableFlowable)e.getValue()).filter(v -> ((BufferSpecification.EndStreamMatcher)e.getKey()).matching().test(opening, v))).collect(Collectors.toSet());
        matchingEndStreams.add(timeoutStream);
        return Flowable.merge(matchingEndStreams).take(1L);
    }

    private Flowable<?> timeoutStreamOf(Duration timeout) {
        if (timeout.isNegative()) {
            return Flowable.never();
        }
        return Flowable.timer((long)timeout.toMillis(), (TimeUnit)TimeUnit.MILLISECONDS);
    }
}

