/*
 * Decompiled with CFR 0.152.
 */
package org.graylog.shaded.org.apache.kafka09.clients.consumer.internals;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.graylog.shaded.org.apache.kafka09.clients.ClientResponse;
import org.graylog.shaded.org.apache.kafka09.clients.Metadata;
import org.graylog.shaded.org.apache.kafka09.clients.consumer.CommitFailedException;
import org.graylog.shaded.org.apache.kafka09.clients.consumer.ConsumerRebalanceListener;
import org.graylog.shaded.org.apache.kafka09.clients.consumer.OffsetAndMetadata;
import org.graylog.shaded.org.apache.kafka09.clients.consumer.OffsetCommitCallback;
import org.graylog.shaded.org.apache.kafka09.clients.consumer.internals.AbstractCoordinator;
import org.graylog.shaded.org.apache.kafka09.clients.consumer.internals.ConsumerNetworkClient;
import org.graylog.shaded.org.apache.kafka09.clients.consumer.internals.ConsumerProtocol;
import org.graylog.shaded.org.apache.kafka09.clients.consumer.internals.DelayedTask;
import org.graylog.shaded.org.apache.kafka09.clients.consumer.internals.PartitionAssignor;
import org.graylog.shaded.org.apache.kafka09.clients.consumer.internals.RequestFuture;
import org.graylog.shaded.org.apache.kafka09.clients.consumer.internals.RequestFutureListener;
import org.graylog.shaded.org.apache.kafka09.clients.consumer.internals.SendFailedException;
import org.graylog.shaded.org.apache.kafka09.clients.consumer.internals.SubscriptionState;
import org.graylog.shaded.org.apache.kafka09.common.Cluster;
import org.graylog.shaded.org.apache.kafka09.common.KafkaException;
import org.graylog.shaded.org.apache.kafka09.common.MetricName;
import org.graylog.shaded.org.apache.kafka09.common.TopicPartition;
import org.graylog.shaded.org.apache.kafka09.common.errors.GroupAuthorizationException;
import org.graylog.shaded.org.apache.kafka09.common.errors.TopicAuthorizationException;
import org.graylog.shaded.org.apache.kafka09.common.errors.WakeupException;
import org.graylog.shaded.org.apache.kafka09.common.metrics.Measurable;
import org.graylog.shaded.org.apache.kafka09.common.metrics.MetricConfig;
import org.graylog.shaded.org.apache.kafka09.common.metrics.Metrics;
import org.graylog.shaded.org.apache.kafka09.common.metrics.Sensor;
import org.graylog.shaded.org.apache.kafka09.common.metrics.stats.Avg;
import org.graylog.shaded.org.apache.kafka09.common.metrics.stats.Count;
import org.graylog.shaded.org.apache.kafka09.common.metrics.stats.Max;
import org.graylog.shaded.org.apache.kafka09.common.metrics.stats.Rate;
import org.graylog.shaded.org.apache.kafka09.common.protocol.ApiKeys;
import org.graylog.shaded.org.apache.kafka09.common.protocol.Errors;
import org.graylog.shaded.org.apache.kafka09.common.requests.JoinGroupRequest;
import org.graylog.shaded.org.apache.kafka09.common.requests.OffsetCommitRequest;
import org.graylog.shaded.org.apache.kafka09.common.requests.OffsetCommitResponse;
import org.graylog.shaded.org.apache.kafka09.common.requests.OffsetFetchRequest;
import org.graylog.shaded.org.apache.kafka09.common.requests.OffsetFetchResponse;
import org.graylog.shaded.org.apache.kafka09.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class ConsumerCoordinator
extends AbstractCoordinator {
    private static final Logger log = LoggerFactory.getLogger(ConsumerCoordinator.class);
    private final List<PartitionAssignor> assignors;
    private final Metadata metadata;
    private final MetadataSnapshot metadataSnapshot;
    private final ConsumerCoordinatorMetrics sensors;
    private final SubscriptionState subscriptions;
    private final OffsetCommitCallback defaultOffsetCommitCallback;
    private final boolean autoCommitEnabled;
    private final AutoCommitTask autoCommitTask;

    public ConsumerCoordinator(ConsumerNetworkClient client, String groupId, int sessionTimeoutMs, int heartbeatIntervalMs, List<PartitionAssignor> assignors, Metadata metadata, SubscriptionState subscriptions, Metrics metrics, String metricGrpPrefix, Map<String, String> metricTags, Time time, long retryBackoffMs, OffsetCommitCallback defaultOffsetCommitCallback, boolean autoCommitEnabled, long autoCommitIntervalMs) {
        super(client, groupId, sessionTimeoutMs, heartbeatIntervalMs, metrics, metricGrpPrefix, metricTags, time, retryBackoffMs);
        this.metadata = metadata;
        this.metadata.requestUpdate();
        this.metadataSnapshot = new MetadataSnapshot();
        this.subscriptions = subscriptions;
        this.defaultOffsetCommitCallback = defaultOffsetCommitCallback;
        this.autoCommitEnabled = autoCommitEnabled;
        this.assignors = assignors;
        this.addMetadataListener();
        this.autoCommitTask = autoCommitEnabled ? new AutoCommitTask(autoCommitIntervalMs) : null;
        this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix, metricTags);
    }

    @Override
    public String protocolType() {
        return "consumer";
    }

    @Override
    public List<JoinGroupRequest.ProtocolMetadata> metadata() {
        ArrayList<JoinGroupRequest.ProtocolMetadata> metadataList = new ArrayList<JoinGroupRequest.ProtocolMetadata>();
        for (PartitionAssignor assignor : this.assignors) {
            PartitionAssignor.Subscription subscription = assignor.subscription(this.subscriptions.subscription());
            ByteBuffer metadata = ConsumerProtocol.serializeSubscription(subscription);
            metadataList.add(new JoinGroupRequest.ProtocolMetadata(assignor.name(), metadata));
        }
        return metadataList;
    }

    private void addMetadataListener() {
        this.metadata.addListener(new Metadata.Listener(){

            @Override
            public void onMetadataUpdate(Cluster cluster) {
                if (!cluster.unauthorizedTopics().isEmpty()) {
                    throw new TopicAuthorizationException(new HashSet<String>(cluster.unauthorizedTopics()));
                }
                if (ConsumerCoordinator.this.subscriptions.hasPatternSubscription()) {
                    ArrayList<String> topicsToSubscribe = new ArrayList<String>();
                    for (String topic : cluster.topics()) {
                        if (!ConsumerCoordinator.this.subscriptions.getSubscribedPattern().matcher(topic).matches()) continue;
                        topicsToSubscribe.add(topic);
                    }
                    ConsumerCoordinator.this.subscriptions.changeSubscription(topicsToSubscribe);
                    ConsumerCoordinator.this.metadata.setTopics(ConsumerCoordinator.this.subscriptions.groupSubscription());
                }
                if (ConsumerCoordinator.this.metadataSnapshot.update(ConsumerCoordinator.this.subscriptions, cluster) && ConsumerCoordinator.this.subscriptions.partitionsAutoAssigned()) {
                    ConsumerCoordinator.this.subscriptions.needReassignment();
                }
            }
        });
    }

    private PartitionAssignor lookupAssignor(String name) {
        for (PartitionAssignor assignor : this.assignors) {
            if (!assignor.name().equals(name)) continue;
            return assignor;
        }
        return null;
    }

    @Override
    protected void onJoinComplete(int generation, String memberId, String assignmentStrategy, ByteBuffer assignmentBuffer) {
        PartitionAssignor assignor = this.lookupAssignor(assignmentStrategy);
        if (assignor == null) {
            throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
        }
        PartitionAssignor.Assignment assignment = ConsumerProtocol.deserializeAssignment(assignmentBuffer);
        this.subscriptions.needRefreshCommits();
        this.subscriptions.assignFromSubscribed(assignment.partitions());
        assignor.onAssignment(assignment);
        if (this.autoCommitEnabled) {
            this.autoCommitTask.enable();
        }
        ConsumerRebalanceListener listener = this.subscriptions.listener();
        log.debug("Setting newly assigned partitions {}", this.subscriptions.assignedPartitions());
        try {
            HashSet<TopicPartition> assigned = new HashSet<TopicPartition>(this.subscriptions.assignedPartitions());
            listener.onPartitionsAssigned(assigned);
        }
        catch (WakeupException e) {
            throw e;
        }
        catch (Exception e) {
            log.error("User provided listener " + listener.getClass().getName() + " failed on partition assignment: ", (Throwable)e);
        }
    }

    @Override
    protected Map<String, ByteBuffer> performAssignment(String leaderId, String assignmentStrategy, Map<String, ByteBuffer> allSubscriptions) {
        PartitionAssignor assignor = this.lookupAssignor(assignmentStrategy);
        if (assignor == null) {
            throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
        }
        HashSet<String> allSubscribedTopics = new HashSet<String>();
        HashMap<String, PartitionAssignor.Subscription> subscriptions = new HashMap<String, PartitionAssignor.Subscription>();
        for (Map.Entry<String, ByteBuffer> subscriptionEntry : allSubscriptions.entrySet()) {
            PartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(subscriptionEntry.getValue());
            subscriptions.put(subscriptionEntry.getKey(), subscription);
            allSubscribedTopics.addAll(subscription.topics());
        }
        this.subscriptions.groupSubscribe(allSubscribedTopics);
        this.metadata.setTopics(this.subscriptions.groupSubscription());
        this.client.ensureFreshMetadata();
        log.debug("Performing {} assignment for subscriptions {}", (Object)assignor.name(), subscriptions);
        Map<String, PartitionAssignor.Assignment> assignment = assignor.assign(this.metadata.fetch(), subscriptions);
        log.debug("Finished assignment: {}", assignment);
        HashMap<String, ByteBuffer> groupAssignment = new HashMap<String, ByteBuffer>();
        for (Map.Entry<String, PartitionAssignor.Assignment> assignmentEntry : assignment.entrySet()) {
            ByteBuffer buffer = ConsumerProtocol.serializeAssignment(assignmentEntry.getValue());
            groupAssignment.put(assignmentEntry.getKey(), buffer);
        }
        return groupAssignment;
    }

    @Override
    protected void onJoinPrepare(int generation, String memberId) {
        this.maybeAutoCommitOffsetsSync();
        ConsumerRebalanceListener listener = this.subscriptions.listener();
        log.debug("Revoking previously assigned partitions {}", this.subscriptions.assignedPartitions());
        try {
            HashSet<TopicPartition> revoked = new HashSet<TopicPartition>(this.subscriptions.assignedPartitions());
            listener.onPartitionsRevoked(revoked);
        }
        catch (WakeupException e) {
            throw e;
        }
        catch (Exception e) {
            log.error("User provided listener " + listener.getClass().getName() + " failed on partition revocation: ", (Throwable)e);
        }
        this.subscriptions.needReassignment();
    }

    @Override
    public boolean needRejoin() {
        return this.subscriptions.partitionsAutoAssigned() && (super.needRejoin() || this.subscriptions.partitionAssignmentNeeded());
    }

    public void refreshCommittedOffsetsIfNeeded() {
        if (this.subscriptions.refreshCommitsNeeded()) {
            Map<TopicPartition, OffsetAndMetadata> offsets = this.fetchCommittedOffsets(this.subscriptions.assignedPartitions());
            for (Map.Entry<TopicPartition, OffsetAndMetadata> entry2 : offsets.entrySet()) {
                TopicPartition tp = entry2.getKey();
                if (!this.subscriptions.isAssigned(tp)) continue;
                this.subscriptions.committed(tp, entry2.getValue());
            }
            this.subscriptions.commitsRefreshed();
        }
    }

    public Map<TopicPartition, OffsetAndMetadata> fetchCommittedOffsets(Set<TopicPartition> partitions) {
        while (true) {
            this.ensureCoordinatorKnown();
            RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future = this.sendOffsetFetchRequest(partitions);
            this.client.poll(future);
            if (future.succeeded()) {
                return future.value();
            }
            if (!future.isRetriable()) {
                throw future.exception();
            }
            this.time.sleep(this.retryBackoffMs);
        }
    }

    public void ensurePartitionAssignment() {
        if (this.subscriptions.partitionsAutoAssigned()) {
            this.ensureActiveGroup();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        this.client.disableWakeups();
        try {
            this.maybeAutoCommitOffsetsSync();
        }
        finally {
            super.close();
        }
    }

    public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
        this.subscriptions.needRefreshCommits();
        RequestFuture<Void> future = this.sendOffsetCommitRequest(offsets);
        final OffsetCommitCallback cb = callback == null ? this.defaultOffsetCommitCallback : callback;
        future.addListener(new RequestFutureListener<Void>(){

            @Override
            public void onSuccess(Void value2) {
                cb.onComplete(offsets, null);
            }

            @Override
            public void onFailure(RuntimeException e) {
                cb.onComplete(offsets, e);
            }
        });
    }

    public void commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
        if (offsets.isEmpty()) {
            return;
        }
        while (true) {
            this.ensureCoordinatorKnown();
            RequestFuture<Void> future = this.sendOffsetCommitRequest(offsets);
            this.client.poll(future);
            if (future.succeeded()) {
                return;
            }
            if (!future.isRetriable()) {
                throw future.exception();
            }
            this.time.sleep(this.retryBackoffMs);
        }
    }

    private void maybeAutoCommitOffsetsSync() {
        if (this.autoCommitEnabled) {
            this.autoCommitTask.disable();
            try {
                this.commitOffsetsSync(this.subscriptions.allConsumed());
            }
            catch (WakeupException e) {
                throw e;
            }
            catch (Exception e) {
                log.warn("Auto offset commit failed: ", (Object)e.getMessage());
            }
        }
    }

    private RequestFuture<Void> sendOffsetCommitRequest(Map<TopicPartition, OffsetAndMetadata> offsets) {
        if (this.coordinatorUnknown()) {
            return RequestFuture.coordinatorNotAvailable();
        }
        if (offsets.isEmpty()) {
            return RequestFuture.voidSuccess();
        }
        HashMap<TopicPartition, OffsetCommitRequest.PartitionData> offsetData = new HashMap<TopicPartition, OffsetCommitRequest.PartitionData>(offsets.size());
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry2 : offsets.entrySet()) {
            OffsetAndMetadata offsetAndMetadata = entry2.getValue();
            offsetData.put(entry2.getKey(), new OffsetCommitRequest.PartitionData(offsetAndMetadata.offset(), offsetAndMetadata.metadata()));
        }
        OffsetCommitRequest req = new OffsetCommitRequest(this.groupId, this.generation, this.memberId, -1L, offsetData);
        log.trace("Sending offset-commit request with {} to {}", offsets, (Object)this.coordinator);
        return this.client.send(this.coordinator, ApiKeys.OFFSET_COMMIT, req).compose(new OffsetCommitResponseHandler(offsets));
    }

    private RequestFuture<Map<TopicPartition, OffsetAndMetadata>> sendOffsetFetchRequest(Set<TopicPartition> partitions) {
        if (this.coordinatorUnknown()) {
            return RequestFuture.coordinatorNotAvailable();
        }
        log.debug("Fetching committed offsets for partitions: {}", partitions);
        OffsetFetchRequest request = new OffsetFetchRequest(this.groupId, new ArrayList<TopicPartition>(partitions));
        return this.client.send(this.coordinator, ApiKeys.OFFSET_FETCH, request).compose(new OffsetFetchResponseHandler());
    }

    private static class MetadataSnapshot {
        private Map<String, Integer> partitionsPerTopic = new HashMap<String, Integer>();

        private MetadataSnapshot() {
        }

        public boolean update(SubscriptionState subscription, Cluster cluster) {
            HashMap<String, Integer> partitionsPerTopic = new HashMap<String, Integer>();
            for (String topic : subscription.groupSubscription()) {
                partitionsPerTopic.put(topic, cluster.partitionCountForTopic(topic));
            }
            if (!partitionsPerTopic.equals(this.partitionsPerTopic)) {
                this.partitionsPerTopic = partitionsPerTopic;
                return true;
            }
            return false;
        }
    }

    private class ConsumerCoordinatorMetrics {
        public final Metrics metrics;
        public final String metricGrpName;
        public final Sensor commitLatency;

        public ConsumerCoordinatorMetrics(Metrics metrics, String metricGrpPrefix, Map<String, String> tags) {
            this.metrics = metrics;
            this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
            this.commitLatency = metrics.sensor("commit-latency");
            this.commitLatency.add(new MetricName("commit-latency-avg", this.metricGrpName, "The average time taken for a commit request", tags), new Avg());
            this.commitLatency.add(new MetricName("commit-latency-max", this.metricGrpName, "The max time taken for a commit request", tags), new Max());
            this.commitLatency.add(new MetricName("commit-rate", this.metricGrpName, "The number of commit calls per second", tags), new Rate(new Count()));
            Measurable numParts = new Measurable(){

                @Override
                public double measure(MetricConfig config, long now) {
                    return ConsumerCoordinator.this.subscriptions.assignedPartitions().size();
                }
            };
            metrics.addMetric(new MetricName("assigned-partitions", this.metricGrpName, "The number of partitions currently assigned to this consumer", tags), numParts);
        }
    }

    private class OffsetFetchResponseHandler
    extends AbstractCoordinator.CoordinatorResponseHandler<OffsetFetchResponse, Map<TopicPartition, OffsetAndMetadata>> {
        private OffsetFetchResponseHandler() {
        }

        @Override
        public OffsetFetchResponse parse(ClientResponse response) {
            return new OffsetFetchResponse(response.responseBody());
        }

        @Override
        public void handle(OffsetFetchResponse response, RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future) {
            HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>(response.responseData().size());
            for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry2 : response.responseData().entrySet()) {
                TopicPartition tp = entry2.getKey();
                OffsetFetchResponse.PartitionData data = entry2.getValue();
                if (data.hasError()) {
                    log.debug("Error fetching offset for topic-partition {}: {}", (Object)tp, (Object)Errors.forCode(data.errorCode).exception().getMessage());
                    if (data.errorCode == Errors.GROUP_LOAD_IN_PROGRESS.code()) {
                        future.raise(Errors.GROUP_LOAD_IN_PROGRESS);
                    } else if (data.errorCode == Errors.NOT_COORDINATOR_FOR_GROUP.code()) {
                        ConsumerCoordinator.this.coordinatorDead();
                        future.raise(Errors.NOT_COORDINATOR_FOR_GROUP);
                    } else if (data.errorCode == Errors.UNKNOWN_MEMBER_ID.code() || data.errorCode == Errors.ILLEGAL_GENERATION.code()) {
                        ConsumerCoordinator.this.subscriptions.needReassignment();
                        future.raise(Errors.forCode(data.errorCode));
                    } else {
                        future.raise(new KafkaException("Unexpected error in fetch offset response: " + Errors.forCode(data.errorCode).exception().getMessage()));
                    }
                    return;
                }
                if (data.offset >= 0L) {
                    offsets.put(tp, new OffsetAndMetadata(data.offset, data.metadata));
                    continue;
                }
                log.debug("No committed offset for partition " + tp);
            }
            future.complete(offsets);
        }
    }

    private class OffsetCommitResponseHandler
    extends AbstractCoordinator.CoordinatorResponseHandler<OffsetCommitResponse, Void> {
        private final Map<TopicPartition, OffsetAndMetadata> offsets;

        public OffsetCommitResponseHandler(Map<TopicPartition, OffsetAndMetadata> offsets) {
            this.offsets = offsets;
        }

        @Override
        public OffsetCommitResponse parse(ClientResponse response) {
            return new OffsetCommitResponse(response.responseBody());
        }

        @Override
        public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> future) {
            ((ConsumerCoordinator)ConsumerCoordinator.this).sensors.commitLatency.record(this.response.requestLatencyMs());
            HashSet<String> unauthorizedTopics = new HashSet<String>();
            for (Map.Entry<TopicPartition, Short> entry2 : commitResponse.responseData().entrySet()) {
                TopicPartition tp = entry2.getKey();
                OffsetAndMetadata offsetAndMetadata = this.offsets.get(tp);
                long offset = offsetAndMetadata.offset();
                Errors error = Errors.forCode(entry2.getValue());
                if (error == Errors.NONE) {
                    log.debug("Committed offset {} for partition {}", (Object)offset, (Object)tp);
                    if (!ConsumerCoordinator.this.subscriptions.isAssigned(tp)) continue;
                    ConsumerCoordinator.this.subscriptions.committed(tp, offsetAndMetadata);
                    continue;
                }
                if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                    log.error("Unauthorized to commit for group {}", (Object)ConsumerCoordinator.this.groupId);
                    future.raise(new GroupAuthorizationException(ConsumerCoordinator.this.groupId));
                    return;
                }
                if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
                    unauthorizedTopics.add(tp.topic());
                    continue;
                }
                if (error == Errors.OFFSET_METADATA_TOO_LARGE || error == Errors.INVALID_COMMIT_OFFSET_SIZE) {
                    log.info("Offset commit for group {} failed on partition {} due to {}, will retry", new Object[]{ConsumerCoordinator.this.groupId, tp, error});
                    future.raise(error);
                    return;
                }
                if (error == Errors.GROUP_LOAD_IN_PROGRESS) {
                    log.info("Offset commit for group {} failed due to {}, will retry", (Object)ConsumerCoordinator.this.groupId, (Object)error);
                    future.raise(error);
                    return;
                }
                if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR_FOR_GROUP || error == Errors.REQUEST_TIMED_OUT) {
                    log.info("Offset commit for group {} failed due to {}, will find new coordinator and retry", (Object)ConsumerCoordinator.this.groupId, (Object)error);
                    ConsumerCoordinator.this.coordinatorDead();
                    future.raise(error);
                    return;
                }
                if (error == Errors.UNKNOWN_MEMBER_ID || error == Errors.ILLEGAL_GENERATION || error == Errors.REBALANCE_IN_PROGRESS) {
                    log.error("Error {} occurred while committing offsets for group {}", (Object)error, (Object)ConsumerCoordinator.this.groupId);
                    ConsumerCoordinator.this.subscriptions.needReassignment();
                    future.raise(new CommitFailedException("Commit cannot be completed due to group rebalance"));
                    return;
                }
                log.error("Error committing partition {} at offset {}: {}", new Object[]{tp, offset, error.exception().getMessage()});
                future.raise(new KafkaException("Unexpected error in commit: " + error.exception().getMessage()));
                return;
            }
            if (!unauthorizedTopics.isEmpty()) {
                log.error("Unauthorized to commit to topics {}", unauthorizedTopics);
                future.raise(new TopicAuthorizationException(unauthorizedTopics));
            } else {
                future.complete(null);
            }
        }
    }

    public static class DefaultOffsetCommitCallback
    implements OffsetCommitCallback {
        @Override
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
            if (exception != null) {
                log.error("Offset commit failed.", (Throwable)exception);
            }
        }
    }

    private class AutoCommitTask
    implements DelayedTask {
        private final long interval;
        private boolean enabled = false;
        private boolean requestInFlight = false;

        public AutoCommitTask(long interval) {
            this.interval = interval;
        }

        public void enable() {
            if (!this.enabled) {
                ConsumerCoordinator.this.client.unschedule(this);
                this.enabled = true;
                if (!this.requestInFlight) {
                    long now = ConsumerCoordinator.this.time.milliseconds();
                    ConsumerCoordinator.this.client.schedule(this, this.interval + now);
                }
            }
        }

        public void disable() {
            this.enabled = false;
            ConsumerCoordinator.this.client.unschedule(this);
        }

        private void reschedule(long at) {
            if (this.enabled) {
                ConsumerCoordinator.this.client.schedule(this, at);
            }
        }

        @Override
        public void run(final long now) {
            if (!this.enabled) {
                return;
            }
            if (ConsumerCoordinator.this.coordinatorUnknown()) {
                log.debug("Cannot auto-commit offsets now since the coordinator is unknown, will retry after backoff");
                ConsumerCoordinator.this.client.schedule(this, now + ConsumerCoordinator.this.retryBackoffMs);
                return;
            }
            this.requestInFlight = true;
            ConsumerCoordinator.this.commitOffsetsAsync(ConsumerCoordinator.this.subscriptions.allConsumed(), new OffsetCommitCallback(){

                @Override
                public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                    AutoCommitTask.this.requestInFlight = false;
                    if (exception == null) {
                        AutoCommitTask.this.reschedule(now + AutoCommitTask.this.interval);
                    } else if (exception instanceof SendFailedException) {
                        log.debug("Failed to send automatic offset commit, will retry immediately");
                        AutoCommitTask.this.reschedule(now);
                    } else {
                        log.warn("Auto offset commit failed: {}", (Object)exception.getMessage());
                        AutoCommitTask.this.reschedule(now + AutoCommitTask.this.interval);
                    }
                }
            });
        }
    }
}

