/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.supervisor;

import java.util.List;
import java.util.Optional;
import javax.inject.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.api.TopicName;
import pl.allegro.tech.hermes.common.config.ConfigFactory;
import pl.allegro.tech.hermes.common.config.Configs;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets;
import pl.allegro.tech.hermes.common.kafka.offset.SubscriptionOffsetChangeIndicator;
import pl.allegro.tech.hermes.common.metric.HermesMetrics;
import pl.allegro.tech.hermes.consumers.consumer.Consumer;
import pl.allegro.tech.hermes.consumers.consumer.offset.OffsetCommitter;
import pl.allegro.tech.hermes.consumers.consumer.offset.OffsetsStorage;
import pl.allegro.tech.hermes.consumers.consumer.receiver.MessageCommitter;
import pl.allegro.tech.hermes.consumers.message.undelivered.UndeliveredMessageLogPersister;
import pl.allegro.tech.hermes.consumers.supervisor.CloseableSubscriptionLock;
import pl.allegro.tech.hermes.consumers.supervisor.ConsumerFactory;
import pl.allegro.tech.hermes.consumers.supervisor.ConsumerHolder;
import pl.allegro.tech.hermes.consumers.supervisor.ConsumersExecutorService;
import pl.allegro.tech.hermes.consumers.supervisor.SubscriptionLocks;
import pl.allegro.tech.hermes.domain.subscription.SubscriptionRepository;
import pl.allegro.tech.hermes.domain.topic.TopicRepository;

public class ConsumersSupervisor {
    private static final Logger logger = LoggerFactory.getLogger(ConsumersSupervisor.class);
    private final SubscriptionRepository subscriptionRepository;
    private final TopicRepository topicRepository;
    private final SubscriptionOffsetChangeIndicator subscriptionOffsetChangeIndicator;
    private final ConsumersExecutorService executor;
    private final ConsumerFactory consumerFactory;
    private final List<OffsetsStorage> offsetsStorages;
    private final List<MessageCommitter> messageCommitters;
    private final HermesMetrics hermesMetrics;
    private final OffsetCommitter offsetCommitter;
    private final ConsumerHolder consumerHolder;
    private final SubscriptionLocks subscriptionsLocks;
    private final String brokersClusterName;
    private final UndeliveredMessageLogPersister undeliveredMessageLogPersister;

    @Inject
    public ConsumersSupervisor(ConfigFactory configFactory, SubscriptionRepository subscriptionRepository, TopicRepository topicRepository, SubscriptionOffsetChangeIndicator subscriptionOffsetChangeIndicator, ConsumersExecutorService executor, ConsumerFactory consumerFactory, List<MessageCommitter> messageCommitters, List<OffsetsStorage> offsetsStorages, HermesMetrics hermesMetrics, UndeliveredMessageLogPersister undeliveredMessageLogPersister) {
        this.subscriptionRepository = subscriptionRepository;
        this.topicRepository = topicRepository;
        this.subscriptionOffsetChangeIndicator = subscriptionOffsetChangeIndicator;
        this.executor = executor;
        this.consumerFactory = consumerFactory;
        this.offsetsStorages = offsetsStorages;
        this.messageCommitters = messageCommitters;
        this.hermesMetrics = hermesMetrics;
        this.undeliveredMessageLogPersister = undeliveredMessageLogPersister;
        this.subscriptionsLocks = new SubscriptionLocks();
        this.consumerHolder = new ConsumerHolder();
        this.offsetCommitter = new OffsetCommitter(this.consumerHolder, messageCommitters, configFactory);
        this.brokersClusterName = configFactory.getStringProperty(Configs.KAFKA_CLUSTER_NAME);
    }

    public void assignConsumerForSubscription(Subscription subscription) {
        try (CloseableSubscriptionLock subscriptionLock = this.subscriptionsLocks.lock(subscription);){
            if (subscription.getState() == Subscription.State.PENDING) {
                this.createAndExecuteConsumerIfNotExists(subscription);
                this.activateSubscription(subscription);
            } else if (subscription.getState() == Subscription.State.ACTIVE) {
                this.createAndExecuteConsumerIfNotExists(subscription);
            } else {
                logger.info("Got subscription created event for inactive subscription {}", (Object)subscription.getId());
            }
        }
        catch (Exception e) {
            logger.error("Failed to create subscription " + subscription.getName(), (Throwable)e);
        }
    }

    public void deleteConsumerForSubscriptionName(SubscriptionName subscription) {
        try (CloseableSubscriptionLock subscriptionLock = this.subscriptionsLocks.lock(subscription);){
            this.deleteConsumerIfExists(subscription, true);
            this.hermesMetrics.removeMetrics(subscription);
        }
        catch (Exception e) {
            logger.error("Failed to remove subscription " + subscription.getId(), (Throwable)e);
        }
    }

    @Deprecated
    public void notifyConsumerOnSubscriptionUpdate(Subscription modifiedSubscription) {
        try (CloseableSubscriptionLock subscriptionLock = this.subscriptionsLocks.lock(modifiedSubscription);){
            Optional<Consumer> consumerOptional = this.consumerHolder.get(modifiedSubscription.getTopicName(), modifiedSubscription.getName());
            Subscription.State oldState = consumerOptional.map(consumer -> consumer.getSubscription().getState()).orElse(Subscription.State.SUSPENDED);
            if (this.subscriptionStateChanged(modifiedSubscription, oldState)) {
                this.handleSubscriptionStateChange(oldState, modifiedSubscription.getState(), modifiedSubscription);
            }
            consumerOptional.ifPresent(consumer -> consumer.updateSubscription(modifiedSubscription));
        }
        catch (Exception e) {
            logger.error("Failed to update subscription " + modifiedSubscription.getId(), (Throwable)e);
        }
    }

    public void updateSubscription(Subscription modifiedSubscription) {
        try (CloseableSubscriptionLock subscriptionLock = this.subscriptionsLocks.lock(modifiedSubscription);){
            this.consumerHolder.get(modifiedSubscription.getTopicName(), modifiedSubscription.getName()).ifPresent(consumer -> consumer.updateSubscription(modifiedSubscription));
        }
    }

    private void activateSubscription(Subscription subscription) {
        subscription.setState(Subscription.State.ACTIVE);
        this.subscriptionRepository.updateSubscription(subscription);
    }

    @Deprecated
    private boolean subscriptionStateChanged(Subscription modifiedSubscription, Subscription.State oldState) {
        return oldState != modifiedSubscription.getState() && oldState != Subscription.State.PENDING;
    }

    @Deprecated
    private void handleSubscriptionStateChange(Subscription.State oldState, Subscription.State newState, Subscription modifiedSubscription) throws Exception {
        logger.info("Changing state from {} to {} for subscription {}", new Object[]{oldState, newState, modifiedSubscription.getId()});
        switch (newState) {
            case PENDING: {
                if (oldState.equals((Object)Subscription.State.PENDING)) break;
                this.createAndExecuteConsumerIfNotExists(modifiedSubscription);
                this.activateSubscription(modifiedSubscription);
                break;
            }
            case ACTIVE: {
                if (oldState.equals((Object)Subscription.State.ACTIVE)) break;
                this.createAndExecuteConsumerIfNotExists(modifiedSubscription);
                break;
            }
            case SUSPENDED: {
                if (!oldState.equals((Object)Subscription.State.ACTIVE)) break;
                this.deleteConsumerIfExists(modifiedSubscription.toSubscriptionName(), false);
                break;
            }
        }
    }

    public void start() throws Exception {
        this.offsetCommitter.start();
        this.undeliveredMessageLogPersister.start();
    }

    public void shutdown() throws InterruptedException {
        for (Consumer consumer : this.consumerHolder) {
            consumer.stopConsuming();
        }
        this.executor.shutdown();
        this.offsetCommitter.shutdown();
        this.undeliveredMessageLogPersister.shutdown();
    }

    private void createAndExecuteConsumerIfNotExists(Subscription subscription) {
        if (this.consumerHolder.contains(subscription.getTopicName(), subscription.getName())) {
            logger.warn("Consumer for {} already exists, ignoring", (Object)subscription.getId());
        } else {
            this.createAndExecuteConsumer(subscription);
        }
    }

    private void deleteConsumerIfExists(SubscriptionName subscription, boolean removeOffsets) throws Exception {
        this.deleteConsumerIfExists(subscription.getTopicName(), subscription.getName(), removeOffsets);
    }

    private void deleteConsumerIfExists(TopicName topicName, String subscriptionName, boolean removeOffsets) throws Exception {
        if (this.consumerHolder.contains(topicName, subscriptionName)) {
            logger.info("Deleting consumer for {}", (Object)Subscription.getId((TopicName)topicName, (String)subscriptionName));
            Consumer consumer = this.consumerHolder.get(topicName, subscriptionName).get();
            consumer.stopConsuming();
            consumer.waitUntilStopped();
            this.consumerHolder.remove(topicName, subscriptionName);
            if (removeOffsets) {
                this.removeOffsets(topicName, subscriptionName, consumer.getOffsetsToCommit());
            }
        }
    }

    private void createAndExecuteConsumer(Subscription subscription) {
        logger.info("Creating consumer for {}", (Object)subscription.getId());
        try {
            Consumer consumer = this.consumerFactory.createConsumer(subscription);
            this.consumerHolder.add(subscription.getTopicName(), subscription.getName(), consumer);
            this.executor.execute(consumer);
        }
        catch (Exception ex) {
            logger.info("Failed to create consumer for subscription {} ", (Object)subscription.getId(), (Object)ex);
        }
    }

    private void removeOffsets(TopicName topicName, String subscriptionName, List<PartitionOffset> offsetsToRemove) throws Exception {
        for (PartitionOffset partitionOffset : offsetsToRemove) {
            for (MessageCommitter messageCommitter : this.messageCommitters) {
                messageCommitter.removeOffset(topicName, subscriptionName, partitionOffset.getTopic(), partitionOffset.getPartition());
            }
        }
    }

    public void retransmit(SubscriptionName subscriptionName) throws Exception {
        try (CloseableSubscriptionLock subscriptionLock = this.subscriptionsLocks.lock(subscriptionName);){
            logger.info("Starting retransmission for subscription {}", (Object)subscriptionName);
            this.deleteConsumerIfExists(subscriptionName, false);
            PartitionOffsets offsets = this.subscriptionOffsetChangeIndicator.getSubscriptionOffsets(this.topicRepository.getTopicDetails(subscriptionName.getTopicName()), subscriptionName.getName(), this.brokersClusterName);
            for (PartitionOffset partitionOffset : offsets) {
                for (OffsetsStorage s : this.offsetsStorages) {
                    s.setSubscriptionOffset(subscriptionName, partitionOffset);
                }
            }
            this.createAndExecuteConsumer(this.subscriptionRepository.getSubscriptionDetails(subscriptionName.getTopicName(), subscriptionName.getName()));
            logger.info("Finished retransmission for subscription {}", (Object)subscriptionName);
        }
        catch (Exception e) {
            logger.error("Error while doing retransmission for subscription {}", (Object)subscriptionName, (Object)e);
        }
    }

    public void restartConsumer(SubscriptionName subscriptionName) throws Exception {
        logger.info("Restarting consumer for subscription {}", (Object)subscriptionName);
        this.deleteConsumerIfExists(subscriptionName, false);
        this.createAndExecuteConsumer(this.subscriptionRepository.getSubscriptionDetails(subscriptionName.getTopicName(), subscriptionName.getName()));
    }
}

