/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.consumer.offset.kafka.broker;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import java.nio.ByteBuffer;
import java.time.Clock;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import jersey.repackaged.com.google.common.collect.Lists;
import kafka.api.RequestOrResponse;
import kafka.common.ErrorMapping;
import kafka.common.OffsetAndMetadata;
import kafka.common.OffsetMetadataAndError;
import kafka.common.TopicAndPartition;
import kafka.javaapi.OffsetCommitRequest;
import kafka.javaapi.OffsetCommitResponse;
import kafka.javaapi.OffsetFetchRequest;
import kafka.javaapi.OffsetFetchResponse;
import kafka.network.BlockingChannel;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.common.config.ConfigFactory;
import pl.allegro.tech.hermes.common.config.Configs;
import pl.allegro.tech.hermes.common.kafka.ConsumerGroupId;
import pl.allegro.tech.hermes.common.kafka.KafkaNamesMapper;
import pl.allegro.tech.hermes.common.kafka.KafkaTopicName;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset;
import pl.allegro.tech.hermes.common.util.HostnameResolver;
import pl.allegro.tech.hermes.consumers.consumer.offset.kafka.broker.BlockingChannelFactory;
import pl.allegro.tech.hermes.consumers.consumer.offset.kafka.broker.BrokerOffsetCommitErrors;
import pl.allegro.tech.hermes.consumers.consumer.receiver.kafka.broker.CannotCommitOffsetToBrokerException;

public class BrokerOffsetsRepository {
    private static final String EMPTY_METADATA = "";
    private static final int CORRELATION_ID = 0;
    private static final short VERSION_ID = 1;
    private final BlockingChannelFactory blockingChannelFactory;
    private final Clock clock;
    private final KafkaNamesMapper kafkaNamesMapper;
    private final LoadingCache<Subscription, BlockingChannel> channels;
    private final String clientId;

    @Inject
    public BrokerOffsetsRepository(BlockingChannelFactory blockingChannelFactory, Clock clock, HostnameResolver hostnameResolver, ConfigFactory configFactory, KafkaNamesMapper kafkaNamesMapper) {
        this(blockingChannelFactory, clock, hostnameResolver, kafkaNamesMapper, configFactory.getIntProperty(Configs.KAFKA_CONSUMER_OFFSET_COMMITTER_BROKER_CONNECTION_EXPIRATION));
    }

    public BrokerOffsetsRepository(final BlockingChannelFactory blockingChannelFactory, Clock clock, HostnameResolver hostnameResolver, final KafkaNamesMapper kafkaNamesMapper, int channelExpTime) {
        this.blockingChannelFactory = blockingChannelFactory;
        this.clock = clock;
        this.kafkaNamesMapper = kafkaNamesMapper;
        this.clientId = this.clientId(hostnameResolver);
        this.channels = CacheBuilder.newBuilder().expireAfterAccess((long)channelExpTime, TimeUnit.SECONDS).removalListener(notification -> ((BlockingChannel)notification.getValue()).disconnect()).build((CacheLoader)new CacheLoader<Subscription, BlockingChannel>(){

            public BlockingChannel load(Subscription key) {
                BlockingChannel channel = blockingChannelFactory.create(kafkaNamesMapper.toConsumerGroupId(key));
                channel.connect();
                return channel;
            }
        });
    }

    public void save(Subscription subscription, PartitionOffset partitionOffset) throws ExecutionException {
        OffsetCommitResponse commitResponse;
        OffsetCommitRequest commitRequest = this.createCommitRequest(subscription, partitionOffset);
        try {
            commitResponse = this.commitOffset(subscription, commitRequest);
        }
        catch (Exception e) {
            this.channels.invalidate((Object)subscription);
            throw e;
        }
        if (commitResponse.hasError()) {
            commitResponse.errors().values().stream().map(error -> (Short)error).filter(error -> error == ErrorMapping.NotCoordinatorForConsumerCode() || error == ErrorMapping.ConsumerCoordinatorNotAvailableCode()).findAny().ifPresent(error -> this.channels.invalidate((Object)subscription));
            throw new CannotCommitOffsetToBrokerException(new BrokerOffsetCommitErrors(commitResponse.errors()));
        }
    }

    public void saveIfOffsetInThePast(Subscription subscription, PartitionOffset partitionOffset) throws ExecutionException {
        long currentOffset = this.find(subscription, partitionOffset.getTopic(), partitionOffset.getPartition());
        if (currentOffset == -1L || currentOffset > partitionOffset.getOffset()) {
            this.save(subscription, partitionOffset);
        }
    }

    private OffsetCommitResponse commitOffset(Subscription subscription, OffsetCommitRequest commitRequest) throws ExecutionException {
        BlockingChannel channel = (BlockingChannel)this.channels.get((Object)subscription);
        channel.send((RequestOrResponse)commitRequest.underlying());
        return OffsetCommitResponse.readFrom((ByteBuffer)channel.receive().buffer());
    }

    private OffsetCommitRequest createCommitRequest(Subscription subscription, PartitionOffset partitionOffset) {
        Map<TopicAndPartition, OffsetAndMetadata> offset = this.createOffset(partitionOffset);
        return new OffsetCommitRequest(this.kafkaNamesMapper.toConsumerGroupId(subscription).asString(), offset, 0, this.clientId, 1);
    }

    private Map<TopicAndPartition, OffsetAndMetadata> createOffset(PartitionOffset partitionOffset) {
        LinkedHashMap<TopicAndPartition, OffsetAndMetadata> offset = new LinkedHashMap<TopicAndPartition, OffsetAndMetadata>();
        TopicAndPartition topicAndPartition = new TopicAndPartition(partitionOffset.getTopic().asString(), partitionOffset.getPartition());
        offset.put(topicAndPartition, new OffsetAndMetadata(partitionOffset.getOffset(), EMPTY_METADATA, this.clock.millis()));
        return offset;
    }

    public long find(Subscription subscription, KafkaTopicName kafkaTopicName, int partitionId) {
        ConsumerGroupId groupId = this.kafkaNamesMapper.toConsumerGroupId(subscription);
        BlockingChannel channel = this.blockingChannelFactory.create(groupId);
        channel.connect();
        TopicAndPartition topicAndPartition = new TopicAndPartition(kafkaTopicName.asString(), partitionId);
        ArrayList partitions = Lists.newArrayList((Object[])new TopicAndPartition[]{topicAndPartition});
        OffsetFetchRequest fetchRequest = new OffsetFetchRequest(groupId.asString(), (List)partitions, 1, 0, this.clientId);
        channel.send((RequestOrResponse)fetchRequest.underlying());
        OffsetFetchResponse fetchResponse = OffsetFetchResponse.readFrom((ByteBuffer)channel.receive().buffer());
        Map result = fetchResponse.offsets();
        OffsetMetadataAndError offset = (OffsetMetadataAndError)result.get(topicAndPartition);
        channel.disconnect();
        return offset.offset();
    }

    private String clientId(HostnameResolver hostnameResolver) {
        return hostnameResolver.resolve() + "_" + UUID.randomUUID();
    }
}

