/*
 * Decompiled with CFR 0.152.
 */
package dk.cloudcreate.essentials.components.foundation.messaging.eip.store_and_forward;

import dk.cloudcreate.essentials.components.foundation.fencedlock.FencedLock;
import dk.cloudcreate.essentials.components.foundation.fencedlock.FencedLockManager;
import dk.cloudcreate.essentials.components.foundation.fencedlock.LockCallback;
import dk.cloudcreate.essentials.components.foundation.messaging.eip.store_and_forward.MessageConsumptionMode;
import dk.cloudcreate.essentials.components.foundation.messaging.eip.store_and_forward.Outbox;
import dk.cloudcreate.essentials.components.foundation.messaging.eip.store_and_forward.OutboxConfig;
import dk.cloudcreate.essentials.components.foundation.messaging.eip.store_and_forward.OutboxName;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueueConsumer;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueues;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.Message;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.MessageMetaData;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueueName;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueuedMessage;
import dk.cloudcreate.essentials.reactive.EventHandler;
import dk.cloudcreate.essentials.shared.FailFast;
import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;

public interface Outboxes {
    public Outbox getOrCreateOutbox(OutboxConfig var1, Consumer<Message> var2);

    public Outbox getOrCreateOutbox(OutboxConfig var1);

    default public Outbox getOrCreateForwardingOutbox(OutboxConfig outboxConfig, EventHandler eventHandler) {
        FailFast.requireNonNull((Object)eventHandler, (String)"No eventHandler provided");
        return this.getOrCreateOutbox(outboxConfig, message -> eventHandler.handle(message.getPayload()));
    }

    public Collection<Outbox> getOutboxes();

    public static Outboxes durableQueueBasedOutboxes(DurableQueues durableQueues, FencedLockManager fencedLockManager) {
        return new DurableQueueBasedOutboxes(durableQueues, fencedLockManager);
    }

    public static class DurableQueueBasedOutboxes
    implements Outboxes {
        private final DurableQueues durableQueues;
        private final FencedLockManager fencedLockManager;
        private ConcurrentMap<OutboxName, Outbox> outboxes = new ConcurrentHashMap<OutboxName, Outbox>();

        public DurableQueueBasedOutboxes(DurableQueues durableQueues, FencedLockManager fencedLockManager) {
            this.durableQueues = (DurableQueues)FailFast.requireNonNull((Object)durableQueues, (String)"No durableQueues instance provided");
            this.fencedLockManager = (FencedLockManager)FailFast.requireNonNull((Object)fencedLockManager, (String)"No fencedLockManager instance provided");
        }

        @Override
        public Outbox getOrCreateOutbox(OutboxConfig outboxConfig) {
            FailFast.requireNonNull((Object)((Object)outboxConfig.getOutboxName()), (String)"No outboxName provided");
            return this.outboxes.computeIfAbsent(outboxConfig.getOutboxName(), outboxName_ -> new DurableQueueBasedOutbox(outboxConfig));
        }

        @Override
        public Outbox getOrCreateOutbox(OutboxConfig outboxConfig, Consumer<Message> messageConsumer) {
            FailFast.requireNonNull((Object)((Object)outboxConfig.getOutboxName()), (String)"No outboxName provided");
            return this.outboxes.computeIfAbsent(outboxConfig.getOutboxName(), outboxName_ -> new DurableQueueBasedOutbox(outboxConfig, messageConsumer));
        }

        @Override
        public Collection<Outbox> getOutboxes() {
            return this.outboxes.values();
        }

        public class DurableQueueBasedOutbox
        implements Outbox {
            private Consumer<Message> messageConsumer;
            public final QueueName outboxQueueName;
            public final OutboxConfig config;
            private DurableQueueConsumer durableQueueConsumer;

            public DurableQueueBasedOutbox(OutboxConfig config, Consumer<Message> messageConsumer) {
                this(config);
                this.consume(messageConsumer);
            }

            public DurableQueueBasedOutbox(OutboxConfig config) {
                this.config = (OutboxConfig)FailFast.requireNonNull((Object)config, (String)"No outboxConfig provided");
                this.outboxQueueName = config.outboxName.asQueueName();
            }

            @Override
            public Outbox setMessageConsumer(Consumer<Message> messageConsumer) {
                this.messageConsumer = (Consumer)FailFast.requireNonNull(messageConsumer, (String)"No messageConsumer provided");
                return this;
            }

            @Override
            public Outbox consume(Consumer<Message> messageConsumer) {
                if (this.messageConsumer != null) {
                    throw new IllegalStateException("Outbox already has a message consumer");
                }
                this.setMessageConsumer(messageConsumer);
                this.startConsuming();
                return this;
            }

            @Override
            public boolean hasAMessageConsumer() {
                return this.messageConsumer != null;
            }

            @Override
            public boolean isConsumingMessages() {
                return this.durableQueueConsumer != null;
            }

            @Override
            public Outbox startConsuming() {
                if (this.messageConsumer == null) {
                    throw new IllegalStateException("No message consumer specified. Please call #setMessageConsumer");
                }
                switch (this.config.messageConsumptionMode) {
                    case SingleGlobalConsumer: {
                        DurableQueueBasedOutboxes.this.fencedLockManager.acquireLockAsync(this.config.outboxName.asLockName(), LockCallback.builder().onLockAcquired(lock -> {
                            this.durableQueueConsumer = this.consumeFromDurableQueue((FencedLock)lock);
                        }).onLockReleased(lock -> this.durableQueueConsumer.cancel()).build());
                        break;
                    }
                    case GlobalCompetingConsumers: {
                        this.durableQueueConsumer = this.consumeFromDurableQueue(null);
                        break;
                    }
                    default: {
                        throw new IllegalStateException("Unexpected messageConsumptionMode: " + this.config.messageConsumptionMode);
                    }
                }
                return this;
            }

            @Override
            public Outbox stopConsuming() {
                if (this.messageConsumer != null) {
                    switch (this.config.messageConsumptionMode) {
                        case SingleGlobalConsumer: {
                            DurableQueueBasedOutboxes.this.fencedLockManager.cancelAsyncLockAcquiring(this.config.outboxName.asLockName());
                            break;
                        }
                        case GlobalCompetingConsumers: {
                            if (this.durableQueueConsumer == null) break;
                            this.durableQueueConsumer.cancel();
                            this.durableQueueConsumer = null;
                            break;
                        }
                        default: {
                            throw new IllegalStateException("Unexpected messageConsumptionMode: " + this.config.messageConsumptionMode);
                        }
                    }
                    this.messageConsumer = null;
                }
                return this;
            }

            @Override
            public OutboxName name() {
                return this.config.outboxName;
            }

            @Override
            public Outbox sendMessage(Message payload) {
                DurableQueueBasedOutboxes.this.durableQueues.queueMessage(this.outboxQueueName, payload);
                return this;
            }

            @Override
            public Outbox sendMessage(Message payload, Duration deliveryDelay) {
                DurableQueueBasedOutboxes.this.durableQueues.queueMessage(this.outboxQueueName, payload, deliveryDelay);
                return this;
            }

            private DurableQueueConsumer consumeFromDurableQueue(FencedLock lock) {
                return DurableQueueBasedOutboxes.this.durableQueues.consumeFromQueue(this.outboxQueueName, this.config.redeliveryPolicy, this.config.numberOfParallelMessageConsumers, queuedMessage -> {
                    if (this.config.messageConsumptionMode == MessageConsumptionMode.SingleGlobalConsumer) {
                        queuedMessage.getMetaData().put(MessageMetaData.FENCED_LOCK_TOKEN, lock.getCurrentToken().toString());
                    }
                    this.handleMessage(queuedMessage);
                });
            }

            private void handleMessage(QueuedMessage queuedMessage) {
                if (DurableQueueBasedOutboxes.this.durableQueues.getUnitOfWorkFactory().isPresent()) {
                    DurableQueueBasedOutboxes.this.durableQueues.getUnitOfWorkFactory().get().usingUnitOfWork(() -> this.messageConsumer.accept(queuedMessage.getMessage()));
                } else {
                    this.messageConsumer.accept(queuedMessage.getMessage());
                }
            }

            @Override
            public long getNumberOfOutgoingMessages() {
                return DurableQueueBasedOutboxes.this.durableQueues.getTotalMessagesQueuedFor(this.outboxQueueName);
            }

            public String toString() {
                return "DurableQueueBasedOutbox{config=" + this.config + ", outboxQueueName=" + this.outboxQueueName + "}";
            }
        }
    }
}

