package com.github.dapeng.message.consumer.kafka;

import com.github.dapeng.core.definition.SoaFunctionDefinition;
import com.github.dapeng.core.helper.SoaSystemEnvProperties;
import com.github.dapeng.message.consumer.api.context.ConsumerContext;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.Parameter;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.ByteBufferDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/dapeng/message/consumer/kafka/KafkaConsumer.class */
public class KafkaConsumer extends Thread {
    private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
    private String groupId;
    private String topic;
    protected org.apache.kafka.clients.consumer.KafkaConsumer<ByteBuffer, ByteBuffer> consumer;
    private List<ConsumerContext> customers = new ArrayList();
    private String kafkaConnect = SoaSystemEnvProperties.SOA_KAFKA_HOST;

    public KafkaConsumer(String str, String str2) {
        this.groupId = str;
        this.topic = str2;
        init();
    }

    public void init() {
        logger.info(new StringBuffer("[KafkaConsumer] [init] ").append("kafkaConnect(").append("192.168.4.5:9092").append(") groupId(").append(this.groupId).append(") topic(").append(this.topic).append(")").toString());
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "192.168.4.5:9092");
        properties.put("group.id", this.groupId);
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("key.deserializer", ByteBufferDeserializer.class);
        properties.put("value.deserializer", ByteBufferDeserializer.class);
        this.consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(properties);
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        try {
            logger.info("[KafkaConsumer][{}][run] ", this.groupId + ":" + this.topic);
            this.consumer.subscribe(Arrays.asList(this.topic));
            while (true) {
                Iterator it = this.consumer.poll(100L).iterator();
                while (it.hasNext()) {
                    receive((ByteBuffer) ((ConsumerRecord) it.next()).value());
                }
            }
        } catch (Exception e) {
            logger.error("[KafkaConsumer][{}][run] " + e.getMessage(), this.groupId + ":" + this.topic, e);
        }
    }

    private void receive(ByteBuffer byteBuffer) {
        logger.info("KafkaConsumer groupId({}) topic({}) 收到消息", this.groupId, this.topic);
        Iterator<ConsumerContext> it = this.customers.iterator();
        while (it.hasNext()) {
            dealMessage(it.next(), byteBuffer);
        }
    }

    public void addCustomer(ConsumerContext consumerContext) {
        this.customers.add(consumerContext);
    }

    public List<ConsumerContext> getCustomers() {
        return this.customers;
    }

    public void setCustomers(List<ConsumerContext> list) {
        this.customers = list;
    }

    private void dealMessage(ConsumerContext consumerContext, ByteBuffer byteBuffer) {
        Class<?> cls;
        SoaFunctionDefinition.Sync soaFunctionDefinition = consumerContext.getSoaFunctionDefinition();
        Object iface = consumerContext.getIface();
        try {
            cls = (Class) (new ArrayList(Arrays.asList(iface.getClass().getInterfaces())).stream().filter(cls2 -> {
                return "org.springframework.aop.framework.Advised".equals(cls2.getName());
            }).count() > 0 ? iface.getClass().getMethod("getTargetClass", new Class[0]).invoke(iface, new Object[0]) : iface.getClass());
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            cls = iface.getClass();
        }
        Method method = soaFunctionDefinition.getClass().getDeclaredMethods()[0];
        Object obj = null;
        for (Parameter parameter : method.getParameters()) {
            if (parameter.getType().getName().contains("args")) {
                try {
                    obj = parameter.getType().newInstance();
                } catch (Exception e2) {
                    logger.error(" failed to instance method: {}" + method.getName());
                }
            }
        }
        Field field = obj.getClass().getDeclaredFields()[0];
        field.setAccessible(true);
        try {
            field.set(obj, byteBuffer);
            logger.info("{}收到kafka消息，执行{}方法", cls.getName(), soaFunctionDefinition.methodName);
            soaFunctionDefinition.apply(iface, obj);
            logger.info("{}收到kafka消息，执行{}方法完成", cls.getName(), soaFunctionDefinition.methodName);
        } catch (Exception e3) {
            logger.error("{}收到kafka消息，执行{}方法异常", cls.getName(), soaFunctionDefinition.methodName);
            logger.error(e3.getMessage(), e3);
        }
    }
}
