/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.supervisor;

import com.google.common.collect.ImmutableList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import javax.inject.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.api.TopicName;
import pl.allegro.tech.hermes.common.admin.AdminOperationsCallback;
import pl.allegro.tech.hermes.common.admin.zookeeper.ZookeeperAdminCache;
import pl.allegro.tech.hermes.common.config.ConfigFactory;
import pl.allegro.tech.hermes.common.config.Configs;
import pl.allegro.tech.hermes.common.metric.HermesMetrics;
import pl.allegro.tech.hermes.consumers.consumer.Consumer;
import pl.allegro.tech.hermes.consumers.consumer.offset.OffsetCommitter;
import pl.allegro.tech.hermes.consumers.consumer.offset.OffsetsStorage;
import pl.allegro.tech.hermes.consumers.consumer.receiver.MessageCommitter;
import pl.allegro.tech.hermes.consumers.message.undelivered.UndeliveredMessageLogPersister;
import pl.allegro.tech.hermes.consumers.subscription.cache.SubscriptionCallback;
import pl.allegro.tech.hermes.consumers.subscription.cache.SubscriptionsCache;
import pl.allegro.tech.hermes.consumers.supervisor.ConsumerFactory;
import pl.allegro.tech.hermes.consumers.supervisor.ConsumerHolder;
import pl.allegro.tech.hermes.consumers.supervisor.ConsumersExecutorService;
import pl.allegro.tech.hermes.consumers.supervisor.SubscriptionLock;
import pl.allegro.tech.hermes.consumers.supervisor.SubscriptionLocks;
import pl.allegro.tech.hermes.domain.subscription.SubscriptionRepository;
import pl.allegro.tech.hermes.domain.subscription.offset.PartitionOffset;
import pl.allegro.tech.hermes.domain.subscription.offset.PartitionOffsets;
import pl.allegro.tech.hermes.domain.subscription.offset.SubscriptionOffsetChangeIndicator;

public class ConsumersSupervisor
implements SubscriptionCallback,
AdminOperationsCallback {
    private static final Logger LOGGER = LoggerFactory.getLogger(ConsumersSupervisor.class);
    private final SubscriptionRepository subscriptionRepository;
    private final SubscriptionOffsetChangeIndicator subscriptionOffsetChangeIndicator;
    private final ConsumersExecutorService executor;
    private final ConsumerFactory consumerFactory;
    private final List<OffsetsStorage> offsetsStorages;
    private final List<MessageCommitter> messageCommitters;
    private final HermesMetrics hermesMetrics;
    private final OffsetCommitter offsetCommitter;
    private final ConsumerHolder consumerHolder;
    private final SubscriptionsCache subscriptionsCache;
    private final ZookeeperAdminCache adminCache;
    private final SubscriptionLocks subscriptionsLocks;
    private final String brokersClusterName;
    private final UndeliveredMessageLogPersister undeliveredMessageLogPersister;

    @Inject
    public ConsumersSupervisor(ConfigFactory configFactory, SubscriptionRepository subscriptionRepository, SubscriptionOffsetChangeIndicator subscriptionOffsetChangeIndicator, ConsumersExecutorService executor, ConsumerFactory consumerFactory, List<MessageCommitter> messageCommitters, List<OffsetsStorage> offsetsStorages, SubscriptionsCache subscriptionsCache, HermesMetrics hermesMetrics, ZookeeperAdminCache adminCache, UndeliveredMessageLogPersister undeliveredMessageLogPersister) {
        this.subscriptionRepository = subscriptionRepository;
        this.subscriptionOffsetChangeIndicator = subscriptionOffsetChangeIndicator;
        this.executor = executor;
        this.consumerFactory = consumerFactory;
        this.offsetsStorages = offsetsStorages;
        this.messageCommitters = messageCommitters;
        this.subscriptionsCache = subscriptionsCache;
        this.adminCache = adminCache;
        this.hermesMetrics = hermesMetrics;
        this.undeliveredMessageLogPersister = undeliveredMessageLogPersister;
        this.subscriptionsLocks = new SubscriptionLocks();
        this.consumerHolder = new ConsumerHolder();
        this.offsetCommitter = new OffsetCommitter(this.consumerHolder, messageCommitters, configFactory);
        this.brokersClusterName = configFactory.getStringProperty(Configs.KAFKA_CLUSTER_NAME);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onSubscriptionCreated(Subscription subscription) {
        SubscriptionLock subscriptionLock = this.subscriptionsLocks.getLock(subscription);
        synchronized (subscriptionLock) {
            try {
                if (subscription.getState() == Subscription.State.PENDING) {
                    this.createAndExecuteConsumerIfNotExists(subscription);
                    this.activateSubscription(subscription);
                } else if (subscription.getState() == Subscription.State.ACTIVE) {
                    this.createAndExecuteConsumerIfNotExists(subscription);
                } else {
                    LOGGER.info("Got subscription created event for inactive subscription {}", (Object)subscription.getId());
                }
            }
            catch (Exception e) {
                LOGGER.error("Failed to create subscription " + subscription.getName(), (Throwable)e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onSubscriptionRemoved(Subscription subscription) {
        SubscriptionLock subscriptionLock = this.subscriptionsLocks.getLock(subscription);
        synchronized (subscriptionLock) {
            try {
                this.deleteConsumerIfExists(subscription, true);
                this.hermesMetrics.removeMetrics(subscription);
            }
            catch (Exception e) {
                LOGGER.error("Failed to remove subscription " + subscription.getId(), (Throwable)e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onSubscriptionChanged(Subscription modifiedSubscription) {
        SubscriptionLock subscriptionLock = this.subscriptionsLocks.getLock(modifiedSubscription);
        synchronized (subscriptionLock) {
            try {
                Optional<Consumer> consumerOptional = this.consumerHolder.get(modifiedSubscription.getTopicName(), modifiedSubscription.getName());
                Subscription.State oldState = consumerOptional.map(consumer -> consumer.getSubscription().getState()).orElse(Subscription.State.SUSPENDED);
                if (this.subscriptionStateChanged(modifiedSubscription, oldState)) {
                    this.handleSubscriptionStateChange(oldState, modifiedSubscription.getState(), modifiedSubscription);
                }
                consumerOptional.ifPresent(consumer -> consumer.updateSubscription(modifiedSubscription));
            }
            catch (Exception e) {
                LOGGER.error("Failed to update subscription " + modifiedSubscription.getId(), (Throwable)e);
            }
        }
    }

    private void activateSubscription(Subscription subscription) {
        subscription.setState(Subscription.State.ACTIVE);
        this.subscriptionRepository.updateSubscription(subscription);
    }

    private boolean subscriptionStateChanged(Subscription modifiedSubscription, Subscription.State oldState) {
        return oldState != modifiedSubscription.getState() && oldState != Subscription.State.PENDING;
    }

    private void handleSubscriptionStateChange(Subscription.State oldState, Subscription.State newState, Subscription modifiedSubscription) throws Exception {
        LOGGER.info("Changing state from {} to {} for subscription {}", new Object[]{oldState, newState, modifiedSubscription.getId()});
        switch (newState) {
            case PENDING: {
                if (oldState.equals((Object)Subscription.State.PENDING)) break;
                this.createAndExecuteConsumerIfNotExists(modifiedSubscription);
                this.activateSubscription(modifiedSubscription);
                break;
            }
            case ACTIVE: {
                if (oldState.equals((Object)Subscription.State.ACTIVE)) break;
                this.createAndExecuteConsumerIfNotExists(modifiedSubscription);
                break;
            }
            case SUSPENDED: {
                if (!oldState.equals((Object)Subscription.State.ACTIVE)) break;
                this.deleteConsumerIfExists(modifiedSubscription, false);
                break;
            }
        }
    }

    public void start() throws Exception {
        this.subscriptionsCache.start((Collection<? extends SubscriptionCallback>)ImmutableList.of((Object)this));
        this.adminCache.start();
        this.adminCache.addCallback((AdminOperationsCallback)this);
        this.offsetCommitter.start();
        this.undeliveredMessageLogPersister.start();
    }

    public void shutdown() throws InterruptedException {
        for (Consumer consumer : this.consumerHolder) {
            consumer.stopConsuming();
        }
        this.executor.shutdown();
        this.offsetCommitter.shutdown();
        this.undeliveredMessageLogPersister.shutdown();
    }

    private void createAndExecuteConsumerIfNotExists(Subscription subscription) {
        if (this.consumerHolder.contains(subscription.getTopicName(), subscription.getName())) {
            LOGGER.warn("Consumer for {} already exists, ignoring", (Object)subscription.getId());
        } else {
            this.createAndExecuteConsumer(subscription);
        }
    }

    private void deleteConsumerIfExists(Subscription subscription, boolean removeOffsets) throws Exception {
        this.deleteConsumerIfExists(subscription.getTopicName(), subscription.getName(), removeOffsets);
    }

    private void deleteConsumerIfExists(SubscriptionName subscription, boolean removeOffsets) throws Exception {
        this.deleteConsumerIfExists(subscription.getTopicName(), subscription.getName(), removeOffsets);
    }

    private void deleteConsumerIfExists(TopicName topicName, String subscriptionName, boolean removeOffsets) throws Exception {
        if (this.consumerHolder.contains(topicName, subscriptionName)) {
            LOGGER.info("Deleting consumer for {}", (Object)Subscription.getId((TopicName)topicName, (String)subscriptionName));
            Consumer consumer = this.consumerHolder.get(topicName, subscriptionName).get();
            consumer.stopConsuming();
            this.consumerHolder.remove(topicName, subscriptionName);
            if (removeOffsets) {
                this.removeOffsets(topicName, subscriptionName, consumer.getOffsetsToCommit());
            }
        }
    }

    private void createAndExecuteConsumer(Subscription subscription) {
        LOGGER.info("Creating consumer for {}", (Object)subscription.getId());
        Consumer consumer = this.consumerFactory.createConsumer(subscription);
        this.consumerHolder.add(subscription.getTopicName(), subscription.getName(), consumer);
        this.executor.execute(consumer);
    }

    private void removeOffsets(TopicName topicName, String subscriptionName, List<PartitionOffset> offsetsToRemove) throws Exception {
        for (PartitionOffset partitionOffset : offsetsToRemove) {
            for (MessageCommitter messageCommitter : this.messageCommitters) {
                messageCommitter.removeOffset(topicName, subscriptionName, partitionOffset.getPartition());
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onRetransmissionStarts(SubscriptionName subscriptionName) throws Exception {
        SubscriptionLock subscriptionLock = this.subscriptionsLocks.getLock(subscriptionName);
        synchronized (subscriptionLock) {
            this.deleteConsumerIfExists(subscriptionName, false);
            PartitionOffsets offsets = this.subscriptionOffsetChangeIndicator.getSubscriptionOffsets(subscriptionName.getTopicName(), subscriptionName.getName(), this.brokersClusterName);
            for (PartitionOffset partitionOffset : offsets) {
                for (OffsetsStorage s : this.offsetsStorages) {
                    s.setSubscriptionOffset(Subscription.fromSubscriptionName((SubscriptionName)subscriptionName), partitionOffset);
                }
            }
            this.createAndExecuteConsumer(this.subscriptionRepository.getSubscriptionDetails(subscriptionName.getTopicName(), subscriptionName.getName()));
        }
    }
}

