/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.management.infrastructure.kafka.service;

import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.kafka.ConsumerGroupId;
import pl.allegro.tech.hermes.common.kafka.KafkaNamesMapper;
import pl.allegro.tech.hermes.management.config.kafka.KafkaProperties;
import pl.allegro.tech.hermes.management.domain.subscription.consumergroup.ConsumerGroupManager;
import pl.allegro.tech.hermes.management.infrastructure.kafka.ConsumerGroupDeletionException;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaConsumerManager;

public class KafkaConsumerGroupManager
implements ConsumerGroupManager {
    private final Logger logger = LoggerFactory.getLogger(KafkaConsumerGroupManager.class);
    private final KafkaNamesMapper kafkaNamesMapper;
    private final String qualifiedClusterName;
    private final KafkaConsumerManager consumerManager;
    private final AdminClient kafkaAdminClient;

    public KafkaConsumerGroupManager(KafkaNamesMapper kafkaNamesMapper, String qualifiedClusterName, String brokerList, KafkaProperties kafkaProperties, AdminClient kafkaAdminClient) {
        this.kafkaNamesMapper = kafkaNamesMapper;
        this.qualifiedClusterName = qualifiedClusterName;
        this.consumerManager = new KafkaConsumerManager(kafkaProperties, kafkaNamesMapper, brokerList);
        this.kafkaAdminClient = kafkaAdminClient;
    }

    @Override
    public void createConsumerGroup(Topic topic, Subscription subscription) {
        this.logger.info("Creating consumer group for subscription {}, cluster: {}", (Object)subscription.getQualifiedName(), (Object)this.qualifiedClusterName);
        KafkaConsumer<byte[], byte[]> kafkaConsumer = this.consumerManager.createConsumer(subscription.getQualifiedName());
        try {
            String kafkaTopicName = this.kafkaNamesMapper.toKafkaTopics(topic).getPrimary().name().asString();
            Set topicPartitions = kafkaConsumer.partitionsFor(kafkaTopicName).stream().map(info -> new TopicPartition(info.topic(), info.partition())).collect(Collectors.toSet());
            this.logger.info("Received partitions: {}, cluster: {}", topicPartitions, (Object)this.qualifiedClusterName);
            kafkaConsumer.assign(topicPartitions);
            Map<TopicPartition, OffsetAndMetadata> topicPartitionByOffset = topicPartitions.stream().map(topicPartition -> {
                long offset = kafkaConsumer.position(topicPartition);
                return ImmutablePair.of((Object)topicPartition, (Object)new OffsetAndMetadata(offset));
            }).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
            kafkaConsumer.commitSync(topicPartitionByOffset);
            kafkaConsumer.close();
            this.logger.info("Successfully created consumer group for subscription {}, cluster: {}", (Object)subscription.getQualifiedName(), (Object)this.qualifiedClusterName);
        }
        catch (Exception e) {
            this.logger.error("Failed to create consumer group for subscription {}, cluster: {}", new Object[]{subscription.getQualifiedName(), this.qualifiedClusterName, e});
        }
    }

    @Override
    public void deleteConsumerGroup(SubscriptionName subscriptionName) throws ConsumerGroupDeletionException {
        this.logger.info("Deleting consumer group for subscription {}, cluster: {}", (Object)subscriptionName, (Object)this.qualifiedClusterName);
        try {
            ConsumerGroupId groupId = this.kafkaNamesMapper.toConsumerGroupId(subscriptionName);
            this.kafkaAdminClient.deleteConsumerGroups(Collections.singletonList(groupId.asString())).all().get();
            this.logger.info("Successfully deleted consumer group for subscription {}, cluster: {}", (Object)subscriptionName, (Object)this.qualifiedClusterName);
        }
        catch (InterruptedException | ExecutionException e) {
            if (e.getCause() instanceof GroupIdNotFoundException) {
                this.logger.info("Consumer group for subscription {} not found, cluster: {}", (Object)subscriptionName, (Object)this.qualifiedClusterName);
                return;
            }
            this.logger.error("Failed to delete consumer group for subscription {}, cluster: {}", new Object[]{subscriptionName, this.qualifiedClusterName, e});
            throw new ConsumerGroupDeletionException(subscriptionName, (Throwable)e);
        }
    }
}

