/*
 * Decompiled with CFR 0.152.
 */
package icu.wwj.camel.component.rocketmq;

import icu.wwj.camel.component.rocketmq.AclUtils;
import icu.wwj.camel.component.rocketmq.RocketMQEndpoint;
import icu.wwj.camel.component.rocketmq.SendFailedException;
import icu.wwj.camel.component.rocketmq.reply.ReplyManager;
import icu.wwj.camel.component.rocketmq.reply.RocketMQReplyManagerSupport;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.FailedToCreateProducerException;
import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.support.DefaultAsyncProducer;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RocketMQProducer
extends DefaultAsyncProducer {
    public static final String GENERATE_MESSAGE_KEY_PREFIX = "camel-rocketmq-";
    private final Logger logger = LoggerFactory.getLogger(RocketMQProducer.class);
    private final AtomicBoolean started = new AtomicBoolean(false);
    private DefaultMQProducer mqProducer;
    private ReplyManager replyManager;

    public RocketMQProducer(RocketMQEndpoint endpoint) {
        super((Endpoint)endpoint);
    }

    public RocketMQEndpoint getEndpoint() {
        return (RocketMQEndpoint)super.getEndpoint();
    }

    public boolean process(Exchange exchange, AsyncCallback callback) {
        if (!this.isRunAllowed()) {
            if (exchange.getException() == null) {
                exchange.setException((Throwable)new RejectedExecutionException());
            }
            callback.done(true);
            return true;
        }
        try {
            this.logger.trace("Exchange Pattern {}", (Object)exchange.getPattern());
            if (exchange.getPattern().isOutCapable()) {
                return this.processInOut(exchange, callback);
            }
            return this.processInOnly(exchange, callback);
        }
        catch (Throwable e) {
            exchange.setException(e);
            callback.done(true);
            return true;
        }
    }

    protected boolean processInOut(final Exchange exchange, final AsyncCallback callback) throws RemotingException, MQClientException, InterruptedException, NoTypeConversionAvailableException {
        org.apache.camel.Message in = exchange.getIn();
        Message message = new Message();
        message.setTopic((String)in.getHeader("rocketmq.OVERRIDE_TOPIC_NAME", () -> this.getEndpoint().getTopicName(), String.class));
        message.setTags((String)in.getHeader("rocketmq.OVERRIDE_TAG", () -> this.getEndpoint().getSendTag(), String.class));
        message.setBody((byte[])exchange.getContext().getTypeConverter().mandatoryConvertTo(byte[].class, exchange, in.getBody()));
        message.setKeys((String)in.getHeader("rocketmq.OVERRIDE_MESSAGE_KEY", (Object)"", String.class));
        this.initReplyManager();
        final String generateKey = GENERATE_MESSAGE_KEY_PREFIX + this.getEndpoint().getCamelContext().getUuidGenerator().generateUuid();
        message.setKeys(Arrays.asList(Optional.ofNullable(message.getKeys()).orElse(""), generateKey));
        this.logger.debug("RocketMQ Producer sending {}", (Object)message);
        this.mqProducer.send(message, new SendCallback(){

            public void onSuccess(SendResult sendResult) {
                if (!SendStatus.SEND_OK.equals((Object)sendResult.getSendStatus())) {
                    exchange.setException((Throwable)new SendFailedException(sendResult.toString()));
                    callback.done(false);
                }
                if (RocketMQProducer.this.replyManager == null) {
                    RocketMQProducer.this.logger.warn("replyToTopic not set! Will not wait for reply.");
                    callback.done(false);
                    return;
                }
                RocketMQProducer.this.replyManager.registerReply(RocketMQProducer.this.replyManager, exchange, callback, generateKey, RocketMQProducer.this.getEndpoint().getRequestTimeout());
            }

            public void onException(Throwable e) {
                RocketMQProducer.this.replyManager.cancelMessageKey(generateKey);
                exchange.setException(e);
                callback.done(false);
            }
        });
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void initReplyManager() {
        if (!this.started.get()) {
            RocketMQProducer rocketMQProducer = this;
            synchronized (rocketMQProducer) {
                if (this.started.get()) {
                    return;
                }
                this.logger.debug("Starting reply manager");
                ClassLoader current = Thread.currentThread().getContextClassLoader();
                ClassLoader ac = this.getEndpoint().getCamelContext().getApplicationContextClassLoader();
                try {
                    if (ac != null) {
                        Thread.currentThread().setContextClassLoader(ac);
                    }
                    if (this.getEndpoint().getReplyToTopic() != null) {
                        this.replyManager = this.createReplyManager();
                        this.logger.debug("Using RocketMQReplyManager: {} to process replies from topic {}", (Object)this.replyManager, (Object)this.getEndpoint().getReplyToTopic());
                    }
                }
                catch (Exception e) {
                    throw new FailedToCreateProducerException((Endpoint)this.getEndpoint(), (Throwable)e);
                }
                finally {
                    if (ac != null) {
                        Thread.currentThread().setContextClassLoader(current);
                    }
                }
                this.started.set(true);
            }
        }
    }

    protected void unInitReplyManager() {
        try {
            if (this.replyManager != null) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Stopping RocketMQReplyManager: {} from processing replies from : {}", (Object)this.replyManager, (Object)this.getEndpoint().getReplyToTopic());
                }
                ServiceHelper.stopService((Object)this.replyManager);
            }
        }
        catch (Exception e) {
            throw RuntimeCamelException.wrapRuntimeCamelException((Throwable)e);
        }
        finally {
            this.started.set(false);
        }
    }

    private ReplyManager createReplyManager() {
        RocketMQReplyManagerSupport replyManager = new RocketMQReplyManagerSupport(this.getEndpoint().getCamelContext());
        replyManager.setEndpoint(this.getEndpoint());
        String name = "RocketMQReplyManagerTimeoutChecker[" + this.getEndpoint().getTopicName() + "]";
        ScheduledExecutorService scheduledExecutorService = this.getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor((Object)name, name);
        replyManager.setScheduledExecutorService(scheduledExecutorService);
        this.logger.debug("Starting ReplyManager: {}", (Object)name);
        ServiceHelper.startService((Object)replyManager);
        return replyManager;
    }

    protected boolean processInOnly(final Exchange exchange, final AsyncCallback callback) throws NoTypeConversionAvailableException, InterruptedException, RemotingException, MQClientException {
        org.apache.camel.Message in = exchange.getIn();
        Message message = new Message();
        message.setTopic((String)in.getHeader("rocketmq.OVERRIDE_TOPIC_NAME", () -> this.getEndpoint().getTopicName(), String.class));
        message.setTags((String)in.getHeader("rocketmq.OVERRIDE_TAG", () -> this.getEndpoint().getSendTag(), String.class));
        message.setBody((byte[])exchange.getContext().getTypeConverter().mandatoryConvertTo(byte[].class, exchange, in.getBody()));
        message.setKeys((String)in.getHeader("rocketmq.OVERRIDE_MESSAGE_KEY", (Object)"", String.class));
        this.logger.debug("RocketMQ Producer sending {}", (Object)message);
        final boolean waitForSendResult = this.getEndpoint().getWaitForSendResult();
        this.mqProducer.send(message, new SendCallback(){

            public void onSuccess(SendResult sendResult) {
                if (!SendStatus.SEND_OK.equals((Object)sendResult.getSendStatus())) {
                    exchange.setException((Throwable)new SendFailedException(sendResult.toString()));
                }
                callback.done(!waitForSendResult);
            }

            public void onException(Throwable e) {
                exchange.setException(e);
                callback.done(!waitForSendResult);
            }
        });
        return !waitForSendResult;
    }

    protected void doStart() throws Exception {
        this.mqProducer = new DefaultMQProducer(null, this.getEndpoint().getProducerGroup(), AclUtils.getAclRPCHook(this.getEndpoint().getAccessKey(), this.getEndpoint().getSecretKey()));
        this.mqProducer.setNamesrvAddr(this.getEndpoint().getNamesrvAddr());
        this.mqProducer.start();
    }

    protected void doStop() {
        this.unInitReplyManager();
        this.mqProducer.shutdown();
        this.mqProducer = null;
    }
}

