/*
 * Decompiled with CFR 0.152.
 */
package me.insidezhou.southernquiet.job.driver;

import java.lang.reflect.AnnotatedElement;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import me.insidezhou.southernquiet.amqp.rabbit.AbstractAmqpJobArranger;
import me.insidezhou.southernquiet.amqp.rabbit.AmqpAutoConfiguration;
import me.insidezhou.southernquiet.amqp.rabbit.AmqpMessageRecover;
import me.insidezhou.southernquiet.amqp.rabbit.DelayedMessage;
import me.insidezhou.southernquiet.amqp.rabbit.DirectRabbitListenerContainerFactoryConfigurer;
import me.insidezhou.southernquiet.job.AmqpJobAutoConfiguration;
import me.insidezhou.southernquiet.job.JobProcessor;
import me.insidezhou.southernquiet.job.driver.AbstractJobProcessorManager;
import me.insidezhou.southernquiet.job.driver.AmqpJobArranger;
import me.insidezhou.southernquiet.logging.SouthernQuietLogger;
import me.insidezhou.southernquiet.logging.SouthernQuietLoggerFactory;
import me.insidezhou.southernquiet.util.Amplifier;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerEndpoint;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpoint;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SmartMessageConverter;
import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.Lifecycle;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

public class AmqpJobProcessorManager
extends AbstractJobProcessorManager
implements Lifecycle,
RabbitListenerConfigurer {
    private static final SouthernQuietLogger log = SouthernQuietLoggerFactory.getLogger(AmqpJobProcessorManager.class);
    private final SmartMessageConverter messageConverter;
    private final ConnectionFactory connectionFactory;
    private final List<ProcessorEndpoint> processorEndpoints = new ArrayList<ProcessorEndpoint>();
    private final AmqpAutoConfiguration.Properties amqpProperties;
    private final AmqpJobAutoConfiguration.Properties amqpJobProperties;
    private final Amplifier amplifier;
    private final RabbitProperties rabbitProperties;
    private final AmqpAdmin amqpAdmin;
    private final RabbitTemplate rabbitTemplate;

    public AmqpJobProcessorManager(AmqpAdmin amqpAdmin, AmqpJobArranger<?> jobArranger, Amplifier amplifier, AmqpJobAutoConfiguration.Properties amqpJobProperties, AmqpAutoConfiguration.Properties amqpProperties, RabbitTransactionManager transactionManager, RabbitProperties rabbitProperties, ApplicationContext applicationContext) {
        super(applicationContext);
        this.amplifier = amplifier;
        this.amqpAdmin = amqpAdmin;
        this.amqpJobProperties = amqpJobProperties;
        this.amqpProperties = amqpProperties;
        this.rabbitProperties = rabbitProperties;
        this.messageConverter = jobArranger.getMessageConverter();
        this.connectionFactory = transactionManager.getConnectionFactory();
        this.rabbitTemplate = new RabbitTemplate(this.connectionFactory);
    }

    public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
        this.processorEndpoints.stream().collect(Collectors.groupingBy(ProcessorEndpoint::getRouting)).forEach((routing, group) -> {
            ProcessorEndpoint endpoint = (ProcessorEndpoint)group.get(0);
            String processorName = endpoint.getProcessorName();
            JobProcessor processorAnnotation = endpoint.getProcessorAnnotation();
            Amplifier amplifier = this.amplifier;
            if (!StringUtils.isEmpty((Object)processorAnnotation.amplifierBeanName())) {
                amplifier = (Amplifier)this.applicationContext.getBean(processorAnnotation.amplifierBeanName(), Amplifier.class);
            }
            DirectRabbitListenerContainerFactoryConfigurer containerFactoryConfigurer = new DirectRabbitListenerContainerFactoryConfigurer(this.rabbitProperties, (MessageRecoverer)new AmqpMessageRecover((AmqpTemplate)this.rabbitTemplate, amplifier, "", AmqpJobProcessorManager.getDeadRouting(this.amqpJobProperties.getNamePrefix(), processorAnnotation, processorName), AbstractAmqpJobArranger.getDelayRouting((String)this.amqpJobProperties.getNamePrefix(), (Class)processorAnnotation.job()), AmqpJobProcessorManager.getRetryRouting(this.amqpJobProperties.getNamePrefix(), processorAnnotation, processorName), this.amqpProperties), this.amqpProperties);
            DirectRabbitListenerContainerFactory factory = new DirectRabbitListenerContainerFactory();
            factory.setMessageConverter((MessageConverter)this.messageConverter);
            factory.setAcknowledgeMode(this.amqpProperties.getAcknowledgeMode());
            factory.setConsumersPerQueue(Integer.valueOf(processorAnnotation.concurrency()));
            containerFactoryConfigurer.configure(factory, this.connectionFactory);
            SimpleRabbitListenerEndpoint rabbitListenerEndpoint = new SimpleRabbitListenerEndpoint();
            rabbitListenerEndpoint.setId(UUID.randomUUID().toString());
            rabbitListenerEndpoint.setQueueNames(new String[]{endpoint.getRouting()});
            rabbitListenerEndpoint.setAdmin(this.amqpAdmin);
            rabbitListenerEndpoint.setMessageListener(endpoint.getMessageListener());
            registrar.registerEndpoint((RabbitListenerEndpoint)rabbitListenerEndpoint, (RabbitListenerContainerFactory)factory);
        });
    }

    protected void initProcessor(JobProcessor processor, Object bean, Method method) {
        String processorName = this.getProcessorName(processor, method);
        String processorRouting = this.getProcessorRouting(processor, processorName);
        this.processorEndpoints.stream().filter(processorEndpoint -> processor.job() == processorEndpoint.getProcessorAnnotation().job() && processorName.equals(processorEndpoint.getProcessorName())).findAny().ifPresent(processorEndpoint -> log.message("\u4efb\u52a1\u5904\u7406\u5668\u91cd\u590d").context(context -> {
            context.put("queue", processorRouting);
            context.put("processor", bean.getClass().getName());
            context.put("processorName", processorName);
            context.put("job", processor.job().getSimpleName());
        }));
        this.declareExchangeAndQueue(processor, processorName);
        DelayedMessage delayedAnnotation = (DelayedMessage)AnnotatedElementUtils.findMergedAnnotation((AnnotatedElement)processor.job(), DelayedMessage.class);
        ProcessorEndpoint processorEndpoint2 = new ProcessorEndpoint();
        processorEndpoint2.setProcessorName(processorName);
        processorEndpoint2.setProcessorAnnotation(processor);
        processorEndpoint2.setRouting(processorRouting);
        processorEndpoint2.setMessageListener(this.generateMessageListener(ParameterizedTypeReference.forType((Type)processor.job()), processorRouting, processor, bean, method, processorName, delayedAnnotation));
        this.processorEndpoints.add(processorEndpoint2);
    }

    protected MessageListener generateMessageListener(ParameterizedTypeReference<?> typeReference, String routing, JobProcessor processor, Object bean, Method method, String processorName, DelayedMessage delayedAnnotation) {
        return message -> {
            Object job = this.messageConverter.fromMessage(message, (Object)typeReference);
            this.onMessageReceived(routing, bean, processorName, job, message);
            Object[] parameters = Arrays.stream(method.getParameters()).map(parameter -> {
                Class<?> parameterClass = parameter.getType();
                if (parameterClass.isInstance(job)) {
                    return job;
                }
                if (parameterClass.isInstance(processor)) {
                    return processor;
                }
                if (parameterClass.equals(DelayedMessage.class)) {
                    return delayedAnnotation;
                }
                log.message("\u4e0d\u652f\u6301\u5728\u4efb\u52a1\u5904\u7406\u5668\u4e2d\u4f7f\u7528\u6b64\u7c7b\u578b\u7684\u53c2\u6570").context("parameter", parameter.getClass()).context("job", (Object)processor.job()).warn();
                try {
                    return parameterClass.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
                }
                catch (Exception e) {
                    return null;
                }
            }).toArray();
            try {
                method.invoke(bean, parameters);
            }
            catch (RuntimeException e) {
                log.message("\u4efb\u52a1\u5904\u7406\u5668\u629b\u51fa\u5f02\u5e38").exception((Throwable)e).error();
                throw e;
            }
            catch (InvocationTargetException e) {
                Throwable target = e.getTargetException();
                log.message("\u4efb\u52a1\u5904\u7406\u5668\u629b\u51fa\u5f02\u5e38").exception(target).error();
                if (target instanceof RuntimeException) {
                    throw (RuntimeException)target;
                }
                throw new RuntimeException(target);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        };
    }

    protected void onMessageReceived(String routing, Object bean, String listenerName, Object job, Message message) {
        log.message("\u63a5\u5230\u4efb\u52a1").context(context -> {
            context.put("queue", routing);
            context.put("processor", bean.getClass().getName());
            context.put("processorName", listenerName);
            context.put("job", job.getClass().getSimpleName());
            context.put("message", message);
        }).debug();
    }

    public static String getDeadRouting(String prefix, JobProcessor processor, String processorName) {
        return AbstractAmqpJobArranger.getRouting((String)prefix, (String)AmqpJobProcessorManager.suffix("DEAD." + AbstractAmqpJobArranger.getQueueSource((Class)processor.job()), processorName));
    }

    public static String getRetryRouting(String prefix, JobProcessor processor, String processorName) {
        return AbstractAmqpJobArranger.getRouting((String)prefix, (String)AmqpJobProcessorManager.suffix("RETRY." + AbstractAmqpJobArranger.getQueueSource((Class)processor.job()), processorName));
    }

    private String getProcessorRouting(JobProcessor processor, String processorName) {
        return AmqpJobProcessorManager.suffix(AbstractAmqpJobArranger.getRouting((String)this.amqpJobProperties.getNamePrefix(), (Class)processor.job()), processorName);
    }

    private String getProcessorName(JobProcessor processor, Method method) {
        String processorName = processor.name();
        if (StringUtils.isEmpty((Object)processorName)) {
            processorName = method.getName();
        }
        Assert.hasText((String)processorName, (String)"\u5904\u7406\u5668\u7684\u540d\u79f0\u4e0d\u80fd\u4e3a\u7a7a");
        return processorName;
    }

    public static String suffix(String routing, String processorName) {
        return routing + "#" + processorName;
    }

    private void declareExchangeAndQueue(JobProcessor processor, String processorName) {
        String routing = AbstractAmqpJobArranger.getRouting((String)this.amqpJobProperties.getNamePrefix(), (Class)processor.job());
        String delayRouting = AbstractAmqpJobArranger.getDelayRouting((String)this.amqpJobProperties.getNamePrefix(), (Class)processor.job());
        String processorRouting = this.getProcessorRouting(processor, processorName);
        FanoutExchange exchange = new FanoutExchange(routing, true, false);
        Queue queue = new Queue(processorRouting);
        this.amqpAdmin.declareExchange((Exchange)exchange);
        this.amqpAdmin.declareQueue(queue);
        this.amqpAdmin.declareBinding(BindingBuilder.bind((Queue)queue).to((Exchange)exchange).with(processorRouting).noargs());
        HashMap<String, String> deadQueueArgs = new HashMap<String, String>();
        deadQueueArgs.put("x-dead-letter-exchange", "");
        deadQueueArgs.put("x-dead-letter-routing-key", queue.getName());
        Queue deadRouting = new Queue(AmqpJobProcessorManager.getDeadRouting(this.amqpJobProperties.getNamePrefix(), processor, processorName), true, false, false, deadQueueArgs);
        this.amqpAdmin.declareQueue(deadRouting);
        HashMap<String, String> exchangeArguments = new HashMap<String, String>();
        exchangeArguments.put("x-delayed-type", "direct");
        CustomExchange delayExchange = new CustomExchange(delayRouting, "x-delayed-message", true, false, exchangeArguments);
        this.amqpAdmin.declareExchange((Exchange)delayExchange);
        HashMap<String, Object> retryQueueArgs = new HashMap<String, Object>();
        retryQueueArgs.put("x-dead-letter-exchange", "");
        retryQueueArgs.put("x-dead-letter-routing-key", queue.getName());
        retryQueueArgs.put("x-message-ttl", 0);
        Queue retryQueue = new Queue(AmqpJobProcessorManager.getRetryRouting(this.amqpJobProperties.getNamePrefix(), processor, processorName), true, false, false, retryQueueArgs);
        this.amqpAdmin.declareQueue(retryQueue);
        this.amqpAdmin.declareBinding(BindingBuilder.bind((Queue)retryQueue).to((Exchange)delayExchange).with(retryQueue.getName()).noargs());
        HashMap<String, Object> delayQueueArgs = new HashMap<String, Object>();
        delayQueueArgs.put("x-dead-letter-exchange", routing);
        delayQueueArgs.put("x-dead-letter-routing-key", routing);
        delayQueueArgs.put("x-message-ttl", 0);
        Queue delayQueue = new Queue(AbstractAmqpJobArranger.getDelayRouting((String)this.amqpJobProperties.getNamePrefix(), (Class)processor.job()), true, false, false, delayQueueArgs);
        this.amqpAdmin.declareQueue(delayQueue);
        this.amqpAdmin.declareBinding(BindingBuilder.bind((Queue)delayQueue).to((Exchange)delayExchange).with(delayQueue.getName()).noargs());
    }

    public void start() {
        this.rabbitTemplate.start();
    }

    public void stop() {
        this.rabbitTemplate.stop();
    }

    public boolean isRunning() {
        return this.rabbitTemplate.isRunning();
    }

    static class ProcessorEndpoint {
        private JobProcessor processorAnnotation;
        private String processorName;
        private String routing;
        private MessageListener messageListener;

        ProcessorEndpoint() {
        }

        public JobProcessor getProcessorAnnotation() {
            return this.processorAnnotation;
        }

        public void setProcessorAnnotation(JobProcessor processorAnnotation) {
            this.processorAnnotation = processorAnnotation;
        }

        public String getProcessorName() {
            return this.processorName;
        }

        public void setProcessorName(String processorName) {
            this.processorName = processorName;
        }

        public String getRouting() {
            return this.routing;
        }

        public void setRouting(String routing) {
            this.routing = routing;
        }

        public MessageListener getMessageListener() {
            return this.messageListener;
        }

        public void setMessageListener(MessageListener messageListener) {
            this.messageListener = messageListener;
        }
    }
}

