/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.supervisor;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.time.Clock;
import java.time.Duration;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.metric.MetricsFacade;
import pl.allegro.tech.hermes.consumers.CommonConsumerParameters;
import pl.allegro.tech.hermes.consumers.consumer.offset.ConsumerPartitionAssignmentState;
import pl.allegro.tech.hermes.consumers.health.ConsumerMonitor;
import pl.allegro.tech.hermes.consumers.message.undelivered.UndeliveredMessageLogPersister;
import pl.allegro.tech.hermes.consumers.supervisor.ConsumerFactory;
import pl.allegro.tech.hermes.consumers.supervisor.ConsumersExecutorService;
import pl.allegro.tech.hermes.consumers.supervisor.ConsumersSupervisor;
import pl.allegro.tech.hermes.consumers.supervisor.process.ConsumerProcessFactory;
import pl.allegro.tech.hermes.consumers.supervisor.process.ConsumerProcessSupervisor;
import pl.allegro.tech.hermes.consumers.supervisor.process.Retransmitter;
import pl.allegro.tech.hermes.consumers.supervisor.process.Signal;
import pl.allegro.tech.hermes.domain.subscription.SubscriptionRepository;

public class NonblockingConsumersSupervisor
implements ConsumersSupervisor {
    private static final Logger logger = LoggerFactory.getLogger(NonblockingConsumersSupervisor.class);
    private final ConsumerProcessSupervisor backgroundProcess;
    private final UndeliveredMessageLogPersister undeliveredMessageLogPersister;
    private final Duration backgroundSupervisorInterval;
    private final SubscriptionRepository subscriptionRepository;
    private final ScheduledExecutorService scheduledExecutor;

    public NonblockingConsumersSupervisor(CommonConsumerParameters commonConsumerParameters, ConsumersExecutorService executor, ConsumerFactory consumerFactory, ConsumerPartitionAssignmentState consumerPartitionAssignmentState, Retransmitter retransmitter, UndeliveredMessageLogPersister undeliveredMessageLogPersister, SubscriptionRepository subscriptionRepository, MetricsFacade metrics, ConsumerMonitor monitor, Clock clock) {
        this.undeliveredMessageLogPersister = undeliveredMessageLogPersister;
        this.subscriptionRepository = subscriptionRepository;
        this.backgroundSupervisorInterval = commonConsumerParameters.getBackgroundSupervisor().getInterval();
        this.backgroundProcess = new ConsumerProcessSupervisor(executor, clock, metrics, new ConsumerProcessFactory(retransmitter, consumerFactory, commonConsumerParameters.getBackgroundSupervisor().getUnhealthyAfter(), clock), commonConsumerParameters.getSignalProcessingQueueSize(), commonConsumerParameters.getBackgroundSupervisor().getKillAfter());
        this.scheduledExecutor = this.createExecutorForSupervision();
        monitor.register("subscriptions", this.backgroundProcess::runningSubscriptionsStatus);
        monitor.register("subscriptionsCount", this.backgroundProcess::countRunningProcesses);
    }

    private ScheduledExecutorService createExecutorForSupervision() {
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("NonblockingConsumersSupervisor-%d").setUncaughtExceptionHandler((t, e) -> logger.error("Exception from supervisor with name {}", (Object)t.getName(), (Object)e)).build();
        return Executors.newSingleThreadScheduledExecutor(threadFactory);
    }

    @Override
    public void assignConsumerForSubscription(Subscription subscription) {
        try {
            Signal start = Signal.of(Signal.SignalType.START, subscription.getQualifiedName(), subscription);
            this.backgroundProcess.accept(start);
            if (subscription.getState() == Subscription.State.PENDING) {
                this.subscriptionRepository.updateSubscriptionState(subscription.getTopicName(), subscription.getName(), Subscription.State.ACTIVE);
            }
        }
        catch (RuntimeException e) {
            logger.error("Error during assigning subscription {} to consumer", (Object)subscription.getQualifiedName(), (Object)e);
        }
    }

    @Override
    public void deleteConsumerForSubscriptionName(SubscriptionName subscription) {
        Signal stop = Signal.of(Signal.SignalType.STOP, subscription);
        logger.info("Deleting consumer for {}. {}", (Object)subscription, (Object)stop.getLogWithIdAndType());
        this.backgroundProcess.accept(stop);
    }

    @Override
    public void updateTopic(Subscription subscription, Topic topic) {
        this.backgroundProcess.accept(Signal.of(Signal.SignalType.UPDATE_TOPIC, subscription.getQualifiedName(), topic));
    }

    @Override
    public void updateSubscription(Subscription subscription) {
        this.backgroundProcess.accept(Signal.of(Signal.SignalType.UPDATE_SUBSCRIPTION, subscription.getQualifiedName(), subscription));
    }

    @Override
    public void retransmit(SubscriptionName subscription) {
        this.backgroundProcess.accept(Signal.of(Signal.SignalType.RETRANSMIT, subscription));
    }

    @Override
    public Set<SubscriptionName> runningConsumers() {
        return this.backgroundProcess.existingConsumers();
    }

    @Override
    public void start() {
        this.scheduledExecutor.scheduleAtFixedRate(this.backgroundProcess, this.backgroundSupervisorInterval.toMillis(), this.backgroundSupervisorInterval.toMillis(), TimeUnit.MILLISECONDS);
        this.undeliveredMessageLogPersister.start();
    }

    @Override
    public void shutdown() {
        this.backgroundProcess.shutdown();
        this.scheduledExecutor.shutdown();
        this.undeliveredMessageLogPersister.shutdown();
    }
}

