/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.consumer.offset.kafka.broker;

import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartitionOffset;
import pl.allegro.tech.hermes.consumers.consumer.offset.kafka.broker.KafkaConsumerOffsetMover;
import pl.allegro.tech.hermes.consumers.consumer.offset.kafka.broker.PartitionNotAssignedException;

public class PartitionAssigningAwareRetransmitter
implements ConsumerRebalanceListener {
    private static final Logger logger = LoggerFactory.getLogger(PartitionAssigningAwareRetransmitter.class);
    private final OffsetMover offsetMover;
    private final SubscriptionName subscriptionName;
    private final BlockingQueue<SubscriptionPartitionOffset> retransmissionQueue;

    public PartitionAssigningAwareRetransmitter(SubscriptionName subscriptionName, int queueSize, KafkaConsumer consumer) {
        this(subscriptionName, queueSize, new KafkaConsumerOffsetMover(consumer));
    }

    public PartitionAssigningAwareRetransmitter(SubscriptionName subscriptionName, int queueSize, OffsetMover offsetMover) {
        this.offsetMover = offsetMover;
        this.subscriptionName = subscriptionName;
        this.retransmissionQueue = new ArrayBlockingQueue<SubscriptionPartitionOffset>(queueSize);
    }

    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
    }

    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        if (!this.retransmissionQueue.isEmpty()) {
            logger.info("Detected scheduled retransmission for subscription {}", (Object)this.subscriptionName);
            this.moveScheduledOffsets();
        }
    }

    public void moveOffsetOrSchedule(SubscriptionPartitionOffset offset) {
        try {
            logger.info("Moving offset for subscription {} {}", (Object)offset.getSubscriptionName(), (Object)offset.toString());
            this.offsetMover.move(offset);
        }
        catch (PartitionNotAssignedException ex) {
            logger.info("Failed to move offset right now, reason: {}", (Object)ex.getMessage());
            boolean scheduled = this.retransmissionQueue.offer(offset);
            if (scheduled) {
                logger.info("Scheduled retransmission for subscription {} on next rebalance, offset {}", (Object)this.subscriptionName, (Object)offset.toString());
            }
            logger.info("Failed to schedule new retransmission for subscription {},there is already retransmission scheduled on next rebalance.", (Object)this.subscriptionName);
        }
    }

    public boolean isQueueEmpty() {
        return this.retransmissionQueue.isEmpty();
    }

    private void moveScheduledOffsets() {
        ArrayList offsets = new ArrayList();
        this.retransmissionQueue.drainTo(offsets);
        offsets.forEach(offset -> {
            try {
                this.offsetMover.move((SubscriptionPartitionOffset)offset);
            }
            catch (Exception ex) {
                logger.info("Still cannot move offset after rebalance for partition {} for subscription {}, possibly owned by different node", new Object[]{offset.getPartition(), offset.getSubscriptionName(), ex});
            }
        });
    }

    static interface OffsetMover {
        public void move(SubscriptionPartitionOffset var1) throws PartitionNotAssignedException;
    }
}

