/*
 * Decompiled with CFR 0.152.
 */
package com.efficient.elasticsearch.flink;

import cn.hutool.core.util.StrUtil;
import com.efficient.elasticsearch.properties.KafkaProducerProperties;
import java.io.Serializable;
import java.util.Properties;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkKafKaListener {
    private static final Logger log = LoggerFactory.getLogger(FlinkKafKaListener.class);

    public static void run(KafkaProducerProperties kafkaProducerProperties, SinkFunction<String> sinkFunction, String jobName, String ... topics) {
        log.info("flink  kafKa listener  {}", (Object)jobName);
        Properties kafkaProps = new Properties();
        kafkaProps.setProperty("bootstrap.servers", kafkaProducerProperties.getBootstrapServers());
        kafkaProps.setProperty("group.id", kafkaProducerProperties.getGroupId());
        if (StrUtil.isNotBlank((CharSequence)kafkaProducerProperties.getKeySerializer())) {
            kafkaProps.put("key.serializer", kafkaProducerProperties.getKeySerializer());
        }
        if (StrUtil.isNotBlank((CharSequence)kafkaProducerProperties.getValueSerializer())) {
            kafkaProps.put("value.serializer", kafkaProducerProperties.getValueSerializer());
        }
        kafkaProps.setProperty("max.poll.records", String.valueOf(kafkaProducerProperties.getMaxPollRecords()));
        kafkaProps.setProperty("fetch.min.bytes", String.valueOf(kafkaProducerProperties.getFetchMinBytes()));
        kafkaProps.setProperty("fetch.max.wait.ms", String.valueOf(kafkaProducerProperties.getFetchMaxWaitMs()));
        try {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(kafkaProducerProperties.getParallelism().intValue());
            env.enableCheckpointing((long)kafkaProducerProperties.getCheckpointing().intValue());
            env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
            env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500L);
            env.getCheckpointConfig().setCheckpointTimeout(60000L);
            env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
            KafkaSource kafkaSource = KafkaSource.builder().setProperties(kafkaProps).setTopics(topics).setGroupId(kafkaProducerProperties.getGroupId()).setValueOnlyDeserializer((DeserializationSchema)new SimpleStringSchema()).build();
            DataStreamSource dateStream = env.fromSource((Source)kafkaSource, WatermarkStrategy.noWatermarks(), kafkaProducerProperties.getSourceName());
            boolean printLog = kafkaProducerProperties.isPrintLog();
            dateStream.map((MapFunction & Serializable)value -> {
                if (printLog) {
                    log.info("accept message: {}", value);
                }
                return value;
            }).addSink(sinkFunction).name(kafkaProducerProperties.getSinkName());
            env.execute(jobName);
        }
        catch (Exception e) {
            log.error("{} fail {}", new Object[]{jobName, e.getMessage(), e});
        }
    }
}

