/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.management.infrastructure.kafka.service;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.ConsumerGroup;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.kafka.KafkaNamesMapper;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset;
import pl.allegro.tech.hermes.management.domain.message.RetransmissionService;
import pl.allegro.tech.hermes.management.domain.subscription.ConsumerGroupManager;
import pl.allegro.tech.hermes.management.domain.topic.BrokerTopicManagement;
import pl.allegro.tech.hermes.management.domain.topic.SingleMessageReader;
import pl.allegro.tech.hermes.management.infrastructure.kafka.MovingSubscriptionOffsetsValidationException;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.ConsumerGroupsDescriber;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaConsumerManager;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.LogEndOffsetChecker;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.OffsetsAvailableChecker;

public class BrokersClusterService {
    private static final Logger logger = LoggerFactory.getLogger(BrokersClusterService.class);
    private final String clusterName;
    private final SingleMessageReader singleMessageReader;
    private final RetransmissionService retransmissionService;
    private final BrokerTopicManagement brokerTopicManagement;
    private final KafkaNamesMapper kafkaNamesMapper;
    private final OffsetsAvailableChecker offsetsAvailableChecker;
    private final ConsumerGroupsDescriber consumerGroupsDescriber;
    private final AdminClient adminClient;
    private final ConsumerGroupManager consumerGroupManager;
    private final KafkaConsumerManager kafkaConsumerManager;

    public BrokersClusterService(String clusterName, SingleMessageReader singleMessageReader, RetransmissionService retransmissionService, BrokerTopicManagement brokerTopicManagement, KafkaNamesMapper kafkaNamesMapper, OffsetsAvailableChecker offsetsAvailableChecker, LogEndOffsetChecker logEndOffsetChecker, AdminClient adminClient, ConsumerGroupManager consumerGroupManager, KafkaConsumerManager kafkaConsumerManager) {
        this.clusterName = clusterName;
        this.singleMessageReader = singleMessageReader;
        this.retransmissionService = retransmissionService;
        this.brokerTopicManagement = brokerTopicManagement;
        this.kafkaNamesMapper = kafkaNamesMapper;
        this.offsetsAvailableChecker = offsetsAvailableChecker;
        this.consumerGroupsDescriber = new ConsumerGroupsDescriber(kafkaNamesMapper, adminClient, logEndOffsetChecker, clusterName);
        this.adminClient = adminClient;
        this.consumerGroupManager = consumerGroupManager;
        this.kafkaConsumerManager = kafkaConsumerManager;
    }

    public String getClusterName() {
        return this.clusterName;
    }

    public void manageTopic(Consumer<BrokerTopicManagement> manageFunction) {
        manageFunction.accept(this.brokerTopicManagement);
    }

    public String readMessageFromPrimary(Topic topic, Integer partition, Long offset) {
        return this.singleMessageReader.readMessageAsJson(topic, this.kafkaNamesMapper.toKafkaTopics(topic).getPrimary(), partition, offset);
    }

    public List<PartitionOffset> indicateOffsetChange(Topic topic, String subscriptionName, Long timestamp, boolean dryRun) {
        return this.retransmissionService.indicateOffsetChange(topic, subscriptionName, this.clusterName, timestamp, dryRun);
    }

    public boolean areOffsetsAvailableOnAllKafkaTopics(Topic topic) {
        return this.kafkaNamesMapper.toKafkaTopics(topic).allMatch(this.offsetsAvailableChecker::areOffsetsAvailable);
    }

    public boolean topicExists(Topic topic) {
        return this.brokerTopicManagement.topicExists(topic);
    }

    public List<String> listTopicsFromCluster() {
        try {
            return new ArrayList<String>((Collection)this.adminClient.listTopics().names().get());
        }
        catch (InterruptedException | ExecutionException e) {
            logger.error("Failed to list topics names", (Throwable)e);
            return Collections.emptyList();
        }
    }

    public void removeTopicByName(String topicName) {
        this.adminClient.deleteTopics(Collections.singletonList(topicName));
    }

    public boolean areOffsetsMoved(Topic topic, String subscriptionName) {
        return this.retransmissionService.areOffsetsMoved(topic, subscriptionName, this.clusterName);
    }

    public boolean allSubscriptionsHaveConsumersAssigned(Topic topic, List<Subscription> subscriptions) {
        List<String> consumerGroupsForSubscriptions = subscriptions.stream().map(sub -> this.kafkaNamesMapper.toConsumerGroupId(sub.getQualifiedName()).asString()).collect(Collectors.toList());
        try {
            int requiredTotalNumberOfAssignments = this.numberOfPartitionsForTopic(topic) * subscriptions.size();
            return this.numberOfAssignmentsForConsumersGroups(consumerGroupsForSubscriptions) == requiredTotalNumberOfAssignments;
        }
        catch (Exception e) {
            logger.error("Failed to check assignments for topic " + topic.getQualifiedName() + " subscriptions", (Throwable)e);
            return false;
        }
    }

    public void createConsumerGroup(Topic topic, Subscription subscription) {
        this.consumerGroupManager.createConsumerGroup(topic, subscription);
    }

    public Optional<ConsumerGroup> describeConsumerGroup(Topic topic, String subscriptionName) {
        return this.consumerGroupsDescriber.describeConsumerGroup(topic, subscriptionName);
    }

    public void moveOffsetsToTheEnd(Topic topic, SubscriptionName subscription) {
        this.validateIfOffsetsCanBeMoved(topic, subscription);
        KafkaConsumer<byte[], byte[]> consumer = this.kafkaConsumerManager.createConsumer(subscription);
        String kafkaTopicName = this.kafkaNamesMapper.toKafkaTopics(topic).getPrimary().name().asString();
        Set<TopicPartition> topicPartitions = this.getTopicPartitions(consumer, kafkaTopicName);
        consumer.assign(topicPartitions);
        Map endOffsets = consumer.endOffsets(topicPartitions);
        Map<TopicPartition, OffsetAndMetadata> endOffsetsMetadata = this.buildOffsetsMetadata(endOffsets);
        consumer.commitSync(endOffsetsMetadata);
        consumer.close();
        logger.info("Successfully moved offset to the end position for subscription {} and consumer group {}", (Object)subscription.getQualifiedName(), (Object)this.kafkaNamesMapper.toConsumerGroupId(subscription));
    }

    private int numberOfAssignmentsForConsumersGroups(List<String> consumerGroupsIds) throws ExecutionException, InterruptedException {
        Collection consumerGroupsDescriptions = ((Map)this.adminClient.describeConsumerGroups(consumerGroupsIds).all().get()).values();
        Stream memberDescriptions = consumerGroupsDescriptions.stream().flatMap(desc -> desc.members().stream());
        return memberDescriptions.flatMap(memberDescription -> memberDescription.assignment().topicPartitions().stream()).collect(Collectors.toList()).size();
    }

    private void validateIfOffsetsCanBeMoved(Topic topic, SubscriptionName subscription) {
        this.describeConsumerGroup(topic, subscription.getName()).ifPresentOrElse(group -> {
            if (!group.getMembers().isEmpty()) {
                String s = String.format("Consumer group %s for subscription %s has still active members.", group.getGroupId(), subscription.getQualifiedName());
                throw new MovingSubscriptionOffsetsValidationException(s);
            }
        }, () -> {
            String s = String.format("No consumer group for subscription %s exists.", subscription.getQualifiedName());
            throw new MovingSubscriptionOffsetsValidationException(s);
        });
    }

    private int numberOfPartitionsForTopic(Topic topic) throws ExecutionException, InterruptedException {
        List kafkaTopicsNames = this.kafkaNamesMapper.toKafkaTopics(topic).stream().map(kafkaTopic -> kafkaTopic.name().asString()).collect(Collectors.toList());
        return ((Map)this.adminClient.describeTopics(kafkaTopicsNames).all().get()).values().stream().map(v -> v.partitions().size()).reduce(0, Integer::sum);
    }

    private Set<TopicPartition> getTopicPartitions(KafkaConsumer<byte[], byte[]> consumer, String kafkaTopicName) {
        return consumer.partitionsFor(kafkaTopicName).stream().map(info -> new TopicPartition(info.topic(), info.partition())).collect(Collectors.toSet());
    }

    private Map<TopicPartition, OffsetAndMetadata> buildOffsetsMetadata(Map<TopicPartition, Long> offsets) {
        return offsets.entrySet().stream().map(entry -> ImmutablePair.of((Object)((TopicPartition)entry.getKey()), (Object)new OffsetAndMetadata(((Long)entry.getValue()).longValue()))).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
    }
}

