/*
 * Decompiled with CFR 0.152.
 */
package org.dromara.mendmix.amqp.adapter.kafka;

import java.lang.reflect.Field;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.dromara.mendmix.amqp.MQContext;
import org.dromara.mendmix.amqp.MQMessage;
import org.dromara.mendmix.amqp.MessageHandler;
import org.dromara.mendmix.amqp.adapter.AbstractConsumer;
import org.dromara.mendmix.amqp.adapter.kafka.OffsetAndMetadataStat;
import org.dromara.mendmix.common.GlobalContext;
import org.dromara.mendmix.common.util.ResourceUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaConsumerAdapter
extends AbstractConsumer {
    private final Logger logger = LoggerFactory.getLogger((String)"org.dromara.mendmix.amqp.adapter");
    private KafkaConsumer<String, String> kafkaConsumer;
    private Duration timeoutDuration;
    private boolean offsetAutoCommit;
    private Map<TopicPartition, OffsetAndMetadataStat> uncommitOffsetStats = new ConcurrentHashMap<TopicPartition, OffsetAndMetadataStat>();

    public KafkaConsumerAdapter(MQContext context, Map<String, MessageHandler> messageHandlers) {
        super(context, messageHandlers);
        this.timeoutDuration = Duration.ofMillis(ResourceUtils.getLong((String)(context.getInstance() + ".amqp.fetch.timeout.ms"), (long)100L));
    }

    @Override
    public void start() throws Exception {
        this.logger.info("<startup-logging>  KafkaConsumer start Begin..");
        Properties configs = this.buildConfigs();
        this.offsetAutoCommit = Boolean.parseBoolean(configs.getProperty("enable.auto.commit"));
        this.kafkaConsumer = new KafkaConsumer(configs);
        Set topicNames = this.messageHandlers.keySet();
        if (this.offsetAutoCommit) {
            this.kafkaConsumer.subscribe(topicNames);
        } else {
            ConsumerRebalanceListener listener = new ConsumerRebalanceListener(){

                public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                    KafkaConsumerAdapter.this.kafkaConsumer.commitSync();
                }

                public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                    KafkaConsumerAdapter.this.uncommitOffsetStats.clear();
                    for (TopicPartition partition : partitions) {
                        KafkaConsumerAdapter.this.uncommitOffsetStats.put(partition, new OffsetAndMetadataStat(0L, 0L));
                    }
                }
            };
            this.kafkaConsumer.subscribe(topicNames, listener);
        }
        super.startWorker();
        this.logger.info("<startup-logging>  KafkaConsumer start End -> subscribeTopics:{}", topicNames);
    }

    @Override
    public List<MQMessage> fetchMessages() {
        this.trySubmitOffsets();
        ConsumerRecords records = this.kafkaConsumer.poll(this.timeoutDuration);
        Iterator iterator = records.iterator();
        ArrayList<MQMessage> result = new ArrayList<MQMessage>(records.count());
        while (iterator.hasNext()) {
            ConsumerRecord item = (ConsumerRecord)iterator.next();
            MQMessage message = new MQMessage(item.topic(), item.value());
            if (item.headers() != null) {
                item.headers().forEach(h -> message.addHeader(h.key(), new String(h.value())));
            }
            message.setOriginMessage(item);
            message.setPartition(item.partition());
            message.setOffset(item.offset());
            result.add(message);
        }
        return result;
    }

    @Override
    public String handleMessageConsumed(MQMessage message, boolean successed) {
        if (this.offsetAutoCommit || !successed) {
            return null;
        }
        ConsumerRecord originMessage = (ConsumerRecord)message.getOriginMessage(ConsumerRecord.class);
        TopicPartition partition = new TopicPartition(originMessage.topic(), originMessage.partition());
        if (this.context.isAsyncConsumeEnabled()) {
            this.uncommitOffsetStats.get(partition).updateOnConsumed(originMessage.offset());
        } else {
            HashMap<TopicPartition, OffsetAndMetadata> uncommitOffsets = new HashMap<TopicPartition, OffsetAndMetadata>(1);
            uncommitOffsets.put(partition, new OffsetAndMetadata(originMessage.offset() + 1L));
            this.submitOffsets(uncommitOffsets);
        }
        return null;
    }

    private void trySubmitOffsets() {
        if (this.offsetAutoCommit || !this.context.isAsyncConsumeEnabled()) {
            return;
        }
        HashMap<TopicPartition, OffsetAndMetadata> uncommitOffsets = new HashMap<TopicPartition, OffsetAndMetadata>(this.uncommitOffsetStats.size());
        this.uncommitOffsetStats.forEach((k, v) -> {
            if (!v.isCommited()) {
                uncommitOffsets.put((TopicPartition)k, new OffsetAndMetadata(v.getOffset() + 1L));
            }
        });
        this.submitOffsets(uncommitOffsets);
    }

    private synchronized void submitOffsets(final Map<TopicPartition, OffsetAndMetadata> uncommitOffsets) {
        if (uncommitOffsets.isEmpty()) {
            return;
        }
        this.kafkaConsumer.commitAsync(uncommitOffsets, new OffsetCommitCallback(){

            public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                if (exception != null) {
                    KafkaConsumerAdapter.this.kafkaConsumer.commitSync(uncommitOffsets);
                } else if (KafkaConsumerAdapter.this.logger.isDebugEnabled()) {
                    KafkaConsumerAdapter.this.logger.debug("MQmessage_COMMIT_SUCCESS -> offsets:{}", offsets);
                }
                offsets.forEach((k, v) -> ((OffsetAndMetadataStat)KafkaConsumerAdapter.this.uncommitOffsetStats.get(k)).setCommited(true));
            }
        });
    }

    private void resetOffsets(TopicPartition topicPartition, long resetOffset) {
        try {
            OffsetAndMetadata metadata = this.kafkaConsumer.committed(topicPartition, this.timeoutDuration);
            Set assignment = this.kafkaConsumer.assignment();
            if (assignment.contains(topicPartition) && resetOffset > 0L && resetOffset < metadata.offset()) {
                this.kafkaConsumer.seek(topicPartition, resetOffset);
                this.logger.info("<framework-logging> seek topicPartition[{}] from {} to {}", new Object[]{topicPartition, metadata.offset(), resetOffset});
            }
        }
        catch (Exception e) {
            this.logger.warn("<framework-logging> try seek topicPartition[" + topicPartition + "] offsets error");
        }
        this.kafkaConsumer.resume((Collection)this.kafkaConsumer.assignment());
    }

    private Properties buildConfigs() {
        Field[] fields;
        Properties result = new Properties();
        result.setProperty("group.id", this.context.getGroupName());
        Class<ConsumerConfig> clazz = ConsumerConfig.class;
        for (Field field : fields = clazz.getDeclaredFields()) {
            String propName;
            if (!field.getName().endsWith("CONFIG") || field.getType() != String.class) continue;
            field.setAccessible(true);
            try {
                propName = field.get(clazz).toString();
            }
            catch (Exception e) {
                continue;
            }
            String propValue = this.context.getProfileProperties(propName);
            if (!StringUtils.isNotBlank((CharSequence)propValue)) continue;
            result.setProperty(propName, propValue);
        }
        if (!result.containsKey("bootstrap.servers")) {
            throw new NullPointerException("Kafka config[bootstrap.servers] is required");
        }
        if (!result.containsKey("key.deserializer")) {
            result.put("key.deserializer", StringDeserializer.class.getName());
        }
        if (!result.containsKey("value.deserializer")) {
            result.put("value.deserializer", StringDeserializer.class.getName());
        }
        if (!result.containsKey("client.id")) {
            result.put("client.id", this.context.getGroupName() + GlobalContext.getWorkerId());
        }
        if (!result.containsKey("max.poll.records")) {
            int maxPollRecords = this.context.getMaxProcessThreads() > 1 ? this.context.getMaxProcessThreads() / 2 : 1;
            result.put("max.poll.records", (Object)maxPollRecords);
        }
        if (!result.containsKey("enable.auto.commit")) {
            result.put("enable.auto.commit", "true");
        }
        if (!result.containsKey("auto.offset.reset")) {
            result.put("auto.offset.reset", "latest");
        }
        String kafkaSecurityProtocol = this.context.getProfileProperties("security.protocol");
        String kafkaSASLMechanism = this.context.getProfileProperties("sasl.mechanism");
        String config = this.context.getProfileProperties("sasl.jaas.config");
        if (!(StringUtils.isEmpty((CharSequence)kafkaSecurityProtocol) || StringUtils.isEmpty((CharSequence)kafkaSASLMechanism) || StringUtils.isEmpty((CharSequence)config))) {
            result.put("security.protocol", kafkaSecurityProtocol);
            result.put("sasl.mechanism", kafkaSASLMechanism);
            result.put("sasl.jaas.config", config);
        }
        return result;
    }

    @Override
    public void shutdown() {
        super.shutdown();
        this.kafkaConsumer.close();
    }
}

