/*
 * Decompiled with CFR 0.152.
 */
package org.streampipes.manager.monitoring.runtime;

import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.streampipes.commons.exceptions.NoMatchingFormatException;
import org.streampipes.commons.exceptions.NoMatchingProtocolException;
import org.streampipes.commons.exceptions.NoMatchingSchemaException;
import org.streampipes.config.backend.BackendConfig;
import org.streampipes.manager.monitoring.runtime.EpRuntimeMonitoring;
import org.streampipes.manager.monitoring.runtime.PipelineObserver;
import org.streampipes.manager.monitoring.runtime.SepStoppedMonitoringPipelineBuilder;
import org.streampipes.manager.operations.Operations;
import org.streampipes.messaging.InternalEventProcessor;
import org.streampipes.messaging.kafka.SpKafkaConsumer;
import org.streampipes.model.SpDataStream;
import org.streampipes.model.client.pipeline.Pipeline;
import org.streampipes.model.graph.DataSourceDescription;
import org.streampipes.storage.couchdb.impl.PipelineStorageImpl;

public class SepStoppedMonitoring
implements EpRuntimeMonitoring<DataSourceDescription>,
Runnable {
    private Map<String, List<PipelineObserver>> streamToObserver;
    private Map<String, Pipeline> streamToStoppedMonitoringPipeline;
    private SpKafkaConsumer kafkaConsumerGroup;

    @Override
    public boolean register(PipelineObserver observer) {
        try {
            Pipeline pipeline = new PipelineStorageImpl().getPipeline(observer.getPipelineId());
            ArrayList allStreams = new ArrayList();
            pipeline.getStreams().forEach(s -> allStreams.add(s));
            for (SpDataStream s2 : allStreams) {
                if (this.streamToObserver.get(s2.getElementId()) == null) {
                    ArrayList<PipelineObserver> po = new ArrayList<PipelineObserver>();
                    po.add(observer);
                    String streamId = s2.getElementId();
                    String sourceId = streamId.substring(0, streamId.lastIndexOf("/"));
                    this.streamToObserver.put(streamId, po);
                    Pipeline p = new SepStoppedMonitoringPipelineBuilder(sourceId, streamId).buildPipeline();
                    Operations.startPipeline(p, false, false, false);
                    this.streamToStoppedMonitoringPipeline.put(streamId, p);
                    continue;
                }
                this.streamToObserver.get(s2.getElementId()).add(observer);
            }
        }
        catch (URISyntaxException e) {
            e.printStackTrace();
        }
        catch (NoMatchingFormatException e) {
            e.printStackTrace();
        }
        catch (NoMatchingSchemaException e) {
            e.printStackTrace();
        }
        catch (NoMatchingProtocolException e) {
            e.printStackTrace();
        }
        catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }

    @Override
    public boolean remove(PipelineObserver observer) {
        Pipeline pipeline = new PipelineStorageImpl().getPipeline(observer.getPipelineId());
        List streams = pipeline.getStreams();
        for (SpDataStream sc : streams) {
            String streamId = sc.getElementId();
            List<PipelineObserver> po = this.streamToObserver.get(streamId);
            if (po.size() == 1) {
                this.streamToObserver.remove(streamId);
                Operations.stopPipeline(this.streamToStoppedMonitoringPipeline.get(streamId), false, false, false);
                this.streamToStoppedMonitoringPipeline.remove(streamId);
                continue;
            }
            po.remove(observer);
        }
        return false;
    }

    @Override
    public void run() {
        this.streamToObserver = new HashMap<String, List<PipelineObserver>>();
        this.streamToStoppedMonitoringPipeline = new HashMap<String, Pipeline>();
        String topic = "internal.streamepipes.sec.stopped";
        this.kafkaConsumerGroup = new SpKafkaConsumer(BackendConfig.INSTANCE.getKafkaUrl(), topic, (InternalEventProcessor)new KafkaCallback());
        Thread thread = new Thread((Runnable)this.kafkaConsumerGroup);
        thread.start();
    }

    public static void main(String[] args) throws IOException {
        SepStoppedMonitoring monitoring = new SepStoppedMonitoring();
        monitoring.run();
        String id = "baaaf5b2-5412-4ac1-a7eb-04aeaf0e12b8";
        PipelineObserver observer1 = new PipelineObserver(id);
        id = "ef915142-2a08-4166-8bea-8d946ae31cd6";
        PipelineObserver observer2 = new PipelineObserver(id);
        id = "b3c0b6ad-05df-4670-a078-83775eeb550b";
        PipelineObserver observer3 = new PipelineObserver(id);
        monitoring.register(observer1);
        monitoring.register(observer2);
        monitoring.register(observer3);
        BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
        String s = br.readLine();
        monitoring.remove(observer1);
        monitoring.remove(observer2);
        monitoring.remove(observer3);
        System.out.println("laalal");
    }

    private class KafkaCallback
    implements InternalEventProcessor<byte[]> {
        private KafkaCallback() {
        }

        public void onEvent(byte[] payload) {
            String str = new String(payload, StandardCharsets.UTF_8);
            JsonObject jo = new JsonParser().parse(str).getAsJsonObject();
            List listPos = (List)SepStoppedMonitoring.this.streamToObserver.get(jo.get("topic").getAsString());
            for (PipelineObserver po : listPos) {
                po.update();
            }
        }
    }
}

