/*
 * Decompiled with CFR 0.152.
 */
package dk.cloudcreate.essentials.components.foundation.messaging.queue;

import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueueConsumer;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueueConsumerNotifications;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueueException;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueues;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.OrderedMessage;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueueName;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueuePollingOptimizer;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueuedMessage;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.TransactionalMode;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.ConsumeFromQueue;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.GetNextMessageReadyForDelivery;
import dk.cloudcreate.essentials.components.foundation.transaction.UnitOfWork;
import dk.cloudcreate.essentials.components.foundation.transaction.UnitOfWorkFactory;
import dk.cloudcreate.essentials.shared.Exceptions;
import dk.cloudcreate.essentials.shared.FailFast;
import dk.cloudcreate.essentials.shared.MessageFormatter;
import dk.cloudcreate.essentials.shared.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.time.Duration;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class DefaultDurableQueueConsumer<DURABLE_QUEUES extends DurableQueues, UOW extends UnitOfWork, UOW_FACTORY extends UnitOfWorkFactory<UOW>>
implements DurableQueueConsumer,
DurableQueueConsumerNotifications {
    public static final Logger LOG = LoggerFactory.getLogger(DurableQueueConsumer.class);
    public static final Logger MESSAGE_HANDLING_FAILURE_LOG = LoggerFactory.getLogger((String)(DurableQueueConsumer.class + ".MessageHandlingFailures"));
    public static final Runnable NO_POSTPROCESSING_AFTER_PROCESS_NEXT_MESSAGE = () -> {};
    public final QueueName queueName;
    private final ConsumeFromQueue consumeFromQueue;
    private volatile boolean started;
    private final ScheduledExecutorService scheduler;
    private final DURABLE_QUEUES durableQueues;
    private final Consumer<DurableQueueConsumer> removeDurableQueueConsumer;
    private final UOW_FACTORY unitOfWorkFactory;
    private final QueuePollingOptimizer queuePollingOptimizer;
    private final long pollingIntervalMs;
    private final ConcurrentMap<Thread, OrderedMessage> orderedMessageDeliveryThreads = new ConcurrentHashMap<Thread, OrderedMessage>();

    public DefaultDurableQueueConsumer(ConsumeFromQueue consumeFromQueue, UOW_FACTORY unitOfWorkFactory, DURABLE_QUEUES durableQueues, Consumer<DurableQueueConsumer> removeDurableQueueConsumer, long pollingIntervalMs, QueuePollingOptimizer queuePollingOptimizer) {
        this.consumeFromQueue = (ConsumeFromQueue)FailFast.requireNonNull((Object)consumeFromQueue, (String)"consumeFromQueue is missing");
        consumeFromQueue.validate();
        this.durableQueues = (DurableQueues)FailFast.requireNonNull(durableQueues, (String)"durableQueues is missing");
        this.unitOfWorkFactory = durableQueues.getTransactionalMode() == TransactionalMode.FullyTransactional ? (UnitOfWorkFactory)FailFast.requireNonNull(unitOfWorkFactory, (String)"You must specify a unitOfWorkFactory") : null;
        this.removeDurableQueueConsumer = (Consumer)FailFast.requireNonNull(removeDurableQueueConsumer, (String)"removeDurableQueueConsumer is missing");
        this.queueName = consumeFromQueue.queueName;
        this.pollingIntervalMs = pollingIntervalMs;
        this.queuePollingOptimizer = queuePollingOptimizer != null ? queuePollingOptimizer : QueuePollingOptimizer.None();
        this.scheduler = consumeFromQueue.getConsumerExecutorService().orElseGet(() -> Executors.newScheduledThreadPool(consumeFromQueue.getParallelConsumers(), new ThreadFactoryBuilder().nameFormat("Queue-" + this.queueName + "-Polling-%d").daemon(true).build()));
    }

    @Override
    public void start() {
        if (!this.started) {
            LOG.info("[{}] {} - Starting {} DurableQueueConsumer threads with polling interval {} ms", new Object[]{this.queueName, this.consumeFromQueue.consumerName, this.consumeFromQueue.getParallelConsumers(), this.pollingIntervalMs});
            for (int i = 0; i < this.consumeFromQueue.getParallelConsumers(); ++i) {
                if (i > 0) {
                    try {
                        Thread.sleep(10L);
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
                this.scheduler.scheduleAtFixedRate(this::pollQueue, this.pollingIntervalMs, this.pollingIntervalMs, TimeUnit.MILLISECONDS);
            }
            this.started = true;
        }
    }

    @Override
    public void stop() {
        if (this.started) {
            LOG.info("[{}] {} - Stopping DurableQueueConsumer", (Object)this.queueName, (Object)this.consumeFromQueue.consumerName);
            this.started = false;
            try {
                this.scheduler.shutdownNow();
            }
            finally {
                this.removeDurableQueueConsumer.accept(this);
                LOG.info("[{}] {} - DurableQueueConsumer stopped", (Object)this.queueName, (Object)this.consumeFromQueue.consumerName);
            }
        }
    }

    @Override
    public boolean isStarted() {
        return this.started;
    }

    @Override
    public QueueName queueName() {
        return this.queueName;
    }

    @Override
    public void cancel() {
        this.stop();
    }

    private void pollQueue() {
        if (!this.started) {
            LOG.trace("[{}] {} - Skipping Polling Queue as the consumer is not started", (Object)this.queueName, (Object)this.consumeFromQueue.consumerName);
            return;
        }
        try {
            if (this.queuePollingOptimizer.shouldSkipPolling()) {
                LOG.trace("[{}] {} - Skipping polling", (Object)this.queueName, (Object)this.consumeFromQueue.consumerName);
                return;
            }
            LOG.trace("[{}] {} - Polling Queue for the next message ready for delivery. Transactional mode: {}", new Object[]{this.queueName, this.consumeFromQueue.consumerName, this.durableQueues.getTransactionalMode()});
            Runnable postTransactionalSideEffect = null;
            if (this.durableQueues.getTransactionalMode() == TransactionalMode.FullyTransactional) {
                if (this.unitOfWorkFactory.getCurrentUnitOfWork().isPresent()) {
                    throw new DurableQueueException(MessageFormatter.msg((String)"[{}] {} - Previous UnitOfWork isn't completed/removed: {}", (Object[])new Object[]{this.queueName, this.consumeFromQueue.consumerName, this.unitOfWorkFactory.getCurrentUnitOfWork().get()}), this.queueName);
                }
                try {
                    postTransactionalSideEffect = (Runnable)this.unitOfWorkFactory.withUnitOfWork(handleAwareUnitOfWork -> this.processNextMessageReadyForDelivery());
                }
                catch (Exception e) {
                    this.handleProcessNextMessageReadyForDeliveryException(e);
                }
            } else {
                try {
                    postTransactionalSideEffect = this.processNextMessageReadyForDelivery();
                }
                catch (Exception e) {
                    this.handleProcessNextMessageReadyForDeliveryException(e);
                }
            }
            if (postTransactionalSideEffect != null) {
                postTransactionalSideEffect.run();
            }
        }
        catch (Throwable e) {
            LOG.error(MessageFormatter.msg((String)"[{}] {} - Failed to poll queue", (Object[])new Object[]{this.consumeFromQueue.consumerName, this.queueName}), e);
        }
    }

    private void handleProcessNextMessageReadyForDeliveryException(Exception e) {
        Throwable rootCause = Exceptions.getRootCause((Throwable)e);
        if (e.getMessage().contains("has been closed") || e.getMessage().contains("Connection is closed") || rootCause instanceof IOException || rootCause.getClass().getSimpleName().equals("ConnectionException") || rootCause.getClass().getSimpleName().equals("MongoSocketReadException")) {
            LOG.trace(MessageFormatter.msg((String)"[{}] {} - Experienced a Connection issue, will retry later", (Object[])new Object[]{this.queueName, this.consumeFromQueue.consumerName}), (Throwable)e);
        } else {
            LOG.error(MessageFormatter.msg((String)"[{}] {} - Experienced an error, will retry later", (Object[])new Object[]{this.queueName, this.consumeFromQueue.consumerName}), (Throwable)e);
        }
    }

    private Runnable processNextMessageReadyForDelivery() {
        try {
            if (this.started) {
                List<String> excludeOrderedMessagesWithTheseKeys = this.resolveMessageKeysToExclude();
                return this.durableQueues.getNextMessageReadyForDelivery(new GetNextMessageReadyForDelivery(this.queueName, excludeOrderedMessagesWithTheseKeys)).map(this::handleMessage).orElseGet(() -> this.queuePollingOptimizer::queuePollingReturnedNoMessages);
            }
            return NO_POSTPROCESSING_AFTER_PROCESS_NEXT_MESSAGE;
        }
        catch (Throwable e) {
            LOG.error(MessageFormatter.msg((String)"[{}] Error Polling Queue", (Object[])new Object[]{this.queueName}), e);
            return NO_POSTPROCESSING_AFTER_PROCESS_NEXT_MESSAGE;
        }
    }

    private List<String> resolveMessageKeysToExclude() {
        OrderedMessage orderedMessageLastHandled = (OrderedMessage)((Object)this.orderedMessageDeliveryThreads.get(Thread.currentThread()));
        HashSet allOrderedMessages = new HashSet(this.orderedMessageDeliveryThreads.values());
        allOrderedMessages.remove((Object)orderedMessageLastHandled);
        List<String> excludeOrderedMessagesWithTheseKeys = allOrderedMessages.stream().map(OrderedMessage::getKey).collect(Collectors.toList());
        return excludeOrderedMessagesWithTheseKeys;
    }

    private Runnable handleMessage(QueuedMessage queuedMessage) {
        boolean isOrderedMessage = queuedMessage.getMessage() instanceof OrderedMessage;
        LOG.debug("[{}:{}] {} - Delivering {}message{}. Total attempts: {}, Redelivery Attempts: {}", new Object[]{this.queueName, queuedMessage.getId(), this.consumeFromQueue.consumerName, isOrderedMessage ? "Ordered " : "", isOrderedMessage ? MessageFormatter.msg((String)" {}:{}", (Object[])new Object[]{((OrderedMessage)queuedMessage.getMessage()).getKey(), ((OrderedMessage)queuedMessage.getMessage()).getOrder()}) : "", queuedMessage.getTotalDeliveryAttempts(), queuedMessage.getRedeliveryAttempts()});
        if (isOrderedMessage) {
            this.orderedMessageDeliveryThreads.put(Thread.currentThread(), (OrderedMessage)queuedMessage.getMessage());
        }
        try {
            this.consumeFromQueue.queueMessageHandler.handle(queuedMessage);
            LOG.debug("[{}:{}] {} - Message handled successfully. Deleting the message in the Queue Store message. Total attempts: {}, Redelivery Attempts: {}", new Object[]{this.queueName, queuedMessage.getId(), this.consumeFromQueue.consumerName, queuedMessage.getTotalDeliveryAttempts(), queuedMessage.getRedeliveryAttempts()});
            this.durableQueues.acknowledgeMessageAsHandled(queuedMessage.getId());
            this.orderedMessageDeliveryThreads.remove(Thread.currentThread());
            return () -> this.queuePollingOptimizer.queuePollingReturnedMessage(queuedMessage);
        }
        catch (Throwable e) {
            MESSAGE_HANDLING_FAILURE_LOG.debug(MessageFormatter.msg((String)"[{}:{}] {} - QueueMessageHandler for failed to handle message: {}", (Object[])new Object[]{this.queueName, queuedMessage.getId(), this.consumeFromQueue.consumerName, queuedMessage}), e);
            boolean isPermanentError = this.consumeFromQueue.getRedeliveryPolicy().isPermanentError(queuedMessage, e);
            if (isPermanentError || queuedMessage.getTotalDeliveryAttempts() >= this.consumeFromQueue.getRedeliveryPolicy().maximumNumberOfRedeliveries + 1) {
                MESSAGE_HANDLING_FAILURE_LOG.debug("[{}:{}] {} - Marking Message as Dead Letter. Is Permanent Error: {}. Message: {}", new Object[]{this.queueName, queuedMessage.getId(), this.consumeFromQueue.consumerName, isPermanentError, queuedMessage});
                try {
                    this.durableQueues.markAsDeadLetterMessage(queuedMessage.getId(), e);
                    this.orderedMessageDeliveryThreads.remove(Thread.currentThread());
                    return () -> this.queuePollingOptimizer.queuePollingReturnedMessage(queuedMessage);
                }
                catch (Throwable ex) {
                    MESSAGE_HANDLING_FAILURE_LOG.error(MessageFormatter.msg((String)"[{}:{}] {} - Failed to mark the Message as a Dead Letter Message. Details: Is Permanent Error: {}. Message: {}", (Object[])new Object[]{this.queueName, queuedMessage.getId(), this.consumeFromQueue.consumerName, isPermanentError, queuedMessage}), ex);
                    return NO_POSTPROCESSING_AFTER_PROCESS_NEXT_MESSAGE;
                }
            }
            Duration redeliveryDelay = this.consumeFromQueue.getRedeliveryPolicy().calculateNextRedeliveryDelay(queuedMessage.getRedeliveryAttempts());
            MESSAGE_HANDLING_FAILURE_LOG.debug(MessageFormatter.msg((String)"[{}:{}] {} - Using redeliveryDelay '{}' for QueueEntryId '{}' due to: {}", (Object[])new Object[]{this.queueName, queuedMessage.getId(), this.consumeFromQueue.consumerName, redeliveryDelay, queuedMessage.getId(), e.getMessage()}));
            try {
                this.durableQueues.retryMessage(queuedMessage.getId(), e, redeliveryDelay);
                this.orderedMessageDeliveryThreads.remove(Thread.currentThread());
                return NO_POSTPROCESSING_AFTER_PROCESS_NEXT_MESSAGE;
            }
            catch (Throwable ex) {
                if (ex.getMessage().contains("Interrupted waiting for lock")) {
                    MESSAGE_HANDLING_FAILURE_LOG.debug(MessageFormatter.msg((String)"[{}:{}] {} - Failed to register the message for retry.", (Object[])new Object[]{this.queueName, queuedMessage.getId(), this.consumeFromQueue.consumerName}), ex);
                } else {
                    MESSAGE_HANDLING_FAILURE_LOG.error(MessageFormatter.msg((String)"[{}:{}] {} - Failed to register the message for retry.", (Object[])new Object[]{this.queueName, queuedMessage.getId(), this.consumeFromQueue.consumerName}), ex);
                }
                return NO_POSTPROCESSING_AFTER_PROCESS_NEXT_MESSAGE;
            }
        }
    }

    @Override
    public void messageAdded(QueuedMessage queuedMessage) {
        this.queuePollingOptimizer.messageAdded(queuedMessage);
    }

    public String toString() {
        return "DurableQueueConsumer{, started=" + this.started + this.consumeFromQueue.toString() + "}";
    }
}

