/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.management.config.kafka;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import javax.annotation.PreDestroy;
import kafka.utils.ZKStringSerializer$;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.avro.Schema;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import pl.allegro.tech.hermes.common.admin.AdminTool;
import pl.allegro.tech.hermes.common.broker.BrokerStorage;
import pl.allegro.tech.hermes.common.broker.ZookeeperBrokerStorage;
import pl.allegro.tech.hermes.common.kafka.KafkaNamesMapper;
import pl.allegro.tech.hermes.common.kafka.SimpleConsumerPool;
import pl.allegro.tech.hermes.common.kafka.SimpleConsumerPoolConfig;
import pl.allegro.tech.hermes.common.kafka.offset.SubscriptionOffsetChangeIndicator;
import pl.allegro.tech.hermes.common.message.wrapper.MessageContentWrapper;
import pl.allegro.tech.hermes.domain.topic.schema.SchemaRepository;
import pl.allegro.tech.hermes.management.config.TopicProperties;
import pl.allegro.tech.hermes.management.config.kafka.KafkaClustersProperties;
import pl.allegro.tech.hermes.management.config.kafka.KafkaProperties;
import pl.allegro.tech.hermes.management.infrastructure.kafka.MultiDCAwareService;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.BrokersClusterService;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaBrokerTopicManagement;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaRawMessageReader;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaSingleMessageReader;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.OffsetsAvailableChecker;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.retransmit.KafkaRetransmissionService;

@Configuration
@EnableConfigurationProperties(value={KafkaClustersProperties.class})
public class KafkaConfiguration {
    @Autowired
    KafkaClustersProperties kafkaClustersProperties;
    @Autowired
    TopicProperties topicProperties;
    @Autowired
    ObjectMapper mapper;
    @Autowired
    MessageContentWrapper messageContentWrapper;
    @Autowired
    SubscriptionOffsetChangeIndicator subscriptionOffsetChangeIndicator;
    @Autowired
    AdminTool adminTool;
    @Autowired
    SchemaRepository<Schema> avroSchemaRepository;
    private final List<ZkClient> zkClients = new ArrayList<ZkClient>();
    private final List<CuratorFramework> curators = new ArrayList<CuratorFramework>();

    @Bean
    MultiDCAwareService multiDCAwareService() {
        List<BrokersClusterService> clusters = this.kafkaClustersProperties.getClusters().stream().map(kafkaProperties -> {
            KafkaNamesMapper kafkaNamesMapper = StringUtils.isEmpty((String)kafkaProperties.getNamespace()) ? new KafkaNamesMapper(this.kafkaClustersProperties.getDefaultNamespace()) : new KafkaNamesMapper(kafkaProperties.getNamespace());
            BrokerStorage storage = this.brokersStorage(this.curatorFramework((KafkaProperties)kafkaProperties));
            KafkaBrokerTopicManagement brokerTopicManagement = new KafkaBrokerTopicManagement(this.topicProperties, this.zkClient((KafkaProperties)kafkaProperties), kafkaNamesMapper);
            SimpleConsumerPool simpleConsumerPool = this.simpleConsumersPool((KafkaProperties)kafkaProperties, storage);
            KafkaRawMessageReader kafkaRawMessageReader = new KafkaRawMessageReader(simpleConsumerPool);
            KafkaRetransmissionService retransmissionService = new KafkaRetransmissionService(storage, kafkaRawMessageReader, this.messageContentWrapper, this.subscriptionOffsetChangeIndicator, simpleConsumerPool, kafkaNamesMapper);
            return new BrokersClusterService(kafkaProperties.getClusterName(), new KafkaSingleMessageReader(kafkaRawMessageReader, this.avroSchemaRepository), retransmissionService, brokerTopicManagement, kafkaNamesMapper, new OffsetsAvailableChecker(simpleConsumerPool, storage));
        }).collect(Collectors.toList());
        return new MultiDCAwareService(clusters, this.adminTool);
    }

    @PreDestroy
    public void shutdown() {
        this.curators.forEach(CuratorFramework::close);
        this.zkClients.forEach(ZkClient::close);
    }

    private ZkClient zkClient(KafkaProperties kafkaProperties) {
        ZkClient zkClient = new ZkClient(kafkaProperties.getConnectionString(), kafkaProperties.getSessionTimeout(), kafkaProperties.getConnectionTimeout(), (ZkSerializer)ZKStringSerializer$.MODULE$);
        zkClient.waitUntilConnected();
        this.zkClients.add(zkClient);
        return zkClient;
    }

    private CuratorFramework curatorFramework(KafkaProperties kafkaProperties) {
        CuratorFramework curator = CuratorFrameworkFactory.newClient((String)kafkaProperties.getConnectionString(), (RetryPolicy)new RetryNTimes(kafkaProperties.getRetryTimes(), kafkaProperties.getRetrySleep()));
        curator.start();
        this.curators.add(curator);
        return curator;
    }

    private BrokerStorage brokersStorage(CuratorFramework curatorFramework) {
        return new ZookeeperBrokerStorage(curatorFramework, this.mapper);
    }

    private SimpleConsumerPool simpleConsumersPool(KafkaProperties kafkaProperties, BrokerStorage brokerStorage) {
        SimpleConsumerPoolConfig config = new SimpleConsumerPoolConfig(kafkaProperties.getSimpleConsumer().getCacheExpiration(), kafkaProperties.getSimpleConsumer().getTimeout(), kafkaProperties.getSimpleConsumer().getBufferSize(), kafkaProperties.getSimpleConsumer().getNamePrefix());
        return new SimpleConsumerPool(config, brokerStorage);
    }
}

