/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.consumer.receiver.kafka;

import java.time.Clock;
import java.util.Properties;
import javax.inject.Inject;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.config.ConfigFactory;
import pl.allegro.tech.hermes.common.config.Configs;
import pl.allegro.tech.hermes.common.kafka.ConsumerGroupId;
import pl.allegro.tech.hermes.common.kafka.KafkaNamesMapper;
import pl.allegro.tech.hermes.common.message.wrapper.MessageContentWrapper;
import pl.allegro.tech.hermes.common.metric.HermesMetrics;
import pl.allegro.tech.hermes.consumers.consumer.receiver.MessageReceiver;
import pl.allegro.tech.hermes.consumers.consumer.receiver.ReceiverFactory;
import pl.allegro.tech.hermes.consumers.consumer.receiver.kafka.KafkaMessageReceiver;
import pl.allegro.tech.hermes.domain.topic.schema.SchemaRepository;

public class KafkaMessageReceiverFactory
implements ReceiverFactory {
    private final ConfigFactory configFactory;
    private final MessageContentWrapper messageContentWrapper;
    private final HermesMetrics hermesMetrics;
    private final Clock clock;
    private final KafkaNamesMapper kafkaNamesMapper;
    private final SchemaRepository schemaRepository;

    @Inject
    public KafkaMessageReceiverFactory(ConfigFactory configFactory, MessageContentWrapper messageContentWrapper, HermesMetrics hermesMetrics, Clock clock, KafkaNamesMapper kafkaNamesMapper, SchemaRepository schemaRepository) {
        this.configFactory = configFactory;
        this.messageContentWrapper = messageContentWrapper;
        this.hermesMetrics = hermesMetrics;
        this.clock = clock;
        this.kafkaNamesMapper = kafkaNamesMapper;
        this.schemaRepository = schemaRepository;
    }

    @Override
    public MessageReceiver createMessageReceiver(Topic receivingTopic, Subscription subscription) {
        return this.create(receivingTopic, this.createConsumerConfig(this.kafkaNamesMapper.toConsumerGroupId(subscription.toSubscriptionName())), subscription);
    }

    MessageReceiver create(Topic receivingTopic, ConsumerConfig consumerConfig, Subscription subscription) {
        return new KafkaMessageReceiver(receivingTopic, Consumer.createJavaConsumerConnector((ConsumerConfig)consumerConfig), this.messageContentWrapper, this.hermesMetrics.timer("read-latency"), this.clock, this.kafkaNamesMapper, this.configFactory.getIntProperty(Configs.KAFKA_STREAM_COUNT), this.configFactory.getIntProperty(Configs.KAFKA_CONSUMER_TIMEOUT_MS), subscription.toSubscriptionName(), this.schemaRepository);
    }

    private ConsumerConfig createConsumerConfig(ConsumerGroupId groupId) {
        Properties props = new Properties();
        props.put("group.id", groupId.asString());
        props.put("zookeeper.connect", this.configFactory.getStringProperty(Configs.KAFKA_ZOOKEEPER_CONNECT_STRING));
        props.put("zookeeper.connection.timeout.ms", this.configFactory.getIntPropertyAsString(Configs.ZOOKEEPER_CONNECTION_TIMEOUT));
        props.put("zookeeper.session.timeout.ms", this.configFactory.getIntPropertyAsString(Configs.ZOOKEEPER_SESSION_TIMEOUT));
        props.put("auto.commit.enable", "false");
        props.put("fetch.wait.max.ms", "10000");
        props.put("consumer.timeout.ms", this.configFactory.getIntPropertyAsString(Configs.KAFKA_CONSUMER_TIMEOUT_MS));
        props.put("auto.offset.reset", this.configFactory.getStringProperty(Configs.KAFKA_CONSUMER_AUTO_OFFSET_RESET));
        props.put("offsets.storage", this.configFactory.getStringProperty(Configs.KAFKA_CONSUMER_OFFSETS_STORAGE));
        props.put("dual.commit.enabled", Boolean.toString(this.configFactory.getBooleanProperty(Configs.KAFKA_CONSUMER_DUAL_COMMIT_ENABLED)));
        props.put("rebalance.max.retries", this.configFactory.getIntPropertyAsString(Configs.KAFKA_CONSUMER_REBALANCE_MAX_RETRIES));
        props.put("rebalance.backoff.ms", this.configFactory.getIntPropertyAsString(Configs.KAFKA_CONSUMER_REBALANCE_BACKOFF));
        return new ConsumerConfig(props);
    }
}

