/*
 * Decompiled with CFR 0.152.
 */
package dk.cloudcreate.essentials.components.foundation.messaging.queue;

import com.fasterxml.jackson.databind.exc.MismatchedInputException;
import dk.cloudcreate.essentials.components.foundation.IOExceptionUtil;
import dk.cloudcreate.essentials.components.foundation.messaging.RedeliveryPolicy;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueueConsumer;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueueConsumerNotifications;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueueDeserializationException;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueueException;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueues;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueuesInterceptor;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.OrderedMessage;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueueName;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueuePollingOptimizer;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueuedMessage;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.TransactionalMode;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.ConsumeFromQueue;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.GetNextMessageReadyForDelivery;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.HandleQueuedMessage;
import dk.cloudcreate.essentials.components.foundation.transaction.UnitOfWork;
import dk.cloudcreate.essentials.components.foundation.transaction.UnitOfWorkFactory;
import dk.cloudcreate.essentials.shared.Exceptions;
import dk.cloudcreate.essentials.shared.FailFast;
import dk.cloudcreate.essentials.shared.MessageFormatter;
import dk.cloudcreate.essentials.shared.concurrent.ThreadFactoryBuilder;
import dk.cloudcreate.essentials.shared.interceptor.InterceptorChain;
import java.time.Duration;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

public abstract class DefaultDurableQueueConsumer<DURABLE_QUEUES extends DurableQueues, UOW extends UnitOfWork, UOW_FACTORY extends UnitOfWorkFactory<UOW>>
implements DurableQueueConsumer,
DurableQueueConsumerNotifications {
    public static final Logger LOG = LoggerFactory.getLogger(DurableQueueConsumer.class);
    public static final Logger MESSAGE_HANDLING_FAILURE_LOG = LoggerFactory.getLogger((String)(DurableQueueConsumer.class.getName() + ".MessageHandlingFailures"));
    public static final Runnable NO_POSTPROCESSING_AFTER_PROCESS_NEXT_MESSAGE = () -> {};
    public final QueueName queueName;
    private final ConsumeFromQueue consumeFromQueue;
    private final List<DurableQueuesInterceptor> interceptors;
    private volatile boolean started;
    private final ScheduledExecutorService scheduler;
    private final DURABLE_QUEUES durableQueues;
    private final Consumer<DurableQueueConsumer> removeDurableQueueConsumer;
    private final UOW_FACTORY unitOfWorkFactory;
    private final QueuePollingOptimizer queuePollingOptimizer;
    private final long pollingIntervalMs;
    private final ConcurrentMap<Thread, OrderedMessage> orderedMessageDeliveryThreads = new ConcurrentHashMap<Thread, OrderedMessage>();

    public DefaultDurableQueueConsumer(ConsumeFromQueue consumeFromQueue, UOW_FACTORY unitOfWorkFactory, DURABLE_QUEUES durableQueues, Consumer<DurableQueueConsumer> removeDurableQueueConsumer, long pollingIntervalMs, QueuePollingOptimizer queuePollingOptimizer, List<DurableQueuesInterceptor> interceptors) {
        this.consumeFromQueue = (ConsumeFromQueue)FailFast.requireNonNull((Object)consumeFromQueue, (String)"consumeFromQueue is missing");
        this.interceptors = new CopyOnWriteArrayList<DurableQueuesInterceptor>((Collection)FailFast.requireNonNull(interceptors, (String)"interceptors is missing"));
        consumeFromQueue.validate();
        this.durableQueues = (DurableQueues)FailFast.requireNonNull(durableQueues, (String)"durableQueues is missing");
        this.unitOfWorkFactory = durableQueues.getTransactionalMode() == TransactionalMode.FullyTransactional ? (UnitOfWorkFactory)FailFast.requireNonNull(unitOfWorkFactory, (String)"You must specify a unitOfWorkFactory") : null;
        this.removeDurableQueueConsumer = (Consumer)FailFast.requireNonNull(removeDurableQueueConsumer, (String)"removeDurableQueueConsumer is missing");
        this.queueName = consumeFromQueue.queueName;
        this.pollingIntervalMs = pollingIntervalMs;
        this.queuePollingOptimizer = queuePollingOptimizer != null ? queuePollingOptimizer : QueuePollingOptimizer.None();
        this.scheduler = consumeFromQueue.getConsumerExecutorService().orElseGet(() -> Executors.newScheduledThreadPool(consumeFromQueue.getParallelConsumers(), new ThreadFactoryBuilder().nameFormat("Queue-" + String.valueOf((Object)this.queueName) + "-Polling-%d").daemon(true).uncaughtExceptionHandler((thread, throwable) -> LOG.error(MessageFormatter.msg((String)"[{}] {} - Unexpected error", (Object[])new Object[]{this.queueName, consumeFromQueue.consumerName}), throwable)).build()));
        LOG.info("[{}] '{}' - Created '{}' with '{}', {} thread(s) and polling interval {} ms", new Object[]{this.queueName, consumeFromQueue.consumerName, this.getClass().getSimpleName(), this.queuePollingOptimizer, consumeFromQueue.getParallelConsumers(), pollingIntervalMs});
    }

    public void start() {
        if (!this.started) {
            LOG.info("[{}] {} - Starting {} DurableQueueConsumer threads with polling interval {} ms", new Object[]{this.queueName, this.consumeFromQueue.consumerName, this.consumeFromQueue.getParallelConsumers(), this.pollingIntervalMs});
            for (int i = 0; i < this.consumeFromQueue.getParallelConsumers(); ++i) {
                if (i > 0) {
                    try {
                        Thread.sleep(10L);
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
                this.scheduler.scheduleAtFixedRate(this::pollQueue, this.pollingIntervalMs, this.pollingIntervalMs, TimeUnit.MILLISECONDS);
            }
            this.started = true;
        }
    }

    public void stop() {
        if (this.started) {
            LOG.info("[{}] {} - Stopping DurableQueueConsumer", (Object)this.queueName, (Object)this.consumeFromQueue.consumerName);
            this.started = false;
            try {
                this.scheduler.shutdownNow();
            }
            finally {
                this.removeDurableQueueConsumer.accept(this);
                LOG.info("[{}] {} - DurableQueueConsumer stopped", (Object)this.queueName, (Object)this.consumeFromQueue.consumerName);
            }
        }
    }

    public boolean isStarted() {
        return this.started;
    }

    @Override
    public QueueName queueName() {
        return this.queueName;
    }

    @Override
    public String consumerName() {
        return this.consumeFromQueue.consumerName;
    }

    @Override
    public void cancel() {
        this.stop();
    }

    @Override
    public RedeliveryPolicy getRedeliveryPolicy() {
        return this.consumeFromQueue.getRedeliveryPolicy();
    }

    private void pollQueue() {
        if (!this.started) {
            LOG.trace("[{}] {} - Skipping Polling Queue as the consumer is not started", (Object)this.queueName, (Object)this.consumeFromQueue.consumerName);
            return;
        }
        try {
            LOG.trace("[{}] {} - Entered pollQueue", (Object)this.queueName, (Object)this.consumeFromQueue.consumerName);
            if (this.queuePollingOptimizer.shouldSkipPolling()) {
                LOG.trace("[{}] {} - Skipping queue polling", (Object)this.queueName, (Object)this.consumeFromQueue.consumerName);
                return;
            }
            LOG.trace("[{}] {} - Polling Queue for the next message ready for delivery. Transactional mode: {}", new Object[]{this.queueName, this.consumeFromQueue.consumerName, this.durableQueues.getTransactionalMode()});
            Runnable postTransactionalSideEffect = null;
            if (this.durableQueues.getTransactionalMode() == TransactionalMode.FullyTransactional) {
                if (this.unitOfWorkFactory.getCurrentUnitOfWork().isPresent()) {
                    throw new DurableQueueException(MessageFormatter.msg((String)"[{}] {} - Previous UnitOfWork isn't completed/removed: {}", (Object[])new Object[]{this.queueName, this.consumeFromQueue.consumerName, this.unitOfWorkFactory.getCurrentUnitOfWork().get()}), this.queueName);
                }
                try {
                    postTransactionalSideEffect = (Runnable)this.unitOfWorkFactory.withUnitOfWork(handleAwareUnitOfWork -> this.processNextMessageReadyForDelivery());
                }
                catch (Exception e) {
                    this.handleProcessNextMessageReadyForDeliveryException(e);
                }
            } else {
                try {
                    postTransactionalSideEffect = this.processNextMessageReadyForDelivery();
                }
                catch (Exception e) {
                    this.handleProcessNextMessageReadyForDeliveryException(e);
                }
            }
            if (postTransactionalSideEffect != null) {
                postTransactionalSideEffect.run();
            }
            LOG.trace("[{}] {} - Completed pollQueue", (Object)this.queueName, (Object)this.consumeFromQueue.consumerName);
        }
        catch (Throwable e) {
            Exceptions.rethrowIfCriticalError((Throwable)e);
            if (IOExceptionUtil.isIOException(e)) {
                LOG.debug(MessageFormatter.msg((String)"[{}] {} - Experienced a Connection issue while polling queue", (Object[])new Object[]{this.queueName, this.consumeFromQueue.consumerName}), e);
            }
            LOG.error(MessageFormatter.msg((String)"[{}] {} - Failed to poll queue", (Object[])new Object[]{this.queueName, this.consumeFromQueue.consumerName}), e);
        }
    }

    private void handleProcessNextMessageReadyForDeliveryException(Exception e) {
        if (IOExceptionUtil.isIOException(e)) {
            LOG.debug(MessageFormatter.msg((String)"[{}] {} - Experienced a Connection issue, will retry later", (Object[])new Object[]{this.queueName, this.consumeFromQueue.consumerName}), (Throwable)e);
        } else {
            LOG.error(MessageFormatter.msg((String)"[{}] {} - Experienced an error, will retry later", (Object[])new Object[]{this.queueName, this.consumeFromQueue.consumerName}), (Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Runnable processNextMessageReadyForDelivery() {
        Runnable excludeOrderedMessagesWithTheseKeys2;
        block7: {
            if (!this.started) break block7;
            LOG.trace("[{}] {} - Entered {}.processNextMessageReadyForDelivery", new Object[]{this.queueName, this.consumeFromQueue.consumerName, DefaultDurableQueueConsumer.class.getSimpleName()});
            List<String> excludeOrderedMessagesWithTheseKeys2 = this.resolveMessageKeysToExclude();
            Runnable[] resultRunnable = new Runnable[1];
            Mono messageMono = Mono.fromCallable(() -> {
                LOG.debug("[{}] {} - Calling {}.getNextMessageReadyForDelivery", new Object[]{this.queueName, this.consumeFromQueue.consumerName, this.durableQueues.getClass().getSimpleName()});
                Optional<QueuedMessage> nextMessageReadyForDelivery = this.durableQueues.getNextMessageReadyForDelivery(new GetNextMessageReadyForDelivery(this.queueName, excludeOrderedMessagesWithTheseKeys2));
                LOG.debug("[{}] {} - {}.getNextMessageReadyForDelivery returned {}", new Object[]{this.queueName, this.consumeFromQueue.consumerName, this.durableQueues.getClass().getSimpleName(), nextMessageReadyForDelivery});
                return nextMessageReadyForDelivery;
            }).onErrorResume(throwable -> {
                if (Exceptions.isCriticalError((Throwable)throwable)) {
                    return Mono.error((Throwable)throwable);
                }
                if (IOExceptionUtil.isIOException(throwable)) {
                    LOG.debug(MessageFormatter.msg((String)"[{}] {} - Error occurred during {}.getNextMessageReadyForDelivery queue processing", (Object[])new Object[]{this.queueName, this.consumeFromQueue.consumerName, this.durableQueues.getClass().getSimpleName()}), throwable);
                } else {
                    LOG.error(MessageFormatter.msg((String)"[{}] {} - Error occurred during {}.getNextMessageReadyForDelivery queue processing", (Object[])new Object[]{this.queueName, this.consumeFromQueue.consumerName, this.durableQueues.getClass().getSimpleName()}), throwable);
                }
                return Mono.just(Optional.empty());
            });
            messageMono.subscribe(optionalQueuedMessage -> {
                if (optionalQueuedMessage.isPresent()) {
                    LOG.debug("[{}] {} - {}.getNextMessageReadyForDelivery return a Message", new Object[]{this.queueName, this.consumeFromQueue.consumerName, this.durableQueues.getClass().getSimpleName()});
                    QueuedMessage queuedMessage = (QueuedMessage)optionalQueuedMessage.get();
                    LOG.trace("[{}] {} - Handling Next Message ReadyForDelivery with id '{}'", new Object[]{this.queueName, this.consumeFromQueue.consumerName, queuedMessage.getId()});
                    resultRunnable[0] = this.handleMessage(queuedMessage);
                } else {
                    LOG.debug("[{}] {} - {}.getNextMessageReadyForDelivery did NOT return a Message", new Object[]{this.queueName, this.consumeFromQueue.consumerName, this.durableQueues.getClass().getSimpleName()});
                    this.queuePollingOptimizer.queuePollingReturnedNoMessages();
                }
            });
            Runnable runnable = resultRunnable[0] != null ? resultRunnable[0] : NO_POSTPROCESSING_AFTER_PROCESS_NEXT_MESSAGE;
            LOG.trace("[{}] {} - Completed {}.getNextMessageReadyForDelivery", new Object[]{this.queueName, this.consumeFromQueue.consumerName, DefaultDurableQueueConsumer.class.getSimpleName()});
            return runnable;
        }
        try {
            excludeOrderedMessagesWithTheseKeys2 = NO_POSTPROCESSING_AFTER_PROCESS_NEXT_MESSAGE;
        }
        catch (Throwable e) {
            Runnable runnable;
            try {
                Exceptions.rethrowIfCriticalError((Throwable)e);
                if (IOExceptionUtil.isIOException(e)) {
                    LOG.debug(MessageFormatter.msg((String)"[{}] {} Can't Poll Queue - Connection seems to be broken or closed, this can happen during JVM or application shutdown", (Object[])new Object[]{this.queueName, this.consumeFromQueue.consumerName}));
                } else {
                    LOG.error(MessageFormatter.msg((String)"[{}] {} Error Polling Queue", (Object[])new Object[]{this.queueName, this.consumeFromQueue.consumerName}), e);
                }
                runnable = NO_POSTPROCESSING_AFTER_PROCESS_NEXT_MESSAGE;
            }
            catch (Throwable throwable2) {
                LOG.trace("[{}] {} - Completed {}.getNextMessageReadyForDelivery", new Object[]{this.queueName, this.consumeFromQueue.consumerName, DefaultDurableQueueConsumer.class.getSimpleName()});
                throw throwable2;
            }
            LOG.trace("[{}] {} - Completed {}.getNextMessageReadyForDelivery", new Object[]{this.queueName, this.consumeFromQueue.consumerName, DefaultDurableQueueConsumer.class.getSimpleName()});
            return runnable;
        }
        LOG.trace("[{}] {} - Completed {}.getNextMessageReadyForDelivery", new Object[]{this.queueName, this.consumeFromQueue.consumerName, DefaultDurableQueueConsumer.class.getSimpleName()});
        return excludeOrderedMessagesWithTheseKeys2;
    }

    private List<String> resolveMessageKeysToExclude() {
        OrderedMessage orderedMessageLastHandled = (OrderedMessage)((Object)this.orderedMessageDeliveryThreads.get(Thread.currentThread()));
        HashSet allOrderedMessages = new HashSet(this.orderedMessageDeliveryThreads.values());
        allOrderedMessages.remove((Object)orderedMessageLastHandled);
        List<String> excludeOrderedMessagesWithTheseKeys = allOrderedMessages.stream().map(OrderedMessage::getKey).collect(Collectors.toList());
        return excludeOrderedMessagesWithTheseKeys;
    }

    private Runnable handleMessage(QueuedMessage queuedMessage) {
        boolean isOrderedMessage = queuedMessage.getMessage() instanceof OrderedMessage;
        LOG.debug("[{}:{}] {} - Delivering {}message{}. Total attempts: {}, Redelivery Attempts: {}", new Object[]{this.queueName, queuedMessage.getId(), this.consumeFromQueue.consumerName, isOrderedMessage ? "Ordered " : "", isOrderedMessage ? MessageFormatter.msg((String)" {}:{}", (Object[])new Object[]{((OrderedMessage)queuedMessage.getMessage()).getKey(), ((OrderedMessage)queuedMessage.getMessage()).getOrder()}) : "", queuedMessage.getTotalDeliveryAttempts(), queuedMessage.getRedeliveryAttempts()});
        if (isOrderedMessage) {
            this.orderedMessageDeliveryThreads.put(Thread.currentThread(), (OrderedMessage)queuedMessage.getMessage());
        }
        try {
            HandleQueuedMessage operation = new HandleQueuedMessage(queuedMessage, this.consumeFromQueue.queueMessageHandler);
            InterceptorChain.newInterceptorChainForOperation((Object)operation, this.interceptors, (interceptor, interceptorChain) -> interceptor.intercept(operation, (InterceptorChain<HandleQueuedMessage, Void, DurableQueuesInterceptor>)interceptorChain), () -> {
                this.consumeFromQueue.queueMessageHandler.handle(queuedMessage);
                return null;
            }).proceed();
            if (queuedMessage.isManuallyMarkedForRedelivery()) {
                LOG.debug("[{}:{}] {} - Message handler manually requested redelivery of the message", new Object[]{this.queueName, queuedMessage.getId(), this.consumeFromQueue.consumerName});
                return this.retryMessage(queuedMessage, null, queuedMessage.getRedeliveryDelay());
            }
            LOG.debug("[{}:{}] {} - Message handled successfully. Deleting the message in the Queue Store message. Total attempts: {}, Redelivery Attempts: {}", new Object[]{this.queueName, queuedMessage.getId(), this.consumeFromQueue.consumerName, queuedMessage.getTotalDeliveryAttempts(), queuedMessage.getRedeliveryAttempts()});
            this.durableQueues.acknowledgeMessageAsHandled(queuedMessage.getId());
            this.orderedMessageDeliveryThreads.remove(Thread.currentThread());
            return () -> this.queuePollingOptimizer.queuePollingReturnedMessage(queuedMessage);
        }
        catch (Throwable e) {
            Exceptions.rethrowIfCriticalError((Throwable)e);
            boolean isPermanentError = this.isPermanentError(queuedMessage, e);
            if (isPermanentError || queuedMessage.getTotalDeliveryAttempts() >= this.consumeFromQueue.getRedeliveryPolicy().maximumNumberOfRedeliveries + 1) {
                if (isPermanentError) {
                    MESSAGE_HANDLING_FAILURE_LOG.error(MessageFormatter.msg((String)"[{}:{}] {} - Marking Message as Dead Letter. Is Permanent Error: {}. Message: {}", (Object[])new Object[]{this.queueName, queuedMessage.getId(), this.consumeFromQueue.consumerName, isPermanentError, queuedMessage}), e);
                } else {
                    MESSAGE_HANDLING_FAILURE_LOG.warn(MessageFormatter.msg((String)"[{}:{}] {} - Too many deliveries, marking Message as Dead Letter. Is Permanent Error: {}. Message: {}", (Object[])new Object[]{this.queueName, queuedMessage.getId(), this.consumeFromQueue.consumerName, isPermanentError, queuedMessage}), e);
                }
                try {
                    this.durableQueues.markAsDeadLetterMessage(queuedMessage.getId(), e);
                    this.orderedMessageDeliveryThreads.remove(Thread.currentThread());
                    return () -> this.queuePollingOptimizer.queuePollingReturnedMessage(queuedMessage);
                }
                catch (Throwable ex) {
                    Exceptions.rethrowIfCriticalError((Throwable)e);
                    String msg = MessageFormatter.msg((String)"[{}:{}] {} - Failed to mark the Message as a Dead Letter Message. Details: Is Permanent Error: {}. Message: {}", (Object[])new Object[]{this.queueName, queuedMessage.getId(), this.consumeFromQueue.consumerName, isPermanentError, queuedMessage});
                    MESSAGE_HANDLING_FAILURE_LOG.error(msg, ex);
                    if (this.durableQueues.getTransactionalMode() == TransactionalMode.FullyTransactional) {
                        throw new DurableQueueException(msg, ex, this.queueName);
                    }
                    return NO_POSTPROCESSING_AFTER_PROCESS_NEXT_MESSAGE;
                }
            }
            if (MESSAGE_HANDLING_FAILURE_LOG.isTraceEnabled()) {
                MESSAGE_HANDLING_FAILURE_LOG.trace(MessageFormatter.msg((String)"[{}:{}] {} - QueueMessageHandler for failed to handle message: {}", (Object[])new Object[]{this.queueName, queuedMessage.getId(), this.consumeFromQueue.consumerName, queuedMessage}), e);
            }
            Duration redeliveryDelay = this.consumeFromQueue.getRedeliveryPolicy().calculateNextRedeliveryDelay(queuedMessage.getRedeliveryAttempts());
            return this.retryMessage(queuedMessage, e, redeliveryDelay);
        }
    }

    private Runnable retryMessage(QueuedMessage queuedMessage, Throwable e, Duration redeliveryDelay) {
        if (MESSAGE_HANDLING_FAILURE_LOG.isDebugEnabled()) {
            MESSAGE_HANDLING_FAILURE_LOG.debug(MessageFormatter.msg((String)"[{}:{}] {} - Will redeliver {} message with redeliveryDelay '{}'. Number of Redelivery-Attempts so far: {}", (Object[])new Object[]{this.queueName, queuedMessage.getId(), this.consumeFromQueue.consumerName, e != null ? "Failed" : "", redeliveryDelay, queuedMessage.getRedeliveryAttempts()}), e);
        }
        try {
            this.durableQueues.retryMessage(queuedMessage.getId(), e, redeliveryDelay);
            this.orderedMessageDeliveryThreads.remove(Thread.currentThread());
            return NO_POSTPROCESSING_AFTER_PROCESS_NEXT_MESSAGE;
        }
        catch (Throwable ex) {
            Exceptions.rethrowIfCriticalError((Throwable)e);
            if (ex.getMessage().contains("Interrupted waiting for lock")) {
                MESSAGE_HANDLING_FAILURE_LOG.debug(MessageFormatter.msg((String)"[{}:{}] {} - Failed to register the message for delayed retry. This can typically happen during JVM or Application shutdown", (Object[])new Object[]{this.queueName, queuedMessage.getId(), this.consumeFromQueue.consumerName}), ex);
            } else {
                String msg = MessageFormatter.msg((String)"[{}:{}] {} - Failed to register the message for delayed retry.", (Object[])new Object[]{this.queueName, queuedMessage.getId(), this.consumeFromQueue.consumerName});
                MESSAGE_HANDLING_FAILURE_LOG.error(msg, ex);
                if (this.durableQueues.getTransactionalMode() == TransactionalMode.FullyTransactional) {
                    throw new DurableQueueException(msg, ex, this.queueName);
                }
            }
            return NO_POSTPROCESSING_AFTER_PROCESS_NEXT_MESSAGE;
        }
    }

    protected boolean isPermanentError(QueuedMessage queuedMessage, Throwable e) {
        Throwable rootCause = Exceptions.getRootCause((Throwable)e);
        return this.consumeFromQueue.getRedeliveryPolicy().isPermanentError(queuedMessage, e) || e instanceof DurableQueueDeserializationException || e instanceof ClassCastException || rootCause instanceof ClassCastException || e instanceof NoClassDefFoundError || rootCause instanceof NoClassDefFoundError || rootCause instanceof MismatchedInputException || e instanceof IllegalArgumentException || rootCause instanceof IllegalArgumentException;
    }

    @Override
    public void messageAdded(QueuedMessage queuedMessage) {
        this.queuePollingOptimizer.messageAdded(queuedMessage);
    }

    public String toString() {
        return this.getClass().getSimpleName() + "{, started=" + this.started + this.consumeFromQueue.toString() + "}";
    }
}

