/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.consumer.receiver.kafka;

import java.time.Clock;
import java.util.Properties;
import javax.inject.Inject;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.config.ConfigFactory;
import pl.allegro.tech.hermes.common.config.Configs;
import pl.allegro.tech.hermes.common.kafka.ConsumerGroupId;
import pl.allegro.tech.hermes.common.kafka.KafkaNamesMapper;
import pl.allegro.tech.hermes.common.metric.HermesMetrics;
import pl.allegro.tech.hermes.consumers.consumer.filtering.FilteredMessageHandler;
import pl.allegro.tech.hermes.consumers.consumer.filtering.chain.FilterChainFactory;
import pl.allegro.tech.hermes.consumers.consumer.idleTime.ExponentiallyGrowingIdleTimeCalculator;
import pl.allegro.tech.hermes.consumers.consumer.offset.ConsumerPartitionAssignmentState;
import pl.allegro.tech.hermes.consumers.consumer.offset.OffsetQueue;
import pl.allegro.tech.hermes.consumers.consumer.rate.ConsumerRateLimiter;
import pl.allegro.tech.hermes.consumers.consumer.receiver.MessageReceiver;
import pl.allegro.tech.hermes.consumers.consumer.receiver.ReceiverFactory;
import pl.allegro.tech.hermes.consumers.consumer.receiver.ThrottlingMessageReceiver;
import pl.allegro.tech.hermes.consumers.consumer.receiver.kafka.FilteringMessageReceiver;
import pl.allegro.tech.hermes.consumers.consumer.receiver.kafka.KafkaSingleThreadedMessageReceiver;
import pl.allegro.tech.hermes.consumers.consumer.receiver.kafka.MessageContentReaderFactory;
import pl.allegro.tech.hermes.tracker.consumers.Trackers;

public class KafkaMessageReceiverFactory
implements ReceiverFactory {
    private final ConfigFactory configs;
    private final MessageContentReaderFactory messageContentReaderFactory;
    private final HermesMetrics hermesMetrics;
    private OffsetQueue offsetQueue;
    private final Clock clock;
    private final KafkaNamesMapper kafkaNamesMapper;
    private final FilterChainFactory filterChainFactory;
    private final Trackers trackers;
    private final ConsumerPartitionAssignmentState consumerPartitionAssignmentState;

    @Inject
    public KafkaMessageReceiverFactory(ConfigFactory configs, MessageContentReaderFactory messageContentReaderFactory, HermesMetrics hermesMetrics, OffsetQueue offsetQueue, Clock clock, KafkaNamesMapper kafkaNamesMapper, FilterChainFactory filterChainFactory, Trackers trackers, ConsumerPartitionAssignmentState consumerPartitionAssignmentState) {
        this.configs = configs;
        this.messageContentReaderFactory = messageContentReaderFactory;
        this.hermesMetrics = hermesMetrics;
        this.offsetQueue = offsetQueue;
        this.clock = clock;
        this.kafkaNamesMapper = kafkaNamesMapper;
        this.filterChainFactory = filterChainFactory;
        this.trackers = trackers;
        this.consumerPartitionAssignmentState = consumerPartitionAssignmentState;
    }

    @Override
    public MessageReceiver createMessageReceiver(Topic topic, Subscription subscription, ConsumerRateLimiter consumerRateLimiter) {
        MessageReceiver receiver = new KafkaSingleThreadedMessageReceiver(this.createKafkaConsumer(topic, subscription), this.messageContentReaderFactory.provide(topic), this.hermesMetrics, this.kafkaNamesMapper, topic, subscription, this.clock, this.configs.getIntProperty(Configs.CONSUMER_RECEIVER_POOL_TIMEOUT), this.configs.getIntProperty(Configs.CONSUMER_RECEIVER_READ_QUEUE_CAPACITY), this.consumerPartitionAssignmentState);
        if (this.configs.getBooleanProperty(Configs.CONSUMER_RECEIVER_WAIT_BETWEEN_UNSUCCESSFUL_POLLS)) {
            ExponentiallyGrowingIdleTimeCalculator idleTimeCalculator = new ExponentiallyGrowingIdleTimeCalculator(this.configs.getIntProperty(Configs.CONSUMER_RECEIVER_INITIAL_IDLE_TIME), this.configs.getIntProperty(Configs.CONSUMER_RECEIVER_MAX_IDLE_TIME));
            receiver = new ThrottlingMessageReceiver(receiver, idleTimeCalculator, subscription, this.hermesMetrics);
        }
        if (this.configs.getBooleanProperty(Configs.CONSUMER_FILTERING_ENABLED)) {
            FilteredMessageHandler filteredMessageHandler = new FilteredMessageHandler(this.offsetQueue, consumerRateLimiter, this.trackers, this.hermesMetrics);
            receiver = new FilteringMessageReceiver(receiver, filteredMessageHandler, this.filterChainFactory, subscription);
        }
        return receiver;
    }

    private KafkaConsumer<byte[], byte[]> createKafkaConsumer(Topic topic, Subscription subscription) {
        ConsumerGroupId groupId = this.kafkaNamesMapper.toConsumerGroupId(subscription.getQualifiedName());
        Properties props = new Properties();
        props.put("group.id", groupId.asString());
        props.put("enable.auto.commit", "false");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("client.id", this.configs.getStringProperty(Configs.CONSUMER_CLIENT_ID) + "_" + groupId.asString());
        props.put("bootstrap.servers", this.configs.getStringProperty(Configs.KAFKA_BROKER_LIST));
        props.put("auto.offset.reset", this.configs.getStringProperty(Configs.KAFKA_CONSUMER_AUTO_OFFSET_RESET_CONFIG));
        props.put("session.timeout.ms", (Object)this.configs.getIntProperty(Configs.KAFKA_CONSUMER_SESSION_TIMEOUT_MS_CONFIG));
        props.put("heartbeat.interval.ms", (Object)this.configs.getIntProperty(Configs.KAFKA_CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG));
        props.put("metadata.max.age.ms", (Object)this.configs.getIntProperty(Configs.KAFKA_CONSUMER_METADATA_MAX_AGE_CONFIG));
        props.put("max.partition.fetch.bytes", (Object)this.getMaxPartitionFetch(topic, this.configs));
        props.put("send.buffer.bytes", (Object)this.configs.getIntProperty(Configs.KAFKA_CONSUMER_SEND_BUFFER_CONFIG));
        props.put("receive.buffer.bytes", (Object)this.configs.getIntProperty(Configs.KAFKA_CONSUMER_RECEIVE_BUFFER_CONFIG));
        props.put("fetch.min.bytes", (Object)this.configs.getIntProperty(Configs.KAFKA_CONSUMER_FETCH_MIN_BYTES_CONFIG));
        props.put("fetch.max.wait.ms", (Object)this.configs.getIntProperty(Configs.KAFKA_CONSUMER_FETCH_MAX_WAIT_MS_CONFIG));
        props.put("reconnect.backoff.ms", (Object)this.configs.getIntProperty(Configs.KAFKA_CONSUMER_RECONNECT_BACKOFF_MS_CONFIG));
        props.put("retry.backoff.ms", (Object)this.configs.getIntProperty(Configs.KAFKA_CONSUMER_RETRY_BACKOFF_MS_CONFIG));
        props.put("check.crcs", (Object)this.configs.getBooleanProperty(Configs.KAFKA_CONSUMER_CHECK_CRCS_CONFIG));
        props.put("metrics.sample.window.ms", (Object)this.configs.getIntProperty(Configs.KAFKA_CONSUMER_METRICS_SAMPLE_WINDOW_MS_CONFIG));
        props.put("metrics.num.samples", (Object)this.configs.getIntProperty(Configs.KAFKA_CONSUMER_METRICS_NUM_SAMPLES_CONFIG));
        props.put("request.timeout.ms", (Object)this.configs.getIntProperty(Configs.KAFKA_CONSUMER_REQUEST_TIMEOUT_MS_CONFIG));
        props.put("connections.max.idle.ms", (Object)this.configs.getIntProperty(Configs.KAFKA_CONSUMER_CONNECTIONS_MAX_IDLE_MS_CONFIG));
        props.put("max.poll.records", (Object)this.configs.getIntProperty(Configs.KAFKA_CONSUMER_MAX_POLL_RECORDS_CONFIG));
        props.put("max.poll.interval.ms", (Object)this.configs.getIntProperty(Configs.KAFKA_CONSUMER_MAX_POLL_INTERVAL_CONFIG));
        return new KafkaConsumer(props);
    }

    private int getMaxPartitionFetch(Topic topic, ConfigFactory configs) {
        if (configs.getBooleanProperty(Configs.CONSUMER_USE_TOPIC_MESSAGE_SIZE)) {
            int topicMessageSize = topic.getMaxMessageSize();
            int min = configs.getIntProperty(Configs.KAFKA_CONSUMER_MAX_PARTITION_FETCH_MIN_BYTES_CONFIG);
            int max = configs.getIntProperty(Configs.KAFKA_CONSUMER_MAX_PARTITION_FETCH_MAX_BYTES_CONFIG);
            return Math.max(Math.min(topicMessageSize, max), min);
        }
        return configs.getIntProperty(Configs.KAFKA_CONSUMER_MAX_PARTITION_FETCH_MAX_BYTES_CONFIG);
    }
}

