/*
 * Decompiled with CFR 0.152.
 */
package de.otto.synapse.endpoint.receiver.kafka;

import com.google.common.collect.Sets;
import de.otto.synapse.channel.ChannelPosition;
import de.otto.synapse.channel.ShardPosition;
import de.otto.synapse.info.MessageReceiverNotification;
import de.otto.synapse.info.MessageReceiverStatus;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEventPublisher;

class ConsumerRebalanceHandler
implements ConsumerRebalanceListener {
    private static final Logger LOG = LoggerFactory.getLogger(ConsumerRebalanceHandler.class);
    private final String channelName;
    private final ChannelPosition channelPosition;
    private final ApplicationEventPublisher eventPublisher;
    private final KafkaConsumer<String, String> kafkaConsumer;
    private final Set<String> currentPartitions = Sets.newConcurrentHashSet();
    private final AtomicBoolean shardsAssignedAndPositioned = new AtomicBoolean(false);

    ConsumerRebalanceHandler(String channelName, ChannelPosition startFrom, ApplicationEventPublisher eventPublisher, KafkaConsumer<String, String> kafkaConsumer) {
        this.channelName = channelName;
        this.channelPosition = startFrom;
        this.eventPublisher = eventPublisher;
        this.kafkaConsumer = kafkaConsumer;
    }

    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        LOG.info("Revoked " + partitions + " Kafka partitions: " + partitions);
        partitions.forEach(p -> {
            String shardName = "" + p.partition();
            this.currentPartitions.remove(shardName);
        });
        this.shardsAssignedAndPositioned.set(!this.currentPartitions.isEmpty());
    }

    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        LOG.info("Assigned " + partitions + " Kafka partitions: " + partitions);
        partitions.forEach(p -> this.currentPartitions.add("" + p.partition()));
        for (TopicPartition partition : partitions) {
            String shardName = "" + partition.partition();
            ShardPosition shardPosition = this.channelPosition.shard(shardName);
            TopicPartition topicPartition = new TopicPartition(this.channelName, partition.partition());
            switch (shardPosition.startFrom()) {
                case POSITION: {
                    this.kafkaConsumer.seek(topicPartition, (long)(Integer.parseInt(shardPosition.position()) + 1));
                    break;
                }
                case AT_POSITION: {
                    this.kafkaConsumer.seek(topicPartition, (long)Integer.parseInt(shardPosition.position()));
                    break;
                }
                case HORIZON: {
                    this.kafkaConsumer.seekToBeginning(Collections.singletonList(topicPartition));
                    break;
                }
                case TIMESTAMP: {
                    Map<TopicPartition, Long> query = Collections.singletonMap(topicPartition, shardPosition.timestamp().toEpochMilli());
                    Map offsets = this.kafkaConsumer.offsetsForTimes(query);
                    this.kafkaConsumer.seek(topicPartition, ((OffsetAndTimestamp)offsets.get(topicPartition)).offset());
                }
            }
            LOG.info("Reading from channel={}, shard={}, position={}", new Object[]{this.channelName, shardName, shardPosition});
        }
        this.shardsAssignedAndPositioned.set(true);
        if (this.eventPublisher != null) {
            this.eventPublisher.publishEvent((Object)MessageReceiverNotification.builder().withChannelName(this.channelName).withStatus(MessageReceiverStatus.STARTED).withMessage("Received shards from Kafka.").build());
        }
    }

    public void onPartitionsLost(Collection<TopicPartition> partitions) {
        LOG.warn("Lost " + partitions + " Kafka partitions: " + partitions);
        partitions.forEach(p -> {
            String shardName = "" + p.partition();
            this.currentPartitions.remove(shardName);
        });
        this.shardsAssignedAndPositioned.set(!this.currentPartitions.isEmpty());
    }

    public Set<String> getCurrentPartitions() {
        return this.currentPartitions;
    }

    public boolean shardsAssignedAndPositioned() {
        return this.shardsAssignedAndPositioned.get();
    }
}

