/*
 * Decompiled with CFR 0.152.
 */
package icu.wwj.camel.component.rocketmq.reply;

import icu.wwj.camel.component.rocketmq.RocketMQEndpoint;
import icu.wwj.camel.component.rocketmq.RocketMQMessageConverter;
import icu.wwj.camel.component.rocketmq.reply.ReplyHandler;
import icu.wwj.camel.component.rocketmq.reply.ReplyHolder;
import icu.wwj.camel.component.rocketmq.reply.ReplyManager;
import icu.wwj.camel.component.rocketmq.reply.ReplyTimeoutMap;
import icu.wwj.camel.component.rocketmq.reply.RocketMQReplyHandler;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangeTimedOutException;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.support.service.ServiceSupport;
import org.apache.camel.util.ObjectHelper;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RocketMQReplyManagerSupport
extends ServiceSupport
implements ReplyManager {
    private static final int CLOSE_TIMEOUT = 30000;
    protected final Logger log = LoggerFactory.getLogger(RocketMQReplyManagerSupport.class);
    protected final CamelContext camelContext;
    protected final CountDownLatch replyToLatch = new CountDownLatch(1);
    protected final long replyToTimeout = 1000L;
    private final RocketMQMessageConverter messageConverter = new RocketMQMessageConverter();
    protected ScheduledExecutorService executorService;
    protected RocketMQEndpoint endpoint;
    protected String replyToTopic;
    protected DefaultMQPushConsumer mqPushConsumer;
    protected ReplyTimeoutMap timeoutMap;

    public RocketMQReplyManagerSupport(CamelContext camelContext) {
        this.camelContext = camelContext;
    }

    protected void doStart() throws Exception {
        ObjectHelper.notNull((Object)this.executorService, (String)"executorService", (Object)this);
        ObjectHelper.notNull((Object)((Object)this.endpoint), (String)"endpoint", (Object)this);
        this.log.debug("Using timeout checker interval with {} millis", (Object)this.endpoint.getRequestTimeoutCheckerInterval());
        this.timeoutMap = new ReplyTimeoutMap(this.executorService, this.endpoint.getRequestTimeoutCheckerInterval());
        ServiceHelper.startService((Object)((Object)this.timeoutMap));
        this.mqPushConsumer = this.createConsumer();
        this.mqPushConsumer.start();
        this.log.debug("Using executor {}", (Object)this.executorService);
    }

    protected DefaultMQPushConsumer createConsumer() throws MQClientException {
        this.setReplyToTopic(this.endpoint.getReplyToTopic());
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
        consumer.setConsumerGroup(this.endpoint.getReplyToConsumerGroup());
        consumer.setNamesrvAddr(this.endpoint.getNamesrvAddr());
        consumer.subscribe(this.replyToTopic, "*");
        consumer.registerMessageListener((msgs, context) -> {
            MessageExt messageExt = (MessageExt)msgs.get(0);
            this.onMessage(messageExt);
            this.log.trace("Consume message {}", (Object)messageExt);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        return consumer;
    }

    public void onMessage(MessageExt messageExt) {
        String messageKey = Arrays.stream(messageExt.getKeys().split(" ")).filter(s -> s.startsWith("camel-rocketmq-")).findFirst().orElse(null);
        if (messageKey == null) {
            this.log.warn("Ignoreing message with no messageKey: {}", (Object)messageExt);
            return;
        }
        this.log.debug("Received reply message with messageKey [{}] -> {}", (Object)messageKey, (Object)messageExt);
        this.handleReplyMessage(messageKey, messageExt);
    }

    protected void doStop() throws Exception {
        ServiceHelper.stopService((Object)((Object)this.timeoutMap));
        if (this.mqPushConsumer != null) {
            this.log.debug("Closing connection: {} with timeout: {} ms.", (Object)this.mqPushConsumer, (Object)30000);
            this.mqPushConsumer.shutdown();
            this.mqPushConsumer = null;
        }
        if (this.executorService != null) {
            this.camelContext.getExecutorServiceManager().shutdownGraceful((ExecutorService)this.executorService);
            this.executorService = null;
        }
    }

    @Override
    public void setEndpoint(RocketMQEndpoint endpoint) {
        this.endpoint = endpoint;
    }

    @Override
    public String getReplyToTopic() {
        if (this.replyToTopic != null) {
            return this.replyToTopic;
        }
        try {
            this.log.trace("Waiting for replyToTopic to be set");
            boolean done = this.replyToLatch.await(1000L, TimeUnit.MILLISECONDS);
            if (!done) {
                this.log.warn("ReplyToTopic was not set and timeout occurred");
            } else {
                this.log.trace("Waiting for replyToTopic to be set done");
            }
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        return this.replyToTopic;
    }

    @Override
    public void setReplyToTopic(String replyToTopic) {
        this.log.debug("ReplyToTopic: {}", (Object)replyToTopic);
        this.replyToTopic = replyToTopic;
        this.replyToLatch.countDown();
    }

    @Override
    public String registerReply(ReplyManager replyManager, Exchange exchange, AsyncCallback callback, String messageKey, long requestTimeout) {
        RocketMQReplyHandler handler = new RocketMQReplyHandler(replyManager, exchange, callback, messageKey, requestTimeout);
        ReplyHandler result = this.timeoutMap.putIfAbsent(messageKey, handler, requestTimeout);
        if (result != null) {
            String logMessage = String.format("The messageKey [%s] is not unique.", messageKey);
            throw new IllegalArgumentException(logMessage);
        }
        return messageKey;
    }

    @Override
    public void setScheduledExecutorService(ScheduledExecutorService executorService) {
        this.executorService = executorService;
    }

    @Override
    public void updateMessageKey(String messageKey, String newMessageKey, long requestTimeout) {
        this.log.trace("Updated messageKey [{}] to [{}]", (Object)messageKey, (Object)newMessageKey);
        Optional.ofNullable((ReplyHandler)this.timeoutMap.remove(messageKey)).ifPresent(handler -> this.timeoutMap.put(newMessageKey, (ReplyHandler)handler, requestTimeout));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void processReply(ReplyHolder holder) {
        if (holder != null && this.isRunAllowed()) {
            try {
                Exchange exchange = holder.getExchange();
                boolean timeout = holder.isTimeout();
                if (timeout) {
                    if (this.log.isWarnEnabled()) {
                        this.log.warn("Timeout occurred after {} millis waiting for reply message with messageKey [{}] on topic {}. Setting ExchangeTimedOutException on {} and continue routing.", new Object[]{holder.getTimeout(), holder.getMessageKey(), this.replyToTopic, ExchangeHelper.logIds((Exchange)exchange)});
                    }
                    String msg = "reply message with messageKey: " + holder.getMessageKey() + " not received on topic: " + this.replyToTopic;
                    exchange.setException((Throwable)new ExchangeTimedOutException(exchange, holder.getTimeout(), msg));
                } else {
                    this.messageConverter.populateRocketExchange(exchange, holder.getMessageExt(), true);
                }
            }
            finally {
                AsyncCallback callback = holder.getCallback();
                callback.done(false);
            }
        }
    }

    @Override
    public void cancelMessageKey(String messageKey) {
        Optional.ofNullable((ReplyHandler)this.timeoutMap.get(messageKey)).ifPresent(replyHandler -> {
            this.log.warn("Cancelling messageKey: {}", (Object)messageKey);
            this.timeoutMap.remove(messageKey);
        });
    }

    protected ReplyHandler createReplyHandler(ReplyManager replyManager, Exchange exchange, AsyncCallback callback, String messageKey, long requestTimeout) {
        return new RocketMQReplyHandler(replyManager, exchange, callback, messageKey, requestTimeout);
    }

    protected void handleReplyMessage(String messageKey, MessageExt messageExt) {
        ReplyHandler handler = (ReplyHandler)this.timeoutMap.get(messageKey);
        if (handler != null) {
            this.timeoutMap.remove(messageKey);
            handler.onReply(messageKey, messageExt);
        } else {
            this.log.warn("Reply received for unknown messageKey [{}]. The message will be ignored: {}", (Object)messageKey, (Object)messageExt);
        }
    }
}

