/*
 * Decompiled with CFR 0.152.
 */
package dk.cloudcreate.essentials.components.foundation.messaging.queue;

import com.fasterxml.jackson.databind.exc.MismatchedInputException;
import dk.cloudcreate.essentials.components.foundation.IOExceptionUtil;
import dk.cloudcreate.essentials.components.foundation.Lifecycle;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.BatchMessageFetchingCapableDurableQueues;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueueConsumer;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueueDeserializationException;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueues;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueuesInterceptor;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.Message;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.OrderedMessage;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueueName;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueuedMessage;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueuedMessageHandler;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.GetNextMessageReadyForDelivery;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.HandleQueuedMessage;
import dk.cloudcreate.essentials.shared.Exceptions;
import dk.cloudcreate.essentials.shared.FailFast;
import dk.cloudcreate.essentials.shared.interceptor.InterceptorChain;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CentralizedMessageFetcher
implements Lifecycle {
    private static final Logger log = LoggerFactory.getLogger(CentralizedMessageFetcher.class);
    private final DurableQueues durableQueues;
    private final ScheduledExecutorService scheduler;
    private final ConcurrentMap<QueueName, DurableQueueConsumerRegistration> consumerRegistrations;
    private final AtomicBoolean started;
    private final long pollingIntervalMs;
    private final List<DurableQueuesInterceptor> interceptors;
    private final ConcurrentMap<QueueName, Set<String>> inProcessOrderedKeys;

    public CentralizedMessageFetcher(DurableQueues durableQueues, long pollingIntervalMs, List<DurableQueuesInterceptor> interceptors) {
        this.durableQueues = (DurableQueues)FailFast.requireNonNull((Object)durableQueues, (String)"No durableQueues provided");
        this.pollingIntervalMs = pollingIntervalMs;
        this.interceptors = new CopyOnWriteArrayList<DurableQueuesInterceptor>((Collection)FailFast.requireNonNull(interceptors, (String)"interceptors is missing"));
        this.consumerRegistrations = new ConcurrentHashMap<QueueName, DurableQueueConsumerRegistration>();
        this.inProcessOrderedKeys = new ConcurrentHashMap<QueueName, Set<String>>();
        this.scheduler = Executors.newScheduledThreadPool(1, r -> {
            Thread thread = new Thread(r);
            thread.setName("DurableQueues-CentralizedMessageFetcher");
            thread.setDaemon(true);
            return thread;
        });
        this.started = new AtomicBoolean(false);
    }

    public void registerConsumer(QueueName queueName, DurableQueueConsumer consumer, QueuedMessageHandler messageHandler, ExecutorService workerPool, int maxParallelConsumers) {
        FailFast.requireNonNull((Object)((Object)queueName), (String)"No queueName provided");
        FailFast.requireNonNull((Object)consumer, (String)"No consumer provided");
        FailFast.requireNonNull((Object)messageHandler, (String)"No messageHandler provided");
        FailFast.requireNonNull((Object)workerPool, (String)"No workerPool provided");
        FailFast.requireTrue((maxParallelConsumers > 0 ? 1 : 0) != 0, (String)"maxParallelConsumers must be > 0");
        log.debug("[{}] Registering consumer with max {} parallel consumers", (Object)queueName, (Object)maxParallelConsumers);
        this.consumerRegistrations.putIfAbsent(queueName, new DurableQueueConsumerRegistration(queueName, consumer, messageHandler, workerPool, maxParallelConsumers));
        this.inProcessOrderedKeys.putIfAbsent(queueName, ConcurrentHashMap.newKeySet());
    }

    public void unregisterConsumer(QueueName queueName) {
        FailFast.requireNonNull((Object)((Object)queueName), (String)"No queueName provided");
        log.debug("[{}] Unregistering consumer", (Object)queueName);
        this.consumerRegistrations.remove((Object)queueName);
        this.inProcessOrderedKeys.remove((Object)queueName);
    }

    public void start() {
        if (this.started.compareAndSet(false, true)) {
            log.info("Starting CentralizedMessageFetcher with polling interval {} ms", (Object)this.pollingIntervalMs);
            this.scheduler.scheduleAtFixedRate(() -> {
                if (!this.started.get()) {
                    return;
                }
                try {
                    this.fetchAndDistributeMessages();
                }
                catch (Throwable e) {
                    Exceptions.rethrowIfCriticalError((Throwable)e);
                    if (IOExceptionUtil.isIOException(e)) {
                        log.debug("I/O issue while polling queues", e);
                    }
                    log.error("Error during centralized message fetching", e);
                }
            }, 0L, this.pollingIntervalMs, TimeUnit.MILLISECONDS);
        }
    }

    public void stop() {
        if (this.started.compareAndSet(true, false)) {
            log.info("Stopping CentralizedMessageFetcher");
            this.scheduler.shutdownNow();
        }
    }

    public boolean isStarted() {
        return this.started.get();
    }

    private void fetchAndDistributeMessages() {
        if (this.consumerRegistrations.isEmpty()) {
            log.trace("No consumers registered, skipping fetch and distribute");
            return;
        }
        Map<QueueName, Integer> availableWorkerSlotsPerQueue = this.calculateAvailableWorkerSlotsPerQueue();
        if (availableWorkerSlotsPerQueue.values().stream().mapToInt(Integer::intValue).sum() == 0) {
            log.trace("No worker capacity available, skipping fetch and distribute");
            return;
        }
        HashMap<QueueName, Set<String>> excludeKeysPerQueue = new HashMap<QueueName, Set<String>>();
        availableWorkerSlotsPerQueue.forEach((queueName, slots) -> {
            if (slots > 0) {
                excludeKeysPerQueue.put((QueueName)((Object)queueName), new HashSet(this.inProcessOrderedKeys.getOrDefault(queueName, ConcurrentHashMap.newKeySet())));
            }
        });
        try {
            DurableQueues durableQueues = this.durableQueues;
            if (durableQueues instanceof BatchMessageFetchingCapableDurableQueues) {
                BatchMessageFetchingCapableDurableQueues batchCapableDurableQueues = (BatchMessageFetchingCapableDurableQueues)durableQueues;
                List<QueuedMessage> messages = batchCapableDurableQueues.fetchNextBatchOfMessages(availableWorkerSlotsPerQueue.keySet(), excludeKeysPerQueue, availableWorkerSlotsPerQueue);
                if (log.isDebugEnabled()) {
                    Map<QueueName, Long> messageCountsByQueue = messages.stream().collect(Collectors.groupingBy(QueuedMessage::getQueueName, Collectors.counting()));
                    log.debug("Batch fetched {} messages across {} queues: {}", new Object[]{messages.size(), availableWorkerSlotsPerQueue.size(), messageCountsByQueue});
                }
                for (QueuedMessage message : messages) {
                    QueueName queueName2 = message.getQueueName();
                    try {
                        this.processMessage(queueName2, message);
                    }
                    catch (Exception e) {
                        log.error("[{}:{}] Error processing message in batch: {}", new Object[]{queueName2, message.getId(), e.getMessage(), e});
                    }
                }
            } else {
                availableWorkerSlotsPerQueue.forEach((queueName, availableSlots) -> {
                    if (availableSlots <= 0) {
                        return;
                    }
                    Set keysBeingProcessed = this.inProcessOrderedKeys.getOrDefault(queueName, Collections.emptySet());
                    try {
                        for (int i = 0; i < availableSlots; ++i) {
                            Optional<QueuedMessage> messageOpt = this.durableQueues.getNextMessageReadyForDelivery(new GetNextMessageReadyForDelivery((QueueName)((Object)queueName), (Collection<String>)new ArrayList<String>(keysBeingProcessed)));
                            if (messageOpt.isEmpty()) {
                                log.trace("[{}] No more messages to process for this queue. Slot: {} (1-based)", (Object)queueName, (Object)(i + 1));
                                break;
                            }
                            this.processMessage((QueueName)((Object)queueName), messageOpt.get());
                        }
                    }
                    catch (Exception e) {
                        log.error("[{}] Error fetching messages: {}", new Object[]{queueName, e.getMessage(), e});
                    }
                });
            }
        }
        catch (Exception e) {
            log.error("Error during batch message fetching: {}", (Object)e.getMessage(), (Object)e);
        }
    }

    private void processMessage(QueueName queueName, QueuedMessage message) {
        DurableQueueConsumerRegistration registration;
        Set sharedKeysBeingProcessed = this.inProcessOrderedKeys.computeIfAbsent(queueName, _queueName -> ConcurrentHashMap.newKeySet());
        Message message2 = message.getMessage();
        if (message2 instanceof OrderedMessage) {
            OrderedMessage orderedMessage = (OrderedMessage)message2;
            sharedKeysBeingProcessed.add(orderedMessage.getKey());
        }
        if ((registration = (DurableQueueConsumerRegistration)this.consumerRegistrations.get((Object)queueName)) == null) {
            log.warn("[{}] Received message for unregistered consumer - will retry the message", (Object)queueName);
            this.durableQueues.retryMessage(message.getId(), null, registration.consumer.getRedeliveryPolicy().initialRedeliveryDelay);
            return;
        }
        registration.activeWorkers.incrementAndGet();
        log.debug("[{}] Submitting message {} to worker pool", (Object)queueName, (Object)message.getId());
        registration.workerPool.submit(() -> {
            block16: {
                try {
                    HandleQueuedMessage operation = new HandleQueuedMessage(message, registration.messageHandler);
                    InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, (InterceptorChain<HandleQueuedMessage, Void, DurableQueuesInterceptor>)interceptorChain), () -> {
                        registration.messageHandler.handle(message);
                        return null;
                    }).proceed();
                    if (message.isManuallyMarkedForRedelivery()) {
                        log.debug("[{}:{}] Message handler manually requested redelivery", (Object)queueName, (Object)message.getId());
                        try {
                            this.durableQueues.retryMessage(message.getId(), null, message.getRedeliveryDelay());
                        }
                        catch (Exception ex) {
                            log.warn("[{}:{}] Could not manually mark message for redelivery: {}", new Object[]{queueName, message.getId(), ex.getMessage()});
                        }
                        break block16;
                    }
                    log.debug("[{}:{}] Message handled successfully, acknowledging", (Object)queueName, (Object)message.getId());
                    try {
                        boolean acknowledged = this.durableQueues.acknowledgeMessageAsHandled(message.getId());
                        if (!acknowledged) {
                            log.debug("[{}:{}] Message acknowledgment reported message already handled or deleted", (Object)queueName, (Object)message.getId());
                        }
                    }
                    catch (Exception ex) {
                        log.warn("[{}:{}] Failed to acknowledge message: {}", new Object[]{queueName, message.getId(), ex.getMessage()});
                    }
                }
                catch (Throwable e) {
                    Exceptions.rethrowIfCriticalError((Throwable)e);
                    try {
                        boolean isPermanentError = this.isPermanentError(message, e);
                        if (isPermanentError || message.getTotalDeliveryAttempts() >= registration.consumer.getRedeliveryPolicy().getMaximumNumberOfRedeliveries() + 1) {
                            log.error("[{}:{}] Marking message as dead letter due to error: {}", new Object[]{queueName, message.getId(), e.getMessage(), e});
                            this.durableQueues.markAsDeadLetterMessage(message.getId(), e);
                            break block16;
                        }
                        Duration redeliveryDelay = registration.consumer.getRedeliveryPolicy().calculateNextRedeliveryDelay(message.getRedeliveryAttempts());
                        log.debug("[{}:{}] Will retry message with delay {}: {}", new Object[]{queueName, message.getId(), redeliveryDelay, e.getMessage()});
                        this.durableQueues.retryMessage(message.getId(), e, redeliveryDelay);
                    }
                    catch (Exception retryEx) {
                        log.error("[{}:{}] Error handling message failure: {}", new Object[]{queueName, message.getId(), retryEx.getMessage(), retryEx});
                    }
                }
                finally {
                    Message patt17321$temp = message.getMessage();
                    if (patt17321$temp instanceof OrderedMessage) {
                        OrderedMessage orderedMessage = (OrderedMessage)patt17321$temp;
                        String key = orderedMessage.getKey();
                        Set currentKeysBeingProcessed = (Set)this.inProcessOrderedKeys.get((Object)queueName);
                        if (currentKeysBeingProcessed != null) {
                            currentKeysBeingProcessed.remove(key);
                        }
                    }
                    registration.activeWorkers.decrementAndGet();
                }
            }
        });
    }

    private Map<QueueName, Integer> calculateAvailableWorkerSlotsPerQueue() {
        HashMap<QueueName, Integer> result = new HashMap<QueueName, Integer>();
        for (Map.Entry entry : this.consumerRegistrations.entrySet()) {
            QueueName queueName = (QueueName)((Object)entry.getKey());
            DurableQueueConsumerRegistration reg = (DurableQueueConsumerRegistration)entry.getValue();
            if (reg == null) {
                log.warn("Registration for queue '{}' is null, skipping", (Object)queueName);
                continue;
            }
            int activeWorkers = reg.activeWorkers.get();
            int availableSlots = Math.max(1, reg.maxParallelConsumers - activeWorkers);
            if (activeWorkers > 0 && (double)activeWorkers >= (double)reg.maxParallelConsumers * 0.8) {
                availableSlots = Math.min(availableSlots, 2);
            }
            result.put(queueName, availableSlots);
        }
        return result;
    }

    private boolean isPermanentError(QueuedMessage queuedMessage, Throwable e) {
        DurableQueueConsumerRegistration registration = (DurableQueueConsumerRegistration)this.consumerRegistrations.get((Object)queuedMessage.getQueueName());
        if (registration == null) {
            return true;
        }
        Throwable rootCause = Exceptions.getRootCause((Throwable)e);
        return registration.consumer.getRedeliveryPolicy().isPermanentError(queuedMessage, e) || e instanceof DurableQueueDeserializationException || e instanceof ClassCastException || rootCause instanceof ClassCastException || e instanceof NoClassDefFoundError || rootCause instanceof NoClassDefFoundError || rootCause instanceof MismatchedInputException || e instanceof IllegalArgumentException || rootCause instanceof IllegalArgumentException;
    }

    public boolean containsConsumerFor(QueueName queueName) {
        FailFast.requireNonNull((Object)((Object)queueName), (String)"No queueName provided");
        return this.consumerRegistrations.containsKey((Object)queueName);
    }

    private static class DurableQueueConsumerRegistration {
        private final QueueName queueName;
        private final DurableQueueConsumer consumer;
        private final QueuedMessageHandler messageHandler;
        private final ExecutorService workerPool;
        private final int maxParallelConsumers;
        private final AtomicInteger activeWorkers = new AtomicInteger(0);

        private DurableQueueConsumerRegistration(QueueName queueName, DurableQueueConsumer consumer, QueuedMessageHandler messageHandler, ExecutorService workerPool, int maxParallelConsumers) {
            this.queueName = (QueueName)((Object)FailFast.requireNonNull((Object)((Object)queueName), (String)"No queueName provided"));
            this.consumer = (DurableQueueConsumer)FailFast.requireNonNull((Object)consumer, (String)"No consumer provided");
            this.messageHandler = (QueuedMessageHandler)FailFast.requireNonNull((Object)messageHandler, (String)"No messageHandler provided");
            this.workerPool = (ExecutorService)FailFast.requireNonNull((Object)workerPool, (String)"No workerPool provided");
            this.maxParallelConsumers = maxParallelConsumers;
        }
    }
}

