/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.consumer.offset.kafka;

import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import javax.inject.Inject;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.TopicAndPartition;
import kafka.javaapi.OffsetRequest;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.common.broker.BrokerStorage;
import pl.allegro.tech.hermes.common.kafka.SimpleConsumerPool;
import pl.allegro.tech.hermes.common.kafka.SimpleConsumerPoolException;

public class KafkaLatestOffsetReader {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaLatestOffsetReader.class);
    private static final int MAXIMUM_NUMBER_OF_OFFSETS = 1;
    private final BrokerStorage brokerStorage;
    private final PartitionOffsetRequestInfo partitionOffsetRequestInfo = new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 1);
    private final SimpleConsumerPool simpleConsumerPool;

    @Inject
    public KafkaLatestOffsetReader(BrokerStorage brokerStorage, SimpleConsumerPool simpleConsumerPool) {
        this.brokerStorage = brokerStorage;
        this.simpleConsumerPool = simpleConsumerPool;
    }

    public Map<TopicAndPartition, Long> read(Set<TopicAndPartition> topicAndPartitionSet) {
        Multimap leadersForPartitions = this.brokerStorage.readLeadersForPartitions(topicAndPartitionSet);
        Map<Integer, OffsetRequest> requestForLeaders = this.createRequestsForLeaders((Multimap<Integer, TopicAndPartition>)leadersForPartitions);
        Map<TopicAndPartition, Long> latestOffsets = this.readLatestOffsetsFromLeaders(requestForLeaders, (Multimap<Integer, TopicAndPartition>)leadersForPartitions);
        return latestOffsets;
    }

    private Map<Integer, OffsetRequest> createRequestsForLeaders(Multimap<Integer, TopicAndPartition> leadersForPartitions) {
        HashMap<Integer, OffsetRequest> offsetRequestsForLeaders = new HashMap<Integer, OffsetRequest>(leadersForPartitions.size());
        for (Integer leaderId : leadersForPartitions.keySet()) {
            try {
                String clientId = this.simpleConsumerPool.get(leaderId).clientId();
                offsetRequestsForLeaders.put(leaderId, this.createOffsetRequestForLeader(clientId, leadersForPartitions.get((Object)leaderId)));
            }
            catch (SimpleConsumerPoolException e) {
                LOGGER.warn("Error while getting simple consumer from pool for leader " + leaderId, (Throwable)e);
            }
        }
        return offsetRequestsForLeaders;
    }

    private OffsetRequest createOffsetRequestForLeader(String clientId, Collection<TopicAndPartition> topicAndPartitions) {
        HashMap<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(topicAndPartitions.size());
        for (TopicAndPartition topicAndPartition : topicAndPartitions) {
            requestInfo.put(topicAndPartition, this.partitionOffsetRequestInfo);
        }
        return new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientId);
    }

    private Map<TopicAndPartition, Long> readLatestOffsetsFromLeaders(Map<Integer, OffsetRequest> offsetRequestForLeaders, Multimap<Integer, TopicAndPartition> leadersForPartitions) {
        HashMap offsets = Maps.newHashMap();
        for (Map.Entry<Integer, OffsetRequest> entry : offsetRequestForLeaders.entrySet()) {
            offsets.putAll(this.readLatestOffsetsFromLeader(entry.getKey(), entry.getValue(), leadersForPartitions.get((Object)entry.getKey())));
        }
        return offsets;
    }

    private Map<TopicAndPartition, Long> readLatestOffsetsFromLeader(Integer leaderId, OffsetRequest offsetRequest, Collection<TopicAndPartition> topicAndPartitions) {
        try {
            SimpleConsumer simpleConsumer = this.simpleConsumerPool.get(leaderId);
            OffsetResponse response = simpleConsumer.getOffsetsBefore(offsetRequest);
            return this.readOffsetResponse(response, topicAndPartitions);
        }
        catch (Exception e) {
            LOGGER.warn("Something went wrong while reading data for broker with id " + leaderId, (Throwable)e);
            return Maps.newHashMap();
        }
    }

    private Map<TopicAndPartition, Long> readOffsetResponse(OffsetResponse offsetsResponse, Collection<TopicAndPartition> partitions) {
        HashMap offsets = Maps.newHashMap();
        for (TopicAndPartition topicAndPartition : partitions) {
            offsets.put(topicAndPartition, offsetsResponse.offsets(topicAndPartition.topic(), topicAndPartition.partition())[0]);
        }
        return offsets;
    }
}

