package com.github.houbb.logstash4j.plugins.output;

import com.github.houbb.jsons.support.json.impl.Jsons;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.logstash4j.plugins.api.config.ILogstashConfig;
import com.github.houbb.logstash4j.plugins.api.output.AbstractLogstashOutput;
import com.github.houbb.logstash4j.plugins.api.support.LogstashEventDataContext;
import com.github.houbb.logstash4j.plugins.output.constant.OutputKafkaConfigEnum;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

/* loaded from: input_file:com/github/houbb/logstash4j/plugins/output/Kafka.class */
public class Kafka extends AbstractLogstashOutput {
    private static final Log log = LogFactory.getLog(Kafka.class);
    private Producer<String, String> producer;

    public void register(ILogstashConfig iLogstashConfig) {
        super.register(iLogstashConfig);
        Properties properties = new Properties();
        properties.put("bootstrap.servers", getConfigVal(OutputKafkaConfigEnum.BOOTSTRAP_SERVERS));
        properties.put("key.serializer", getConfigVal(OutputKafkaConfigEnum.KEY_SERIALIZER_CLASS));
        properties.put("value.serializer", getConfigVal(OutputKafkaConfigEnum.VALUE_SERIALIZER));
        properties.put("request.timeout.ms", getConfigVal(OutputKafkaConfigEnum.REQUEST_TIMEOUT_MS));
        properties.put("acks", getConfigVal(OutputKafkaConfigEnum.ACKS));
        properties.put("compression.type", getConfigVal(OutputKafkaConfigEnum.COMPRESSION_TYPE));
        this.producer = new KafkaProducer(properties);
    }

    public void doProcess(LogstashEventDataContext logstashEventDataContext) {
        this.producer.send(new ProducerRecord((String) getConfigVal(OutputKafkaConfigEnum.TOPIC_ID), Jsons.fastJson().toJson(logstashEventDataContext.getEventMap())), (recordMetadata, exc) -> {
            if (exc == null) {
                log.debug("Message sent successfully! Topic: {}, Partition: {}, Offset: {}", new Object[]{recordMetadata.topic(), Integer.valueOf(recordMetadata.partition()), Long.valueOf(recordMetadata.offset())});
            } else {
                log.error("Error sending message: ", exc);
            }
        });
    }

    public void stop() {
        this.producer.close();
    }
}
