/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.consumer.receiver.kafka;

import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.kafka.ConsumerGroupId;
import pl.allegro.tech.hermes.common.kafka.KafkaNamesMapper;
import pl.allegro.tech.hermes.common.kafka.KafkaParameters;
import pl.allegro.tech.hermes.common.metric.MetricsFacade;
import pl.allegro.tech.hermes.consumers.CommonConsumerParameters;
import pl.allegro.tech.hermes.consumers.consumer.filtering.FilteredMessageHandler;
import pl.allegro.tech.hermes.consumers.consumer.idletime.ExponentiallyGrowingIdleTimeCalculator;
import pl.allegro.tech.hermes.consumers.consumer.load.SubscriptionLoadRecorder;
import pl.allegro.tech.hermes.consumers.consumer.offset.ConsumerPartitionAssignmentState;
import pl.allegro.tech.hermes.consumers.consumer.offset.PendingOffsetsAppender;
import pl.allegro.tech.hermes.consumers.consumer.rate.ConsumerRateLimiter;
import pl.allegro.tech.hermes.consumers.consumer.receiver.MessageReceiver;
import pl.allegro.tech.hermes.consumers.consumer.receiver.ReceiverFactory;
import pl.allegro.tech.hermes.consumers.consumer.receiver.ThrottlingMessageReceiver;
import pl.allegro.tech.hermes.consumers.consumer.receiver.kafka.FilteringMessageReceiver;
import pl.allegro.tech.hermes.consumers.consumer.receiver.kafka.KafkaConsumerParameters;
import pl.allegro.tech.hermes.consumers.consumer.receiver.kafka.KafkaConsumerRecordToMessageConverterFactory;
import pl.allegro.tech.hermes.consumers.consumer.receiver.kafka.KafkaReceiverParameters;
import pl.allegro.tech.hermes.consumers.consumer.receiver.kafka.KafkaSingleThreadedMessageReceiver;
import pl.allegro.tech.hermes.consumers.consumer.receiver.kafka.PartitionAssignmentStrategy;
import pl.allegro.tech.hermes.domain.filtering.chain.FilterChainFactory;
import pl.allegro.tech.hermes.tracker.consumers.Trackers;

public class KafkaMessageReceiverFactory
implements ReceiverFactory {
    private final CommonConsumerParameters commonConsumerParameters;
    private final KafkaParameters kafkaParameters;
    private final KafkaReceiverParameters consumerReceiverParameters;
    private final KafkaConsumerParameters kafkaConsumerParameters;
    private final KafkaConsumerRecordToMessageConverterFactory messageConverterFactory;
    private final MetricsFacade metricsFacade;
    private final KafkaNamesMapper kafkaNamesMapper;
    private final FilterChainFactory filterChainFactory;
    private final Trackers trackers;
    private final ConsumerPartitionAssignmentState consumerPartitionAssignmentState;

    public KafkaMessageReceiverFactory(CommonConsumerParameters commonConsumerParameters, KafkaReceiverParameters consumerReceiverParameters, KafkaConsumerParameters kafkaConsumerParameters, KafkaParameters kafkaParameters, KafkaConsumerRecordToMessageConverterFactory messageConverterFactory, MetricsFacade metricsFacade, KafkaNamesMapper kafkaNamesMapper, FilterChainFactory filterChainFactory, Trackers trackers, ConsumerPartitionAssignmentState consumerPartitionAssignmentState) {
        this.commonConsumerParameters = commonConsumerParameters;
        this.consumerReceiverParameters = consumerReceiverParameters;
        this.kafkaConsumerParameters = kafkaConsumerParameters;
        this.kafkaParameters = kafkaParameters;
        this.messageConverterFactory = messageConverterFactory;
        this.metricsFacade = metricsFacade;
        this.kafkaNamesMapper = kafkaNamesMapper;
        this.filterChainFactory = filterChainFactory;
        this.trackers = trackers;
        this.consumerPartitionAssignmentState = consumerPartitionAssignmentState;
    }

    @Override
    public MessageReceiver createMessageReceiver(Topic topic, Subscription subscription, ConsumerRateLimiter consumerRateLimiter, SubscriptionLoadRecorder loadReporter, MetricsFacade metrics, PendingOffsetsAppender pendingOffsetsAppender) {
        MessageReceiver receiver = this.createKafkaSingleThreadedMessageReceiver(topic, subscription, loadReporter);
        if (this.consumerReceiverParameters.isWaitBetweenUnsuccessfulPolls()) {
            receiver = this.createThrottlingMessageReceiver(receiver, subscription, metrics);
        }
        if (this.consumerReceiverParameters.isFilteringEnabled()) {
            receiver = this.createFilteringMessageReceiver(receiver, consumerRateLimiter, subscription, metrics, pendingOffsetsAppender);
        }
        return receiver;
    }

    private MessageReceiver createKafkaSingleThreadedMessageReceiver(Topic topic, Subscription subscription, SubscriptionLoadRecorder loadReporter) {
        return new KafkaSingleThreadedMessageReceiver(this.createKafkaConsumer(topic, subscription), this.messageConverterFactory, this.metricsFacade, this.kafkaNamesMapper, topic, subscription, this.consumerReceiverParameters.getPoolTimeout(), this.consumerReceiverParameters.getReadQueueCapacity(), loadReporter, this.consumerPartitionAssignmentState);
    }

    private MessageReceiver createThrottlingMessageReceiver(MessageReceiver receiver, Subscription subscription, MetricsFacade metrics) {
        ExponentiallyGrowingIdleTimeCalculator idleTimeCalculator = new ExponentiallyGrowingIdleTimeCalculator(this.consumerReceiverParameters.getInitialIdleTime().toMillis(), this.consumerReceiverParameters.getMaxIdleTime().toMillis());
        return new ThrottlingMessageReceiver(receiver, idleTimeCalculator, subscription.getQualifiedName(), metrics);
    }

    private MessageReceiver createFilteringMessageReceiver(MessageReceiver receiver, ConsumerRateLimiter consumerRateLimiter, Subscription subscription, MetricsFacade metrics, PendingOffsetsAppender pendingOffsetsAppender) {
        boolean filteringRateLimitEnabled = this.consumerReceiverParameters.isFilteringRateLimiterEnabled();
        FilteredMessageHandler filteredMessageHandler = new FilteredMessageHandler(filteringRateLimitEnabled ? consumerRateLimiter : null, pendingOffsetsAppender, this.trackers, metrics, subscription.getQualifiedName());
        return new FilteringMessageReceiver(receiver, filteredMessageHandler, this.filterChainFactory, subscription);
    }

    private KafkaConsumer<byte[], byte[]> createKafkaConsumer(Topic topic, Subscription subscription) {
        ConsumerGroupId groupId = this.kafkaNamesMapper.toConsumerGroupId(subscription.getQualifiedName());
        Properties props = new Properties();
        props.put("bootstrap.servers", this.kafkaParameters.getBrokerList());
        props.put("client.id", this.consumerReceiverParameters.getClientId() + "_" + groupId.asString());
        props.put("group.id", groupId.asString());
        props.put("enable.auto.commit", "false");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        this.addKafkaAuthorizationParameters(props);
        this.addKafkaConsumerParameters(props, topic);
        return new KafkaConsumer(props);
    }

    private void addKafkaAuthorizationParameters(Properties props) {
        if (this.kafkaParameters.isAuthenticationEnabled()) {
            props.put("sasl.mechanism", this.kafkaParameters.getAuthenticationMechanism());
            props.put("security.protocol", this.kafkaParameters.getAuthenticationProtocol());
            props.put("sasl.jaas.config", this.kafkaParameters.getJaasConfig());
        }
    }

    private void addKafkaConsumerParameters(Properties props, Topic topic) {
        props.put("auto.offset.reset", this.kafkaConsumerParameters.getAutoOffsetReset());
        props.put("session.timeout.ms", (Object)((int)this.kafkaConsumerParameters.getSessionTimeout().toMillis()));
        props.put("heartbeat.interval.ms", (Object)((int)this.kafkaConsumerParameters.getHeartbeatInterval().toMillis()));
        props.put("metadata.max.age.ms", (Object)((int)this.kafkaConsumerParameters.getMetadataMaxAge().toMillis()));
        props.put("max.partition.fetch.bytes", (Object)this.getMaxPartitionFetch(topic));
        props.put("send.buffer.bytes", (Object)this.kafkaConsumerParameters.getSendBufferBytes());
        props.put("receive.buffer.bytes", (Object)this.kafkaConsumerParameters.getReceiveBufferBytes());
        props.put("fetch.min.bytes", (Object)this.kafkaConsumerParameters.getFetchMinBytes());
        props.put("fetch.max.wait.ms", (Object)((int)this.kafkaConsumerParameters.getFetchMaxWait().toMillis()));
        props.put("reconnect.backoff.ms", (Object)((int)this.kafkaConsumerParameters.getReconnectBackoff().toMillis()));
        props.put("retry.backoff.ms", (Object)((int)this.kafkaConsumerParameters.getRetryBackoff().toMillis()));
        props.put("check.crcs", (Object)this.kafkaConsumerParameters.isCheckCrcs());
        props.put("metrics.sample.window.ms", (Object)((int)this.kafkaConsumerParameters.getMetricsSampleWindow().toMillis()));
        props.put("metrics.num.samples", (Object)this.kafkaConsumerParameters.getMetricsNumSamples());
        props.put("request.timeout.ms", (Object)((int)this.kafkaConsumerParameters.getRequestTimeout().toMillis()));
        props.put("connections.max.idle.ms", (Object)((int)this.kafkaConsumerParameters.getConnectionsMaxIdle().toMillis()));
        props.put("max.poll.records", (Object)this.kafkaConsumerParameters.getMaxPollRecords());
        props.put("max.poll.interval.ms", (Object)((int)this.kafkaConsumerParameters.getMaxPollInterval().toMillis()));
        props.put("partition.assignment.strategy", this.getPartitionAssignmentStrategies());
    }

    private int getMaxPartitionFetch(Topic topic) {
        if (this.commonConsumerParameters.isUseTopicMessageSizeEnabled()) {
            int topicMessageSize = topic.getMaxMessageSize();
            int min = this.kafkaConsumerParameters.getMaxPartitionFetchMin();
            int max = this.kafkaConsumerParameters.getMaxPartitionFetchMax();
            return Math.max(Math.min(topicMessageSize, max), min);
        }
        return this.kafkaConsumerParameters.getMaxPartitionFetchMax();
    }

    private List<String> getPartitionAssignmentStrategies() {
        return this.kafkaConsumerParameters.getPartitionAssignmentStrategies().stream().map(PartitionAssignmentStrategy::getAssignorClass).map(Class::getName).collect(Collectors.toList());
    }
}

