package com.github.houbb.logstash4j.plugins.input;

import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.logstash4j.plugins.api.input.AbstractLogstashInput;
import com.github.houbb.logstash4j.plugins.input.constant.InputKafkaConfigEnum;
import java.time.Duration;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.consumer.KafkaConsumer;

/* loaded from: input_file:com/github/houbb/logstash4j/plugins/input/Kafka.class */
public class Kafka extends AbstractLogstashInput {
    private static final Log log = LogFactory.getLog(Kafka.class);

    public void emit() {
        try {
            Properties properties = new Properties();
            properties.put("bootstrap.servers", getConfigVal(InputKafkaConfigEnum.BOOTSTRAP_SERVERS));
            properties.put("key.deserializer", getConfigVal(InputKafkaConfigEnum.KEY_DESERIALIZER_CLASS));
            properties.put("value.deserializer", getConfigVal(InputKafkaConfigEnum.VALUE_DESERIALIZER_CLASS));
            properties.put("group.id", getConfigVal(InputKafkaConfigEnum.GROUP_ID));
            properties.put("max.poll.records", getConfigVal(InputKafkaConfigEnum.MAX_POLL_RECORDS));
            KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
            kafkaConsumer.subscribe((List) getConfigVal(InputKafkaConfigEnum.TOPIC));
            long longValue = ((Long) getConfigVal(InputKafkaConfigEnum.MAX_POLL_WAIT_TIMEOUT_MILLS)).longValue();
            while (true) {
                try {
                    kafkaConsumer.poll(Duration.ofMillis(longValue)).forEach(consumerRecord -> {
                        process((String) consumerRecord.value());
                    });
                } catch (Exception e) {
                    log.error("consumer message meet ex", e);
                }
            }
        } catch (Exception e2) {
            log.error("Kafka emit failed", e2);
            throw new RuntimeException(e2);
        }
    }
}
