/*
 * Decompiled with CFR 0.152.
 */
package org.streamingpool.core.service.impl;

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.streamingpool.core.domain.ErrorStreamPair;
import org.streamingpool.core.service.CycleInStreamDiscoveryDetectedException;
import org.streamingpool.core.service.DiscoveryService;
import org.streamingpool.core.service.StreamFactory;
import org.streamingpool.core.service.StreamId;
import org.streamingpool.core.service.impl.PoolContent;

public class TrackKeepingDiscoveryService
implements DiscoveryService {
    private static final Logger LOGGER = LoggerFactory.getLogger(TrackKeepingDiscoveryService.class);
    private final Set<StreamId<?>> idsOfStreamsUnderCreation;
    private final List<StreamFactory> factories;
    private final PoolContent content;
    private final Thread contextOfExecution;

    public TrackKeepingDiscoveryService(List<StreamFactory> factories, PoolContent content) {
        this(factories, content, new HashSet(), Thread.currentThread());
    }

    public TrackKeepingDiscoveryService(List<StreamFactory> factories, PoolContent content, Set<StreamId<?>> idsOfStreamsUnderCreation, Thread contextOfExecution) {
        this.factories = Objects.requireNonNull(factories, "factories must not be null");
        this.content = Objects.requireNonNull(content, "activeStreams must not be null");
        this.idsOfStreamsUnderCreation = Collections.unmodifiableSet(idsOfStreamsUnderCreation);
        this.contextOfExecution = Objects.requireNonNull(contextOfExecution, "contextOfExecution must not be null");
    }

    @Override
    public <T> Publisher<T> discover(StreamId<T> id) {
        this.checkSameContexOfExecution();
        this.checkForRecursiveCycles(id);
        this.content.synchronousPutIfAbsent(id, () -> this.createFromFactories(id));
        return this.getStreamWithIdOrElseThrow(id);
    }

    private <T> Publisher<T> getStreamWithIdOrElseThrow(StreamId<T> id) {
        Publisher<T> activeStream = this.content.get(id);
        if (activeStream == null) {
            throw new IllegalArgumentException("The stream for id '" + id + "' is neither present nor can it be created by any factory.");
        }
        return activeStream;
    }

    private <T> void checkForRecursiveCycles(StreamId<T> id) {
        if (this.idsOfStreamsUnderCreation.contains(id)) {
            throw new CycleInStreamDiscoveryDetectedException(String.format("Cycle detected when looking up streams. (At least) the following id was queried twice: %s. Number of queried ids without revolving: %s", id, this.idsOfStreamsUnderCreation.size()));
        }
    }

    private void checkSameContexOfExecution() {
        if (!Thread.currentThread().equals(this.contextOfExecution)) {
            throw new IllegalStateException(String.format("Invalid context of execution. It is not allowed to recursively discover streams from different threads. The allowed thread is [%s] while the current one is [%s]", this.contextOfExecution.getName(), Thread.currentThread().getName()));
        }
    }

    private <T> TrackKeepingDiscoveryService cloneDiscoveryServiceIncluding(StreamId<T> newId) {
        HashSet newSet = new HashSet(this.idsOfStreamsUnderCreation);
        newSet.add(newId);
        return new TrackKeepingDiscoveryService(this.factories, this.content, newSet, this.contextOfExecution);
    }

    private <T> ErrorStreamPair<T> createFromFactories(StreamId<T> newId) {
        for (StreamFactory factory : this.factories) {
            ErrorStreamPair<T> factoryResult = factory.create(newId, this.cloneDiscoveryServiceIncluding(newId));
            if (factoryResult == null) {
                throw new IllegalStateException(String.format("Factory %s returned null instead of a valid stream object for the id %s", factory, newId));
            }
            if (!factoryResult.isPresent()) continue;
            LOGGER.info(String.format("Stream from id '%s' was successfully created by factory '%s'", newId, factory));
            return factoryResult;
        }
        return ErrorStreamPair.empty();
    }
}

