/*
 * Decompiled with CFR 0.152.
 */
package cern.streaming.pool.ext.tensorics.streamfactory;

import cern.streaming.pool.core.service.DiscoveryService;
import cern.streaming.pool.core.service.StreamFactory;
import cern.streaming.pool.core.service.StreamId;
import cern.streaming.pool.core.service.streamid.OverlapBufferStreamId;
import cern.streaming.pool.ext.tensorics.evaluation.BufferedEvaluation;
import cern.streaming.pool.ext.tensorics.evaluation.ContinuousEvaluation;
import cern.streaming.pool.ext.tensorics.evaluation.EvaluationStrategy;
import cern.streaming.pool.ext.tensorics.evaluation.TriggeredEvaluation;
import cern.streaming.pool.ext.tensorics.exception.NoBufferedStreamSpecifiedException;
import cern.streaming.pool.ext.tensorics.expression.UnresolvedStreamIdBasedExpression;
import cern.streaming.pool.ext.tensorics.streamid.DetailedExpressionStreamId;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.reactivex.Flowable;
import io.reactivex.functions.Function;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.tensorics.core.resolve.domain.DetailedExpressionResult;
import org.tensorics.core.resolve.engine.ResolvedContextDidNotGrowException;
import org.tensorics.core.resolve.engine.ResolvingEngine;
import org.tensorics.core.resolve.options.HandleWithFirstCapableAncestorStrategy;
import org.tensorics.core.resolve.options.ResolvingOption;
import org.tensorics.core.tree.domain.Contexts;
import org.tensorics.core.tree.domain.EditableResolvingContext;
import org.tensorics.core.tree.domain.Expression;
import org.tensorics.core.tree.domain.ResolvingContext;
import org.tensorics.core.tree.walking.Trees;

public class DetailedTensoricsExpressionStreamFactory
implements StreamFactory {
    private static final HandleWithFirstCapableAncestorStrategy EXCEPTION_HANDLING_STRATEGY = new HandleWithFirstCapableAncestorStrategy();
    private static final Function<Object[], Boolean> TRIGGER_CONTEXT_COMBINER = entriesToCombine -> true;
    private static final Function<Object[], ResolvingContext> CONTEXT_COMBINER = entriesToCombine -> {
        EditableResolvingContext context = Contexts.newResolvingContext();
        for (Object entry : entriesToCombine) {
            if (!(entry instanceof ExpToValue)) continue;
            ExpToValue castedEntry = (ExpToValue)entry;
            context.put(castedEntry.node, castedEntry.value);
        }
        return context;
    };
    private final ResolvingEngine engine;

    public DetailedTensoricsExpressionStreamFactory(ResolvingEngine engine) {
        this.engine = engine;
    }

    public <T> Optional<Publisher<T>> create(StreamId<T> id, DiscoveryService discoveryService) {
        if (!(id instanceof DetailedExpressionStreamId)) {
            return Optional.empty();
        }
        return Optional.of(this.resolvedStream((DetailedExpressionStreamId)id, discoveryService));
    }

    private <T, E extends Expression<T>> Flowable<DetailedExpressionResult<T, E>> resolvedStream(DetailedExpressionStreamId<T, E> id, DiscoveryService discoveryService) {
        Object expression = id.expression();
        ResolvingContext initialCtx = id.initialCtx();
        Map<Expression<Object>, StreamId<Object>> streamIds = this.streamIdsFrom(id);
        HashMap<StreamId<Object>, Flowable> observableEntries = new HashMap<StreamId<Object>, Flowable>();
        for (Map.Entry<Expression<Object>, StreamId<Object>> entry : streamIds.entrySet()) {
            Flowable plainObservable = Flowable.fromPublisher((Publisher)discoveryService.discover(entry.getValue()));
            Flowable mappedObservable = plainObservable.map(obj -> new ExpToValue((Expression<Object>)((Expression)entry.getKey()), obj));
            observableEntries.put(entry.getValue(), mappedObservable);
        }
        return DetailedTensoricsExpressionStreamFactory.triggerObservable(observableEntries, id.evaluationStrategy(), discoveryService).withLatestFrom((Publisher[])observableEntries.values().toArray(new Flowable[0]), CONTEXT_COMBINER).map(ctx -> {
            EditableResolvingContext fullContext = Contexts.newResolvingContext();
            fullContext.putAllNew(ctx);
            fullContext.putAllNew(initialCtx);
            return this.engine.resolveDetailed(expression, (ResolvingContext)fullContext, new ResolvingOption[]{EXCEPTION_HANDLING_STRATEGY});
        });
    }

    @VisibleForTesting
    <T extends Expression<?>> Map<Expression<Object>, StreamId<Object>> streamIdsFrom(DetailedExpressionStreamId<?, T> id) {
        T rootExpression = id.expression();
        ResolvingContext initialCtx = id.initialCtx();
        Set unresolvedStreamIdExpressions = Trees.findNodesOfClass(rootExpression, UnresolvedStreamIdBasedExpression.class);
        ImmutableMap.Builder mapBuilder = ImmutableMap.builder();
        for (UnresolvedStreamIdBasedExpression unresolvedStreamIdExpression : unresolvedStreamIdExpressions) {
            try {
                Expression streamIdExpression = unresolvedStreamIdExpression.streamIdExpression();
                StreamId streamId = (StreamId)this.engine.resolve(streamIdExpression, initialCtx, new ResolvingOption[0]);
                mapBuilder.put((Object)unresolvedStreamIdExpression, (Object)streamId);
            }
            catch (ResolvedContextDidNotGrowException ex) {
                throw new RuntimeException(String.format("Context did not grow while resolving the StreamId of expression. This is most probably because the initial context (%s) did not contain the value of the current UnresolvedStreamIdBasedExpression (%s).", new Object[]{initialCtx, unresolvedStreamIdExpression}), ex);
            }
        }
        return mapBuilder.build();
    }

    private static final Flowable<?> triggerObservable(Map<StreamId<?>, ? extends Flowable<?>> flowables, EvaluationStrategy strategy, DiscoveryService discoveryService) {
        if (strategy instanceof ContinuousEvaluation) {
            return Flowable.combineLatest(flowables.values(), TRIGGER_CONTEXT_COMBINER);
        }
        if (strategy instanceof BufferedEvaluation) {
            List triggeringObservables = flowables.entrySet().stream().filter(e -> e.getKey() instanceof OverlapBufferStreamId).map(Map.Entry::getValue).collect(Collectors.toList());
            if (triggeringObservables.isEmpty()) {
                throw new NoBufferedStreamSpecifiedException();
            }
            return Flowable.zip(triggeringObservables, ImmutableSet::of);
        }
        if (strategy instanceof TriggeredEvaluation) {
            return Flowable.fromPublisher((Publisher)discoveryService.discover(((TriggeredEvaluation)strategy).triggeringStreamId()));
        }
        throw new IllegalArgumentException("Unknown evaluationStrategy '" + strategy + "'. Cannot create trigger Observable.");
    }

    private static final class ExpToValue {
        private final Expression<Object> node;
        private final Object value;

        public ExpToValue(Expression<Object> node, Object value) {
            this.node = node;
            this.value = value;
        }
    }
}

