/*
 * Decompiled with CFR 0.152.
 */
package org.streampipes.wrapper.flink;

import java.io.Serializable;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.streampipes.commons.exceptions.SpRuntimeException;
import org.streampipes.model.SpDataStream;
import org.streampipes.model.base.InvocableStreamPipesEntity;
import org.streampipes.model.grounding.KafkaTransportProtocol;
import org.streampipes.model.grounding.SimpleTopicDefinition;
import org.streampipes.wrapper.distributed.runtime.DistributedRuntime;
import org.streampipes.wrapper.flink.FlinkDeploymentConfig;
import org.streampipes.wrapper.flink.FlinkJobController;
import org.streampipes.wrapper.flink.converter.JsonToMapFormat;
import org.streampipes.wrapper.flink.logger.StatisticLogger;
import org.streampipes.wrapper.params.binding.BindingParams;

public abstract class FlinkRuntime<B extends BindingParams<I>, I extends InvocableStreamPipesEntity>
extends DistributedRuntime<B, I>
implements Runnable,
Serializable {
    private static final long serialVersionUID = 1L;
    protected TimeCharacteristic streamTimeCharacteristic;
    protected FlinkDeploymentConfig config;
    private boolean debug;
    private StreamExecutionEnvironment env;

    public FlinkRuntime(B bindingParams) {
        this(bindingParams, new FlinkDeploymentConfig("", "localhost", 6123), true);
    }

    public FlinkRuntime(B bindingParams, FlinkDeploymentConfig config) {
        this(bindingParams, config, false);
    }

    private FlinkRuntime(B bindingParams, FlinkDeploymentConfig config, boolean debug) {
        super(bindingParams);
        this.config = config;
        this.debug = debug;
    }

    protected abstract void appendExecutionConfig(DataStream<Map<String, Object>> ... var1);

    @Override
    public void run() {
        try {
            this.env.execute(this.bindingParams.getGraph().getElementId());
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void setStreamTimeCharacteristic(TimeCharacteristic streamTimeCharacteristic) {
        this.streamTimeCharacteristic = streamTimeCharacteristic;
    }

    private SourceFunction<String> getStream1Source() {
        return this.getStreamSource(0);
    }

    private SourceFunction<String> getStream2Source() {
        return this.getStreamSource(1);
    }

    private SourceFunction<String> getStreamSource(int i) {
        if (this.bindingParams.getGraph().getInputStreams().size() - 1 >= i) {
            SpDataStream stream = (SpDataStream)this.bindingParams.getGraph().getInputStreams().get(i);
            if (stream != null) {
                KafkaTransportProtocol protocol = (KafkaTransportProtocol)stream.getEventGrounding().getTransportProtocol();
                if (protocol.getTopicDefinition() instanceof SimpleTopicDefinition) {
                    return new FlinkKafkaConsumer010(protocol.getTopicDefinition().getActualTopicName(), (DeserializationSchema)new SimpleStringSchema(), this.getProperties(protocol));
                }
                String patternTopic = this.replaceWildcardWithPatternFormat(protocol.getTopicDefinition().getActualTopicName());
                return new FlinkKafkaConsumer010(Pattern.compile(patternTopic), (DeserializationSchema)new SimpleStringSchema(), this.getProperties(protocol));
            }
            return null;
        }
        return null;
    }

    public void prepareRuntime() throws SpRuntimeException {
        this.env = this.debug ? StreamExecutionEnvironment.createLocalEnvironment() : StreamExecutionEnvironment.createRemoteEnvironment((String)this.config.getHost(), (int)this.config.getPort(), (String[])new String[]{this.config.getJarFile()});
        this.appendEnvironmentConfig(this.env);
        SourceFunction<String> source1 = this.getStream1Source();
        if (source1 == null) {
            throw new SpRuntimeException("At least one source must be defined for a flink sepa");
        }
        SingleOutputStreamOperator messageStream1 = this.env.addSource(source1).flatMap((FlatMapFunction)new JsonToMapFormat()).flatMap((FlatMapFunction)new StatisticLogger(this.getGraph()));
        SourceFunction<String> source2 = this.getStream2Source();
        if (source2 != null) {
            SingleOutputStreamOperator messageStream2 = this.env.addSource(source2).flatMap((FlatMapFunction)new JsonToMapFormat()).flatMap((FlatMapFunction)new StatisticLogger(this.getGraph()));
            this.appendExecutionConfig(new DataStream[]{messageStream1, messageStream2});
        } else {
            this.appendExecutionConfig(new DataStream[]{messageStream1});
        }
    }

    public void postDiscard() throws SpRuntimeException {
    }

    public void bindRuntime() throws SpRuntimeException {
        try {
            this.prepareRuntime();
            Thread thread = new Thread(this);
            thread.start();
            if (!this.debug) {
                FlinkJobController ctrl = new FlinkJobController(this.config.getHost(), this.config.getPort());
                boolean isDeployed = false;
                int count = 0;
                do {
                    try {
                        ++count;
                        Thread.sleep(1000L);
                        ctrl.findJobId(ctrl.getJobManagerGateway(), this.bindingParams.getGraph().getElementId());
                        isDeployed = true;
                    }
                    catch (Exception exception) {
                        // empty catch block
                    }
                } while (!isDeployed && count < 60);
                if (count == 60) {
                    throw new SpRuntimeException("Error: Timeout reached when trying to connect to Flink Job Controller");
                }
            }
        }
        catch (Exception e) {
            throw new SpRuntimeException(e.getMessage());
        }
    }

    public void discardRuntime() throws SpRuntimeException {
        FlinkJobController ctrl = new FlinkJobController(this.config.getHost(), this.config.getPort());
        try {
            if (!ctrl.deleteJob(ctrl.findJobId(ctrl.getJobManagerGateway(), this.bindingParams.getGraph().getElementId()))) {
                throw new SpRuntimeException("Could not stop Flink Job");
            }
        }
        catch (Exception e) {
            throw new SpRuntimeException("Could not find Flink Job Manager, is it running?");
        }
    }

    public void appendEnvironmentConfig(StreamExecutionEnvironment env) {
        if (this.streamTimeCharacteristic != null) {
            env.setStreamTimeCharacteristic(this.streamTimeCharacteristic);
        }
    }
}

