/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.management.config.kafka;

import java.time.Clock;
import java.time.Duration;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import pl.allegro.tech.hermes.common.broker.BrokerStorage;
import pl.allegro.tech.hermes.common.broker.KafkaBrokerStorage;
import pl.allegro.tech.hermes.common.kafka.KafkaConsumerPool;
import pl.allegro.tech.hermes.common.kafka.KafkaConsumerPoolConfig;
import pl.allegro.tech.hermes.common.kafka.KafkaNamesMapper;
import pl.allegro.tech.hermes.common.kafka.offset.SubscriptionOffsetChangeIndicator;
import pl.allegro.tech.hermes.management.config.TopicProperties;
import pl.allegro.tech.hermes.management.config.kafka.KafkaClustersProperties;
import pl.allegro.tech.hermes.management.config.kafka.KafkaNamesMappers;
import pl.allegro.tech.hermes.management.config.kafka.KafkaProperties;
import pl.allegro.tech.hermes.management.config.kafka.MultipleDcKafkaNamesMappersFactory;
import pl.allegro.tech.hermes.management.config.subscription.SubscriptionProperties;
import pl.allegro.tech.hermes.management.domain.dc.DatacenterBoundRepositoryHolder;
import pl.allegro.tech.hermes.management.domain.dc.MultiDatacenterRepositoryCommandExecutor;
import pl.allegro.tech.hermes.management.domain.subscription.consumergroup.ConsumerGroupManager;
import pl.allegro.tech.hermes.management.infrastructure.kafka.MultiDCAwareService;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.BrokersClusterService;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaBrokerTopicManagement;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaConsumerGroupManager;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaRawMessageReader;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaSingleMessageReader;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.LogEndOffsetChecker;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.NoOpConsumerGroupManager;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.OffsetsAvailableChecker;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.retransmit.KafkaRetransmissionService;
import pl.allegro.tech.hermes.management.infrastructure.zookeeper.ZookeeperRepositoryManager;
import pl.allegro.tech.hermes.schema.SchemaRepository;
import tech.allegro.schema.json2avro.converter.AvroJsonConverter;

@Configuration
@EnableConfigurationProperties(value={KafkaClustersProperties.class})
public class KafkaConfiguration
implements MultipleDcKafkaNamesMappersFactory {
    @Autowired
    KafkaClustersProperties kafkaClustersProperties;
    @Autowired
    TopicProperties topicProperties;
    @Autowired
    SubscriptionProperties subscriptionProperties;
    @Autowired
    ZookeeperRepositoryManager zookeeperRepositoryManager;
    @Autowired
    MultiDatacenterRepositoryCommandExecutor multiDcExecutor;

    @Bean
    MultiDCAwareService multiDCAwareService(KafkaNamesMappers kafkaNamesMappers, SchemaRepository schemaRepository, Clock clock, AvroJsonConverter avroJsonConverter) {
        List<DatacenterBoundRepositoryHolder<SubscriptionOffsetChangeIndicator>> repositories = this.zookeeperRepositoryManager.getRepositories(SubscriptionOffsetChangeIndicator.class);
        List<BrokersClusterService> clusters = this.kafkaClustersProperties.getClusters().stream().map(kafkaProperties -> {
            KafkaNamesMapper kafkaNamesMapper = kafkaNamesMappers.getMapper(kafkaProperties.getQualifiedClusterName());
            AdminClient brokerAdminClient = this.brokerAdminClient((KafkaProperties)kafkaProperties);
            BrokerStorage storage = this.brokersStorage(brokerAdminClient);
            KafkaBrokerTopicManagement brokerTopicManagement = new KafkaBrokerTopicManagement(this.topicProperties, brokerAdminClient, kafkaNamesMapper, kafkaProperties.getDatacenter());
            KafkaConsumerPool consumerPool = this.kafkaConsumersPool((KafkaProperties)kafkaProperties, storage, kafkaProperties.getBrokerList());
            KafkaRawMessageReader kafkaRawMessageReader = new KafkaRawMessageReader(consumerPool, kafkaProperties.getKafkaConsumer().getPollTimeoutMillis());
            SubscriptionOffsetChangeIndicator subscriptionOffsetChangeIndicator = this.getRepository(repositories, (KafkaProperties)kafkaProperties);
            KafkaRetransmissionService retransmissionService = new KafkaRetransmissionService(storage, subscriptionOffsetChangeIndicator, consumerPool, kafkaNamesMapper);
            KafkaSingleMessageReader messageReader = new KafkaSingleMessageReader(kafkaRawMessageReader, schemaRepository, avroJsonConverter);
            return new BrokersClusterService(kafkaProperties.getDatacenter(), kafkaProperties.getQualifiedClusterName(), messageReader, retransmissionService, brokerTopicManagement, kafkaNamesMapper, new OffsetsAvailableChecker(consumerPool, storage), new LogEndOffsetChecker(consumerPool), brokerAdminClient, this.createConsumerGroupManager((KafkaProperties)kafkaProperties, kafkaNamesMapper, brokerAdminClient));
        }).collect(Collectors.toList());
        return new MultiDCAwareService(clusters, clock, Duration.ofMillis(this.subscriptionProperties.getIntervalBetweenCheckinIfOffsetsMovedInMillis()), Duration.ofSeconds(this.subscriptionProperties.getOffsetsMovedTimeoutInSeconds()), this.multiDcExecutor);
    }

    private ConsumerGroupManager createConsumerGroupManager(KafkaProperties kafkaProperties, KafkaNamesMapper kafkaNamesMapper, AdminClient kafkaAdminClient) {
        return this.subscriptionProperties.isCreateConsumerGroupManuallyEnabled() ? new KafkaConsumerGroupManager(kafkaNamesMapper, kafkaProperties.getQualifiedClusterName(), kafkaProperties.getBrokerList(), kafkaProperties, kafkaAdminClient) : new NoOpConsumerGroupManager();
    }

    private SubscriptionOffsetChangeIndicator getRepository(List<DatacenterBoundRepositoryHolder<SubscriptionOffsetChangeIndicator>> repositories, KafkaProperties kafkaProperties) {
        if (repositories.size() == 1) {
            return repositories.get(0).getRepository();
        }
        return (SubscriptionOffsetChangeIndicator)repositories.stream().filter(repository -> kafkaProperties.getDatacenter().equals(repository.getDatacenterName())).findFirst().orElseThrow(() -> new IllegalArgumentException(String.format("Kafka cluster dc name '%s' not matched with Zookeeper dc names: %s", kafkaProperties.getDatacenter(), repositories.stream().map(DatacenterBoundRepositoryHolder::getDatacenterName).collect(Collectors.joining(","))))).getRepository();
    }

    @Bean
    @ConditionalOnMissingBean
    KafkaNamesMappers kafkaNameMappers() {
        return this.createDefaultKafkaNamesMapper(this.kafkaClustersProperties);
    }

    private BrokerStorage brokersStorage(AdminClient kafkaAdminClient) {
        return new KafkaBrokerStorage(kafkaAdminClient);
    }

    private KafkaConsumerPool kafkaConsumersPool(KafkaProperties kafkaProperties, BrokerStorage brokerStorage, String configuredBootstrapServers) {
        KafkaConsumerPoolConfig config = new KafkaConsumerPoolConfig(kafkaProperties.getKafkaConsumer().getCacheExpirationSeconds(), kafkaProperties.getKafkaConsumer().getBufferSizeBytes(), kafkaProperties.getKafkaConsumer().getFetchMaxWaitMillis(), kafkaProperties.getKafkaConsumer().getFetchMinBytes(), kafkaProperties.getKafkaConsumer().getNamePrefix(), kafkaProperties.getKafkaConsumer().getConsumerGroupName(), kafkaProperties.getAuthentication().isEnabled(), kafkaProperties.getAuthentication().getMechanism(), kafkaProperties.getAuthentication().getProtocol(), kafkaProperties.getAuthentication().getJaasConfig());
        return new KafkaConsumerPool(config, brokerStorage, configuredBootstrapServers);
    }

    private AdminClient brokerAdminClient(KafkaProperties kafkaProperties) {
        Properties props = new Properties();
        props.put("bootstrap.servers", kafkaProperties.getBrokerList());
        props.put("security.protocol", "PLAINTEXT");
        props.put("request.timeout.ms", (Object)kafkaProperties.getKafkaServerRequestTimeoutMillis());
        if (kafkaProperties.getAuthentication().isEnabled()) {
            props.put("sasl.mechanism", kafkaProperties.getAuthentication().getMechanism());
            props.put("security.protocol", kafkaProperties.getAuthentication().getProtocol());
            props.put("sasl.jaas.config", kafkaProperties.getAuthentication().getJaasConfig());
        }
        return AdminClient.create((Properties)props);
    }
}

