/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.consumer.offset.kafka.broker;

import java.util.LinkedHashMap;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets;
import pl.allegro.tech.hermes.consumers.consumer.offset.ConsumerPartitionAssignmentState;

public class KafkaConsumerOffsetMover {
    private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerOffsetMover.class);
    private final SubscriptionName subscriptionName;
    private KafkaConsumer consumer;
    private ConsumerPartitionAssignmentState partitionAssignmentState;

    public KafkaConsumerOffsetMover(SubscriptionName subscriptionName, KafkaConsumer consumer, ConsumerPartitionAssignmentState partitionAssignmentState) {
        this.subscriptionName = subscriptionName;
        this.consumer = consumer;
        this.partitionAssignmentState = partitionAssignmentState;
    }

    public PartitionOffsets move(PartitionOffsets offsets) {
        PartitionOffsets movedOffsets = new PartitionOffsets();
        for (PartitionOffset offset : offsets) {
            if (!this.move(offset)) continue;
            movedOffsets.add(offset);
        }
        this.commit(movedOffsets);
        if (!movedOffsets.isEmpty()) {
            this.partitionAssignmentState.incrementTerm(this.subscriptionName);
        }
        return movedOffsets;
    }

    private boolean move(PartitionOffset offset) {
        try {
            TopicPartition tp = new TopicPartition(offset.getTopic().asString(), offset.getPartition());
            if (this.consumer.assignment().contains(tp)) {
                logger.info("Moving offset for assigned partition {} on subscription {}", (Object)offset.getPartition(), (Object)this.subscriptionName);
                this.consumer.seek(tp, offset.getOffset());
                return true;
            }
            logger.info("Not assigned to partition {} on subscription {}", (Object)offset.getPartition(), (Object)this.subscriptionName);
            return false;
        }
        catch (IllegalStateException ex) {
            logger.error("Failed to move offset for subscription={}, partition={}, offset={}", new Object[]{this.subscriptionName, offset.getPartition(), offset.getOffset(), ex});
            return false;
        }
    }

    private void commit(PartitionOffsets partitionOffsets) {
        try {
            LinkedHashMap<TopicPartition, OffsetAndMetadata> offsetsToCommit = new LinkedHashMap<TopicPartition, OffsetAndMetadata>();
            for (PartitionOffset partitionOffset : partitionOffsets) {
                offsetsToCommit.put(new TopicPartition(partitionOffset.getTopic().asString(), partitionOffset.getPartition()), new OffsetAndMetadata(partitionOffset.getOffset()));
            }
            if (!offsetsToCommit.isEmpty()) {
                this.consumer.commitSync(offsetsToCommit);
            }
        }
        catch (Exception e) {
            logger.error("Failed to commit offsets while trying to move them for subscription {}", (Object)this.subscriptionName, (Object)e);
        }
    }
}

