/*
 * Decompiled with CFR 0.152.
 */
package org.streampipes.manager.monitoring.runtime;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import org.streampipes.commons.exceptions.NoMatchingFormatException;
import org.streampipes.commons.exceptions.NoMatchingProtocolException;
import org.streampipes.commons.exceptions.NoMatchingSchemaException;
import org.streampipes.config.backend.BackendConfig;
import org.streampipes.manager.matching.PipelineVerificationHandler;
import org.streampipes.manager.operations.Operations;
import org.streampipes.model.SpDataSet;
import org.streampipes.model.SpDataStream;
import org.streampipes.model.client.pipeline.Pipeline;
import org.streampipes.model.client.pipeline.PipelineModification;
import org.streampipes.model.client.pipeline.PipelineModificationMessage;
import org.streampipes.model.graph.DataProcessorDescription;
import org.streampipes.model.graph.DataProcessorInvocation;
import org.streampipes.model.graph.DataSinkDescription;
import org.streampipes.model.graph.DataSinkInvocation;
import org.streampipes.model.graph.DataSourceDescription;
import org.streampipes.model.staticproperty.DomainStaticProperty;
import org.streampipes.model.staticproperty.FreeTextStaticProperty;
import org.streampipes.model.staticproperty.StaticProperty;
import org.streampipes.model.staticproperty.SupportedProperty;
import org.streampipes.storage.management.StorageManager;

public class SepStoppedMonitoringPipelineBuilder {
    private final String RATE_SEPA_URI = "http://ipe-koi05.perimeter.fzi.de:8090/sepa/streamStopped";
    private final String KAFKA_SEC_URI = "http://ipe-koi04.perimeter.fzi.de:8091/kafka";
    private final String OUTPUT_TOPIC = "internal.streamepipes.sec.stopped";
    private SpDataStream stream;
    private final String outputTopic;
    private DataSourceDescription dataSourceDescription;
    private DataProcessorDescription streamStoppedDataProcessorDescription;
    private DataSinkDescription kafkaDataSinkDescription;
    private String streamUri;

    public SepStoppedMonitoringPipelineBuilder(String sepUri, String streamUri) throws URISyntaxException {
        this.outputTopic = "internal.streamepipes.sec.stopped";
        this.streamUri = streamUri;
        DataSourceDescription desc = StorageManager.INSTANCE.getStorageAPI().getSEPById(sepUri);
        this.stream = StorageManager.INSTANCE.getStorageAPI().getEventStreamById(streamUri);
        this.dataSourceDescription = desc;
        this.streamStoppedDataProcessorDescription = this.getStreamStoppedEpa();
        this.kafkaDataSinkDescription = this.getKafkaPublisherEc();
    }

    public Pipeline buildPipeline() throws NoMatchingFormatException, NoMatchingSchemaException, NoMatchingProtocolException, Exception {
        DataProcessorInvocation rateSepaClient = new DataProcessorInvocation(this.streamStoppedDataProcessorDescription);
        SpDataStream streamClient = this.stream instanceof SpDataStream ? new SpDataStream(this.stream) : new SpDataSet((SpDataSet)this.stream);
        DataSinkInvocation kafkaActionClient = new DataSinkInvocation(this.kafkaDataSinkDescription);
        ArrayList<SpDataStream> elements = new ArrayList<SpDataStream>();
        elements.add(streamClient);
        rateSepaClient.setConnectedTo(Arrays.asList("stream"));
        streamClient.setDOM("stream");
        rateSepaClient.setDOM("rate");
        kafkaActionClient.setDOM("kafka");
        Pipeline pipeline = new Pipeline();
        pipeline.setStreams(Arrays.asList(streamClient));
        pipeline.setSepas(Arrays.asList(rateSepaClient));
        PipelineModificationMessage message = new PipelineVerificationHandler(pipeline).validateConnection().computeMappingProperties().getPipelineModificationMessage();
        DataProcessorInvocation updatedSepa = this.updateStreamStoppedSepa(rateSepaClient, message);
        pipeline.setSepas(Arrays.asList(updatedSepa));
        kafkaActionClient.setConnectedTo(Arrays.asList("rate"));
        pipeline.setActions(Arrays.asList(kafkaActionClient));
        message = new PipelineVerificationHandler(pipeline).validateConnection().computeMappingProperties().getPipelineModificationMessage();
        pipeline.setActions(Arrays.asList(this.updateKafkaSec(kafkaActionClient, message)));
        pipeline.setPipelineId(UUID.randomUUID().toString());
        pipeline.setName("Monitoring - " + this.stream.getName());
        return pipeline;
    }

    private DataSinkDescription getKafkaPublisherEc() throws URISyntaxException {
        return StorageManager.INSTANCE.getStorageAPI().getSECById("http://ipe-koi04.perimeter.fzi.de:8091/kafka");
    }

    private DataProcessorDescription getStreamStoppedEpa() throws URISyntaxException {
        return StorageManager.INSTANCE.getStorageAPI().getSEPAById("http://ipe-koi05.perimeter.fzi.de:8090/sepa/streamStopped");
    }

    private DataSinkInvocation updateKafkaSec(DataSinkInvocation actionClient, PipelineModificationMessage message) {
        List properties = ((PipelineModification)message.getPipelineModifications().get(0)).getStaticProperties();
        ArrayList<StaticProperty> newStaticProperties = new ArrayList<StaticProperty>();
        for (StaticProperty p : properties) {
            if (p instanceof FreeTextStaticProperty || p instanceof DomainStaticProperty) {
                if (p instanceof FreeTextStaticProperty) {
                    if (p.getInternalName().equals("topic")) {
                        ((FreeTextStaticProperty)p).setValue(this.outputTopic);
                    }
                } else if (p instanceof DomainStaticProperty) {
                    for (SupportedProperty sp : ((DomainStaticProperty)p).getSupportedProperties()) {
                        if (sp.getPropertyId().equals("http://schema.org/kafkaHost")) {
                            sp.setValue(String.valueOf(BackendConfig.INSTANCE.getKafkaHost()));
                            continue;
                        }
                        if (!sp.getPropertyId().equals("http://schema.org/kafkaPort")) continue;
                        sp.setValue(String.valueOf(BackendConfig.INSTANCE.getKafkaPort()));
                    }
                }
            }
            newStaticProperties.add(p);
        }
        actionClient.setStaticProperties(newStaticProperties);
        return actionClient;
    }

    private DataProcessorInvocation updateStreamStoppedSepa(DataProcessorInvocation newSEPA, PipelineModificationMessage message) {
        List properties = ((PipelineModification)message.getPipelineModifications().get(0)).getStaticProperties();
        ArrayList<StaticProperty> newStaticProperties = new ArrayList<StaticProperty>();
        for (StaticProperty p : properties) {
            if (p instanceof FreeTextStaticProperty && p.getInternalName().equals("topic")) {
                ((FreeTextStaticProperty)p).setValue(String.valueOf(this.streamUri));
            }
            newStaticProperties.add(p);
        }
        newSEPA.setStaticProperties(newStaticProperties);
        return newSEPA;
    }

    public static void main(String[] args) throws URISyntaxException {
        String SEP_URI = "http://frosch.fzi.de:8089//source-wunderbar";
        String STREAM_URI = "http://frosch.fzi.de:8089//source-wunderbar/accelerometer";
        SepStoppedMonitoringPipelineBuilder pc = new SepStoppedMonitoringPipelineBuilder(SEP_URI, STREAM_URI);
        try {
            Pipeline pipeline = pc.buildPipeline();
            Operations.startPipeline(pipeline, false, false, false);
            BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
            String s = br.readLine();
            Operations.stopPipeline(pipeline, false, false, false);
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }
}

