/*
 * Decompiled with CFR 0.152.
 */
package me.insidezhou.southernquiet.job.driver;

import java.lang.reflect.AnnotatedElement;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import me.insidezhou.southernquiet.amqp.rabbit.AbstractAmqpJobArranger;
import me.insidezhou.southernquiet.amqp.rabbit.AbstractAmqpNotificationPublisher;
import me.insidezhou.southernquiet.amqp.rabbit.AmqpAutoConfiguration;
import me.insidezhou.southernquiet.amqp.rabbit.AmqpMessageRecover;
import me.insidezhou.southernquiet.amqp.rabbit.DelayedMessage;
import me.insidezhou.southernquiet.amqp.rabbit.DirectRabbitListenerContainerFactoryConfigurer;
import me.insidezhou.southernquiet.job.AmqpJobAutoConfiguration;
import me.insidezhou.southernquiet.job.JobProcessor;
import me.insidezhou.southernquiet.job.driver.AbstractJobProcessorManager;
import me.insidezhou.southernquiet.job.driver.AmqpJobArranger;
import me.insidezhou.southernquiet.logging.SouthernQuietLogger;
import me.insidezhou.southernquiet.logging.SouthernQuietLoggerFactory;
import me.insidezhou.southernquiet.util.Amplifier;
import me.insidezhou.southernquiet.util.Tuple;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerEndpoint;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpoint;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SmartMessageConverter;
import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.Lifecycle;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

public class AmqpJobProcessorManager
extends AbstractJobProcessorManager
implements Lifecycle,
RabbitListenerConfigurer {
    private static final SouthernQuietLogger log = SouthernQuietLoggerFactory.getLogger(AmqpJobProcessorManager.class);
    private final SmartMessageConverter messageConverter;
    private final ConnectionFactory connectionFactory;
    private final List<Tuple<RabbitListenerEndpoint, JobProcessor, String>> listenerEndpoints = new ArrayList<Tuple<RabbitListenerEndpoint, JobProcessor, String>>();
    private final AmqpAutoConfiguration.Properties amqpProperties;
    private final AmqpJobAutoConfiguration.Properties amqpJobProperties;
    private final Amplifier amplifier;
    private final RabbitProperties rabbitProperties;
    private final AmqpAdmin amqpAdmin;
    private final RabbitTemplate rabbitTemplate;
    public static final String DeadMark = "DEAD.";
    public static final String RetryMark = "RETRY.";

    public AmqpJobProcessorManager(AmqpAdmin amqpAdmin, AmqpJobArranger<?> jobArranger, Amplifier amplifier, AmqpJobAutoConfiguration.Properties amqpJobProperties, AmqpAutoConfiguration.Properties amqpProperties, RabbitTransactionManager transactionManager, RabbitProperties rabbitProperties, ApplicationContext applicationContext) {
        super(applicationContext);
        this.amplifier = amplifier;
        this.amqpAdmin = amqpAdmin;
        this.amqpJobProperties = amqpJobProperties;
        this.amqpProperties = amqpProperties;
        this.rabbitProperties = rabbitProperties;
        this.messageConverter = jobArranger.getMessageConverter();
        this.connectionFactory = transactionManager.getConnectionFactory();
        this.rabbitTemplate = new RabbitTemplate(this.connectionFactory);
    }

    public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
        this.listenerEndpoints.forEach(tuple -> {
            RabbitListenerEndpoint endpoint = (RabbitListenerEndpoint)tuple.getFirst();
            JobProcessor processor = (JobProcessor)tuple.getSecond();
            String listenerName = (String)tuple.getThird();
            Amplifier amplifier = this.amplifier;
            if (!StringUtils.isEmpty((Object)processor.amplifierBeanName())) {
                amplifier = (Amplifier)this.applicationContext.getBean(processor.amplifierBeanName(), Amplifier.class);
            }
            DirectRabbitListenerContainerFactoryConfigurer containerFactoryConfigurer = new DirectRabbitListenerContainerFactoryConfigurer(this.rabbitProperties, (MessageRecoverer)new AmqpMessageRecover((AmqpTemplate)this.rabbitTemplate, amplifier, "", AmqpJobProcessorManager.getDeadRouting(this.amqpJobProperties.getNamePrefix(), processor, listenerName), "", AmqpJobProcessorManager.getRetryRouting(this.amqpJobProperties.getNamePrefix(), processor, listenerName), this.amqpProperties), this.amqpProperties);
            DirectRabbitListenerContainerFactory factory = new DirectRabbitListenerContainerFactory();
            factory.setMessageConverter((MessageConverter)this.messageConverter);
            factory.setAcknowledgeMode(this.amqpProperties.getAcknowledgeMode());
            containerFactoryConfigurer.configure(factory, this.connectionFactory);
            registrar.registerEndpoint(endpoint, (RabbitListenerContainerFactory)factory);
        });
    }

    protected void initProcessor(JobProcessor jobProcessor, Object bean, Method method) {
        String processorName = this.getProcessorName(jobProcessor, method);
        String listenerRouting = this.getProcessorRouting(jobProcessor, processorName);
        DelayedMessage delayedAnnotation = (DelayedMessage)AnnotatedElementUtils.findMergedAnnotation((AnnotatedElement)jobProcessor.job(), DelayedMessage.class);
        this.listenerEndpoints.stream().filter(listenerEndpoint -> jobProcessor.job() == ((JobProcessor)listenerEndpoint.getSecond()).job() && processorName.equals(listenerEndpoint.getThird())).findAny().ifPresent(listenerEndpoint -> log.message("\u4efb\u52a1\u5904\u7406\u5668\u91cd\u590d").context(context -> {
            context.put("queue", listenerRouting);
            context.put("listener", bean.getClass().getName());
            context.put("listenerName", processorName);
            context.put("job", jobProcessor.job().getSimpleName());
        }));
        SimpleRabbitListenerEndpoint endpoint = new SimpleRabbitListenerEndpoint();
        endpoint.setId(UUID.randomUUID().toString());
        endpoint.setQueueNames(new String[]{listenerRouting});
        endpoint.setAdmin(this.amqpAdmin);
        this.declareExchangeAndQueue(jobProcessor, processorName);
        Class jobClass = jobProcessor.job();
        ParameterizedTypeReference typeReference = ParameterizedTypeReference.forType((Type)jobClass);
        endpoint.setMessageListener(message -> {
            Object job = this.messageConverter.fromMessage(message, (Object)typeReference);
            log.message("\u63a5\u5230\u4efb\u52a1").context(context -> {
                context.put("queue", endpoint.getQueueNames());
                context.put("listener", bean.getClass().getName());
                context.put("listenerName", processorName);
                context.put("listenerId", endpoint.getId());
                context.put("job", job.getClass().getSimpleName());
                context.put("message", message);
            }).debug();
            Object[] parameters = Arrays.stream(method.getParameters()).map(parameter -> {
                Class<?> parameterClass = parameter.getType();
                if (parameterClass.isInstance(job)) {
                    return job;
                }
                if (parameterClass.isInstance(jobProcessor)) {
                    return jobProcessor;
                }
                if (parameterClass.equals(DelayedMessage.class)) {
                    return delayedAnnotation;
                }
                log.message("\u4e0d\u652f\u6301\u5728\u901a\u77e5\u76d1\u542c\u5668\u4e2d\u4f7f\u7528\u6b64\u7c7b\u578b\u7684\u53c2\u6570").context("parameter", parameter.getClass()).context("job", (Object)jobClass).warn();
                try {
                    return parameterClass.newInstance();
                }
                catch (Exception e) {
                    return null;
                }
            }).toArray();
            try {
                method.invoke(bean, parameters);
            }
            catch (RuntimeException e) {
                log.message("\u4efb\u52a1\u5904\u7406\u5668\u629b\u51fa\u5f02\u5e38").exception((Throwable)e).error();
                throw e;
            }
            catch (InvocationTargetException e) {
                Throwable target = e.getTargetException();
                log.message("\u4efb\u52a1\u5904\u7406\u5668\u629b\u51fa\u5f02\u5e38").exception(target).error();
                if (target instanceof RuntimeException) {
                    throw (RuntimeException)target;
                }
                throw new RuntimeException(target);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        this.listenerEndpoints.add((Tuple<RabbitListenerEndpoint, JobProcessor, String>)new Tuple((Object)endpoint, (Object)jobProcessor, (Object)processorName));
    }

    public static String getDeadRouting(String prefix, JobProcessor processor, String processorName) {
        return AbstractAmqpNotificationPublisher.getRouting((String)prefix, (String)AmqpJobProcessorManager.suffix(DeadMark + AbstractAmqpNotificationPublisher.getNotificationSource((Class)processor.job()), processorName));
    }

    public static String getRetryRouting(String prefix, JobProcessor processor, String processorName) {
        return AbstractAmqpNotificationPublisher.getRouting((String)prefix, (String)AmqpJobProcessorManager.suffix(RetryMark + AbstractAmqpNotificationPublisher.getNotificationSource((Class)processor.job()), processorName));
    }

    private String getProcessorRouting(JobProcessor processor, String processorName) {
        return AmqpJobProcessorManager.suffix(AbstractAmqpNotificationPublisher.getRouting((String)this.amqpJobProperties.getNamePrefix(), (Class)processor.job()), processorName);
    }

    private String getProcessorName(JobProcessor listener, Method method) {
        String listenerName = listener.name();
        if (StringUtils.isEmpty((Object)listenerName)) {
            listenerName = method.getName();
        }
        Assert.hasText((String)listenerName, (String)"\u5904\u7406\u5668\u7684\u540d\u79f0\u4e0d\u80fd\u4e3a\u7a7a");
        return listenerName;
    }

    public static String suffix(String routing, String listenerName) {
        return routing + "#" + listenerName;
    }

    private void declareExchangeAndQueue(JobProcessor processor, String processorName) {
        String routing = AbstractAmqpJobArranger.getRouting((String)this.amqpJobProperties.getNamePrefix(), (Class)processor.job());
        String delayRouting = AbstractAmqpJobArranger.getDelayedRouting((String)this.amqpJobProperties.getNamePrefix(), (Class)processor.job());
        String listenerRouting = this.getProcessorRouting(processor, processorName);
        FanoutExchange exchange = new FanoutExchange(AbstractAmqpJobArranger.getExchange((String)this.amqpJobProperties.getNamePrefix(), (Class)processor.job()));
        Queue queue = new Queue(listenerRouting);
        this.amqpAdmin.declareExchange((Exchange)exchange);
        this.amqpAdmin.declareQueue(queue);
        this.amqpAdmin.declareBinding(BindingBuilder.bind((Queue)queue).to((Exchange)exchange).with(listenerRouting).noargs());
        HashMap<String, String> deadQueueArgs = new HashMap<String, String>();
        deadQueueArgs.put("x-dead-letter-exchange", "");
        deadQueueArgs.put("x-dead-letter-routing-key", queue.getName());
        Queue deadRouting = new Queue(AmqpJobProcessorManager.getDeadRouting(this.amqpJobProperties.getNamePrefix(), processor, processorName), true, false, false, deadQueueArgs);
        this.amqpAdmin.declareQueue(deadRouting);
        HashMap<String, String> retryQueueArgs = new HashMap<String, String>();
        retryQueueArgs.put("x-dead-letter-exchange", "");
        retryQueueArgs.put("x-dead-letter-routing-key", queue.getName());
        Queue retryRouting = new Queue(AmqpJobProcessorManager.getRetryRouting(this.amqpJobProperties.getNamePrefix(), processor, processorName), true, false, false, retryQueueArgs);
        this.amqpAdmin.declareQueue(retryRouting);
        HashMap<String, String> delayQueueArgs = new HashMap<String, String>();
        delayQueueArgs.put("x-dead-letter-exchange", exchange.getName());
        delayQueueArgs.put("x-dead-letter-routing-key", routing);
        Queue delayQueue = new Queue(delayRouting, true, false, false, delayQueueArgs);
        this.amqpAdmin.declareQueue(delayQueue);
    }

    public void start() {
        this.rabbitTemplate.start();
    }

    public void stop() {
        this.rabbitTemplate.stop();
    }

    public boolean isRunning() {
        return this.rabbitTemplate.isRunning();
    }

    public RabbitTemplate getRabbitTemplate() {
        return this.rabbitTemplate;
    }
}

