/*
 * Decompiled with CFR 0.152.
 */
package de.otto.synapse.endpoint.receiver.kafka;

import de.otto.synapse.channel.ChannelDurationBehind;
import de.otto.synapse.info.MessageReceiverNotification;
import de.otto.synapse.info.MessageReceiverStatus;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEventPublisher;

class ChannelDurationBehindHandler
implements ConsumerRebalanceListener {
    private static final Logger LOG = LoggerFactory.getLogger(ChannelDurationBehindHandler.class);
    private final AtomicReference<ChannelDurationBehind> channelDurationBehind = new AtomicReference();
    private final String channelName;
    private final ApplicationEventPublisher eventPublisher;
    private final KafkaConsumer<String, String> kafkaConsumer;
    private final Clock clock;

    ChannelDurationBehindHandler(String channelName, ApplicationEventPublisher eventPublisher, KafkaConsumer<String, String> kafkaConsumer) {
        this(channelName, eventPublisher, Clock.systemDefaultZone(), kafkaConsumer);
    }

    ChannelDurationBehindHandler(String channelName, ApplicationEventPublisher eventPublisher, Clock clock, KafkaConsumer<String, String> kafkaConsumer) {
        this.channelName = channelName;
        this.eventPublisher = eventPublisher;
        this.kafkaConsumer = kafkaConsumer;
        this.channelDurationBehind.set(ChannelDurationBehind.unknown());
        this.clock = clock;
    }

    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        LOG.info("Revoked " + partitions + " Kafka partitions: " + partitions);
        partitions.forEach(p -> {
            String shardName = "" + p.partition();
            this.channelDurationBehind.getAndUpdate(previous -> ChannelDurationBehind.copyOf((ChannelDurationBehind)previous).without(shardName).build());
        });
    }

    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        Set shardNames = partitions.stream().map(TopicPartition::partition).map(String::valueOf).collect(Collectors.toSet());
        this.channelDurationBehind.getAndUpdate(previous -> ChannelDurationBehind.copyOf((ChannelDurationBehind)previous).withAllUnknown(shardNames).build());
    }

    public void update(TopicPartition topicPartition, long lastOffsetRead, Instant lastTimestampRead) {
        Map endOffsets = this.kafkaConsumer.endOffsets(Collections.singletonList(topicPartition));
        Duration durationBehind = (Long)endOffsets.get(topicPartition) - 1L > lastOffsetRead ? Duration.between(lastTimestampRead, this.clock.instant()) : Duration.ZERO;
        Duration finalDurationBehind = durationBehind;
        this.channelDurationBehind.updateAndGet(behind -> ChannelDurationBehind.copyOf((ChannelDurationBehind)behind).with("" + topicPartition.partition(), finalDurationBehind).build());
        LOG.debug("Read from '{}:{}', durationBehind={}", new Object[]{this.channelName, topicPartition.partition(), durationBehind});
        if (this.eventPublisher != null) {
            this.eventPublisher.publishEvent((Object)MessageReceiverNotification.builder().withChannelName(this.channelName).withChannelDurationBehind(this.channelDurationBehind.get()).withStatus(MessageReceiverStatus.RUNNING).withMessage("Reading from Kafka stream.").build());
        }
    }

    ChannelDurationBehind getChannelDurationBehind() {
        return this.channelDurationBehind.get();
    }

    public void noRecordsReceived() {
        ChannelDurationBehind zeroChannelDurationBehind = ChannelDurationBehind.channelDurationBehind().with(this.channelName, Duration.ZERO).build();
        this.channelDurationBehind.updateAndGet(behind -> zeroChannelDurationBehind);
        if (this.eventPublisher != null) {
            this.eventPublisher.publishEvent((Object)MessageReceiverNotification.builder().withChannelName(this.channelName).withChannelDurationBehind(zeroChannelDurationBehind).withStatus(MessageReceiverStatus.RUNNING).withMessage("Reading from Kafka stream.").build());
        }
    }
}

