/*
 * Decompiled with CFR 0.152.
 */
package org.streampipes.manager.matching;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang.RandomStringUtils;
import org.streampipes.config.backend.BackendConfig;
import org.streampipes.manager.data.PipelineGraph;
import org.streampipes.manager.data.PipelineGraphHelpers;
import org.streampipes.manager.matching.GroundingBuilder;
import org.streampipes.manager.matching.output.OutputSchemaFactory;
import org.streampipes.manager.matching.output.OutputSchemaGenerator;
import org.streampipes.model.SpDataStream;
import org.streampipes.model.base.InvocableStreamPipesEntity;
import org.streampipes.model.base.NamedStreamPipesEntity;
import org.streampipes.model.graph.DataProcessorInvocation;
import org.streampipes.model.grounding.EventGrounding;
import org.streampipes.model.monitoring.ElementStatusInfoSettings;
import org.streampipes.model.schema.EventSchema;
import org.streampipes.sdk.helpers.Tuple2;

public class InvocationGraphBuilder {
    private PipelineGraph pipelineGraph;
    private String pipelineId;
    private List<InvocableStreamPipesEntity> graphs = new ArrayList<InvocableStreamPipesEntity>();

    public InvocationGraphBuilder(PipelineGraph pipelineGraph, String pipelineId) {
        this.pipelineGraph = pipelineGraph;
        this.pipelineId = pipelineId;
    }

    public List<InvocableStreamPipesEntity> buildGraphs() {
        List<SpDataStream> streams = PipelineGraphHelpers.findStreams(this.pipelineGraph);
        for (SpDataStream stream : streams) {
            Set<InvocableStreamPipesEntity> connectedElements = this.getConnections((NamedStreamPipesEntity)stream);
            this.configure((NamedStreamPipesEntity)stream, connectedElements);
        }
        return this.graphs;
    }

    private void configure(NamedStreamPipesEntity source, Set<InvocableStreamPipesEntity> targets) {
        EventGrounding inputGrounding = new GroundingBuilder(source, targets).getEventGrounding();
        if (source instanceof InvocableStreamPipesEntity) {
            if (source instanceof DataProcessorInvocation) {
                Tuple2 outputSettings;
                DataProcessorInvocation dataProcessorInvocation = (DataProcessorInvocation)source;
                OutputSchemaGenerator<?> schemaGenerator = new OutputSchemaFactory(dataProcessorInvocation).getOuputSchemaGenerator();
                if (((DataProcessorInvocation)source).getInputStreams().size() == 1) {
                    outputSettings = schemaGenerator.buildFromOneStream((SpDataStream)dataProcessorInvocation.getInputStreams().get(0));
                } else if (this.graphExists(dataProcessorInvocation.getDOM())) {
                    DataProcessorInvocation existingInvocation = (DataProcessorInvocation)this.find(dataProcessorInvocation.getDOM());
                    outputSettings = schemaGenerator.buildFromTwoStreams((SpDataStream)existingInvocation.getInputStreams().get(0), (SpDataStream)dataProcessorInvocation.getInputStreams().get(1));
                    this.graphs.remove(existingInvocation);
                } else {
                    outputSettings = new Tuple2((Object)new EventSchema(), dataProcessorInvocation.getOutputStrategies().get(0));
                }
                SpDataStream outputStream = new SpDataStream();
                outputStream.setEventGrounding(inputGrounding);
                dataProcessorInvocation.setOutputStrategies(Collections.singletonList(outputSettings.b));
                outputStream.setEventSchema((EventSchema)outputSettings.a);
                ((DataProcessorInvocation)source).setOutputStream(outputStream);
            }
            if (!this.graphExists(source.getDOM())) {
                this.graphs.add((InvocableStreamPipesEntity)source);
            }
        }
        targets.forEach(t -> {
            ((SpDataStream)t.getInputStreams().get(this.getIndex(source.getDOM(), (InvocableStreamPipesEntity)t))).setEventGrounding(inputGrounding);
            ((SpDataStream)t.getInputStreams().get(this.getIndex(source.getDOM(), (InvocableStreamPipesEntity)t))).setEventSchema(this.getInputSchema(source));
            String elementIdentifier = this.makeElementIdentifier(this.pipelineId, inputGrounding.getTransportProtocol().getTopicDefinition().getActualTopicName(), t.getName());
            t.setElementId(t.getBelongsTo() + "/" + elementIdentifier);
            t.setCorrespondingPipeline(this.pipelineId);
            t.setStatusInfoSettings(this.makeStatusInfoSettings(elementIdentifier));
            this.configure((NamedStreamPipesEntity)t, this.getConnections((NamedStreamPipesEntity)t));
        });
    }

    private ElementStatusInfoSettings makeStatusInfoSettings(String elementIdentifier) {
        ElementStatusInfoSettings statusSettings = new ElementStatusInfoSettings();
        statusSettings.setKafkaHost(BackendConfig.INSTANCE.getKafkaHost());
        statusSettings.setKafkaPort(BackendConfig.INSTANCE.getKafkaPort());
        statusSettings.setErrorTopic(elementIdentifier + ".error");
        statusSettings.setStatsTopic(elementIdentifier + ".stats");
        statusSettings.setElementIdentifier(elementIdentifier);
        return statusSettings;
    }

    private String makeElementIdentifier(String pipelineId, String topic, String elementName) {
        return pipelineId + "-" + topic + "-" + elementName.replaceAll(" ", "").toLowerCase() + "-" + RandomStringUtils.randomAlphabetic((int)5);
    }

    private EventSchema getInputSchema(NamedStreamPipesEntity source) {
        if (source instanceof SpDataStream) {
            return ((SpDataStream)source).getEventSchema();
        }
        if (source instanceof DataProcessorInvocation) {
            return ((DataProcessorInvocation)source).getOutputStream().getEventSchema();
        }
        throw new IllegalArgumentException();
    }

    private Set<InvocableStreamPipesEntity> getConnections(NamedStreamPipesEntity source) {
        Set outgoingEdges = this.pipelineGraph.outgoingEdgesOf(source);
        return outgoingEdges.stream().map(o -> (NamedStreamPipesEntity)this.pipelineGraph.getEdgeTarget(o)).map(g -> (InvocableStreamPipesEntity)g).collect(Collectors.toSet());
    }

    private Integer getIndex(String sourceDomId, InvocableStreamPipesEntity targetElement) {
        return targetElement.getConnectedTo().indexOf(sourceDomId);
    }

    private boolean graphExists(String domId) {
        return this.graphs.stream().anyMatch(g -> g.getDOM().equals(domId));
    }

    private InvocableStreamPipesEntity find(String domId) {
        return this.graphs.stream().filter(g -> g.getDOM().equals(domId)).findFirst().get();
    }
}

