/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.management.infrastructure.kafka.service;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.kafka.KafkaNamesMapper;
import pl.allegro.tech.hermes.common.kafka.KafkaTopic;
import pl.allegro.tech.hermes.common.kafka.KafkaTopics;
import pl.allegro.tech.hermes.management.config.TopicProperties;
import pl.allegro.tech.hermes.management.domain.topic.BrokerTopicManagement;
import pl.allegro.tech.hermes.management.infrastructure.kafka.BrokersClusterCommunicationException;

public class KafkaBrokerTopicManagement
implements BrokerTopicManagement {
    private final TopicProperties topicProperties;
    private final AdminClient kafkaAdminClient;
    private final KafkaNamesMapper kafkaNamesMapper;
    private final String datacenterName;
    private static final Logger logger = LoggerFactory.getLogger(KafkaBrokerTopicManagement.class);

    public KafkaBrokerTopicManagement(TopicProperties topicProperties, AdminClient kafkaAdminClient, KafkaNamesMapper kafkaNamesMapper, String datacenterName) {
        this.topicProperties = topicProperties;
        this.kafkaAdminClient = kafkaAdminClient;
        this.kafkaNamesMapper = kafkaNamesMapper;
        this.datacenterName = datacenterName;
    }

    @Override
    public void createTopic(Topic topic) {
        Map<String, String> config = this.createTopicConfig(topic.getRetentionTime().getDurationInMillis(), this.topicProperties);
        this.kafkaNamesMapper.toKafkaTopics(topic).stream().map(k -> this.kafkaAdminClient.createTopics(Collections.singletonList(new NewTopic(k.name().asString(), this.getPartitionsForDatacenter(this.datacenterName), (short)this.topicProperties.getReplicationFactor()).configs(config)))).map(CreateTopicsResult::all).forEach(this::waitForKafkaFuture);
    }

    @Override
    public void removeTopic(Topic topic) {
        this.kafkaNamesMapper.toKafkaTopics(topic).stream().map(k -> this.kafkaAdminClient.deleteTopics(Collections.singletonList(k.name().asString()))).map(DeleteTopicsResult::all).forEach(future -> {
            logger.info("Removing topic: {} from Kafka dc: {}", (Object)topic, (Object)this.datacenterName);
            long start = System.currentTimeMillis();
            this.waitForKafkaFuture((KafkaFuture)future);
            logger.info("Removed topic: {} from Kafka dc: {} in {} ms", new Object[]{topic, this.datacenterName, System.currentTimeMillis() - start});
        });
    }

    @Override
    public void updateTopic(Topic topic) {
        Map<String, String> config = this.createTopicConfig(topic.getRetentionTime().getDurationInMillis(), this.topicProperties);
        KafkaTopics kafkaTopics = this.kafkaNamesMapper.toKafkaTopics(topic);
        if (this.isMigrationToNewKafkaTopic(kafkaTopics)) {
            KafkaFuture createTopicsFuture = this.kafkaAdminClient.createTopics(Collections.singletonList(new NewTopic(kafkaTopics.getPrimary().name().asString(), this.getPartitionsForDatacenter(this.datacenterName), (short)this.topicProperties.getReplicationFactor()).configs(config))).all();
            this.waitForKafkaFuture(createTopicsFuture);
        } else {
            this.doUpdateTopic(kafkaTopics.getPrimary(), config);
        }
        kafkaTopics.getSecondary().ifPresent(secondary -> this.doUpdateTopic((KafkaTopic)secondary, config));
    }

    @Override
    public boolean topicExists(Topic topic) {
        return this.kafkaNamesMapper.toKafkaTopics(topic).allMatch(this::doesTopicExist);
    }

    private boolean isMigrationToNewKafkaTopic(KafkaTopics kafkaTopics) {
        return kafkaTopics.getSecondary().isPresent() && !this.doesTopicExist(kafkaTopics.getPrimary());
    }

    private boolean doesTopicExist(KafkaTopic topic) {
        KafkaFuture topicExistsFuture = this.kafkaAdminClient.listTopics().names().thenApply(names -> names.contains(topic.name().asString()));
        return (Boolean)this.waitForKafkaFuture(topicExistsFuture);
    }

    private void doUpdateTopic(KafkaTopic topic, Map<String, String> configMap) {
        ConfigResource topicConfigResource = new ConfigResource(ConfigResource.Type.TOPIC, topic.name().asString());
        Collection configEntries = configMap.entrySet().stream().map(entry -> new ConfigEntry((String)entry.getKey(), (String)entry.getValue())).collect(Collectors.toList());
        HashMap<ConfigResource, Config> configUpdates = new HashMap<ConfigResource, Config>();
        configUpdates.put(topicConfigResource, new Config(configEntries));
        KafkaFuture updateTopicFuture = this.kafkaAdminClient.alterConfigs(configUpdates).all();
        this.waitForKafkaFuture(updateTopicFuture);
    }

    private Map<String, String> createTopicConfig(long retentionPolicy, TopicProperties topicProperties) {
        HashMap<String, String> props = new HashMap<String, String>();
        props.put("retention.ms", String.valueOf(retentionPolicy));
        props.put("unclean.leader.election.enable", Boolean.toString(topicProperties.isUncleanLeaderElectionEnabled()));
        props.put("max.message.bytes", String.valueOf(topicProperties.getMaxMessageSize()));
        return props;
    }

    private <T> T waitForKafkaFuture(KafkaFuture<T> future) {
        try {
            return (T)future.get();
        }
        catch (InterruptedException | ExecutionException e) {
            throw new BrokersClusterCommunicationException(e);
        }
    }

    private int getPartitionsForDatacenter(String datacenterName) {
        return this.topicProperties.getPartitionsPerDc().getOrDefault(datacenterName, this.topicProperties.getPartitions());
    }
}

