/*
 * Decompiled with CFR 0.152.
 */
package dk.cloudcreate.essentials.components.foundation.messaging.eip.store_and_forward;

import dk.cloudcreate.essentials.components.foundation.fencedlock.FencedLock;
import dk.cloudcreate.essentials.components.foundation.fencedlock.FencedLockManager;
import dk.cloudcreate.essentials.components.foundation.fencedlock.LockCallback;
import dk.cloudcreate.essentials.components.foundation.messaging.eip.store_and_forward.Inbox;
import dk.cloudcreate.essentials.components.foundation.messaging.eip.store_and_forward.InboxConfig;
import dk.cloudcreate.essentials.components.foundation.messaging.eip.store_and_forward.InboxName;
import dk.cloudcreate.essentials.components.foundation.messaging.eip.store_and_forward.MessageConsumptionMode;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueueConsumer;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueues;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.Message;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.MessageMetaData;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueueName;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueuedMessage;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.TransactionalMode;
import dk.cloudcreate.essentials.reactive.command.CommandBus;
import dk.cloudcreate.essentials.shared.FailFast;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;

public interface Inboxes {
    public Inbox getOrCreateInbox(InboxConfig var1);

    public Inbox getOrCreateInbox(InboxConfig var1, Consumer<Message> var2);

    default public Inbox getOrCreateInbox(InboxConfig inboxConfig, CommandBus forwardTo) {
        FailFast.requireNonNull((Object)forwardTo, (String)"No forwardTo command bus provided");
        return this.getOrCreateInbox(inboxConfig, (Message message) -> forwardTo.send(message.getPayload()));
    }

    public Collection<Inbox> getInboxes();

    public static Inboxes durableQueueBasedInboxes(DurableQueues durableQueues, FencedLockManager fencedLockManager) {
        return new DurableQueueBasedInboxes(durableQueues, fencedLockManager);
    }

    public static class DurableQueueBasedInboxes
    implements Inboxes {
        private final DurableQueues durableQueues;
        private final FencedLockManager fencedLockManager;
        private ConcurrentMap<InboxName, Inbox> inboxes = new ConcurrentHashMap<InboxName, Inbox>();

        public DurableQueueBasedInboxes(DurableQueues durableQueues, FencedLockManager fencedLockManager) {
            this.durableQueues = (DurableQueues)FailFast.requireNonNull((Object)durableQueues, (String)"No durableQueues instance provided");
            this.fencedLockManager = (FencedLockManager)FailFast.requireNonNull((Object)fencedLockManager, (String)"No fencedLockManager instance provided");
        }

        @Override
        public Inbox getOrCreateInbox(InboxConfig inboxConfig, Consumer<Message> messageConsumer) {
            FailFast.requireNonNull((Object)inboxConfig, (String)"No inboxConfig provided");
            return this.inboxes.computeIfAbsent(inboxConfig.getInboxName(), inboxName_ -> new DurableQueueBasedInbox(inboxConfig, messageConsumer));
        }

        @Override
        public Inbox getOrCreateInbox(InboxConfig inboxConfig) {
            FailFast.requireNonNull((Object)inboxConfig, (String)"No inboxConfig provided");
            return this.inboxes.computeIfAbsent(inboxConfig.getInboxName(), inboxName_ -> new DurableQueueBasedInbox(inboxConfig));
        }

        @Override
        public Collection<Inbox> getInboxes() {
            return this.inboxes.values();
        }

        public class DurableQueueBasedInbox
        implements Inbox {
            private Consumer<Message> messageConsumer;
            public final QueueName inboxQueueName;
            public final InboxConfig config;
            private DurableQueueConsumer durableQueueConsumer;

            public DurableQueueBasedInbox(InboxConfig config, Consumer<Message> messageConsumer) {
                this(config);
                this.consume(messageConsumer);
            }

            public DurableQueueBasedInbox(InboxConfig config) {
                this.config = (InboxConfig)FailFast.requireNonNull((Object)config, (String)"No inbox config provided");
                this.inboxQueueName = config.inboxName.asQueueName();
            }

            @Override
            public Inbox consume(Consumer<Message> messageConsumer) {
                if (this.messageConsumer != null) {
                    throw new IllegalStateException("Inbox already has a message consumer");
                }
                this.setMessageConsumer(messageConsumer);
                this.startConsuming();
                return this;
            }

            @Override
            public Inbox setMessageConsumer(Consumer<Message> messageConsumer) {
                this.messageConsumer = (Consumer)FailFast.requireNonNull(messageConsumer, (String)"No messageConsumer provided");
                return this;
            }

            @Override
            public Inbox startConsuming() {
                if (this.messageConsumer == null) {
                    throw new IllegalStateException("No message consumer specified. Please call #setMessageConsumer");
                }
                switch (this.config.messageConsumptionMode) {
                    case SingleGlobalConsumer: {
                        DurableQueueBasedInboxes.this.fencedLockManager.acquireLockAsync(this.config.inboxName.asLockName(), LockCallback.builder().onLockAcquired(lock -> {
                            this.durableQueueConsumer = this.consumeFromDurableQueue((FencedLock)lock);
                        }).onLockReleased(lock -> this.durableQueueConsumer.cancel()).build());
                        break;
                    }
                    case GlobalCompetingConsumers: {
                        this.durableQueueConsumer = this.consumeFromDurableQueue(null);
                        break;
                    }
                    default: {
                        throw new IllegalStateException("Unexpected messageConsumptionMode: " + this.config.messageConsumptionMode);
                    }
                }
                return this;
            }

            @Override
            public boolean hasAMessageConsumer() {
                return this.messageConsumer != null;
            }

            @Override
            public boolean isConsumingMessages() {
                return this.durableQueueConsumer != null;
            }

            @Override
            public Inbox stopConsuming() {
                if (this.messageConsumer != null) {
                    switch (this.config.messageConsumptionMode) {
                        case SingleGlobalConsumer: {
                            DurableQueueBasedInboxes.this.fencedLockManager.cancelAsyncLockAcquiring(this.config.inboxName.asLockName());
                            break;
                        }
                        case GlobalCompetingConsumers: {
                            if (this.durableQueueConsumer == null) break;
                            this.durableQueueConsumer.cancel();
                            this.durableQueueConsumer = null;
                            break;
                        }
                        default: {
                            throw new IllegalStateException("Unexpected messageConsumptionMode: " + this.config.messageConsumptionMode);
                        }
                    }
                    this.messageConsumer = null;
                }
                return this;
            }

            @Override
            public InboxName name() {
                return this.config.inboxName;
            }

            @Override
            public Inbox addMessageReceived(Message message) {
                if (DurableQueueBasedInboxes.this.durableQueues.getTransactionalMode() == TransactionalMode.FullyTransactional) {
                    DurableQueueBasedInboxes.this.durableQueues.getUnitOfWorkFactory().get().usingUnitOfWork(() -> DurableQueueBasedInboxes.this.durableQueues.queueMessage(this.inboxQueueName, message));
                } else {
                    DurableQueueBasedInboxes.this.durableQueues.queueMessage(this.inboxQueueName, message);
                }
                return this;
            }

            private DurableQueueConsumer consumeFromDurableQueue(FencedLock lock) {
                return DurableQueueBasedInboxes.this.durableQueues.consumeFromQueue(this.inboxQueueName, this.config.redeliveryPolicy, this.config.numberOfParallelMessageConsumers, queuedMessage -> {
                    if (this.config.messageConsumptionMode == MessageConsumptionMode.SingleGlobalConsumer) {
                        queuedMessage.getMetaData().put(MessageMetaData.FENCED_LOCK_TOKEN, lock.getCurrentToken().toString());
                    }
                    this.handleMessage(queuedMessage);
                });
            }

            private void handleMessage(QueuedMessage queuedMessage) {
                if (DurableQueueBasedInboxes.this.durableQueues.getUnitOfWorkFactory().isPresent()) {
                    DurableQueueBasedInboxes.this.durableQueues.getUnitOfWorkFactory().get().usingUnitOfWork(() -> this.messageConsumer.accept(queuedMessage.getMessage()));
                } else {
                    this.messageConsumer.accept(queuedMessage.getMessage());
                }
            }

            @Override
            public long getNumberOfUndeliveredMessages() {
                return DurableQueueBasedInboxes.this.durableQueues.getTotalMessagesQueuedFor(this.inboxQueueName);
            }

            public String toString() {
                return "DurableQueueBasedInbox{config=" + this.config + ", inboxQueueName=" + this.inboxQueueName + "}";
            }
        }
    }
}

