/*
 * Decompiled with CFR 0.152.
 */
package org.streampipes.wrapper.flink;

import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import org.streampipes.model.SpDataStream;
import org.streampipes.model.graph.DataProcessorInvocation;
import org.streampipes.model.grounding.KafkaTransportProtocol;
import org.streampipes.wrapper.flink.FlinkDeploymentConfig;
import org.streampipes.wrapper.flink.FlinkRuntime;
import org.streampipes.wrapper.flink.serializer.SimpleJmsSerializer;
import org.streampipes.wrapper.flink.serializer.SimpleKafkaSerializer;
import org.streampipes.wrapper.flink.sink.FlinkJmsProducer;
import org.streampipes.wrapper.params.binding.EventProcessorBindingParams;

public abstract class FlinkDataProcessorRuntime<B extends EventProcessorBindingParams>
extends FlinkRuntime<B, DataProcessorInvocation> {
    private static final long serialVersionUID = 1L;

    @Deprecated
    public FlinkDataProcessorRuntime(B params) {
        super(params);
    }

    public FlinkDataProcessorRuntime(B params, boolean debug) {
        super(params, debug);
    }

    public FlinkDataProcessorRuntime(B params, FlinkDeploymentConfig config) {
        super(params, config);
    }

    @Override
    public void appendExecutionConfig(DataStream<Map<String, Object>> ... convertedStream) {
        DataStream<Map<String, Object>> applicationLogic = this.getApplicationLogic(convertedStream);
        SimpleKafkaSerializer kafkaSerializer = new SimpleKafkaSerializer();
        SimpleJmsSerializer jmsSerializer = new SimpleJmsSerializer();
        if (this.isKafkaProtocol(this.getOutputStream())) {
            applicationLogic.addSink((SinkFunction)new FlinkKafkaProducer010(this.getTopic(this.getOutputStream()), (SerializationSchema)kafkaSerializer, this.getProperties((KafkaTransportProtocol)this.getOutputStream().getEventGrounding().getTransportProtocol())));
        } else {
            applicationLogic.addSink(new FlinkJmsProducer<Map<String, Object>>(this.getJmsProtocol(this.getOutputStream()), jmsSerializer));
        }
    }

    private SpDataStream getOutputStream() {
        return ((DataProcessorInvocation)this.getGraph()).getOutputStream();
    }

    protected abstract DataStream<Map<String, Object>> getApplicationLogic(DataStream<Map<String, Object>> ... var1);

    protected Properties getProperties(KafkaTransportProtocol protocol) {
        Properties props = new Properties();
        String kafkaHost = protocol.getBrokerHostname();
        Integer kafkaPort = protocol.getKafkaPort();
        props.put("client.id", UUID.randomUUID().toString());
        props.put("bootstrap.servers", kafkaHost + ":" + kafkaPort);
        return props;
    }
}

