/*
 * Decompiled with CFR 0.152.
 */
package org.odpi.openmetadata.adapters.eventbus.topic.kafka;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Queue;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.odpi.openmetadata.adapters.eventbus.topic.kafka.KafkaConfigurationWrapper;
import org.odpi.openmetadata.adapters.eventbus.topic.kafka.KafkaIncomingEvent;
import org.odpi.openmetadata.adapters.eventbus.topic.kafka.KafkaOpenMetadataEventConsumerConfiguration;
import org.odpi.openmetadata.adapters.eventbus.topic.kafka.KafkaOpenMetadataEventConsumerProperty;
import org.odpi.openmetadata.adapters.eventbus.topic.kafka.KafkaOpenMetadataTopicConnector;
import org.odpi.openmetadata.adapters.eventbus.topic.kafka.KafkaOpenMetadataTopicConnectorAuditCode;
import org.odpi.openmetadata.frameworks.auditlog.AuditLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaOpenMetadataEventConsumer
implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(KafkaOpenMetadataEventConsumer.class);
    private final AuditLog auditLog;
    private final long recoverySleepTimeSec;
    private final long pollTimeout;
    private final long maxQueueSize;
    private KafkaConsumer<String, String> consumer;
    private final String topicToSubscribe;
    private final String localServerId;
    private final KafkaOpenMetadataTopicConnector connector;
    private long nextMessageProcessingStatusCheckTime = System.currentTimeMillis();
    private long maxNextPollTimestampToAvoidConsumerTimeout = 0L;
    private final long maxMsBetweenPolls;
    private final long consumerTimeoutPreventionSafetyWindowMs;
    private final long messageProcessingStatusCheckIntervalMs;
    private final long messageProcessingTimeoutMs;
    private final Map<TopicPartition, OffsetAndMetadata> currentOffsets = new ConcurrentHashMap<TopicPartition, OffsetAndMetadata>();
    private final Map<TopicPartition, BlockingDeque<KafkaIncomingEvent>> unprocessedEventQueues = new ConcurrentHashMap<TopicPartition, BlockingDeque<KafkaIncomingEvent>>();
    private final AtomicBoolean running = new AtomicBoolean(true);
    private final boolean isAutoCommitEnabled;

    KafkaOpenMetadataEventConsumer(String topicName, String localServerId, KafkaOpenMetadataEventConsumerConfiguration config, Properties kafkaConsumerProperties, KafkaOpenMetadataTopicConnector connector, AuditLog auditLog) {
        this.auditLog = auditLog;
        this.consumer = new KafkaConsumer(kafkaConsumerProperties);
        this.topicToSubscribe = topicName;
        this.consumer.subscribe(Collections.singletonList(this.topicToSubscribe), (ConsumerRebalanceListener)new HandleRebalance(auditLog));
        this.connector = connector;
        this.localServerId = localServerId;
        String actionDescription = "initialize";
        if (auditLog != null) {
            auditLog.logMessage("initialize", KafkaOpenMetadataTopicConnectorAuditCode.SERVICE_CONSUMER_PROPERTIES.getMessageDefinition(Integer.toString(kafkaConsumerProperties.size()), topicName), kafkaConsumerProperties.toString());
        }
        this.maxMsBetweenPolls = new KafkaConfigurationWrapper(kafkaConsumerProperties).getMaxPollIntervalMs();
        this.recoverySleepTimeSec = config.getLongProperty(KafkaOpenMetadataEventConsumerProperty.RECOVERY_SLEEP_TIME);
        this.maxQueueSize = config.getIntProperty(KafkaOpenMetadataEventConsumerProperty.MAX_QUEUE_SIZE);
        this.consumerTimeoutPreventionSafetyWindowMs = config.getLongProperty(KafkaOpenMetadataEventConsumerProperty.CONSUMER_TIMEOUT_PREVENTION_SAFETY_WINDOW_MS);
        this.pollTimeout = config.getLongProperty(KafkaOpenMetadataEventConsumerProperty.POLL_TIMEOUT);
        this.isAutoCommitEnabled = KafkaOpenMetadataEventConsumer.getBooleanProperty(kafkaConsumerProperties, "enable.auto.commit", false);
        this.messageProcessingStatusCheckIntervalMs = config.getLongProperty(KafkaOpenMetadataEventConsumerProperty.COMMIT_CHECK_INTERVAL_MS);
        long messageTimeoutMins = config.getLongProperty(KafkaOpenMetadataEventConsumerProperty.CONSUMER_EVENT_PROCESSING_TIMEOUT_MINS);
        this.messageProcessingTimeoutMs = messageTimeoutMins < 0L ? messageTimeoutMins : TimeUnit.MILLISECONDS.convert(messageTimeoutMins, TimeUnit.MINUTES);
    }

    private static boolean getBooleanProperty(Properties p, String name, boolean defaultValue) {
        String value = p.getProperty(name);
        if (value == null) {
            return defaultValue;
        }
        return Boolean.parseBoolean(value);
    }

    private void updateNextMaxPollTimestamp() {
        this.maxNextPollTimestampToAvoidConsumerTimeout = System.currentTimeMillis() + this.maxMsBetweenPolls - this.consumerTimeoutPreventionSafetyWindowMs;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        String actionDescription = "run";
        while (this.isRunning()) {
            try {
                this.checkForFullyProcessedMessagesIfNeeded();
                boolean pollRequired = System.currentTimeMillis() > this.maxNextPollTimestampToAvoidConsumerTimeout;
                int nUnprocessedEvents = this.connector.getNumberOfUnprocessedEvents();
                if (!pollRequired && (long)nUnprocessedEvents > this.maxQueueSize) {
                    log.warn("Skipping Kafka polling since unprocessed message queue size {} is greater than {}", (Object)nUnprocessedEvents, (Object)this.maxQueueSize);
                    this.awaitNextPollingTime();
                    continue;
                }
                this.updateNextMaxPollTimestamp();
                Duration pollDuration = Duration.ofMillis(this.pollTimeout);
                ConsumerRecords records = this.consumer.poll(pollDuration);
                log.debug("Found records: " + records.count());
                for (ConsumerRecord record : records) {
                    String json = (String)record.value();
                    log.debug("Received message: " + json);
                    KafkaIncomingEvent event = new KafkaIncomingEvent(json, record.offset());
                    if (!this.localServerId.equals(record.key())) {
                        try {
                            this.addUnprocessedEvent(record.partition(), record.topic(), event);
                            this.connector.distributeToListeners(event);
                        }
                        catch (Exception error) {
                            log.error(String.format("Error distributing inbound event: %s", error.getMessage()), (Throwable)error);
                            if (this.auditLog != null) {
                                this.auditLog.logException("run", KafkaOpenMetadataTopicConnectorAuditCode.EXCEPTION_DISTRIBUTING_EVENT.getMessageDefinition(this.topicToSubscribe, error.getClass().getName(), json, error.getMessage()), (Throwable)error);
                            }
                        }
                    } else {
                        log.debug("Ignoring message with key: " + (String)record.key() + " and value " + (String)record.value());
                    }
                    if (!this.isAutoCommitEnabled) continue;
                    TopicPartition partition = new TopicPartition(record.topic(), record.partition());
                    this.currentOffsets.put(partition, new OffsetAndMetadata(record.offset() + 1L));
                }
            }
            catch (WakeupException e) {
                log.debug("Received wakeup call, proceeding with graceful shutdown", (Throwable)e);
            }
            catch (Exception error) {
                log.error(String.format("Unexpected error: %s", error.getMessage()), (Throwable)error);
                if (this.auditLog != null) {
                    this.auditLog.logException("run", KafkaOpenMetadataTopicConnectorAuditCode.EXCEPTION_RECEIVING_EVENT.getMessageDefinition(this.topicToSubscribe, error.getClass().getName(), error.getMessage()), (Throwable)error);
                }
                this.recoverAfterError();
            }
            finally {
                this.awaitNextPollingTime();
            }
        }
        if (this.consumer != null) {
            try {
                boolean changesCommitted = this.checkForFullyProcessedMessages();
                if (!changesCommitted) {
                    if (!this.isAutoCommitEnabled) {
                        int nUnprocessedMessages = this.getNumberOfUnprocessedMessages();
                        if (nUnprocessedMessages > 0) {
                            log.error("Consumer was shut down before all message processing has completed!  There are " + nUnprocessedMessages + " messages whose processing is incomplete.");
                        } else {
                            log.info("All messages have been fully processed.  Consumer is shutting down safely.");
                        }
                    }
                    log.info("Committing current offsets before shutdown: " + this.currentOffsets);
                    try {
                        this.consumer.commitSync(this.currentOffsets);
                    }
                    catch (WakeupException nUnprocessedMessages) {
                    }
                    catch (Exception error) {
                        if (this.auditLog != null) {
                            this.auditLog.logException("consumer.commitSync", KafkaOpenMetadataTopicConnectorAuditCode.EXCEPTION_COMMITTING_OFFSETS.getMessageDefinition(error.getClass().getName(), this.topicToSubscribe, error.getMessage()), (Throwable)error);
                        }
                    }
                }
            }
            finally {
                this.consumer.close();
            }
            this.consumer = null;
        }
    }

    private void addUnprocessedEvent(int partition, String topic, KafkaIncomingEvent event) {
        if (this.isAutoCommitEnabled) {
            return;
        }
        TopicPartition key = new TopicPartition(topic, partition);
        BlockingDeque<KafkaIncomingEvent> queue = this.unprocessedEventQueues.get(key);
        if (queue == null) {
            queue = new LinkedBlockingDeque<KafkaIncomingEvent>();
            this.unprocessedEventQueues.put(key, queue);
        }
        queue.add(event);
    }

    private boolean checkForFullyProcessedMessagesIfNeeded() {
        if (this.isAutoCommitEnabled) {
            return false;
        }
        if (System.currentTimeMillis() >= this.nextMessageProcessingStatusCheckTime) {
            boolean changesFound = this.checkForFullyProcessedMessages();
            this.nextMessageProcessingStatusCheckTime = System.currentTimeMillis() + this.messageProcessingStatusCheckIntervalMs;
            return changesFound;
        }
        return false;
    }

    private boolean checkForFullyProcessedMessages() {
        block6: {
            if (this.isAutoCommitEnabled) {
                return false;
            }
            log.info("Checking for fully processed messages whose offsets need to be committed");
            HashMap<TopicPartition, OffsetAndMetadata> commitData = new HashMap<TopicPartition, OffsetAndMetadata>();
            for (Map.Entry<TopicPartition, BlockingDeque<KafkaIncomingEvent>> entry : this.unprocessedEventQueues.entrySet()) {
                Queue queue = entry.getValue();
                KafkaIncomingEvent mostRecentProcessedEvent = this.removeFullyProcessedEventsFromBeginningOfQueue(queue);
                if (mostRecentProcessedEvent == null) continue;
                OffsetAndMetadata omd = new OffsetAndMetadata(mostRecentProcessedEvent.getOffset() + 1L);
                commitData.put(entry.getKey(), omd);
            }
            if (!commitData.isEmpty()) {
                this.currentOffsets.putAll(commitData);
                log.info("Committing: " + commitData);
                try {
                    this.consumer.commitSync(commitData);
                    return true;
                }
                catch (WakeupException wakeupException) {
                }
                catch (Exception error) {
                    if (this.auditLog == null) break block6;
                    this.auditLog.logException("checkForFullyProcessedMessages.commitSync", KafkaOpenMetadataTopicConnectorAuditCode.EXCEPTION_COMMITTING_OFFSETS.getMessageDefinition(error.getClass().getName(), this.topicToSubscribe, error.getMessage()), (Throwable)error);
                }
            }
        }
        return false;
    }

    private KafkaIncomingEvent removeFullyProcessedEventsFromBeginningOfQueue(Queue<KafkaIncomingEvent> queue) {
        KafkaIncomingEvent lastRemoved = null;
        while (this.isFirstEventFullyProcessed(queue)) {
            lastRemoved = queue.remove();
            log.info("Message with offset " + lastRemoved.getOffset() + " has been fully processed.");
        }
        KafkaIncomingEvent firstEvent = queue.peek();
        if (firstEvent != null) {
            log.info("Waiting for completing of processing of message with offset " + firstEvent.getOffset());
        }
        return lastRemoved;
    }

    private boolean isFirstEventFullyProcessed(Queue<KafkaIncomingEvent> queue) {
        KafkaIncomingEvent firstEvent = queue.peek();
        if (firstEvent == null) {
            return false;
        }
        if (this.messageProcessingTimeoutMs >= 0L && firstEvent.hasTimeElapsedSinceCreation(this.messageProcessingTimeoutMs)) {
            log.warn("Processing of message at offset " + firstEvent.getOffset() + " timed out.");
            return true;
        }
        return firstEvent.isFullyProcessed();
    }

    private int getNumberOfUnprocessedMessages() {
        if (this.isAutoCommitEnabled) {
            return 0;
        }
        int result = 0;
        for (Queue queue : this.unprocessedEventQueues.values()) {
            if (queue.isEmpty()) continue;
            ++result;
        }
        return result;
    }

    private void awaitNextPollingTime() {
        try {
            Thread.sleep(1000L);
        }
        catch (InterruptedException e) {
            log.error(String.format("Interruption error: %s", e.getMessage()), (Throwable)e);
        }
    }

    private void recoverAfterError() {
        log.info(String.format("Waiting %s seconds to recover", this.recoverySleepTimeSec));
        try {
            Thread.sleep(this.recoverySleepTimeSec * 1000L);
        }
        catch (InterruptedException e1) {
            log.debug("Interrupted while recovering", (Throwable)e1);
        }
    }

    void safeCloseConsumer() {
        this.stopRunning();
        if (this.consumer != null) {
            this.consumer.wakeup();
        }
    }

    private boolean isRunning() {
        return this.running.get();
    }

    private void stopRunning() {
        this.running.set(false);
    }

    private class HandleRebalance
    implements ConsumerRebalanceListener {
        AuditLog auditLog = null;

        public HandleRebalance(AuditLog auditLog) {
            this.auditLog = auditLog;
        }

        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        }

        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            block6: {
                String methodName = "onPartitionsRevoked.commitSync";
                if (!KafkaOpenMetadataEventConsumer.this.currentOffsets.isEmpty()) {
                    log.info("Lost partitions in rebalance. Committing current offsets:" + KafkaOpenMetadataEventConsumer.this.currentOffsets);
                    try {
                        KafkaOpenMetadataEventConsumer.this.consumer.commitSync(KafkaOpenMetadataEventConsumer.this.currentOffsets);
                    }
                    catch (WakeupException wakeupException) {
                    }
                    catch (CommitFailedException error) {
                        if (this.auditLog != null) {
                            this.auditLog.logMessage("onPartitionsRevoked.commitSync", KafkaOpenMetadataTopicConnectorAuditCode.FAILED_TO_COMMIT_CONSUMED_EVENTS.getMessageDefinition());
                        }
                    }
                    catch (Exception error) {
                        if (this.auditLog == null) break block6;
                        this.auditLog.logException("onPartitionsRevoked.commitSync", KafkaOpenMetadataTopicConnectorAuditCode.EXCEPTION_COMMITTING_OFFSETS.getMessageDefinition(error.getClass().getName(), KafkaOpenMetadataEventConsumer.this.topicToSubscribe, error.getMessage()), (Throwable)error);
                    }
                }
            }
        }
    }
}

