/*
 * Decompiled with CFR 0.152.
 */
package dk.cloudcreate.essentials.components.foundation.messaging.eip.store_and_forward;

import dk.cloudcreate.essentials.components.foundation.fencedlock.FencedLock;
import dk.cloudcreate.essentials.components.foundation.fencedlock.FencedLockManager;
import dk.cloudcreate.essentials.components.foundation.fencedlock.LockCallback;
import dk.cloudcreate.essentials.components.foundation.fencedlock.LockName;
import dk.cloudcreate.essentials.components.foundation.messaging.eip.store_and_forward.Inbox;
import dk.cloudcreate.essentials.components.foundation.messaging.eip.store_and_forward.InboxConfig;
import dk.cloudcreate.essentials.components.foundation.messaging.eip.store_and_forward.InboxName;
import dk.cloudcreate.essentials.components.foundation.messaging.eip.store_and_forward.MessageConsumptionMode;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueueConsumer;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueues;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.Message;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.MessageMetaData;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueueName;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueuedMessage;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.TransactionalMode;
import dk.cloudcreate.essentials.reactive.command.CommandBus;
import dk.cloudcreate.essentials.shared.FailFast;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public interface Inboxes {
    public Inbox getOrCreateInbox(InboxConfig var1);

    public Inbox getOrCreateInbox(InboxConfig var1, Consumer<Message> var2);

    default public Inbox getOrCreateInbox(InboxConfig inboxConfig, CommandBus forwardTo) {
        FailFast.requireNonNull((Object)forwardTo, (String)"No forwardTo command bus provided");
        return this.getOrCreateInbox(inboxConfig, (Message message) -> forwardTo.send(message.getPayload()));
    }

    public Collection<Inbox> getInboxes();

    public static Inboxes durableQueueBasedInboxes(DurableQueues durableQueues, FencedLockManager fencedLockManager) {
        return new DurableQueueBasedInboxes(durableQueues, fencedLockManager);
    }

    public static class DurableQueueBasedInboxes
    implements Inboxes {
        private final DurableQueues durableQueues;
        private final FencedLockManager fencedLockManager;
        private ConcurrentMap<InboxName, Inbox> inboxes = new ConcurrentHashMap<InboxName, Inbox>();

        public DurableQueueBasedInboxes(DurableQueues durableQueues, FencedLockManager fencedLockManager) {
            this.durableQueues = (DurableQueues)FailFast.requireNonNull((Object)durableQueues, (String)"No durableQueues instance provided");
            this.fencedLockManager = (FencedLockManager)FailFast.requireNonNull((Object)fencedLockManager, (String)"No fencedLockManager instance provided");
        }

        @Override
        public Inbox getOrCreateInbox(InboxConfig inboxConfig, Consumer<Message> messageConsumer) {
            FailFast.requireNonNull((Object)inboxConfig, (String)"No inboxConfig provided");
            return this.inboxes.computeIfAbsent(inboxConfig.getInboxName(), inboxName_ -> new DurableQueueBasedInbox(inboxConfig, messageConsumer));
        }

        @Override
        public Inbox getOrCreateInbox(InboxConfig inboxConfig) {
            FailFast.requireNonNull((Object)inboxConfig, (String)"No inboxConfig provided");
            return this.inboxes.computeIfAbsent(inboxConfig.getInboxName(), inboxName_ -> new DurableQueueBasedInbox(inboxConfig));
        }

        @Override
        public Collection<Inbox> getInboxes() {
            return this.inboxes.values();
        }

        public class DurableQueueBasedInbox
        implements Inbox {
            private static final Logger log = LoggerFactory.getLogger(DurableQueueBasedInbox.class);
            private Consumer<Message> messageConsumer;
            public final QueueName inboxQueueName;
            public final InboxConfig config;
            private DurableQueueConsumer durableQueueConsumer;

            public DurableQueueBasedInbox(InboxConfig config, Consumer<Message> messageConsumer) {
                this(config);
                this.consume(messageConsumer);
            }

            public DurableQueueBasedInbox(InboxConfig config) {
                this.config = (InboxConfig)FailFast.requireNonNull((Object)config, (String)"No inbox config provided");
                this.inboxQueueName = config.inboxName.asQueueName();
            }

            @Override
            public Inbox consume(Consumer<Message> messageConsumer) {
                if (this.messageConsumer != null) {
                    throw new IllegalStateException("Inbox already has a message consumer");
                }
                this.setMessageConsumer(messageConsumer);
                this.startConsuming();
                return this;
            }

            @Override
            public Inbox setMessageConsumer(Consumer<Message> messageConsumer) {
                this.messageConsumer = (Consumer)FailFast.requireNonNull(messageConsumer, (String)"No messageConsumer provided");
                return this;
            }

            @Override
            public Inbox startConsuming() {
                if (this.messageConsumer == null) {
                    throw new IllegalStateException("No message consumer specified. Please call #setMessageConsumer");
                }
                log.info("Starting Consuming from Inbox '{}'", (Object)this.config.inboxName);
                switch (this.config.messageConsumptionMode) {
                    case SingleGlobalConsumer: {
                        LockName lockName = this.config.inboxName.asLockName();
                        log.info("Creating FencedLock '{}' for Consumer for Inbox '{}'", (Object)lockName, (Object)this.config.inboxName);
                        DurableQueueBasedInboxes.this.fencedLockManager.acquireLockAsync(lockName, LockCallback.builder().onLockAcquired(lock -> {
                            log.info("FencedLock '{}' for Inbox '{}' was ACQUIRED - will start Exclusive DurableQueueConsumer", (Object)lockName, (Object)this.config.inboxName);
                            this.durableQueueConsumer = this.consumeFromDurableQueue((FencedLock)lock);
                            log.info("Exclusive DurableQueueConsumer for Inbox '{}': {}", (Object)this.config.inboxName, (Object)this.durableQueueConsumer);
                        }).onLockReleased(lock -> {
                            if (this.durableQueueConsumer != null) {
                                log.info("FencedLock '{}' for Inbox '{}' was RELEASED - will stop Exclusive DurableQueueConsumer: {}", new Object[]{lockName, this.config.inboxName, this.durableQueueConsumer});
                                this.durableQueueConsumer.cancel();
                                log.info("Stopped Exclusive DurableQueueConsumer for Inbox '{}': {}", (Object)this.config.inboxName, (Object)this.durableQueueConsumer);
                            } else {
                                log.warn("FencedLock '{}' for Inbox '{}' was RELEASED - didn't find an Exclusive DurableQueueConsumer!", (Object)lockName, (Object)this.config.inboxName);
                            }
                        }).build());
                        break;
                    }
                    case GlobalCompetingConsumers: {
                        log.info("Starting Non-Exclusive DurableQueueConsumer for Inbox '{}'", (Object)this.config.inboxName);
                        this.durableQueueConsumer = this.consumeFromDurableQueue(null);
                        log.info("Non-Exclusive DurableQueueConsumer for Inbox '{}': {}", (Object)this.config.inboxName, (Object)this.durableQueueConsumer);
                        break;
                    }
                    default: {
                        throw new IllegalStateException("Unexpected messageConsumptionMode: " + String.valueOf((Object)this.config.messageConsumptionMode));
                    }
                }
                return this;
            }

            @Override
            public boolean hasAMessageConsumer() {
                return this.messageConsumer != null;
            }

            @Override
            public boolean isConsumingMessages() {
                return this.durableQueueConsumer != null;
            }

            @Override
            public Inbox stopConsuming() {
                if (this.messageConsumer != null) {
                    log.info("Stop Consuming from Inbox '{}'", (Object)this.config.inboxName);
                    switch (this.config.messageConsumptionMode) {
                        case SingleGlobalConsumer: {
                            LockName lockName = this.config.inboxName.asLockName();
                            log.info("CancelAsyncLockAcquiring FencedLock '{}' for Inbox '{}'", (Object)lockName, (Object)this.config.inboxName);
                            DurableQueueBasedInboxes.this.fencedLockManager.cancelAsyncLockAcquiring(lockName);
                            break;
                        }
                        case GlobalCompetingConsumers: {
                            if (this.durableQueueConsumer == null) break;
                            log.info("Stopping Non-Exclusive DurableQueueConsumer for Inbox '{}': {}", (Object)this.config.inboxName, (Object)this.durableQueueConsumer);
                            this.durableQueueConsumer.cancel();
                            this.durableQueueConsumer = null;
                            break;
                        }
                        default: {
                            throw new IllegalStateException("Unexpected messageConsumptionMode: " + String.valueOf((Object)this.config.messageConsumptionMode));
                        }
                    }
                    this.messageConsumer = null;
                }
                return this;
            }

            @Override
            public InboxName name() {
                return this.config.inboxName;
            }

            @Override
            public void deleteAllMessages() {
                DurableQueueBasedInboxes.this.durableQueues.purgeQueue(this.inboxQueueName);
            }

            @Override
            public Inbox addMessageReceived(Message message) {
                if (DurableQueueBasedInboxes.this.durableQueues.getTransactionalMode() == TransactionalMode.FullyTransactional) {
                    DurableQueueBasedInboxes.this.durableQueues.getUnitOfWorkFactory().get().usingUnitOfWork(() -> DurableQueueBasedInboxes.this.durableQueues.queueMessage(this.inboxQueueName, message));
                } else {
                    DurableQueueBasedInboxes.this.durableQueues.queueMessage(this.inboxQueueName, message);
                }
                return this;
            }

            @Override
            public Inbox addMessageReceived(Message message, Duration deliveryDelay) {
                if (DurableQueueBasedInboxes.this.durableQueues.getTransactionalMode() == TransactionalMode.FullyTransactional) {
                    DurableQueueBasedInboxes.this.durableQueues.getUnitOfWorkFactory().get().usingUnitOfWork(() -> DurableQueueBasedInboxes.this.durableQueues.queueMessage(this.inboxQueueName, message, deliveryDelay));
                } else {
                    DurableQueueBasedInboxes.this.durableQueues.queueMessage(this.inboxQueueName, message, deliveryDelay);
                }
                return this;
            }

            @Override
            public Inbox addMessagesReceived(List<Message> messages) {
                if (DurableQueueBasedInboxes.this.durableQueues.getTransactionalMode() == TransactionalMode.FullyTransactional) {
                    DurableQueueBasedInboxes.this.durableQueues.getUnitOfWorkFactory().get().usingUnitOfWork(() -> DurableQueueBasedInboxes.this.durableQueues.queueMessages(this.inboxQueueName, messages));
                } else {
                    DurableQueueBasedInboxes.this.durableQueues.queueMessages(this.inboxQueueName, messages);
                }
                return this;
            }

            @Override
            public Inbox addMessagesReceived(List<Message> messages, Duration deliveryDelay) {
                if (DurableQueueBasedInboxes.this.durableQueues.getTransactionalMode() == TransactionalMode.FullyTransactional) {
                    DurableQueueBasedInboxes.this.durableQueues.getUnitOfWorkFactory().get().usingUnitOfWork(() -> DurableQueueBasedInboxes.this.durableQueues.queueMessages(this.inboxQueueName, (List<? extends Message>)messages, deliveryDelay));
                } else {
                    DurableQueueBasedInboxes.this.durableQueues.queueMessages(this.inboxQueueName, messages, deliveryDelay);
                }
                return this;
            }

            private DurableQueueConsumer consumeFromDurableQueue(FencedLock lock) {
                return DurableQueueBasedInboxes.this.durableQueues.consumeFromQueue(this.inboxQueueName, this.config.redeliveryPolicy, this.config.numberOfParallelMessageConsumers, queuedMessage -> {
                    if (this.config.messageConsumptionMode == MessageConsumptionMode.SingleGlobalConsumer) {
                        queuedMessage.getMetaData().put(MessageMetaData.FENCED_LOCK_TOKEN, lock.getCurrentToken().toString());
                    }
                    this.handleMessage(queuedMessage);
                });
            }

            private void handleMessage(QueuedMessage queuedMessage) {
                if (DurableQueueBasedInboxes.this.durableQueues.getUnitOfWorkFactory().isPresent()) {
                    DurableQueueBasedInboxes.this.durableQueues.getUnitOfWorkFactory().get().usingUnitOfWork(() -> this.messageConsumer.accept(queuedMessage.getMessage()));
                } else {
                    this.messageConsumer.accept(queuedMessage.getMessage());
                }
            }

            @Override
            public long getNumberOfUndeliveredMessages() {
                return DurableQueueBasedInboxes.this.durableQueues.getTotalMessagesQueuedFor(this.inboxQueueName);
            }

            public String toString() {
                return "DurableQueueBasedInbox{config=" + String.valueOf(this.config) + ", inboxQueueName=" + String.valueOf((Object)this.inboxQueueName) + "}";
            }
        }
    }
}

