/*
 * Decompiled with CFR 0.152.
 */
package org.dromara.mendmix.amqp.adapter.eventbus;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ThreadFactory;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.dromara.mendmix.amqp.MQContext;
import org.dromara.mendmix.amqp.MQMessage;
import org.dromara.mendmix.amqp.MessageHandler;
import org.dromara.mendmix.amqp.adapter.AbstractProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventbusProducerAdapter
extends AbstractProducer {
    private static final Logger logger = LoggerFactory.getLogger((String)"org.dromara.mendmix");
    private static final Translator TRANSLATOR = new Translator();
    private static EventHandler<MQMessageEvent> eventHandler = new MQMessageEventHandler();
    private static Map<String, MessageHandler> messageHandlers = new HashMap<String, MessageHandler>();
    private Disruptor<MQMessageEvent> disruptor;

    public EventbusProducerAdapter(MQContext context) {
        super(context);
    }

    @Override
    public void start() throws Exception {
        MQMessageEventFactory eventFactory = new MQMessageEventFactory();
        int ringBufferSize = 0x100000;
        BasicThreadFactory factory = new BasicThreadFactory.Builder().namingPattern("mq-eventbus-%d").daemon(true).priority(5).build();
        this.disruptor = new Disruptor((EventFactory)eventFactory, ringBufferSize, (ThreadFactory)factory, ProducerType.SINGLE, (WaitStrategy)new BlockingWaitStrategy());
        this.disruptor.handleEventsWith(new EventHandler[]{eventHandler});
        this.disruptor.start();
    }

    @Override
    public String sendMessage(MQMessage message, boolean async) {
        this.prepareHandle(message);
        RingBuffer ringBuffer = this.disruptor.getRingBuffer();
        ringBuffer.publishEvent((EventTranslatorOneArg)TRANSLATOR, (Object)message);
        this.handleSuccess(message);
        return message.getMsgId();
    }

    @Override
    public void shutdown() {
        super.shutdown();
        this.disruptor.shutdown();
    }

    public static void setMessageHandlers(Map<String, MessageHandler> messageHandlers) {
        EventbusProducerAdapter.messageHandlers = messageHandlers;
    }

    static class Translator
    implements EventTranslatorOneArg<MQMessageEvent, MQMessage> {
        Translator() {
        }

        public void translateTo(MQMessageEvent event, long sequence, MQMessage data) {
            event.set(data);
        }
    }

    static class MQMessageEventHandler
    implements EventHandler<MQMessageEvent> {
        MQMessageEventHandler() {
        }

        public void onEvent(MQMessageEvent event, long sequence, boolean endOfBatch) {
            MQMessage message = event.get();
            message.setUserContextOnConsume();
            MessageHandler handler = (MessageHandler)messageHandlers.get(message.getTopic());
            if (handler != null) {
                try {
                    ((MessageHandler)messageHandlers.get(message.getTopic())).process(message);
                    if (logger.isDebugEnabled()) {
                        logger.debug("MENDMIX-TRACE-LOGGGING-->> MQ_MESSAGE_CONSUME_SUCCESS ->topic:{},message:{}", (Object)message.getTopic(), (Object)sequence);
                    }
                }
                catch (Exception e) {
                    logger.error(String.format("MENDMIX-TRACE-LOGGGING-->> MQ_MESSAGE_CONSUME_ERROR ->topic:%s,msgId:%s", message.getTopic(), sequence), (Throwable)e);
                }
            }
        }
    }

    static class MQMessageEventFactory
    implements EventFactory<MQMessageEvent> {
        MQMessageEventFactory() {
        }

        public MQMessageEvent newInstance() {
            return new MQMessageEvent();
        }
    }

    static class MQMessageEvent {
        private MQMessage value;

        MQMessageEvent() {
        }

        public void set(MQMessage value) {
            this.value = value;
        }

        public MQMessage get() {
            return this.value;
        }
    }
}

