/*
 * Decompiled with CFR 0.152.
 */
package org.dromara.mendmix.amqp.adapter;

import java.util.List;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.dromara.mendmix.amqp.MQConsumer;
import org.dromara.mendmix.amqp.MQContext;
import org.dromara.mendmix.amqp.MQMessage;
import org.dromara.mendmix.amqp.MessageHandler;
import org.dromara.mendmix.common.GlobalContext;
import org.dromara.mendmix.common.ThreadLocalContext;
import org.dromara.mendmix.common.async.DelayRetryExecutor;
import org.dromara.mendmix.common.async.ICaller;
import org.dromara.mendmix.common.async.StandardThreadExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractConsumer
implements MQConsumer {
    protected static Logger logger = LoggerFactory.getLogger((String)"org.dromara.mendmix.amqp.adapter");
    protected Map<String, MessageHandler> messageHandlers;
    private AtomicBoolean closed = new AtomicBoolean(false);
    protected StandardThreadExecutor fetchExecutor;
    protected StandardThreadExecutor asyncProcessExecutor;
    protected DelayRetryExecutor retryExecutor;
    protected Semaphore semaphore;
    protected MQContext context;
    private long lastFetchErrLoggingTime;

    public AbstractConsumer(MQContext context, Map<String, MessageHandler> messageHandlers) {
        this.context = context;
        this.messageHandlers = messageHandlers;
    }

    protected void startWorker() {
        int fetchCoreThreads;
        int fetchMaxThreads = fetchCoreThreads = 1;
        if (this.context.isAsyncConsumeEnabled()) {
            int maxThread = this.context.getMaxProcessThreads();
            this.semaphore = new Semaphore(maxThread);
            this.asyncProcessExecutor = new StandardThreadExecutor(1, maxThread, 60L, TimeUnit.SECONDS, maxThread, (ThreadFactory)new StandardThreadExecutor.StandardThreadFactory("messageAsyncProcessor"));
            fetchMaxThreads = maxThread;
            logger.info("<startup-logging>  init asyncProcessExecutor finish -> maxThread:{}", (Object)maxThread);
        }
        this.fetchExecutor = new StandardThreadExecutor(fetchCoreThreads, fetchMaxThreads, 0L, TimeUnit.SECONDS, fetchMaxThreads * 10, (ThreadFactory)new StandardThreadExecutor.StandardThreadFactory("messageFetcher"));
        this.fetchExecutor.execute((Runnable)new Worker());
        this.retryExecutor = new DelayRetryExecutor(1, 5000, 1000, 3);
        logger.info("<startup-logging>  init fetchExecutor finish -> fetchMaxThreads:{}", (Object)fetchMaxThreads);
    }

    public abstract List<MQMessage> fetchMessages();

    public abstract String handleMessageConsumed(MQMessage var1, boolean var2);

    protected void handleFetchError(Exception e) {
        long currentTime = System.currentTimeMillis();
        if (currentTime - this.lastFetchErrLoggingTime > 30000L) {
            logger.error("fetch_message_error", (Throwable)e);
            this.lastFetchErrLoggingTime = currentTime;
        }
        try {
            Thread.sleep(100L);
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    private void onMessageProcessComplated(MQMessage message, Exception ex) {
        this.handleMessageConsumed(message, ex == null);
        MQContext.processMessageLog(this.context, message, MQContext.ActionType.sub, ex);
    }

    private void asyncConsumeMessage(final MQMessage message) throws InterruptedException {
        if (this.context.getConsumeMaxRetryTimes() > 0 && message.getConsumeTimes() > this.context.getConsumeMaxRetryTimes()) {
            return;
        }
        this.semaphore.acquire();
        this.asyncProcessExecutor.execute(new Runnable(){

            @Override
            public void run() {
                AbstractConsumer.this.consumeMessage(message);
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void consumeMessage(final MQMessage message) {
        final MessageHandler messageHandler = this.messageHandlers.get(message.getTopic());
        try {
            if (messageHandler == null) {
                logger.warn("not messageHandler found for:{}", (Object)message.getTopic());
                return;
            }
            final String stateCheckUrl = this.context.getStateCheckUrl();
            if (!message.originStatusCompleted(stateCheckUrl)) {
                this.retryExecutor.submit("message:" + message.getMsgId(), (ICaller)new ICaller<Void>(){

                    public Void call() throws Exception {
                        if (message.originStatusCompleted(stateCheckUrl)) {
                            messageHandler.process(message);
                        }
                        return null;
                    }
                });
                logger.info("MQmessage_CONSUME_ABORT_ADD_RETRY -> message:{}", (Object)message.logString());
                return;
            }
            message.setUserContextOnConsume();
            messageHandler.process(message);
            this.onMessageProcessComplated(message, null);
            if (logger.isDebugEnabled()) {
                logger.debug("MQmessage_CONSUME_SUCCESS -> message:{}", (Object)message.logString());
            }
        }
        catch (Exception e) {
            logger.error(String.format("MQmessage_CONSUME_ERROR -> [%s]", message.logString()), (Throwable)e);
            if (messageHandler.retrieable()) {
                this.retryExecutor.submit("message:" + message.getMsgId(), (ICaller)new ICaller<Void>(){

                    public Void call() throws Exception {
                        messageHandler.process(message);
                        AbstractConsumer.this.onMessageProcessComplated(message, null);
                        return null;
                    }
                });
            } else {
                this.onMessageProcessComplated(message, e);
            }
        }
        finally {
            ThreadLocalContext.unset();
            if (this.semaphore != null) {
                this.semaphore.release();
            }
        }
    }

    @Override
    public void shutdown() {
        this.closed.set(true);
        if (this.fetchExecutor != null) {
            this.fetchExecutor.shutdown();
        }
        if (this.asyncProcessExecutor != null) {
            this.asyncProcessExecutor.shutdown();
        }
        if (this.retryExecutor != null) {
            this.retryExecutor.close();
        }
    }

    private class Worker
    implements Runnable {
        private Worker() {
        }

        @Override
        public void run() {
            while (!AbstractConsumer.this.closed.get()) {
                try {
                    if (GlobalContext.isStarting()) {
                        Thread.sleep(10L);
                        continue;
                    }
                    if (AbstractConsumer.this.asyncProcessExecutor != null && AbstractConsumer.this.asyncProcessExecutor.getSubmittedTasksCount() >= AbstractConsumer.this.context.getMaxProcessThreads()) {
                        Thread.sleep(1L);
                        continue;
                    }
                    List<MQMessage> messages = AbstractConsumer.this.fetchMessages();
                    if (messages == null || messages.isEmpty()) {
                        Thread.sleep(10L);
                        continue;
                    }
                    for (MQMessage message : messages) {
                        if (!AbstractConsumer.this.context.matchedOnFilter(message)) continue;
                        if (AbstractConsumer.this.asyncProcessExecutor == null) {
                            AbstractConsumer.this.consumeMessage(message);
                            continue;
                        }
                        AbstractConsumer.this.asyncConsumeMessage(message);
                    }
                }
                catch (Exception e) {
                    AbstractConsumer.this.handleFetchError(e);
                }
            }
        }
    }
}

