/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.consumer.offset.kafka.broker;

import com.codahale.metrics.Timer;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Sets;
import java.nio.ByteBuffer;
import java.time.Clock;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import jersey.repackaged.com.google.common.collect.Lists;
import kafka.api.RequestOrResponse;
import kafka.common.ErrorMapping;
import kafka.common.OffsetAndMetadata;
import kafka.common.OffsetMetadataAndError;
import kafka.common.TopicAndPartition;
import kafka.javaapi.OffsetCommitRequest;
import kafka.javaapi.OffsetCommitResponse;
import kafka.javaapi.OffsetFetchRequest;
import kafka.javaapi.OffsetFetchResponse;
import kafka.network.BlockingChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.common.config.ConfigFactory;
import pl.allegro.tech.hermes.common.config.Configs;
import pl.allegro.tech.hermes.common.kafka.ConsumerGroupId;
import pl.allegro.tech.hermes.common.kafka.KafkaNamesMapper;
import pl.allegro.tech.hermes.common.metric.HermesMetrics;
import pl.allegro.tech.hermes.common.util.HostnameResolver;
import pl.allegro.tech.hermes.consumers.consumer.offset.FailedToCommitOffsets;
import pl.allegro.tech.hermes.consumers.consumer.offset.OffsetsToCommit;
import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartition;
import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartitionOffset;
import pl.allegro.tech.hermes.consumers.consumer.offset.kafka.broker.BlockingChannelFactory;
import pl.allegro.tech.hermes.consumers.consumer.offset.kafka.broker.BrokerOffsetCommitErrors;
import pl.allegro.tech.hermes.consumers.consumer.receiver.kafka.broker.CannotCommitOffsetToBrokerException;

public class BrokerOffsetsRepository {
    private static final Logger logger = LoggerFactory.getLogger(BrokerOffsetsRepository.class);
    private static final String EMPTY_METADATA = "";
    private static final int CORRELATION_ID = 0;
    private static final short VERSION_ID = 1;
    private final BlockingChannelFactory blockingChannelFactory;
    private final KafkaNamesMapper kafkaNamesMapper;
    private final Clock clock;
    private final HermesMetrics metrics;
    private final LoadingCache<SubscriptionName, BlockingChannel> channels;
    private final String clientId;

    @Inject
    public BrokerOffsetsRepository(BlockingChannelFactory blockingChannelFactory, KafkaNamesMapper kafkaNamesMapper, Clock clock, HostnameResolver hostnameResolver, HermesMetrics hermesMetrics, ConfigFactory configFactory) {
        this(blockingChannelFactory, configFactory.getIntProperty(Configs.KAFKA_CONSUMER_OFFSET_COMMITTER_BROKER_CONNECTION_EXPIRATION), kafkaNamesMapper, clock, hostnameResolver, hermesMetrics);
    }

    public BrokerOffsetsRepository(final BlockingChannelFactory blockingChannelFactory, int channelCacheExpiryTimeSeconds, final KafkaNamesMapper kafkaNamesMapper, Clock clock, HostnameResolver hostnameResolver, HermesMetrics hermesMetrics) {
        this.blockingChannelFactory = blockingChannelFactory;
        this.clock = clock;
        this.kafkaNamesMapper = kafkaNamesMapper;
        this.metrics = hermesMetrics;
        this.clientId = this.clientId(hostnameResolver);
        this.channels = CacheBuilder.newBuilder().expireAfterAccess((long)channelCacheExpiryTimeSeconds, TimeUnit.SECONDS).removalListener(notification -> ((BlockingChannel)notification.getValue()).disconnect()).build((CacheLoader)new CacheLoader<SubscriptionName, BlockingChannel>(){

            public BlockingChannel load(SubscriptionName key) {
                BlockingChannel channel = blockingChannelFactory.create(kafkaNamesMapper.toConsumerGroupId(key));
                channel.connect();
                return channel;
            }
        });
    }

    private String clientId(HostnameResolver hostnameResolver) {
        return hostnameResolver.resolve() + "_" + UUID.randomUUID();
    }

    public FailedToCommitOffsets commit(OffsetsToCommit offsetsToCommit) {
        FailedToCommitOffsets failedOffsets = new FailedToCommitOffsets();
        for (SubscriptionName subscriptionName : offsetsToCommit.subscriptionNames()) {
            Set<SubscriptionPartitionOffset> subscriptionOffsets = offsetsToCommit.batchFor(subscriptionName);
            try {
                Timer.Context c = this.metrics.timer("offset-committer.single-commit.kafka").time();
                Throwable throwable = null;
                try {
                    this.commit(subscriptionName, subscriptionOffsets);
                }
                catch (Throwable throwable2) {
                    throwable = throwable2;
                    throw throwable2;
                }
                finally {
                    if (c == null) continue;
                    if (throwable != null) {
                        try {
                            c.close();
                        }
                        catch (Throwable throwable3) {
                            throwable.addSuppressed(throwable3);
                        }
                        continue;
                    }
                    c.close();
                }
            }
            catch (Exception exception) {
                logger.warn("Failed to commit offsets for subscription {}", (Object)subscriptionName, (Object)exception);
                failedOffsets.add(subscriptionOffsets);
            }
        }
        return failedOffsets;
    }

    private void commit(SubscriptionName subscriptionName, Set<SubscriptionPartitionOffset> subscriptionPartitionOffsets) {
        OffsetCommitResponse commitResponse;
        ConsumerGroupId consumerGroupId = this.kafkaNamesMapper.toConsumerGroupId(subscriptionName);
        OffsetCommitRequest commitRequest = this.createCommitRequest(consumerGroupId, subscriptionPartitionOffsets);
        try {
            commitResponse = this.commitOffset(subscriptionName, commitRequest);
        }
        catch (Exception exception) {
            this.channels.invalidate((Object)subscriptionName);
            throw new CannotCommitOffsetToBrokerException(subscriptionName, exception);
        }
        if (commitResponse.hasError()) {
            commitResponse.errors().values().stream().map(error -> (Short)error).filter(error -> error == ErrorMapping.NotCoordinatorForConsumerCode() || error == ErrorMapping.ConsumerCoordinatorNotAvailableCode()).findAny().ifPresent(error -> this.channels.invalidate((Object)subscriptionName));
            throw new CannotCommitOffsetToBrokerException(subscriptionName, new BrokerOffsetCommitErrors(commitResponse.errors()));
        }
    }

    private OffsetCommitRequest createCommitRequest(ConsumerGroupId consumerGroupId, Set<SubscriptionPartitionOffset> offsets) {
        Map<TopicAndPartition, OffsetAndMetadata> offset = this.createOffset(offsets);
        return new OffsetCommitRequest(consumerGroupId.asString(), offset, 0, this.clientId, 1);
    }

    private Map<TopicAndPartition, OffsetAndMetadata> createOffset(Set<SubscriptionPartitionOffset> partitionOffsets) {
        LinkedHashMap<TopicAndPartition, OffsetAndMetadata> offsetsData = new LinkedHashMap<TopicAndPartition, OffsetAndMetadata>();
        for (SubscriptionPartitionOffset partitionOffset : partitionOffsets) {
            TopicAndPartition topicAndPartition = new TopicAndPartition(partitionOffset.getKafkaTopicName().asString(), partitionOffset.getPartition());
            offsetsData.put(topicAndPartition, new OffsetAndMetadata(partitionOffset.getOffset(), EMPTY_METADATA, this.clock.millis()));
        }
        return offsetsData;
    }

    private OffsetCommitResponse commitOffset(SubscriptionName subscription, OffsetCommitRequest commitRequest) throws ExecutionException {
        BlockingChannel channel = (BlockingChannel)this.channels.get((Object)subscription);
        channel.send((RequestOrResponse)commitRequest.underlying());
        return OffsetCommitResponse.readFrom((ByteBuffer)channel.receive().buffer());
    }

    public void moveOffset(SubscriptionPartitionOffset subscriptionPartitionOffset) {
        long currentOffset = this.findOffset(subscriptionPartitionOffset.getSubscriptionPartition());
        if (currentOffset == -1L || currentOffset > subscriptionPartitionOffset.getOffset()) {
            this.commit(subscriptionPartitionOffset.getSubscriptionName(), Sets.newHashSet((Object[])new SubscriptionPartitionOffset[]{subscriptionPartitionOffset}));
        } else {
            logger.warn("Tried to move offset for subscription {} and partition {} to {} which is in the future. Current offset: {}", new Object[]{subscriptionPartitionOffset.getSubscriptionName(), subscriptionPartitionOffset.getPartition(), subscriptionPartitionOffset.getOffset(), currentOffset});
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public long findOffset(SubscriptionPartition subscriptionPartition) {
        ConsumerGroupId groupId = this.kafkaNamesMapper.toConsumerGroupId(subscriptionPartition.getSubscriptionName());
        BlockingChannel channel = this.blockingChannelFactory.create(groupId);
        try {
            channel.connect();
            TopicAndPartition topicAndPartition = new TopicAndPartition(subscriptionPartition.getKafkaTopicName().asString(), subscriptionPartition.getPartition());
            ArrayList partitions = Lists.newArrayList((Object[])new TopicAndPartition[]{topicAndPartition});
            OffsetFetchRequest fetchRequest = new OffsetFetchRequest(groupId.asString(), (List)partitions, 1, 0, this.clientId);
            channel.send((RequestOrResponse)fetchRequest.underlying());
            OffsetFetchResponse fetchResponse = OffsetFetchResponse.readFrom((ByteBuffer)channel.receive().buffer());
            Map result = fetchResponse.offsets();
            OffsetMetadataAndError offset = (OffsetMetadataAndError)result.get(topicAndPartition);
            long l = offset.offset();
            return l;
        }
        finally {
            channel.disconnect();
        }
    }
}

