/*
 * Decompiled with CFR 0.152.
 */
package de.otto.synapse.endpoint.receiver.kafka;

import de.otto.synapse.channel.ChannelPosition;
import de.otto.synapse.channel.ChannelResponse;
import de.otto.synapse.channel.ShardResponse;
import de.otto.synapse.endpoint.MessageInterceptorRegistry;
import de.otto.synapse.endpoint.receiver.AbstractMessageLogReceiverEndpoint;
import de.otto.synapse.endpoint.receiver.kafka.ChannelDurationBehindHandler;
import de.otto.synapse.endpoint.receiver.kafka.ConsumerRebalanceHandler;
import de.otto.synapse.endpoint.receiver.kafka.ConsumerRebalanceListeners;
import de.otto.synapse.endpoint.receiver.kafka.KafkaDecoder;
import de.otto.synapse.endpoint.receiver.kafka.KafkaRecordsConsumer;
import de.otto.synapse.info.MessageReceiverStatus;
import de.otto.synapse.logging.LogHelper;
import java.time.Duration;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import javax.annotation.Nonnull;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEventPublisher;

public class KafkaMessageLogReceiverEndpoint
extends AbstractMessageLogReceiverEndpoint {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaMessageLogReceiverEndpoint.class);
    private static final long KAFKA_CONSUMER_POLLING_DURATION = 1000L;
    private static final int LOG_MESSAGE_COUNTER_EVERY_NTH_MESSAGE = 10000;
    private final Consumer<String, String> kafkaConsumer;
    private final ExecutorService executorService;
    private final ApplicationEventPublisher eventPublisher;
    private final MessageInterceptorRegistry interceptorRegistry;
    final AtomicBoolean stopSignal = new AtomicBoolean(false);

    public KafkaMessageLogReceiverEndpoint(String channelName, MessageInterceptorRegistry interceptorRegistry, Consumer<String, String> kafkaConsumer, ExecutorService executorService, ApplicationEventPublisher eventPublisher) {
        super(channelName, interceptorRegistry, eventPublisher);
        this.kafkaConsumer = kafkaConsumer;
        this.executorService = executorService;
        this.eventPublisher = eventPublisher;
        this.interceptorRegistry = interceptorRegistry;
    }

    @Nonnull
    public CompletableFuture<ChannelPosition> consumeUntil(@Nonnull ChannelPosition startFrom, @Nonnull Predicate<ShardResponse> stopCondition) {
        this.publishEvent(MessageReceiverStatus.STARTING, "Consuming messages from Kafka.", null);
        ChannelDurationBehindHandler durationBehindHandler = new ChannelDurationBehindHandler(this.getChannelName(), startFrom, this.eventPublisher, this.kafkaConsumer);
        ConsumerRebalanceHandler rebalanceHandler = new ConsumerRebalanceHandler(this.getChannelName(), startFrom, this.eventPublisher, this.kafkaConsumer);
        KafkaRecordsConsumer recordsConsumer = new KafkaRecordsConsumer(this.getChannelName(), startFrom, this.interceptorRegistry, this.getMessageDispatcher(), durationBehindHandler, rebalanceHandler::getCurrentPartitions, new KafkaDecoder());
        Set subscription = this.kafkaConsumer.subscription();
        if (!subscription.isEmpty()) {
            if (!subscription.contains(this.getChannelName())) {
                LOG.error("KafkaConsumer is already subscribed to " + subscription);
                throw new IllegalStateException("Unable to consume channel " + this.getChannelName() + " using KafkaConsumer that is subscribed to " + subscription);
            }
            this.kafkaConsumer.unsubscribe();
        }
        this.kafkaConsumer.subscribe(Collections.singletonList(this.getChannelName()), (ConsumerRebalanceListener)ConsumerRebalanceListeners.of(durationBehindHandler, rebalanceHandler));
        return ((CompletableFuture)CompletableFuture.supplyAsync(() -> this.processMessages(startFrom, stopCondition, rebalanceHandler, recordsConsumer), this.executorService).thenApply(channelPosition -> {
            this.publishEvent(MessageReceiverStatus.FINISHED, "Finished consuming messages from Kafka", null);
            return channelPosition;
        })).exceptionally(throwable -> {
            LOG.error("Failed to consume from Kafka stream {}: {}", (Object)this.getChannelName(), (Object)throwable.getMessage());
            this.publishEvent(MessageReceiverStatus.FAILED, "Failed to consume messages from Kafka: " + throwable.getMessage(), null);
            this.stop();
            throw new RuntimeException(throwable.getMessage(), (Throwable)throwable);
        });
    }

    private ChannelPosition processMessages(ChannelPosition startFrom, Predicate<ShardResponse> stopCondition, ConsumerRebalanceHandler rebalanceHandler, KafkaRecordsConsumer recordsConsumer) {
        long firstMessageLogTime = System.currentTimeMillis();
        AtomicLong shardMessagesCounter = new AtomicLong(0L);
        AtomicLong previousMessageLogTime = new AtomicLong(System.currentTimeMillis());
        AtomicLong previousLoggedMessageCounterMod = new AtomicLong(0L);
        AtomicLong previousLoggedMessageCounter = new AtomicLong(0L);
        AtomicBoolean stopConditionMet = new AtomicBoolean(false);
        ChannelPosition channelPosition = startFrom;
        try {
            do {
                ConsumerRecords records = this.kafkaConsumer.poll(Duration.ofMillis(1000L));
                if (!rebalanceHandler.shardsAssignedAndPositioned()) continue;
                ChannelResponse channelResponse = recordsConsumer.apply((ConsumerRecords<String, String>)records);
                channelPosition = channelResponse.getChannelPosition();
                stopConditionMet.set(channelResponse.getShardResponses().stream().allMatch(stopCondition));
                this.kafkaConsumer.commitAsync();
                int responseMessagesCounter = records.count();
                long totalMessagesCounter = shardMessagesCounter.addAndGet(responseMessagesCounter);
                if ((totalMessagesCounter <= 0L || totalMessagesCounter <= previousLoggedMessageCounterMod.get() + 10000L) && !stopConditionMet.get()) continue;
                double messagesPerSecond = LogHelper.calculateMessagesPerSecond((long)previousMessageLogTime.getAndSet(System.currentTimeMillis()), (long)(totalMessagesCounter - previousLoggedMessageCounter.get()));
                LOG.info("Read {} messages ({} per sec) from '{}', durationBehind={}, totalMessages={}", new Object[]{responseMessagesCounter, String.format("%.2f", messagesPerSecond), this.getChannelName(), channelResponse.getChannelDurationBehind(), totalMessagesCounter});
                if (stopConditionMet.get() || this.stopSignal.get()) {
                    LOG.info("Stop reading of channel={}, stopCondition={}, stopSignal={}, durationBehind={}", new Object[]{this.getChannelName(), stopConditionMet, this.stopSignal.get(), channelResponse.getChannelDurationBehind()});
                }
                previousLoggedMessageCounterMod.set(totalMessagesCounter - totalMessagesCounter % 10000L);
                previousLoggedMessageCounter.set(totalMessagesCounter);
            } while (!stopConditionMet.get() && !this.stopSignal.get());
        }
        catch (WakeupException e) {
            LOG.info("Shutting down Kafka consumer");
        }
        double totalMessagesPerSecond = LogHelper.calculateMessagesPerSecond((long)firstMessageLogTime, (long)shardMessagesCounter.get());
        LOG.info("Read a total of {} messages from '{}', totalMessagesPerSecond={}", new Object[]{shardMessagesCounter.get(), this.getChannelName(), String.format("%.2f", totalMessagesPerSecond)});
        return channelPosition;
    }

    public void stop() {
        LOG.info("Channel {} received stop signal.", (Object)this.getChannelName());
        this.stopSignal.set(true);
        this.kafkaConsumer.wakeup();
    }
}

