/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.management.domain.subscription;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Anonymizable;
import pl.allegro.tech.hermes.api.MessageTrace;
import pl.allegro.tech.hermes.api.OwnerId;
import pl.allegro.tech.hermes.api.PatchData;
import pl.allegro.tech.hermes.api.PersistentSubscriptionMetrics;
import pl.allegro.tech.hermes.api.Query;
import pl.allegro.tech.hermes.api.SentMessageTrace;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.SubscriptionHealth;
import pl.allegro.tech.hermes.api.SubscriptionMetrics;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.api.SubscriptionNameWithMetrics;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.api.TopicMetrics;
import pl.allegro.tech.hermes.api.TopicName;
import pl.allegro.tech.hermes.api.UnhealthySubscription;
import pl.allegro.tech.hermes.api.helpers.Patch;
import pl.allegro.tech.hermes.common.message.undelivered.UndeliveredMessageLog;
import pl.allegro.tech.hermes.domain.subscription.SubscriptionRepository;
import pl.allegro.tech.hermes.management.domain.Auditor;
import pl.allegro.tech.hermes.management.domain.auth.RequestUser;
import pl.allegro.tech.hermes.management.domain.dc.DatacenterBoundRepositoryHolder;
import pl.allegro.tech.hermes.management.domain.dc.MultiDatacenterRepositoryCommandExecutor;
import pl.allegro.tech.hermes.management.domain.dc.RepositoryManager;
import pl.allegro.tech.hermes.management.domain.subscription.SubscriptionMetricsRepository;
import pl.allegro.tech.hermes.management.domain.subscription.SubscriptionOwnerCache;
import pl.allegro.tech.hermes.management.domain.subscription.SubscriptionRemover;
import pl.allegro.tech.hermes.management.domain.subscription.UnhealthySubscriptionGetException;
import pl.allegro.tech.hermes.management.domain.subscription.commands.CreateSubscriptionRepositoryCommand;
import pl.allegro.tech.hermes.management.domain.subscription.commands.UpdateSubscriptionRepositoryCommand;
import pl.allegro.tech.hermes.management.domain.subscription.health.SubscriptionHealthChecker;
import pl.allegro.tech.hermes.management.domain.subscription.validator.SubscriptionValidator;
import pl.allegro.tech.hermes.management.domain.topic.TopicService;
import pl.allegro.tech.hermes.management.infrastructure.kafka.MultiDCAwareService;
import pl.allegro.tech.hermes.tracker.management.LogRepository;

public class SubscriptionService {
    private static final Logger logger = LoggerFactory.getLogger(SubscriptionService.class);
    private static final int LAST_MESSAGE_COUNT = 100;
    private final SubscriptionRepository subscriptionRepository;
    private final SubscriptionOwnerCache subscriptionOwnerCache;
    private final TopicService topicService;
    private final SubscriptionMetricsRepository metricsRepository;
    private final SubscriptionHealthChecker subscriptionHealthChecker;
    private final LogRepository logRepository;
    private final SubscriptionValidator subscriptionValidator;
    private final Auditor auditor;
    private final MultiDatacenterRepositoryCommandExecutor multiDcExecutor;
    private final MultiDCAwareService multiDCAwareService;
    private final RepositoryManager repositoryManager;
    private final long subscriptionHealthCheckTimeoutMillis;
    private final ExecutorService subscriptionHealthCheckExecutorService;
    private final SubscriptionRemover subscriptionRemover;

    public SubscriptionService(SubscriptionRepository subscriptionRepository, SubscriptionOwnerCache subscriptionOwnerCache, TopicService topicService, SubscriptionMetricsRepository metricsRepository, SubscriptionHealthChecker subscriptionHealthChecker, LogRepository logRepository, SubscriptionValidator subscriptionValidator, Auditor auditor, MultiDatacenterRepositoryCommandExecutor multiDcExecutor, MultiDCAwareService multiDCAwareService, RepositoryManager repositoryManager, ExecutorService unhealthyGetExecutorService, long unhealthyGetTimeoutMillis, SubscriptionRemover subscriptionRemover) {
        this.subscriptionRepository = subscriptionRepository;
        this.subscriptionOwnerCache = subscriptionOwnerCache;
        this.topicService = topicService;
        this.metricsRepository = metricsRepository;
        this.subscriptionHealthChecker = subscriptionHealthChecker;
        this.logRepository = logRepository;
        this.subscriptionValidator = subscriptionValidator;
        this.auditor = auditor;
        this.multiDcExecutor = multiDcExecutor;
        this.multiDCAwareService = multiDCAwareService;
        this.repositoryManager = repositoryManager;
        this.subscriptionHealthCheckExecutorService = unhealthyGetExecutorService;
        this.subscriptionHealthCheckTimeoutMillis = unhealthyGetTimeoutMillis;
        this.subscriptionRemover = subscriptionRemover;
    }

    public List<String> listSubscriptionNames(TopicName topicName) {
        return this.subscriptionRepository.listSubscriptionNames(topicName);
    }

    public List<String> listTrackedSubscriptionNames(TopicName topicName) {
        return this.listSubscriptions(topicName).stream().filter(Subscription::isTrackingEnabled).map(Subscription::getName).collect(Collectors.toList());
    }

    public List<String> listFilteredSubscriptionNames(TopicName topicName, Query<Subscription> query) {
        return query.filter(this.listSubscriptions(topicName)).map(Subscription::getName).collect(Collectors.toList());
    }

    public List<Subscription> listSubscriptions(TopicName topicName) {
        return this.subscriptionRepository.listSubscriptions(topicName);
    }

    public void createSubscription(Subscription subscription, RequestUser createdBy, String qualifiedTopicName) {
        this.auditor.beforeObjectCreation(createdBy.getUsername(), (Anonymizable)subscription);
        this.subscriptionValidator.checkCreation(subscription, createdBy);
        Topic topic = this.topicService.getTopicDetails(TopicName.fromQualifiedName((String)qualifiedTopicName));
        this.multiDCAwareService.createConsumerGroups(topic, subscription);
        this.multiDcExecutor.executeByUser(new CreateSubscriptionRepositoryCommand(subscription), createdBy);
        this.auditor.objectCreated(createdBy.getUsername(), (Anonymizable)subscription);
        this.subscriptionOwnerCache.onCreatedSubscription(subscription);
    }

    public Subscription getSubscriptionDetails(TopicName topicName, String subscriptionName) {
        Subscription subscription = this.subscriptionRepository.getSubscriptionDetails(topicName, subscriptionName).anonymize();
        subscription.setState(this.getEffectiveState(topicName, subscriptionName));
        return subscription;
    }

    private Subscription.State getEffectiveState(TopicName topicName, String subscriptionName) {
        Set<Subscription.State> states = this.loadSubscriptionStatesFromAllDc(topicName, subscriptionName);
        if (states.size() > 1) {
            logger.warn("Some states are out of sync: {}", states);
        }
        if (states.contains(Subscription.State.ACTIVE)) {
            return Subscription.State.ACTIVE;
        }
        if (states.contains(Subscription.State.SUSPENDED)) {
            return Subscription.State.SUSPENDED;
        }
        return Subscription.State.PENDING;
    }

    private Set<Subscription.State> loadSubscriptionStatesFromAllDc(TopicName topicName, String subscriptionName) {
        List<DatacenterBoundRepositoryHolder<SubscriptionRepository>> holders = this.repositoryManager.getRepositories(SubscriptionRepository.class);
        HashSet<Subscription.State> states = new HashSet<Subscription.State>();
        for (DatacenterBoundRepositoryHolder<SubscriptionRepository> holder : holders) {
            try {
                Subscription.State state = holder.getRepository().getSubscriptionDetails(topicName, subscriptionName).getState();
                states.add(state);
            }
            catch (Exception e) {
                logger.warn("Could not load state of subscription (topic: {}, name: {}) from DC {}.", new Object[]{topicName, subscriptionName, holder.getDatacenterName()});
            }
        }
        return states;
    }

    public void removeSubscription(TopicName topicName, String subscriptionName, RequestUser removedBy) {
        this.subscriptionRemover.removeSubscription(topicName, subscriptionName, removedBy);
    }

    public void updateSubscription(TopicName topicName, String subscriptionName, PatchData patch, RequestUser modifiedBy) {
        this.auditor.beforeObjectUpdate(modifiedBy.getUsername(), Subscription.class.getSimpleName(), new SubscriptionName(subscriptionName, topicName), patch);
        Subscription retrieved = this.subscriptionRepository.getSubscriptionDetails(topicName, subscriptionName);
        Subscription.State oldState = retrieved.getState();
        Subscription updated = (Subscription)Patch.apply((Object)retrieved, (PatchData)patch);
        this.revertStateIfChangedToPending(updated, oldState);
        this.subscriptionValidator.checkModification(updated, modifiedBy, retrieved);
        this.subscriptionOwnerCache.onUpdatedSubscription(retrieved, updated);
        if (!retrieved.equals((Object)updated)) {
            this.multiDcExecutor.executeByUser(new UpdateSubscriptionRepositoryCommand(updated), modifiedBy);
            this.auditor.objectUpdated(modifiedBy.getUsername(), (Anonymizable)retrieved, (Anonymizable)updated);
        }
    }

    private void revertStateIfChangedToPending(Subscription updated, Subscription.State oldState) {
        if (updated.getState() == Subscription.State.PENDING) {
            updated.setState(oldState);
        }
    }

    public void updateSubscriptionState(TopicName topicName, String subscriptionName, Subscription.State state, RequestUser modifiedBy) {
        if (state != Subscription.State.PENDING) {
            PatchData patchData = PatchData.patchData().set("state", (Object)state).build();
            this.auditor.beforeObjectUpdate(modifiedBy.getUsername(), Subscription.class.getSimpleName(), new SubscriptionName(subscriptionName, topicName), patchData);
            Subscription retrieved = this.subscriptionRepository.getSubscriptionDetails(topicName, subscriptionName);
            if (!retrieved.getState().equals((Object)state)) {
                Subscription updated = (Subscription)Patch.apply((Object)retrieved, (PatchData)patchData);
                this.multiDcExecutor.executeByUser(new UpdateSubscriptionRepositoryCommand(updated), modifiedBy);
                this.auditor.objectUpdated(modifiedBy.getUsername(), (Anonymizable)retrieved, (Anonymizable)updated);
            }
        }
    }

    public Subscription.State getSubscriptionState(TopicName topicName, String subscriptionName) {
        return this.getSubscriptionDetails(topicName, subscriptionName).getState();
    }

    public SubscriptionMetrics getSubscriptionMetrics(TopicName topicName, String subscriptionName) {
        this.subscriptionRepository.ensureSubscriptionExists(topicName, subscriptionName);
        return this.metricsRepository.loadMetrics(topicName, subscriptionName);
    }

    public PersistentSubscriptionMetrics getPersistentSubscriptionMetrics(TopicName topicName, String subscriptionName) {
        this.subscriptionRepository.ensureSubscriptionExists(topicName, subscriptionName);
        return this.metricsRepository.loadZookeeperMetrics(topicName, subscriptionName);
    }

    public SubscriptionHealth getSubscriptionHealth(TopicName topicName, String subscriptionName) {
        Subscription subscription = this.getSubscriptionDetails(topicName, subscriptionName);
        return this.getHealth(subscription);
    }

    public Optional<SentMessageTrace> getLatestUndeliveredMessage(TopicName topicName, String subscriptionName) {
        List<DatacenterBoundRepositoryHolder<UndeliveredMessageLog>> holders = this.repositoryManager.getRepositories(UndeliveredMessageLog.class);
        ArrayList traces = new ArrayList();
        for (DatacenterBoundRepositoryHolder<UndeliveredMessageLog> holder : holders) {
            try {
                holder.getRepository().last(topicName, subscriptionName).ifPresent(traces::add);
            }
            catch (Exception e) {
                logger.warn("Could not load latest undelivered message from DC: {}", (Object)holder.getDatacenterName());
            }
        }
        return traces.stream().max(Comparator.comparing(SentMessageTrace::getTimestamp));
    }

    public List<SentMessageTrace> getLatestUndeliveredMessagesTrackerLogs(TopicName topicName, String subscriptionName) {
        return this.logRepository.getLastUndeliveredMessages(topicName.qualifiedName(), subscriptionName, 100);
    }

    public List<MessageTrace> getMessageStatus(String qualifiedTopicName, String subscriptionName, String messageId) {
        return this.logRepository.getMessageStatus(qualifiedTopicName, subscriptionName, messageId);
    }

    public List<Subscription> querySubscription(Query<Subscription> query) {
        return query.filter(this.getAllSubscriptions()).collect(Collectors.toList());
    }

    public List<SubscriptionNameWithMetrics> querySubscriptionsMetrics(Query<SubscriptionNameWithMetrics> query) {
        List<Subscription> filteredSubscriptions = query.filterNames(this.getAllSubscriptions()).collect(Collectors.toList());
        return query.filter(this.getSubscriptionsMetrics(filteredSubscriptions)).collect(Collectors.toList());
    }

    public List<Subscription> getAllSubscriptions() {
        return this.topicService.getAllTopics().stream().map(Topic::getName).map(this::listSubscriptions).flatMap(Collection::stream).collect(Collectors.toList());
    }

    public List<Subscription> getForOwnerId(OwnerId ownerId) {
        Collection<SubscriptionName> subscriptionNames = this.subscriptionOwnerCache.get(ownerId);
        return this.subscriptionRepository.getSubscriptionDetails(subscriptionNames);
    }

    public List<UnhealthySubscription> getAllUnhealthy(boolean respectMonitoringSeverity, List<String> subscriptionNames, List<String> qualifiedTopicNames) {
        return this.getUnhealthyList(this.subscriptionOwnerCache.getAll(), respectMonitoringSeverity, subscriptionNames, qualifiedTopicNames);
    }

    public List<UnhealthySubscription> getUnhealthyForOwner(OwnerId ownerId, boolean respectMonitoringSeverity, List<String> subscriptionNames, List<String> qualifiedTopicNames) {
        return this.getUnhealthyList(this.subscriptionOwnerCache.get(ownerId), respectMonitoringSeverity, subscriptionNames, qualifiedTopicNames);
    }

    private List<UnhealthySubscription> getUnhealthyList(Collection<SubscriptionName> ownerSubscriptionNames, boolean respectMonitoringSeverity, List<String> subscriptionNames, List<String> qualifiedTopicNames) {
        try {
            return (List)((CompletableFuture)this.getSubscriptionDetails(ownerSubscriptionNames).thenComposeAsync(ownerSubscriptions -> {
                List<CompletableFuture<UnhealthySubscription>> unhealthySubscriptions = this.filterSubscriptions((Collection<Subscription>)ownerSubscriptions, respectMonitoringSeverity, subscriptionNames, qualifiedTopicNames);
                return CompletableFuture.allOf(unhealthySubscriptions.toArray(new CompletableFuture[0])).thenApply(v -> unhealthySubscriptions.stream().map(CompletableFuture::join).filter(Objects::nonNull).collect(Collectors.toList()));
            }, (Executor)this.subscriptionHealthCheckExecutorService)).get(this.subscriptionHealthCheckTimeoutMillis, TimeUnit.MILLISECONDS);
        }
        catch (TimeoutException e) {
            throw new UnhealthySubscriptionGetException("Fetching unhealthy subscriptions timed out.");
        }
        catch (Exception e) {
            throw new UnhealthySubscriptionGetException("Fetching unhealthy subscriptions failed.", e);
        }
    }

    private CompletableFuture<List<Subscription>> getOwnerSubscriptions(OwnerId ownerId) {
        return CompletableFuture.supplyAsync(() -> this.getForOwnerId(ownerId));
    }

    private CompletableFuture<List<Subscription>> getSubscriptionDetails(Collection<SubscriptionName> subscriptionNames) {
        return CompletableFuture.supplyAsync(() -> this.subscriptionRepository.getSubscriptionDetails(subscriptionNames), this.subscriptionHealthCheckExecutorService);
    }

    private List<CompletableFuture<UnhealthySubscription>> filterSubscriptions(Collection<Subscription> subscriptions, boolean respectMonitoringSeverity, List<String> subscriptionNames, List<String> qualifiedTopicNames) {
        boolean shouldFilterBySubscriptionNames = CollectionUtils.isNotEmpty(subscriptionNames);
        boolean shouldFilterByQualifiedTopicNames = CollectionUtils.isNotEmpty(qualifiedTopicNames);
        Stream<Subscription> subscriptionStream = subscriptions.stream().filter(s -> this.filterBySeverityMonitorFlag(respectMonitoringSeverity, s.isSeverityNotImportant()));
        if (shouldFilterBySubscriptionNames) {
            subscriptionStream = subscriptionStream.filter(s -> this.filterBySubscriptionNames(subscriptionNames, s.getName()));
        }
        if (shouldFilterByQualifiedTopicNames) {
            subscriptionStream = subscriptionStream.filter(s -> this.filterByQualifiedTopicNames(qualifiedTopicNames, s.getQualifiedTopicName()));
        }
        return subscriptionStream.map(s -> CompletableFuture.supplyAsync(() -> this.getUnhealthy((Subscription)s), this.subscriptionHealthCheckExecutorService)).collect(Collectors.toList());
    }

    private boolean filterBySubscriptionNames(List<String> subscriptionNames, String subscriptionName) {
        return subscriptionNames.contains(subscriptionName);
    }

    private boolean filterByQualifiedTopicNames(List<String> qualifiedTopicNames, String qualifiedTopicName) {
        return qualifiedTopicNames.contains(qualifiedTopicName);
    }

    private boolean filterBySeverityMonitorFlag(boolean respectMonitoringSeverity, boolean isSeverityNotImportant) {
        return !respectMonitoringSeverity || !isSeverityNotImportant;
    }

    private SubscriptionHealth getHealth(Subscription subscription) {
        TopicName topicName = subscription.getTopicName();
        TopicMetrics topicMetrics = this.topicService.getTopicMetrics(topicName);
        SubscriptionMetrics subscriptionMetrics = this.getSubscriptionMetrics(topicName, subscription.getName());
        return this.subscriptionHealthChecker.checkHealth(subscription, topicMetrics, subscriptionMetrics);
    }

    private UnhealthySubscription getUnhealthy(Subscription subscription) {
        SubscriptionHealth subscriptionHealth = this.getHealth(subscription);
        if (subscriptionHealth.getStatus() == SubscriptionHealth.Status.UNHEALTHY) {
            return UnhealthySubscription.from((Subscription)subscription, (SubscriptionHealth)subscriptionHealth);
        }
        return null;
    }

    private List<SubscriptionNameWithMetrics> getSubscriptionsMetrics(List<Subscription> subscriptions) {
        return subscriptions.stream().map(s -> {
            SubscriptionMetrics metrics = this.metricsRepository.loadMetrics(s.getTopicName(), s.getName());
            return SubscriptionNameWithMetrics.from((SubscriptionMetrics)metrics, (String)s.getName(), (String)s.getQualifiedTopicName());
        }).collect(Collectors.toList());
    }
}

