/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.consumer.internals.AsyncClient;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.EpochEndOffset;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
import org.apache.kafka.common.utils.LogContext;

public class OffsetsForLeaderEpochClient
extends AsyncClient<Map<TopicPartition, SubscriptionState.FetchPosition>, OffsetsForLeaderEpochRequest, OffsetsForLeaderEpochResponse, OffsetForEpochResult> {
    OffsetsForLeaderEpochClient(ConsumerNetworkClient client, LogContext logContext) {
        super(client, logContext);
    }

    @Override
    protected AbstractRequest.Builder<OffsetsForLeaderEpochRequest> prepareRequest(Node node2, Map<TopicPartition, SubscriptionState.FetchPosition> requestData) {
        HashMap<TopicPartition, OffsetsForLeaderEpochRequest.PartitionData> partitionData = new HashMap<TopicPartition, OffsetsForLeaderEpochRequest.PartitionData>(requestData.size());
        requestData.forEach((topicPartition, fetchPosition) -> fetchPosition.offsetEpoch.ifPresent(fetchEpoch -> partitionData.put((TopicPartition)topicPartition, new OffsetsForLeaderEpochRequest.PartitionData(fetchPosition.currentLeader.epoch, (int)fetchEpoch))));
        return OffsetsForLeaderEpochRequest.Builder.forConsumer(partitionData);
    }

    @Override
    protected OffsetForEpochResult handleResponse(Node node2, Map<TopicPartition, SubscriptionState.FetchPosition> requestData, OffsetsForLeaderEpochResponse response) {
        HashSet<TopicPartition> partitionsToRetry = new HashSet<TopicPartition>();
        HashSet<String> unauthorizedTopics = new HashSet<String>();
        HashMap<TopicPartition, EpochEndOffset> endOffsets = new HashMap<TopicPartition, EpochEndOffset>();
        for (TopicPartition topicPartition : requestData.keySet()) {
            EpochEndOffset epochEndOffset = response.responses().get(topicPartition);
            if (epochEndOffset == null) {
                this.logger().warn("Missing partition {} from response, ignoring", (Object)topicPartition);
                partitionsToRetry.add(topicPartition);
                continue;
            }
            Errors error = epochEndOffset.error();
            if (error == Errors.NONE) {
                this.logger().debug("Handling OffsetsForLeaderEpoch response for {}. Got offset {} for epoch {}", topicPartition, epochEndOffset.endOffset(), epochEndOffset.leaderEpoch());
                endOffsets.put(topicPartition, epochEndOffset);
                continue;
            }
            if (error == Errors.NOT_LEADER_FOR_PARTITION || error == Errors.REPLICA_NOT_AVAILABLE || error == Errors.KAFKA_STORAGE_ERROR || error == Errors.OFFSET_NOT_AVAILABLE || error == Errors.LEADER_NOT_AVAILABLE) {
                this.logger().debug("Attempt to fetch offsets for partition {} failed due to {}, retrying.", (Object)topicPartition, (Object)error);
                partitionsToRetry.add(topicPartition);
                continue;
            }
            if (error == Errors.FENCED_LEADER_EPOCH || error == Errors.UNKNOWN_LEADER_EPOCH) {
                this.logger().debug("Attempt to fetch offsets for partition {} failed due to {}, retrying.", (Object)topicPartition, (Object)error);
                partitionsToRetry.add(topicPartition);
                continue;
            }
            if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
                this.logger().warn("Received unknown topic or partition error in ListOffset request for partition {}", (Object)topicPartition);
                partitionsToRetry.add(topicPartition);
                continue;
            }
            if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
                unauthorizedTopics.add(topicPartition.topic());
                continue;
            }
            this.logger().warn("Attempt to fetch offsets for partition {} failed due to: {}, retrying.", (Object)topicPartition, (Object)error.message());
            partitionsToRetry.add(topicPartition);
        }
        if (!unauthorizedTopics.isEmpty()) {
            throw new TopicAuthorizationException(unauthorizedTopics);
        }
        return new OffsetForEpochResult(endOffsets, partitionsToRetry);
    }

    public static class OffsetForEpochResult {
        private final Map<TopicPartition, EpochEndOffset> endOffsets;
        private final Set<TopicPartition> partitionsToRetry;

        OffsetForEpochResult(Map<TopicPartition, EpochEndOffset> endOffsets, Set<TopicPartition> partitionsNeedingRetry) {
            this.endOffsets = endOffsets;
            this.partitionsToRetry = partitionsNeedingRetry;
        }

        public Map<TopicPartition, EpochEndOffset> endOffsets() {
            return this.endOffsets;
        }

        public Set<TopicPartition> partitionsToRetry() {
            return this.partitionsToRetry;
        }
    }
}

