/*
 * Decompiled with CFR 0.152.
 */
package me.insidezhou.southernquiet.job.driver;

import java.lang.reflect.Method;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import me.insidezhou.southernquiet.amqp.rabbit.AmqpAutoConfiguration;
import me.insidezhou.southernquiet.amqp.rabbit.AmqpMessageRecover;
import me.insidezhou.southernquiet.amqp.rabbit.DirectRabbitListenerContainerFactoryConfigurer;
import me.insidezhou.southernquiet.job.AmqpJobAutoConfiguration;
import me.insidezhou.southernquiet.job.JobProcessor;
import me.insidezhou.southernquiet.job.driver.AbstractJobProcessorManager;
import me.insidezhou.southernquiet.job.driver.AmqpJobArranger;
import me.insidezhou.southernquiet.logging.SouthernQuietLogger;
import me.insidezhou.southernquiet.logging.SouthernQuietLoggerFactory;
import me.insidezhou.southernquiet.util.Amplifier;
import me.insidezhou.southernquiet.util.Tuple;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerEndpoint;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpoint;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SmartMessageConverter;
import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.Lifecycle;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

public class AmqpJobProcessorManager
extends AbstractJobProcessorManager
implements Lifecycle,
RabbitListenerConfigurer {
    private static final SouthernQuietLogger log = SouthernQuietLoggerFactory.getLogger(AmqpJobProcessorManager.class);
    private final SmartMessageConverter messageConverter;
    private final ConnectionFactory connectionFactory;
    private final List<Tuple<RabbitListenerEndpoint, JobProcessor, String>> listenerEndpoints = new ArrayList<Tuple<RabbitListenerEndpoint, JobProcessor, String>>();
    private final AmqpAutoConfiguration.Properties amqpProperties;
    private final AmqpJobAutoConfiguration.Properties amqpJobProperties;
    private final Amplifier amplifier;
    private final RabbitProperties rabbitProperties;
    private final RabbitAdmin rabbitAdmin;
    private final RabbitTemplate rabbitTemplate;

    public AmqpJobProcessorManager(RabbitAdmin rabbitAdmin, AmqpJobArranger<?> jobArranger, Amplifier amplifier, AmqpJobAutoConfiguration.Properties amqpJobProperties, AmqpAutoConfiguration.Properties amqpProperties, RabbitTransactionManager transactionManager, RabbitProperties rabbitProperties, ApplicationContext applicationContext) {
        super(applicationContext);
        this.amplifier = amplifier;
        this.rabbitAdmin = rabbitAdmin;
        this.amqpJobProperties = amqpJobProperties;
        this.amqpProperties = amqpProperties;
        this.rabbitProperties = rabbitProperties;
        this.messageConverter = jobArranger.getMessageConverter();
        this.connectionFactory = transactionManager.getConnectionFactory();
        this.rabbitTemplate = new RabbitTemplate(this.connectionFactory);
    }

    public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
        this.listenerEndpoints.forEach(tuple -> {
            RabbitListenerEndpoint endpoint = (RabbitListenerEndpoint)tuple.getFirst();
            JobProcessor processor = (JobProcessor)tuple.getSecond();
            String listenerName = (String)tuple.getThird();
            Amplifier amplifier = this.amplifier;
            if (!StringUtils.isEmpty((Object)processor.amplifierBeanName())) {
                amplifier = (Amplifier)this.applicationContext.getBean(processor.amplifierBeanName(), Amplifier.class);
            }
            DirectRabbitListenerContainerFactoryConfigurer containerFactoryConfigurer = new DirectRabbitListenerContainerFactoryConfigurer(this.rabbitProperties, (MessageRecoverer)new AmqpMessageRecover((AmqpTemplate)this.rabbitTemplate, amplifier, "", this.getDeadRouting(processor, listenerName), this.amqpProperties), this.amqpProperties);
            DirectRabbitListenerContainerFactory factory = new DirectRabbitListenerContainerFactory();
            factory.setMessageConverter((MessageConverter)this.messageConverter);
            factory.setAcknowledgeMode(this.amqpProperties.getAcknowledgeMode());
            containerFactoryConfigurer.configure(factory, this.connectionFactory);
            registrar.registerEndpoint(endpoint, (RabbitListenerContainerFactory)factory);
        });
    }

    protected void initProcessor(JobProcessor jobProcessor, Object bean, Method method) {
        String listenerDefaultName = method.getName();
        String listenerName = this.getListenerName(jobProcessor, listenerDefaultName);
        String listenerRouting = this.getListenerRouting(jobProcessor, listenerDefaultName);
        this.listenerEndpoints.stream().filter(listenerEndpoint -> jobProcessor.job() == ((JobProcessor)listenerEndpoint.getSecond()).job() && listenerName.equals(listenerEndpoint.getThird())).findAny().ifPresent(listenerEndpoint -> log.message("\u76d1\u542c\u5668\u91cd\u590d").context(context -> {
            context.put("queue", listenerRouting);
            context.put("listener", bean.getClass().getName());
            context.put("listenerName", listenerName);
            context.put("job", jobProcessor.job().getSimpleName());
        }));
        SimpleRabbitListenerEndpoint endpoint = new SimpleRabbitListenerEndpoint();
        endpoint.setId(UUID.randomUUID().toString());
        endpoint.setQueueNames(new String[]{listenerRouting});
        endpoint.setAdmin((AmqpAdmin)this.rabbitAdmin);
        this.declareExchangeAndQueue(jobProcessor, listenerDefaultName);
        Class jobClass = jobProcessor.job();
        ParameterizedTypeReference typeReference = ParameterizedTypeReference.forType((Type)jobClass);
        endpoint.setMessageListener(message -> {
            Object job = this.messageConverter.fromMessage(message, (Object)typeReference);
            log.message("\u76d1\u542c\u5668\u6536\u5230\u901a\u77e5").context(context -> {
                context.put("queue", endpoint.getQueueNames());
                context.put("listener", bean.getClass().getName());
                context.put("listenerName", listenerName);
                context.put("listenerId", endpoint.getId());
                context.put("job", job.getClass().getSimpleName());
                context.put("message", message);
            }).debug();
            Object[] parameters = Arrays.stream(method.getParameters()).map(parameter -> {
                Class<?> parameterClass = parameter.getType();
                if (parameterClass.isInstance(job)) {
                    return job;
                }
                if (parameterClass.isInstance(jobProcessor)) {
                    return jobProcessor;
                }
                log.message("\u4e0d\u652f\u6301\u5728\u901a\u77e5\u76d1\u542c\u5668\u4e2d\u4f7f\u7528\u6b64\u7c7b\u578b\u7684\u53c2\u6570").context("parameter", parameter.getClass()).context("job", (Object)jobClass).warn();
                try {
                    return parameterClass.newInstance();
                }
                catch (Exception e) {
                    return null;
                }
            }).toArray();
            try {
                method.invoke(bean, parameters);
            }
            catch (RuntimeException e) {
                log.message("\u901a\u77e5\u5904\u7406\u5668\u629b\u51fa\u5f02\u5e38").exception((Throwable)e).error();
                throw e;
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        this.listenerEndpoints.add((Tuple<RabbitListenerEndpoint, JobProcessor, String>)new Tuple((Object)endpoint, (Object)jobProcessor, (Object)listenerName));
    }

    private String getDeadSource(JobProcessor listener, String listenerDefaultName) {
        return this.suffix("DEAD." + AmqpJobArranger.getQueueSource(listener.job()), listener, listenerDefaultName);
    }

    private String getDeadRouting(JobProcessor listener, String listenerDefaultName) {
        return AmqpJobArranger.getRouting(this.amqpJobProperties.getNamePrefix(), this.getDeadSource(listener, listenerDefaultName));
    }

    private String getListenerRouting(JobProcessor listener, String listenerDefaultName) {
        return this.suffix(AmqpJobArranger.getRouting(this.amqpJobProperties.getNamePrefix(), listener.job()), listener, listenerDefaultName);
    }

    private String getListenerName(JobProcessor listener, String listenerDefaultName) {
        String listenerName = listener.name();
        if (StringUtils.isEmpty((Object)listenerName)) {
            listenerName = listenerDefaultName;
        }
        Assert.hasText((String)listenerName, (String)"\u5904\u7406\u5668\u7684\u540d\u79f0\u4e0d\u80fd\u4e3a\u7a7a");
        return listenerName;
    }

    private String suffix(String routing, JobProcessor listener, String listenerDefaultName) {
        return routing + "#" + this.getListenerName(listener, listenerDefaultName);
    }

    private void declareExchangeAndQueue(JobProcessor listener, String listenerDefaultName) {
        String routing = this.getListenerRouting(listener, listenerDefaultName);
        FanoutExchange exchange = new FanoutExchange(AmqpJobArranger.getExchange(this.amqpJobProperties.getNamePrefix(), listener.job()));
        Queue queue = new Queue(routing);
        this.rabbitAdmin.declareExchange((Exchange)exchange);
        this.rabbitAdmin.declareQueue(queue);
        this.rabbitAdmin.declareBinding(BindingBuilder.bind((Queue)queue).to((Exchange)exchange).with(routing).noargs());
        HashMap<String, String> deadQueueArgs = new HashMap<String, String>();
        deadQueueArgs.put("x-dead-letter-exchange", "");
        deadQueueArgs.put("x-dead-letter-routing-key", queue.getName());
        Queue deadRouting = new Queue(this.getDeadRouting(listener, listenerDefaultName), true, false, false, deadQueueArgs);
        this.rabbitAdmin.declareQueue(deadRouting);
    }

    public void start() {
        this.rabbitTemplate.start();
    }

    public void stop() {
        this.rabbitTemplate.stop();
    }

    public boolean isRunning() {
        return this.rabbitTemplate.isRunning();
    }

    public RabbitTemplate getRabbitTemplate() {
        return this.rabbitTemplate;
    }
}

