/*
 * Decompiled with CFR 0.152.
 */
package org.dromara.mendmix.amqp.adapter.kafka;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.StringSerializer;
import org.dromara.mendmix.amqp.MQContext;
import org.dromara.mendmix.amqp.MQMessage;
import org.dromara.mendmix.amqp.adapter.AbstractProducer;
import org.dromara.mendmix.common.GlobalContext;
import org.dromara.mendmix.common.util.HashUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaProducerAdapter
extends AbstractProducer {
    private final Logger logger = LoggerFactory.getLogger((String)"org.dromara.mendmix.amqp.adapter");
    private KafkaProducer<String, Object> kafkaProducer;

    public KafkaProducerAdapter(MQContext context) {
        super(context);
    }

    @Override
    public void start() throws Exception {
        Properties configs = this.buildConfigs();
        this.kafkaProducer = new KafkaProducer(configs);
    }

    @Override
    public String sendMessage(final MQMessage message, boolean async) {
        this.prepareHandle(message);
        final String topic = message.getTopic();
        Integer partition = message.getPartition();
        String key = message.getBizKey();
        String value = message.toMessageValue(true);
        if (partition == null && StringUtils.isNotBlank((CharSequence)message.getPartitionKey())) {
            int partitionNums = this.kafkaProducer.partitionsFor(message.getTopic()).size();
            partition = (int)(HashUtils.hash((String)message.getPartitionKey()) % (long)partitionNums);
        }
        ArrayList<RecordHeader> headers = new ArrayList<RecordHeader>(4);
        if (message.getHeaders() != null) {
            for (String name : message.getHeaders().keySet()) {
                String headerValue = message.getHeaders().get(name);
                if (headerValue == null) continue;
                headers.add(new RecordHeader(name, headerValue.getBytes()));
            }
        }
        ProducerRecord producerRecord = new ProducerRecord(topic, partition, (Object)key, (Object)value, headers);
        if (async) {
            this.kafkaProducer.send(producerRecord, new Callback(){

                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    message.onProducerFinished(null, recordMetadata.partition(), recordMetadata.offset());
                    if (e == null) {
                        KafkaProducerAdapter.this.handleSuccess(message);
                        KafkaProducerAdapter.this.logger.debug("\u53d1\u9001\u6210\u529f, topic:{}, partition:{}, offset:{}", new Object[]{topic, recordMetadata.partition(), recordMetadata.offset()});
                    } else {
                        KafkaProducerAdapter.this.handleError(message, e);
                        KafkaProducerAdapter.this.logger.warn("\u53d1\u9001\u5931\u8d25, topic:{}, partition:{}, offset:{}, exception:{}", new Object[]{topic, recordMetadata.partition(), recordMetadata.offset(), e});
                    }
                }
            });
        } else {
            try {
                Future future = this.kafkaProducer.send(producerRecord);
                RecordMetadata recordMetadata = (RecordMetadata)future.get();
                message.onProducerFinished(null, recordMetadata.partition(), recordMetadata.offset());
                this.handleSuccess(message);
            }
            catch (Exception e) {
                this.handleError(message, e);
                throw new RuntimeException(e);
            }
        }
        return null;
    }

    @Override
    public void shutdown() {
        super.shutdown();
        this.kafkaProducer.close();
    }

    private Properties buildConfigs() {
        Field[] fields;
        Properties result = new Properties();
        Class<ProducerConfig> clazz = ProducerConfig.class;
        for (Field field : fields = clazz.getDeclaredFields()) {
            String propName;
            if (!field.getName().endsWith("CONFIG") || field.getType() != String.class) continue;
            field.setAccessible(true);
            try {
                propName = field.get(clazz).toString();
            }
            catch (Exception e) {
                continue;
            }
            String propValue = this.context.getProfileProperties(propName);
            if (!StringUtils.isNotBlank((CharSequence)propValue)) continue;
            result.setProperty(propName, propValue);
        }
        if (!result.containsKey("bootstrap.servers")) {
            throw new NullPointerException("Kafka config[bootstrap.servers] is required");
        }
        if (!result.containsKey("key.serializer")) {
            result.put("key.serializer", StringSerializer.class.getName());
        }
        if (!result.containsKey("value.serializer")) {
            result.put("value.serializer", StringSerializer.class.getName());
        }
        if (!result.containsKey("client.id")) {
            result.put("client.id", this.context.getGroupName() + GlobalContext.getWorkerId());
        }
        if (!result.containsKey("retries")) {
            result.put("retries", "1");
        }
        if (!result.containsKey("compression.type")) {
            result.put("compression.type", "snappy");
        }
        String kafkaSecurityProtocol = this.context.getProfileProperties("security.protocol");
        String kafkaSASLMechanism = this.context.getProfileProperties("sasl.mechanism");
        String config = this.context.getProfileProperties("sasl.jaas.config");
        if (!(StringUtils.isEmpty((CharSequence)kafkaSecurityProtocol) || StringUtils.isEmpty((CharSequence)kafkaSASLMechanism) || StringUtils.isEmpty((CharSequence)config))) {
            result.put("security.protocol", kafkaSecurityProtocol);
            result.put("sasl.mechanism", kafkaSASLMechanism);
            result.put("sasl.jaas.config", config);
        }
        return result;
    }
}

