/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.consumer.offset.kafka.broker;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import jersey.repackaged.com.google.common.collect.Lists;
import kafka.api.RequestOrResponse;
import kafka.common.ErrorMapping;
import kafka.common.OffsetAndMetadata;
import kafka.common.OffsetMetadataAndError;
import kafka.common.TopicAndPartition;
import kafka.javaapi.OffsetCommitRequest;
import kafka.javaapi.OffsetCommitResponse;
import kafka.javaapi.OffsetFetchRequest;
import kafka.javaapi.OffsetFetchResponse;
import kafka.network.BlockingChannel;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.common.config.ConfigFactory;
import pl.allegro.tech.hermes.common.config.Configs;
import pl.allegro.tech.hermes.common.time.Clock;
import pl.allegro.tech.hermes.common.util.HostnameResolver;
import pl.allegro.tech.hermes.consumers.consumer.offset.kafka.broker.BlockingChannelFactory;
import pl.allegro.tech.hermes.consumers.consumer.offset.kafka.broker.BrokerOffsetCommitErrors;
import pl.allegro.tech.hermes.consumers.consumer.receiver.kafka.broker.CannotCommitOffsetToBrokerException;
import pl.allegro.tech.hermes.domain.subscription.offset.PartitionOffset;

public class BrokerOffsetsRepository {
    private static final String EMPTY_METADATA = "";
    private static final int CORRELATION_ID = 0;
    private static final short VERSION_ID = 1;
    private final BlockingChannelFactory blockingChannelFactory;
    private final Clock clock;
    private final LoadingCache<Subscription, BlockingChannel> channels;
    private final String clientId;

    @Inject
    public BrokerOffsetsRepository(BlockingChannelFactory blockingChannelFactory, Clock clock, HostnameResolver hostnameResolver, ConfigFactory configFactory) {
        this(blockingChannelFactory, clock, hostnameResolver, configFactory.getIntProperty(Configs.KAFKA_CONSUMER_OFFSET_COMMITTER_BROKER_CONNECTION_EXPIRATION));
    }

    public BrokerOffsetsRepository(final BlockingChannelFactory blockingChannelFactory, Clock clock, HostnameResolver hostnameResolver, int channelExpTime) {
        this.blockingChannelFactory = blockingChannelFactory;
        this.clock = clock;
        this.clientId = this.clientId(hostnameResolver);
        this.channels = CacheBuilder.newBuilder().expireAfterAccess((long)channelExpTime, TimeUnit.SECONDS).removalListener(notification -> ((BlockingChannel)notification.getValue()).disconnect()).build((CacheLoader)new CacheLoader<Subscription, BlockingChannel>(){

            public BlockingChannel load(Subscription key) {
                BlockingChannel channel = blockingChannelFactory.create(key.getId());
                channel.connect();
                return channel;
            }
        });
    }

    public void save(Subscription subscription, PartitionOffset partitionOffset) throws ExecutionException {
        OffsetCommitRequest commitRequest = this.createCommitRequest(subscription, partitionOffset);
        OffsetCommitResponse commitResponse = this.commitOffset(subscription, commitRequest);
        if (commitResponse.hasError()) {
            commitResponse.errors().values().stream().map(e -> Short.parseShort(e.toString())).filter(e -> e == ErrorMapping.NotCoordinatorForConsumerCode() || e == ErrorMapping.NotCoordinatorForConsumerCode()).forEach(e -> this.channels.invalidate((Object)subscription));
            throw new CannotCommitOffsetToBrokerException(new BrokerOffsetCommitErrors(commitResponse.errors()));
        }
    }

    private OffsetCommitResponse commitOffset(Subscription subscription, OffsetCommitRequest commitRequest) throws ExecutionException {
        BlockingChannel channel = (BlockingChannel)this.channels.get((Object)subscription);
        channel.send((RequestOrResponse)commitRequest.underlying());
        return OffsetCommitResponse.readFrom((ByteBuffer)channel.receive().buffer());
    }

    private OffsetCommitRequest createCommitRequest(Subscription subscription, PartitionOffset partitionOffset) {
        Map<TopicAndPartition, OffsetAndMetadata> offset = this.createOffset(subscription, partitionOffset);
        return new OffsetCommitRequest(subscription.getId(), offset, 0, this.clientId, 1);
    }

    private Map<TopicAndPartition, OffsetAndMetadata> createOffset(Subscription subscription, PartitionOffset partitionOffset) {
        LinkedHashMap<TopicAndPartition, OffsetAndMetadata> offset = new LinkedHashMap<TopicAndPartition, OffsetAndMetadata>();
        TopicAndPartition topicAndPartition = new TopicAndPartition(subscription.getTopicName().qualifiedName(), partitionOffset.getPartition());
        offset.put(topicAndPartition, new OffsetAndMetadata(partitionOffset.getOffset(), EMPTY_METADATA, this.clock.getTime()));
        return offset;
    }

    public long find(Subscription subscription, int partitionId) {
        String groupId = subscription.getId();
        BlockingChannel channel = this.blockingChannelFactory.create(groupId);
        channel.connect();
        TopicAndPartition topicAndPartition = new TopicAndPartition(subscription.getQualifiedTopicName(), partitionId);
        ArrayList partitions = Lists.newArrayList((Object[])new TopicAndPartition[]{topicAndPartition});
        OffsetFetchRequest fetchRequest = new OffsetFetchRequest(groupId, (List)partitions, 1, 0, this.clientId);
        channel.send((RequestOrResponse)fetchRequest.underlying());
        OffsetFetchResponse fetchResponse = OffsetFetchResponse.readFrom((ByteBuffer)channel.receive().buffer());
        Map result = fetchResponse.offsets();
        OffsetMetadataAndError offset = (OffsetMetadataAndError)result.get(topicAndPartition);
        channel.disconnect();
        return offset.offset();
    }

    private String clientId(HostnameResolver hostnameResolver) {
        return hostnameResolver.resolve() + "_" + UUID.randomUUID();
    }
}

