/*
 * Decompiled with CFR 0.152.
 */
package de.otto.synapse.endpoint.receiver.kafka;

import de.otto.synapse.channel.ChannelDurationBehind;
import de.otto.synapse.channel.ChannelPosition;
import de.otto.synapse.info.MessageReceiverNotification;
import de.otto.synapse.info.MessageReceiverStatus;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEventPublisher;

class ChannelDurationBehindHandler
implements ConsumerRebalanceListener {
    private static final Logger LOG = LoggerFactory.getLogger(ChannelDurationBehindHandler.class);
    private final List<TopicPartition> partitions = new CopyOnWriteArrayList<TopicPartition>();
    private final AtomicReference<ChannelDurationBehind> channelDurationBehind = new AtomicReference();
    private final String channelName;
    private final ChannelPosition startFrom;
    private final ApplicationEventPublisher eventPublisher;
    private final Consumer<String, String> kafkaConsumer;
    private final Clock clock;

    ChannelDurationBehindHandler(String channelName, ChannelPosition startFrom, ApplicationEventPublisher eventPublisher, Consumer<String, String> kafkaConsumer) {
        this(channelName, startFrom, eventPublisher, Clock.systemDefaultZone(), kafkaConsumer);
    }

    ChannelDurationBehindHandler(String channelName, ChannelPosition startFrom, ApplicationEventPublisher eventPublisher, Clock clock, Consumer<String, String> kafkaConsumer) {
        this.channelName = channelName;
        this.startFrom = startFrom;
        this.eventPublisher = eventPublisher;
        this.kafkaConsumer = kafkaConsumer;
        this.channelDurationBehind.set(ChannelDurationBehind.unknown());
        this.clock = clock;
    }

    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        LOG.info("Revoked " + partitions + " Kafka partitions: " + partitions);
        this.partitions.removeAll(partitions);
        partitions.forEach(p -> {
            String shardName = "" + p.partition();
            this.channelDurationBehind.getAndUpdate(previous -> ChannelDurationBehind.copyOf((ChannelDurationBehind)previous).without(shardName).build());
        });
    }

    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        this.partitions.addAll(partitions);
        this.channelDurationBehind.updateAndGet(channelDurationBehind -> this.updateOnPartitionChanged(this.startFrom, partitions, (ChannelDurationBehind)channelDurationBehind));
    }

    public void update(TopicPartition topicPartition, long lastOffsetRead, Instant lastTimestampRead) {
        Map endOffsets = this.kafkaConsumer.endOffsets(Collections.singletonList(topicPartition));
        Duration durationBehind = (Long)endOffsets.get(topicPartition) - 1L > lastOffsetRead ? Duration.between(lastTimestampRead, this.clock.instant()) : Duration.ZERO;
        Duration finalDurationBehind = durationBehind;
        this.channelDurationBehind.updateAndGet(behind -> ChannelDurationBehind.copyOf((ChannelDurationBehind)behind).with("" + topicPartition.partition(), finalDurationBehind).build());
        LOG.debug("Read from '{}:{}', durationBehind={}", new Object[]{this.channelName, topicPartition.partition(), durationBehind});
        if (this.eventPublisher != null) {
            this.eventPublisher.publishEvent((Object)MessageReceiverNotification.builder().withChannelName(this.channelName).withChannelDurationBehind(this.channelDurationBehind.get()).withStatus(MessageReceiverStatus.RUNNING).withMessage("Reading from Kafka stream.").build());
        }
    }

    private ChannelDurationBehind updateOnPartitionChanged(ChannelPosition startFrom, Collection<TopicPartition> partitions, ChannelDurationBehind channelDurationBehind) {
        Map topicPartitionToEndOffsetMap = this.kafkaConsumer.endOffsets(partitions);
        ChannelDurationBehind.Builder channelDurationBehindBuilder = ChannelDurationBehind.copyOf((ChannelDurationBehind)channelDurationBehind);
        topicPartitionToEndOffsetMap.forEach((topicPartition, endOffset) -> {
            String shard = "" + topicPartition.partition();
            String startPosition = startFrom.shard(shard).position();
            long startPos = this.startPosToLong(startPosition);
            if (startPos >= endOffset) {
                channelDurationBehindBuilder.with(shard, Duration.ZERO);
            } else {
                channelDurationBehindBuilder.with(shard, Duration.ofMillis(Long.MAX_VALUE));
            }
        });
        return channelDurationBehindBuilder.build();
    }

    private long startPosToLong(String startPosition) {
        if (startPosition == null || startPosition.equals("")) {
            return 0L;
        }
        return Long.parseLong(startPosition);
    }

    ChannelDurationBehind getChannelDurationBehind() {
        return this.channelDurationBehind.get();
    }
}

