/*
 * Decompiled with CFR 0.152.
 */
package org.streamingpool.core.service.impl;

import io.reactivex.BackpressureOverflowStrategy;
import io.reactivex.Flowable;
import io.reactivex.functions.Action;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.streamingpool.core.conf.PoolConfiguration;
import org.streamingpool.core.domain.ErrorStreamPair;
import org.streamingpool.core.domain.backpressure.BackpressureAware;
import org.streamingpool.core.domain.backpressure.BackpressureBufferStrategy;
import org.streamingpool.core.domain.backpressure.BackpressureDropStrategy;
import org.streamingpool.core.domain.backpressure.BackpressureLatestStrategy;
import org.streamingpool.core.domain.backpressure.BackpressureNoneStrategy;
import org.streamingpool.core.domain.backpressure.BackpressureStrategy;
import org.streamingpool.core.service.CycleInStreamDiscoveryDetectedException;
import org.streamingpool.core.service.DiscoveryService;
import org.streamingpool.core.service.StreamFactory;
import org.streamingpool.core.service.StreamId;
import org.streamingpool.core.service.impl.PoolContent;

public class TrackKeepingDiscoveryService
implements DiscoveryService {
    private static final Logger LOGGER = LoggerFactory.getLogger(TrackKeepingDiscoveryService.class);
    private final Action NOOP = () -> {};
    private final Set<StreamId<?>> idsOfStreamsUnderCreation;
    private final Set<StreamId<?>> idsDiscovered = new HashSet();
    private final List<StreamFactory> factories;
    private final PoolContent content;
    private final Thread contextOfExecution;
    private final PoolConfiguration poolConfiguration;

    public TrackKeepingDiscoveryService(List<StreamFactory> factories, PoolContent content, PoolConfiguration poolConfiguration) {
        this(factories, content, new HashSet(), Thread.currentThread(), poolConfiguration);
    }

    private TrackKeepingDiscoveryService(List<StreamFactory> factories, PoolContent content, Set<StreamId<?>> idsOfStreamsUnderCreation, Thread contextOfExecution, PoolConfiguration poolConfiguration) {
        this.factories = Objects.requireNonNull(factories, "factories must not be null");
        this.content = Objects.requireNonNull(content, "activeStreams must not be null");
        this.idsOfStreamsUnderCreation = Collections.unmodifiableSet(idsOfStreamsUnderCreation);
        this.contextOfExecution = Objects.requireNonNull(contextOfExecution, "contextOfExecution must not be null");
        this.poolConfiguration = poolConfiguration;
    }

    @Override
    public <T> Publisher<T> discover(StreamId<T> id) {
        this.checkSameContexOfExecution();
        this.checkForRecursiveCycles(id);
        this.content.synchronousPutIfAbsent(id, () -> {
            for (StreamFactory factory : this.factories) {
                TrackKeepingDiscoveryService innerDiscoveryService;
                ErrorStreamPair factoryResult = factory.create(id, innerDiscoveryService = this.cloneDiscoveryServiceIncluding(id));
                if (factoryResult == null) {
                    throw new IllegalStateException(String.format("Factory %s returned null instead of a valid stream object for the id %s", factory, id));
                }
                if (!factoryResult.isPresent()) continue;
                LOGGER.info("Stream from id '{}' was successfully created by factory '{}'", (Object)id, (Object)factory);
                innerDiscoveryService.idsDiscovered.forEach(parent -> this.content.addDependency(id, (StreamId<?>)parent));
                return ErrorStreamPair.ofDataError(factoryResult.data(), factoryResult.error());
            }
            return ErrorStreamPair.empty();
        });
        this.idsDiscovered.add(id);
        Publisher<T> publisher = this.getStreamWithIdOrElseThrow(id);
        Flowable<T> stream = this.observeOnThreadPool(publisher);
        if (id instanceof BackpressureAware) {
            stream = this.applyBackpressureStrategy(stream, ((BackpressureAware)((Object)id)).backpressureStrategy());
        }
        return stream;
    }

    private <T> Flowable<T> applyBackpressureStrategy(Flowable<T> source, BackpressureStrategy backpressureStrategy) {
        if (backpressureStrategy == null) {
            return source;
        }
        if (backpressureStrategy instanceof BackpressureLatestStrategy) {
            return source.onBackpressureLatest();
        }
        if (backpressureStrategy instanceof BackpressureDropStrategy) {
            return source.onBackpressureDrop(i -> {});
        }
        if (backpressureStrategy instanceof BackpressureBufferStrategy) {
            BackpressureBufferStrategy bufferStrategy = (BackpressureBufferStrategy)backpressureStrategy;
            if (bufferStrategy.overflowStrategy() == BackpressureBufferStrategy.BackpressureBufferOverflowStrategy.DROP_LATEST) {
                return source.onBackpressureBuffer((long)bufferStrategy.bufferSize(), this.NOOP, BackpressureOverflowStrategy.DROP_LATEST);
            }
            if (bufferStrategy.overflowStrategy() == BackpressureBufferStrategy.BackpressureBufferOverflowStrategy.DROP_OLDEST) {
                return source.onBackpressureBuffer((long)bufferStrategy.bufferSize(), this.NOOP, BackpressureOverflowStrategy.DROP_OLDEST);
            }
            throw new IllegalArgumentException("Cannot determine the specified buffer overflow strategy: " + bufferStrategy);
        }
        if (backpressureStrategy instanceof BackpressureNoneStrategy) {
            return source;
        }
        throw new IllegalArgumentException("Cannot determine the specified backpressure strategy: " + backpressureStrategy);
    }

    private <T> Flowable<T> observeOnThreadPool(Publisher<T> publisher) {
        return Flowable.fromPublisher(publisher).observeOn(this.poolConfiguration.getScheduler(), false, this.poolConfiguration.getObserveOnCapacity());
    }

    private <T> Publisher<T> getStreamWithIdOrElseThrow(StreamId<T> id) {
        Publisher<T> activeStream = this.content.get(id);
        if (activeStream == null) {
            throw new IllegalArgumentException("The stream for id '" + id + "' is neither present nor can it be created by any factory.");
        }
        return activeStream;
    }

    private <T> void checkForRecursiveCycles(StreamId<T> id) {
        if (this.idsOfStreamsUnderCreation.contains(id)) {
            throw new CycleInStreamDiscoveryDetectedException(String.format("Cycle detected when looking up streams. (At least) the following id was queried twice: %s. Number of queried ids without revolving: %s", id, this.idsOfStreamsUnderCreation.size()));
        }
    }

    private void checkSameContexOfExecution() {
        if (!Thread.currentThread().equals(this.contextOfExecution)) {
            throw new IllegalStateException(String.format("Invalid context of execution. It is not allowed to recursively discover streams from different threads. The allowed thread is [%s] while the current one is [%s]", this.contextOfExecution.getName(), Thread.currentThread().getName()));
        }
    }

    private <T> TrackKeepingDiscoveryService cloneDiscoveryServiceIncluding(StreamId<T> newId) {
        HashSet newSet = new HashSet(this.idsOfStreamsUnderCreation);
        newSet.add(newId);
        return new TrackKeepingDiscoveryService(this.factories, this.content, newSet, this.contextOfExecution, this.poolConfiguration);
    }
}

