/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.supervisor;

import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import java.util.Collection;
import java.util.List;
import javax.inject.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.api.TopicName;
import pl.allegro.tech.hermes.common.admin.AdminOperationsCallback;
import pl.allegro.tech.hermes.common.admin.zookeeper.ZookeeperAdminCache;
import pl.allegro.tech.hermes.common.broker.BrokerStorage;
import pl.allegro.tech.hermes.common.config.ConfigFactory;
import pl.allegro.tech.hermes.common.config.Configs;
import pl.allegro.tech.hermes.common.metric.HermesMetrics;
import pl.allegro.tech.hermes.consumers.consumer.Consumer;
import pl.allegro.tech.hermes.consumers.consumer.offset.OffsetCommitter;
import pl.allegro.tech.hermes.consumers.consumer.receiver.MessageCommitter;
import pl.allegro.tech.hermes.consumers.message.undelivered.UndeliveredMessageLogPersister;
import pl.allegro.tech.hermes.consumers.subscription.cache.SubscriptionCallback;
import pl.allegro.tech.hermes.consumers.subscription.cache.SubscriptionsCache;
import pl.allegro.tech.hermes.consumers.supervisor.ConsumerFactory;
import pl.allegro.tech.hermes.consumers.supervisor.ConsumerHolder;
import pl.allegro.tech.hermes.consumers.supervisor.ConsumersExecutorService;
import pl.allegro.tech.hermes.consumers.supervisor.SubscriptionLock;
import pl.allegro.tech.hermes.consumers.supervisor.SubscriptionLocks;
import pl.allegro.tech.hermes.domain.subscription.SubscriptionRepository;
import pl.allegro.tech.hermes.domain.subscription.offset.PartitionOffset;
import pl.allegro.tech.hermes.domain.subscription.offset.PartitionOffsets;
import pl.allegro.tech.hermes.domain.subscription.offset.SubscriptionOffsetChangeIndicator;

public class ConsumersSupervisor
implements SubscriptionCallback,
AdminOperationsCallback {
    private static final Logger LOGGER = LoggerFactory.getLogger(ConsumersSupervisor.class);
    private final SubscriptionRepository subscriptionRepository;
    private final SubscriptionOffsetChangeIndicator subscriptionOffsetChangeIndicator;
    private final ConsumersExecutorService executor;
    private final ConsumerFactory consumerFactory;
    private final BrokerStorage brokerStorage;
    private final MessageCommitter messageCommitter;
    private final HermesMetrics hermesMetrics;
    private final OffsetCommitter offsetCommitter;
    private final ConsumerHolder consumerHolder;
    private final SubscriptionsCache subscriptionsCache;
    private final ZookeeperAdminCache adminCache;
    private final SubscriptionLocks subscriptionsLocks;
    private final String brokersClusterName;
    private final UndeliveredMessageLogPersister undeliveredMessageLogPersister;

    @Inject
    public ConsumersSupervisor(ConfigFactory configFactory, SubscriptionRepository subscriptionRepository, SubscriptionOffsetChangeIndicator subscriptionOffsetChangeIndicator, ConsumersExecutorService executor, ConsumerFactory consumerFactory, MessageCommitter messageCommitter, BrokerStorage brokerStorage, SubscriptionsCache subscriptionsCache, HermesMetrics hermesMetrics, ZookeeperAdminCache adminCache, UndeliveredMessageLogPersister undeliveredMessageLogPersister) {
        this.subscriptionRepository = subscriptionRepository;
        this.subscriptionOffsetChangeIndicator = subscriptionOffsetChangeIndicator;
        this.executor = executor;
        this.consumerFactory = consumerFactory;
        this.brokerStorage = brokerStorage;
        this.messageCommitter = messageCommitter;
        this.subscriptionsCache = subscriptionsCache;
        this.adminCache = adminCache;
        this.hermesMetrics = hermesMetrics;
        this.undeliveredMessageLogPersister = undeliveredMessageLogPersister;
        this.subscriptionsLocks = new SubscriptionLocks();
        this.consumerHolder = new ConsumerHolder();
        this.offsetCommitter = new OffsetCommitter(this.consumerHolder, messageCommitter, configFactory);
        this.brokersClusterName = configFactory.getStringProperty(Configs.KAFKA_CLUSTER_NAME);
    }

    @Override
    public void onSubscriptionCreated(Subscription subscription) {
        try {
            if (subscription.getState() == Subscription.State.PENDING) {
                this.createAndExecuteConsumerIfNotExists(subscription);
                this.activateSubscription(subscription);
            } else if (subscription.getState() == Subscription.State.ACTIVE) {
                this.createAndExecuteConsumerIfNotExists(subscription);
            } else {
                LOGGER.info("Got subscription created event for inactive subscription {}", (Object)subscription.getName());
            }
        }
        catch (Exception e) {
            LOGGER.error("Failed to create subscription " + subscription.getName(), (Throwable)e);
        }
    }

    @Override
    public void onSubscriptionRemoved(Subscription subscription) {
        try {
            this.deleteConsumer(subscription, true);
            this.hermesMetrics.removeMetrics(subscription);
        }
        catch (Exception e) {
            LOGGER.error("Failed to remove subscription " + subscription.getName(), (Throwable)e);
        }
    }

    @Override
    public void onSubscriptionChanged(Subscription modifiedSubscription) {
        try {
            Subscription.State oldState;
            Optional<Consumer> consumer = this.consumerHolder.get(modifiedSubscription.getTopicName(), modifiedSubscription.getName());
            Subscription.State state = oldState = consumer.isPresent() ? ((Consumer)consumer.get()).getSubscription().getState() : Subscription.State.SUSPENDED;
            if (this.subscriptionStateChanged(modifiedSubscription, oldState)) {
                this.handleSubscriptionStateChange(oldState, modifiedSubscription.getState(), modifiedSubscription);
            }
            if (consumer.isPresent()) {
                ((Consumer)consumer.get()).updateSubscription(modifiedSubscription);
            }
        }
        catch (Exception e) {
            LOGGER.error("Failed to update subscription " + modifiedSubscription.getName(), (Throwable)e);
        }
    }

    private void activateSubscription(Subscription subscription) {
        subscription.setState(Subscription.State.ACTIVE);
        this.subscriptionRepository.updateSubscription(subscription);
    }

    private boolean subscriptionStateChanged(Subscription modifiedSubscription, Subscription.State oldState) {
        return oldState != modifiedSubscription.getState() && oldState != Subscription.State.PENDING;
    }

    private void handleSubscriptionStateChange(Subscription.State oldState, Subscription.State newState, Subscription modifiedSubscription) throws Exception {
        LOGGER.info("Changing subscription state from {} to {}", (Object)oldState, (Object)newState);
        switch (newState) {
            case PENDING: {
                if (!oldState.equals((Object)Subscription.State.PENDING)) {
                    this.createAndExecuteConsumerIfNotExists(modifiedSubscription);
                    this.activateSubscription(modifiedSubscription);
                }
            }
            case ACTIVE: {
                if (oldState.equals((Object)Subscription.State.ACTIVE)) break;
                this.createAndExecuteConsumerIfNotExists(modifiedSubscription);
                break;
            }
            case SUSPENDED: {
                if (!oldState.equals((Object)Subscription.State.ACTIVE)) break;
                this.deleteConsumer(modifiedSubscription, false);
                break;
            }
        }
    }

    public void start() throws Exception {
        this.subscriptionsCache.start((Collection<? extends SubscriptionCallback>)ImmutableList.of((Object)this));
        this.adminCache.start();
        this.adminCache.addCallback((AdminOperationsCallback)this);
        this.offsetCommitter.start();
        this.undeliveredMessageLogPersister.start();
    }

    public void shutdown() throws InterruptedException {
        for (Consumer consumer : this.consumerHolder) {
            consumer.stopConsuming();
        }
        this.executor.shutdown();
        this.offsetCommitter.shutdown();
        this.undeliveredMessageLogPersister.shutdown();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void createAndExecuteConsumerIfNotExists(Subscription subscription) {
        SubscriptionLock subscriptionLock = this.subscriptionsLocks.getLock(subscription.getId());
        synchronized (subscriptionLock) {
            Optional<Consumer> maybeConsumer = this.consumerHolder.get(subscription.getTopicName(), subscription.getName());
            if (maybeConsumer.isPresent()) {
                LOGGER.warn("Consumer for {} already exists, ignoring", (Object)subscription.getId());
            } else {
                this.createAndExecuteConsumer(subscription);
            }
        }
    }

    private void deleteConsumer(Subscription subscription, boolean removeOffsets) throws Exception {
        this.deleteConsumer(subscription.getTopicName(), subscription.getName(), removeOffsets);
    }

    private void deleteConsumer(SubscriptionName subscription, boolean removeOffsets) throws Exception {
        this.deleteConsumer(subscription.getTopicName(), subscription.getName(), removeOffsets);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void deleteConsumer(TopicName topicName, String subscriptionName, boolean removeOffsets) throws Exception {
        SubscriptionLock subscriptionLock = this.subscriptionsLocks.getLock(Subscription.getId((TopicName)topicName, (String)subscriptionName));
        synchronized (subscriptionLock) {
            Optional<Consumer> maybeConsumer = this.consumerHolder.get(topicName, subscriptionName);
            if (maybeConsumer.isPresent()) {
                Consumer consumer = (Consumer)maybeConsumer.get();
                consumer.stopConsuming();
                this.consumerHolder.remove(topicName, subscriptionName);
                if (removeOffsets) {
                    this.removeOffsets(topicName, subscriptionName, consumer.getOffsetsToCommit());
                }
            }
        }
    }

    private void createAndExecuteConsumer(Subscription subscription) {
        Consumer consumer = this.consumerFactory.createConsumer(subscription);
        this.consumerHolder.add(subscription.getTopicName(), subscription.getName(), consumer);
        this.executor.execute(consumer);
    }

    private void removeOffsets(TopicName topicName, String subscriptionName, List<PartitionOffset> offsetsToRemove) throws Exception {
        for (PartitionOffset partitionOffset : offsetsToRemove) {
            this.messageCommitter.removeOffset(topicName, subscriptionName, partitionOffset.getPartition());
        }
    }

    public void onRetransmissionStarts(SubscriptionName subscription) throws Exception {
        this.deleteConsumer(subscription, false);
        PartitionOffsets offsets = this.subscriptionOffsetChangeIndicator.getSubscriptionOffsets(subscription.getTopicName(), subscription.getName(), this.brokersClusterName);
        for (PartitionOffset partitionOffset : offsets) {
            this.brokerStorage.setSubscriptionOffset(subscription.getTopicName(), subscription.getName(), partitionOffset.getPartition(), Long.valueOf(partitionOffset.getOffset()));
        }
        this.createAndExecuteConsumer(this.subscriptionRepository.getSubscriptionDetails(subscription.getTopicName(), subscription.getName()));
    }
}

