/*
 * Decompiled with CFR 0.152.
 */
package cool.doudou.doudada.mq.core.processor;

import cool.doudou.doudada.mq.annotation.MqConsumer;
import cool.doudou.doudada.mq.core.enums.MsgTypeEnum;
import cool.doudou.doudada.mq.properties.PulsarProperties;
import java.io.Serializable;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;

public class ConsumerBeanPostProcessor
implements BeanPostProcessor {
    private static final Logger log = LoggerFactory.getLogger(ConsumerBeanPostProcessor.class);
    private PulsarClient pulsarClient;
    private PulsarProperties pulsarProperties;

    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
        Arrays.stream(bean.getClass().getDeclaredMethods()).filter(method -> method.isAnnotationPresent(MqConsumer.class)).forEach(method -> {
            MqConsumer mqConsumer = method.getAnnotation(MqConsumer.class);
            this.initConsumer(mqConsumer.topics(), mqConsumer.msgType(), bean, (Method)method);
        });
        return bean;
    }

    private void initConsumer(String[] topics, MsgTypeEnum msgType, Object bean, Method method) {
        if (topics == null || topics.length <= 0) {
            log.error("initConsumer error: @MqConsumer.topics must be specified");
            return;
        }
        try {
            this.pulsarClient.newConsumer(msgType.get()).topic(topics).subscriptionName(this.pulsarProperties.getSubscriptionName()).subscriptionType(SubscriptionType.valueOf((String)this.pulsarProperties.getSubscriptionType())).subscriptionInitialPosition(SubscriptionInitialPosition.valueOf((String)this.pulsarProperties.getSubscriptionInitialPosition())).negativeAckRedeliveryDelay((long)this.pulsarProperties.getNegativeAckRedeliveryDelay().intValue(), TimeUnit.SECONDS).messageListener((MessageListener & Serializable)(consumer, msg) -> {
                try {
                    method.setAccessible(true);
                    method.invoke(bean, consumer.getTopic(), msg.getValue());
                    consumer.acknowledge(msg);
                }
                catch (Exception e) {
                    consumer.negativeAcknowledge(msg);
                    throw new RuntimeException("bean[" + bean + "].method[" + method + "]invoke exception: ", e);
                }
            }).subscribe();
        }
        catch (PulsarClientException e) {
            log.error("initConsumer[{}] exception: ", (Object)topics, (Object)e);
        }
    }

    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        return bean;
    }

    public ConsumerBeanPostProcessor(PulsarClient pulsarClient, PulsarProperties pulsarProperties) {
        this.pulsarClient = pulsarClient;
        this.pulsarProperties = pulsarProperties;
    }
}

