/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.management.domain.detection;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;
import java.time.Clock;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import pl.allegro.tech.hermes.api.OwnerId;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.api.TopicName;
import pl.allegro.tech.hermes.management.config.detection.InactiveTopicsDetectionProperties;
import pl.allegro.tech.hermes.management.domain.detection.InactiveTopic;
import pl.allegro.tech.hermes.management.domain.detection.InactiveTopicWithOwner;
import pl.allegro.tech.hermes.management.domain.detection.InactiveTopicsDetectionService;
import pl.allegro.tech.hermes.management.domain.detection.InactiveTopicsNotifier;
import pl.allegro.tech.hermes.management.domain.detection.InactiveTopicsStorageService;
import pl.allegro.tech.hermes.management.domain.detection.NotificationResult;
import pl.allegro.tech.hermes.management.domain.topic.TopicService;

@Component
public class InactiveTopicsDetectionJob {
    private final TopicService topicService;
    private final InactiveTopicsStorageService inactiveTopicsStorageService;
    private final InactiveTopicsDetectionService inactiveTopicsDetectionService;
    private final Optional<InactiveTopicsNotifier> notifier;
    private final InactiveTopicsDetectionProperties properties;
    private final Clock clock;
    private final MeterRegistry meterRegistry;
    private static final Logger logger = LoggerFactory.getLogger(InactiveTopicsDetectionJob.class);

    public InactiveTopicsDetectionJob(TopicService topicService, InactiveTopicsStorageService inactiveTopicsStorageService, InactiveTopicsDetectionService inactiveTopicsDetectionService, Optional<InactiveTopicsNotifier> notifier, InactiveTopicsDetectionProperties properties, Clock clock, MeterRegistry meterRegistry) {
        this.topicService = topicService;
        this.inactiveTopicsStorageService = inactiveTopicsStorageService;
        this.inactiveTopicsDetectionService = inactiveTopicsDetectionService;
        this.properties = properties;
        this.clock = clock;
        this.meterRegistry = meterRegistry;
        if (notifier.isEmpty()) {
            logger.info("Inactive topics notifier bean is absent");
        }
        this.notifier = notifier;
    }

    public void detectAndNotify() {
        List<Topic> topics = this.topicService.getAllTopics();
        List<String> qualifiedTopicNames = topics.stream().map(Topic::getQualifiedName).toList();
        List<InactiveTopic> historicalInactiveTopics = this.inactiveTopicsStorageService.getInactiveTopics();
        List<InactiveTopic> foundInactiveTopics = this.detectInactiveTopics(qualifiedTopicNames, historicalInactiveTopics);
        Map<Boolean, List<InactiveTopic>> groupedByNeedOfNotification = foundInactiveTopics.stream().collect(Collectors.groupingBy(this.inactiveTopicsDetectionService::shouldBeNotified));
        List<InactiveTopic> topicsToNotify = groupedByNeedOfNotification.getOrDefault(true, List.of());
        List topicsToSkipNotification = groupedByNeedOfNotification.getOrDefault(false, List.of());
        List<InactiveTopic> notifiedTopics = this.notify(this.enrichWithOwner(topicsToNotify, topics));
        List<InactiveTopic> processedTopics = this.limitHistory(Stream.concat(notifiedTopics.stream(), topicsToSkipNotification.stream()).toList());
        this.measureInactiveTopics(processedTopics);
        this.inactiveTopicsStorageService.markAsInactive(processedTopics);
    }

    private List<InactiveTopic> detectInactiveTopics(List<String> qualifiedTopicNames, List<InactiveTopic> historicalInactiveTopics) {
        Map<String, InactiveTopic> historicalInactiveTopicsByName = this.groupByName(historicalInactiveTopics);
        return qualifiedTopicNames.stream().map(qualifiedTopicName -> this.inactiveTopicsDetectionService.detectInactiveTopic(TopicName.fromQualifiedName((String)qualifiedTopicName), Optional.ofNullable((InactiveTopic)historicalInactiveTopicsByName.get(qualifiedTopicName)))).map(opt -> opt.orElse(null)).filter(Objects::nonNull).toList();
    }

    private Map<String, InactiveTopic> groupByName(List<InactiveTopic> inactiveTopics) {
        return inactiveTopics.stream().collect(Collectors.toMap(InactiveTopic::qualifiedTopicName, v -> v, (v1, v2) -> v1));
    }

    private List<InactiveTopicWithOwner> enrichWithOwner(List<InactiveTopic> inactiveTopics, List<Topic> topics) {
        HashMap ownerByTopicName = new HashMap();
        topics.forEach(topic -> ownerByTopicName.put(topic.getQualifiedName(), topic.getOwner()));
        return inactiveTopics.stream().map(inactiveTopic -> new InactiveTopicWithOwner((InactiveTopic)inactiveTopic, (OwnerId)ownerByTopicName.get(inactiveTopic.qualifiedTopicName()))).toList();
    }

    private List<InactiveTopic> notify(List<InactiveTopicWithOwner> inactiveTopics) {
        if (inactiveTopics.isEmpty()) {
            logger.info("No inactive topics to notify");
            return List.of();
        }
        if (this.notifier.isPresent()) {
            logger.info("Notifying {} inactive topics", (Object)inactiveTopics.size());
            NotificationResult result = this.notifier.get().notify(inactiveTopics);
            Instant now = this.clock.instant();
            return inactiveTopics.stream().map(InactiveTopicWithOwner::topic).map(topic -> result.isSuccess(topic.qualifiedTopicName()) ? topic.notificationSent(now) : topic).toList();
        }
        logger.info("Skipping notification of {} inactive topics", (Object)inactiveTopics.size());
        return inactiveTopics.stream().map(InactiveTopicWithOwner::topic).toList();
    }

    private List<InactiveTopic> limitHistory(List<InactiveTopic> inactiveTopics) {
        return inactiveTopics.stream().map(topic -> topic.limitNotificationsHistory(this.properties.notificationsHistoryLimit())).toList();
    }

    private void measureInactiveTopics(List<InactiveTopic> processedTopics) {
        processedTopics.stream().collect(Collectors.groupingBy(topic -> topic.notificationTimestampsMs().size(), Collectors.counting())).forEach((notificationsCount, topicsCount) -> {
            Tags tags = Tags.of((String)"notifications", (String)notificationsCount.toString());
            this.meterRegistry.gauge("inactive-topics", (Iterable)tags, (Number)topicsCount);
        });
    }
}

