/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.supervisor;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.time.Clock;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.config.ConfigFactory;
import pl.allegro.tech.hermes.common.config.Configs;
import pl.allegro.tech.hermes.common.metric.HermesMetrics;
import pl.allegro.tech.hermes.consumers.consumer.Consumer;
import pl.allegro.tech.hermes.consumers.consumer.offset.OffsetCommitter;
import pl.allegro.tech.hermes.consumers.consumer.offset.OffsetQueue;
import pl.allegro.tech.hermes.consumers.consumer.receiver.MessageCommitter;
import pl.allegro.tech.hermes.consumers.message.undelivered.UndeliveredMessageLogPersister;
import pl.allegro.tech.hermes.consumers.supervisor.ConsumerFactory;
import pl.allegro.tech.hermes.consumers.supervisor.ConsumersExecutorService;
import pl.allegro.tech.hermes.consumers.supervisor.ConsumersSupervisor;
import pl.allegro.tech.hermes.consumers.supervisor.process.ConsumerProcessSupervisor;
import pl.allegro.tech.hermes.consumers.supervisor.process.Retransmitter;
import pl.allegro.tech.hermes.consumers.supervisor.process.Signal;
import pl.allegro.tech.hermes.domain.subscription.SubscriptionRepository;

public class NonblockingConsumersSupervisor
implements ConsumersSupervisor {
    private static final Logger logger = LoggerFactory.getLogger(NonblockingConsumersSupervisor.class);
    private final ConsumerProcessSupervisor backgroundProcess;
    private final ConsumerFactory consumerFactory;
    private final UndeliveredMessageLogPersister undeliveredMessageLogPersister;
    private final ConfigFactory configs;
    private final OffsetCommitter offsetCommitter;
    private final SubscriptionRepository subscriptionRepository;
    private final ScheduledExecutorService scheduledExecutor;

    @Inject
    public NonblockingConsumersSupervisor(ConfigFactory configFactory, ConsumersExecutorService executor, ConsumerFactory consumerFactory, List<MessageCommitter> messageCommitters, OffsetQueue offsetQueue, Retransmitter retransmitter, UndeliveredMessageLogPersister undeliveredMessageLogPersister, SubscriptionRepository subscriptionRepository, HermesMetrics metrics, Clock clock) {
        this.consumerFactory = consumerFactory;
        this.undeliveredMessageLogPersister = undeliveredMessageLogPersister;
        this.subscriptionRepository = subscriptionRepository;
        this.configs = configFactory;
        this.offsetCommitter = new OffsetCommitter(offsetQueue, messageCommitters, configFactory.getIntProperty(Configs.CONSUMER_COMMIT_OFFSET_PERIOD), metrics);
        this.backgroundProcess = new ConsumerProcessSupervisor(executor, retransmitter, clock, metrics, configFactory);
        this.scheduledExecutor = this.createExecutorForSupervision();
    }

    private ScheduledExecutorService createExecutorForSupervision() {
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("NonblockingConsumersSupervisor-%d").setUncaughtExceptionHandler((t, e) -> logger.error("Exception from supervisor with name {}", (Object)t.getName(), (Object)e)).build();
        return Executors.newSingleThreadScheduledExecutor(threadFactory);
    }

    @Override
    public void assignConsumerForSubscription(Subscription subscription) {
        logger.info("Creating consumer for {}", (Object)subscription.getQualifiedName());
        try {
            Consumer consumer = this.consumerFactory.createConsumer(subscription);
            logger.info("Created consumer for {}", (Object)subscription.getQualifiedName());
            this.backgroundProcess.accept(Signal.of(Signal.SignalType.START, subscription.getQualifiedName(), consumer));
            if (subscription.getState() == Subscription.State.PENDING) {
                this.subscriptionRepository.updateSubscriptionState(subscription.getTopicName(), subscription.getName(), Subscription.State.ACTIVE);
            }
            logger.info("Consumer for {} was added for execution", (Object)subscription.getQualifiedName());
        }
        catch (Exception ex) {
            logger.error("Failed to create consumer for subscription {}", (Object)subscription.getQualifiedName(), (Object)ex);
        }
    }

    @Override
    public void deleteConsumerForSubscriptionName(SubscriptionName subscription) {
        this.backgroundProcess.accept(Signal.of(Signal.SignalType.STOP, subscription));
    }

    @Override
    public void updateTopic(Subscription subscription, Topic topic) {
        this.backgroundProcess.accept(Signal.of(Signal.SignalType.UPDATE_TOPIC, subscription.getQualifiedName(), topic));
    }

    @Override
    public void updateSubscription(Subscription subscription) {
        this.backgroundProcess.accept(Signal.of(Signal.SignalType.UPDATE_SUBSCRIPTION, subscription.getQualifiedName(), subscription));
    }

    @Override
    public void retransmit(SubscriptionName subscription) {
        this.backgroundProcess.accept(Signal.of(Signal.SignalType.RETRANSMIT, subscription));
    }

    @Override
    public void restartConsumer(SubscriptionName subscription) {
        this.backgroundProcess.accept(Signal.of(Signal.SignalType.RESTART, subscription));
    }

    @Override
    public void start() {
        this.scheduledExecutor.scheduleAtFixedRate(this.backgroundProcess, this.configs.getIntProperty(Configs.CONSUMER_BACKGROUND_SUPERVISOR_INTERVAL), this.configs.getIntProperty(Configs.CONSUMER_BACKGROUND_SUPERVISOR_INTERVAL), TimeUnit.MILLISECONDS);
        this.offsetCommitter.start();
        this.undeliveredMessageLogPersister.start();
    }

    @Override
    public void shutdown() {
        this.backgroundProcess.shutdown();
        this.scheduledExecutor.shutdown();
        this.offsetCommitter.shutdown();
        this.undeliveredMessageLogPersister.shutdown();
    }
}

