/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.management.infrastructure.kafka.service;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.MemberDescription;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.ConsumerGroup;
import pl.allegro.tech.hermes.api.ConsumerGroupMember;
import pl.allegro.tech.hermes.api.ContentType;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.api.TopicPartition;
import pl.allegro.tech.hermes.common.kafka.ConsumerGroupId;
import pl.allegro.tech.hermes.common.kafka.KafkaNamesMapper;
import pl.allegro.tech.hermes.common.kafka.KafkaTopic;
import pl.allegro.tech.hermes.common.kafka.KafkaTopicName;
import pl.allegro.tech.hermes.common.kafka.KafkaTopics;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.LogEndOffsetChecker;

class ConsumerGroupsDescriber {
    private static final Logger logger = LoggerFactory.getLogger(ConsumerGroupsDescriber.class);
    private final KafkaNamesMapper kafkaNamesMapper;
    private final AdminClient adminClient;
    private final LogEndOffsetChecker logEndOffsetChecker;
    private final String clusterName;

    ConsumerGroupsDescriber(KafkaNamesMapper kafkaNamesMapper, AdminClient adminClient, LogEndOffsetChecker logEndOffsetChecker, String clusterName) {
        this.kafkaNamesMapper = kafkaNamesMapper;
        this.adminClient = adminClient;
        this.logEndOffsetChecker = logEndOffsetChecker;
        this.clusterName = clusterName;
    }

    Optional<ConsumerGroup> describeConsumerGroup(Topic topic, String subscriptionName) {
        ConsumerGroupId consumerGroupId = this.kafkaNamesMapper.toConsumerGroupId(new SubscriptionName(subscriptionName, topic.getName()));
        KafkaTopics kafkaTopics = this.kafkaNamesMapper.toKafkaTopics(topic);
        try {
            return this.describeConsumerGroup(consumerGroupId, kafkaTopics);
        }
        catch (Exception e) {
            logger.error("Failed to describe group with id: {}", (Object)consumerGroupId.asString(), (Object)e);
            return Optional.empty();
        }
    }

    private Optional<ConsumerGroup> describeConsumerGroup(ConsumerGroupId consumerGroupId, KafkaTopics kafkaTopics) throws ExecutionException, InterruptedException {
        Map<KafkaTopicName, ContentType> kafkaTopicContentTypes = kafkaTopics.stream().collect(Collectors.toMap(KafkaTopic::name, KafkaTopic::contentType));
        Map topicPartitionOffsets = (Map)this.adminClient.listConsumerGroupOffsets(consumerGroupId.asString()).partitionsToOffsetAndMetadata().get();
        Optional description = ((Map)this.adminClient.describeConsumerGroups(Collections.singletonList(consumerGroupId.asString())).all().get()).values().stream().findFirst();
        return description.map(d -> this.getKafkaConsumerGroup(topicPartitionOffsets, kafkaTopicContentTypes, (ConsumerGroupDescription)d));
    }

    private ConsumerGroup getKafkaConsumerGroup(Map<org.apache.kafka.common.TopicPartition, OffsetAndMetadata> topicPartitionOffsets, Map<KafkaTopicName, ContentType> kafkaTopicContentTypes, ConsumerGroupDescription description) {
        Set groupMembers = description.members().stream().map(member -> this.getKafkaConsumerGroupMember(topicPartitionOffsets, kafkaTopicContentTypes, (MemberDescription)member)).collect(Collectors.toSet());
        return new ConsumerGroup(this.clusterName, description.groupId(), description.state().toString(), groupMembers);
    }

    private ConsumerGroupMember getKafkaConsumerGroupMember(Map<org.apache.kafka.common.TopicPartition, OffsetAndMetadata> topicPartitionOffsets, Map<KafkaTopicName, ContentType> kafkaTopicContentTypes, MemberDescription member) {
        Set kafkaTopicPartitions = member.assignment().topicPartitions().stream().map(topicPartition -> {
            Optional offset = Optional.ofNullable(topicPartitionOffsets.get(topicPartition));
            return new TopicPartition(topicPartition.partition(), topicPartition.topic(), offset.map(OffsetAndMetadata::offset).orElse(0L).longValue(), this.logEndOffsetChecker.check((org.apache.kafka.common.TopicPartition)topicPartition), offset.map(OffsetAndMetadata::metadata).orElse(""), (ContentType)kafkaTopicContentTypes.get(KafkaTopicName.valueOf((String)topicPartition.topic())));
        }).collect(Collectors.toSet());
        return new ConsumerGroupMember(member.consumerId(), member.clientId(), ConsumerGroupsDescriber.toHostName(member.host()), kafkaTopicPartitions);
    }

    private static String toHostName(String inetAddressStringRepresentation) {
        String[] parts = inetAddressStringRepresentation.split("/");
        String ip = parts[parts.length - 1];
        try {
            InetAddress addr = InetAddress.getByName(ip);
            return addr.getHostName();
        }
        catch (UnknownHostException e) {
            return inetAddressStringRepresentation;
        }
    }
}

