/*
 * Decompiled with CFR 0.152.
 */
package org.streampipes.wrapper.flink;

import java.util.Properties;
import java.util.UUID;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.streampipes.dataformat.SpDataFormatDefinition;
import org.streampipes.model.SpDataStream;
import org.streampipes.model.graph.DataProcessorInvocation;
import org.streampipes.model.grounding.EventGrounding;
import org.streampipes.model.grounding.KafkaTransportProtocol;
import org.streampipes.model.grounding.TransportFormat;
import org.streampipes.model.runtime.Event;
import org.streampipes.wrapper.context.EventProcessorRuntimeContext;
import org.streampipes.wrapper.flink.FlinkDeploymentConfig;
import org.streampipes.wrapper.flink.FlinkRuntime;
import org.streampipes.wrapper.flink.converter.EventToMapConverter;
import org.streampipes.wrapper.flink.serializer.ByteArraySerializer;
import org.streampipes.wrapper.flink.sink.FlinkJmsProducer;
import org.streampipes.wrapper.params.binding.EventProcessorBindingParams;
import org.streampipes.wrapper.params.runtime.EventProcessorRuntimeParams;

public abstract class FlinkDataProcessorRuntime<B extends EventProcessorBindingParams>
extends FlinkRuntime<EventProcessorRuntimeParams<B>, B, DataProcessorInvocation, EventProcessorRuntimeContext> {
    private static final long serialVersionUID = 1L;

    @Deprecated
    public FlinkDataProcessorRuntime(B params) {
        super(params);
    }

    public FlinkDataProcessorRuntime(B params, boolean debug) {
        super(params, debug);
    }

    public FlinkDataProcessorRuntime(B params, FlinkDeploymentConfig config) {
        super(params, config);
    }

    @Override
    public void appendExecutionConfig(DataStream<Event> ... convertedStream) {
        SingleOutputStreamOperator applicationLogic = this.getApplicationLogic(convertedStream).flatMap((FlatMapFunction)new EventToMapConverter());
        EventGrounding outputGrounding = this.getOutputStream().getEventGrounding();
        SpDataFormatDefinition outputDataFormatDefinition = this.getDataFormatDefinition((TransportFormat)outputGrounding.getTransportFormats().get(0));
        ByteArraySerializer serializer = new ByteArraySerializer(outputDataFormatDefinition);
        if (this.isKafkaProtocol(this.getOutputStream())) {
            applicationLogic.addSink((SinkFunction)new FlinkKafkaProducer(this.getTopic(this.getOutputStream()), (SerializationSchema)serializer, this.getProducerProperties((KafkaTransportProtocol)outputGrounding.getTransportProtocol())));
        } else {
            applicationLogic.addSink((SinkFunction)new FlinkJmsProducer(this.getJmsProtocol(this.getOutputStream()), serializer));
        }
    }

    private SpDataStream getOutputStream() {
        return ((DataProcessorInvocation)this.getGraph()).getOutputStream();
    }

    protected abstract DataStream<Event> getApplicationLogic(DataStream<Event> ... var1);

    protected Properties getProperties(KafkaTransportProtocol protocol) {
        Properties props = new Properties();
        String kafkaHost = protocol.getBrokerHostname();
        Integer kafkaPort = protocol.getKafkaPort();
        props.put("client.id", UUID.randomUUID().toString());
        props.put("bootstrap.servers", kafkaHost + ":" + kafkaPort);
        return props;
    }

    protected EventProcessorRuntimeParams<B> makeRuntimeParams() {
        return new EventProcessorRuntimeParams((EventProcessorBindingParams)this.bindingParams, Boolean.valueOf(false));
    }
}

