/*
 * Decompiled with CFR 0.152.
 */
package dk.cloudcreate.essentials.components.foundation.messaging.queue;

import dk.cloudcreate.essentials.components.foundation.messaging.RedeliveryPolicy;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.BatchMessageFetchingCapableDurableQueues;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.CentralizedMessageFetcher;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueueConsumer;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueueName;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueuedMessageHandler;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.ConsumeFromQueue;
import dk.cloudcreate.essentials.shared.FailFast;
import dk.cloudcreate.essentials.shared.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CentralizedMessageFetcherDurableQueueConsumer
implements DurableQueueConsumer {
    private static final Logger log = LoggerFactory.getLogger(CentralizedMessageFetcherDurableQueueConsumer.class);
    private final QueueName queueName;
    private final String consumerName;
    private final QueuedMessageHandler messageHandler;
    private final AtomicBoolean started;
    private final RedeliveryPolicy redeliveryPolicy;
    private final Consumer<DurableQueueConsumer> removeDurableQueueConsumer;
    private final ExecutorService workerPool;
    private final int parallelConsumers;
    private final CentralizedMessageFetcher centralizedMessageFetcher;

    public CentralizedMessageFetcherDurableQueueConsumer(ConsumeFromQueue consumeFromQueue, BatchMessageFetchingCapableDurableQueues durableQueues, Consumer<DurableQueueConsumer> removeDurableQueueConsumer, CentralizedMessageFetcher centralizedMessageFetcher) {
        FailFast.requireNonNull((Object)consumeFromQueue, (String)"No consumeFromQueue provided");
        FailFast.requireNonNull((Object)durableQueues, (String)"No durableQueues provided");
        this.queueName = consumeFromQueue.queueName;
        this.consumerName = consumeFromQueue.consumerName;
        this.messageHandler = consumeFromQueue.queueMessageHandler;
        this.redeliveryPolicy = consumeFromQueue.getRedeliveryPolicy();
        this.removeDurableQueueConsumer = (Consumer)FailFast.requireNonNull(removeDurableQueueConsumer, (String)"No removeDurableQueueConsumer provided");
        this.started = new AtomicBoolean(false);
        this.parallelConsumers = consumeFromQueue.getParallelConsumers();
        this.centralizedMessageFetcher = (CentralizedMessageFetcher)FailFast.requireNonNull((Object)centralizedMessageFetcher, (String)"No centralizedMessageFetcher provided");
        this.workerPool = consumeFromQueue.getConsumerExecutorService().orElseGet(() -> Executors.newScheduledThreadPool(consumeFromQueue.getParallelConsumers(), new ThreadFactoryBuilder().nameFormat("Queue-" + String.valueOf((Object)this.queueName) + "-Worker-%d").daemon(true).build()));
        log.info("[{}] '{}' - Created CentralizedMessageFetcherDurableQueueConsumer with {} workers", new Object[]{this.queueName, this.consumerName, this.parallelConsumers});
    }

    public void start() {
        if (this.started.compareAndSet(false, true)) {
            log.info("[{}] '{}' - Starting with {} workers", new Object[]{this.queueName, this.consumerName, this.parallelConsumers});
            this.centralizedMessageFetcher.registerConsumer(this.queueName, this, this.messageHandler, this.workerPool, this.parallelConsumers);
        }
    }

    public void stop() {
        if (this.started.compareAndSet(true, false)) {
            log.info("[{}] '{}' - Stopping", (Object)this.queueName, (Object)this.consumerName);
            if (!this.workerPool.isShutdown()) {
                this.workerPool.shutdown();
            }
            this.removeDurableQueueConsumer.accept(this);
            if (this.centralizedMessageFetcher.containsConsumerFor(this.queueName)) {
                this.centralizedMessageFetcher.unregisterConsumer(this.queueName);
            }
            log.info("[{}] '{}' - Stopped", (Object)this.queueName, (Object)this.consumerName);
        }
    }

    public boolean isStarted() {
        return this.started.get();
    }

    @Override
    public QueueName queueName() {
        return this.queueName;
    }

    @Override
    public String consumerName() {
        return this.consumerName;
    }

    @Override
    public void cancel() {
        this.stop();
    }

    @Override
    public RedeliveryPolicy getRedeliveryPolicy() {
        return this.redeliveryPolicy;
    }

    public String toString() {
        return "CentralizedMessageFetcherDurableQueueConsumer{queueName=" + String.valueOf((Object)this.queueName) + ", consumerName='" + this.consumerName + "', started=" + String.valueOf(this.started) + "}";
    }
}

