/*
 * Decompiled with CFR 0.152.
 */
package org.odpi.openmetadata.adapters.eventbus.topic.kafka;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Queue;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.odpi.openmetadata.adapters.eventbus.topic.kafka.KafkaConfigurationWrapper;
import org.odpi.openmetadata.adapters.eventbus.topic.kafka.KafkaIncomingEvent;
import org.odpi.openmetadata.adapters.eventbus.topic.kafka.KafkaOpenMetadataEventConsumerConfiguration;
import org.odpi.openmetadata.adapters.eventbus.topic.kafka.KafkaOpenMetadataEventConsumerProperty;
import org.odpi.openmetadata.adapters.eventbus.topic.kafka.KafkaOpenMetadataTopicConnector;
import org.odpi.openmetadata.adapters.eventbus.topic.kafka.KafkaOpenMetadataTopicConnectorAuditCode;
import org.odpi.openmetadata.frameworks.auditlog.AuditLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaOpenMetadataEventConsumer
implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(KafkaOpenMetadataEventConsumer.class);
    private final AuditLog auditLog;
    private final long recoverySleepTimeSec;
    private final long pollTimeout;
    private final long maxQueueSize;
    private KafkaConsumer<String, String> consumer;
    private final String topicToSubscribe;
    private final String localServerId;
    private final KafkaOpenMetadataTopicConnector connector;
    private long nextMessageProcessingStatusCheckTime = System.currentTimeMillis();
    private long maxNextPollTimestampToAvoidConsumerTimeout = 0L;
    private final long maxMsBetweenPolls;
    private boolean initialPartitionAssignment = true;
    private final long consumerTimeoutPreventionSafetyWindowMs;
    private final long messageProcessingStatusCheckIntervalMs;
    private final long messageProcessingTimeoutMs;
    private final Map<TopicPartition, OffsetAndMetadata> currentOffsets = new ConcurrentHashMap<TopicPartition, OffsetAndMetadata>();
    private final Map<TopicPartition, BlockingDeque<KafkaIncomingEvent>> unprocessedEventQueues = new ConcurrentHashMap<TopicPartition, BlockingDeque<KafkaIncomingEvent>>();
    private final AtomicBoolean running = new AtomicBoolean(true);
    private final boolean isAutoCommitEnabled;
    private final long startTime = System.currentTimeMillis();
    private long countIgnoredMessages = 0L;
    private long countReceivedMessages = 0L;
    private long countCommits = 0L;
    private long countMessagesToProcess = 0L;
    private long countMessagesFailedToProcess = 0L;

    KafkaOpenMetadataEventConsumer(String topicName, String localServerId, KafkaOpenMetadataEventConsumerConfiguration config, Properties kafkaConsumerProperties, KafkaOpenMetadataTopicConnector connector, AuditLog auditLog) {
        this.auditLog = auditLog;
        this.consumer = new KafkaConsumer(kafkaConsumerProperties);
        this.topicToSubscribe = topicName;
        this.consumer.subscribe(Collections.singletonList(this.topicToSubscribe), (ConsumerRebalanceListener)new HandleRebalance(auditLog));
        this.connector = connector;
        this.localServerId = localServerId;
        String actionDescription = "initialize";
        if (auditLog != null) {
            auditLog.logMessage("initialize", KafkaOpenMetadataTopicConnectorAuditCode.SERVICE_CONSUMER_PROPERTIES.getMessageDefinition(Integer.toString(kafkaConsumerProperties.size()), topicName), kafkaConsumerProperties.toString());
        }
        this.maxMsBetweenPolls = new KafkaConfigurationWrapper(kafkaConsumerProperties).getMaxPollIntervalMs();
        this.recoverySleepTimeSec = config.getLongProperty(KafkaOpenMetadataEventConsumerProperty.RECOVERY_SLEEP_TIME);
        this.maxQueueSize = config.getIntProperty(KafkaOpenMetadataEventConsumerProperty.MAX_QUEUE_SIZE);
        this.consumerTimeoutPreventionSafetyWindowMs = config.getLongProperty(KafkaOpenMetadataEventConsumerProperty.CONSUMER_TIMEOUT_PREVENTION_SAFETY_WINDOW_MS);
        this.pollTimeout = config.getLongProperty(KafkaOpenMetadataEventConsumerProperty.POLL_TIMEOUT);
        this.isAutoCommitEnabled = KafkaOpenMetadataEventConsumer.getBooleanProperty(kafkaConsumerProperties, "enable.auto.commit", false);
        this.messageProcessingStatusCheckIntervalMs = config.getLongProperty(KafkaOpenMetadataEventConsumerProperty.COMMIT_CHECK_INTERVAL_MS);
        long messageTimeoutMins = config.getLongProperty(KafkaOpenMetadataEventConsumerProperty.CONSUMER_EVENT_PROCESSING_TIMEOUT_MINS);
        this.messageProcessingTimeoutMs = messageTimeoutMins < 0L ? messageTimeoutMins : TimeUnit.MILLISECONDS.convert(messageTimeoutMins, TimeUnit.MINUTES);
    }

    private static boolean getBooleanProperty(Properties p, String name, boolean defaultValue) {
        String value = p.getProperty(name);
        if (value == null) {
            return defaultValue;
        }
        return Boolean.parseBoolean(value);
    }

    private void updateNextMaxPollTimestamp() {
        this.maxNextPollTimestampToAvoidConsumerTimeout = System.currentTimeMillis() + this.maxMsBetweenPolls - this.consumerTimeoutPreventionSafetyWindowMs;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        String actionDescription = "run";
        Thread.currentThread().setName(this.topicToSubscribe + "/" + Thread.currentThread().getName());
        log.info("Main loop started for topic {}", (Object)this.topicToSubscribe);
        while (this.isRunning()) {
            try {
                this.checkForFullyProcessedMessagesIfNeeded();
                boolean pollRequired = System.currentTimeMillis() > this.maxNextPollTimestampToAvoidConsumerTimeout;
                int nUnprocessedEvents = this.connector.getNumberOfUnprocessedEvents();
                if (!pollRequired && (long)nUnprocessedEvents > this.maxQueueSize) {
                    log.debug("Skipping Kafka polling since unprocessed message queue size {} is greater than {}", (Object)nUnprocessedEvents, (Object)this.maxQueueSize);
                    this.awaitNextPollingTime();
                    continue;
                }
                this.updateNextMaxPollTimestamp();
                Duration pollDuration = Duration.ofMillis(this.pollTimeout);
                ConsumerRecords records = this.consumer.poll(pollDuration);
                log.debug("Found records: {}", (Object)records.count());
                for (ConsumerRecord consumerRecord : records) {
                    String json = (String)consumerRecord.value();
                    log.debug("Received message: {}", (Object)json);
                    ++this.countReceivedMessages;
                    log.debug("Metrics: receivedMessages: {}", (Object)this.countReceivedMessages);
                    KafkaIncomingEvent event = new KafkaIncomingEvent(json, consumerRecord.offset());
                    String recordKey = (String)consumerRecord.key();
                    String recordValue = (String)consumerRecord.value();
                    if (!this.localServerId.equals(recordKey)) {
                        try {
                            this.addUnprocessedEvent(consumerRecord.partition(), consumerRecord.topic(), event);
                            this.connector.distributeToListeners(event);
                            ++this.countMessagesToProcess;
                            log.debug("Metrics: messagesToProcess: {}", (Object)this.countMessagesToProcess);
                        }
                        catch (Exception error) {
                            ++this.countMessagesFailedToProcess;
                            log.debug("Metrics: messagesFailedToProcess: {}", (Object)this.countMessagesFailedToProcess);
                            log.warn("Error distributing inbound event: {}", (Object)error.getMessage());
                            if (this.auditLog != null) {
                                this.auditLog.logException("run", KafkaOpenMetadataTopicConnectorAuditCode.EXCEPTION_DISTRIBUTING_EVENT.getMessageDefinition(this.topicToSubscribe, error.getClass().getName(), json, error.getMessage()), (Throwable)error);
                            }
                        }
                    } else {
                        log.debug("Ignoring message with key: {} and value: {}", (Object)recordKey, (Object)recordValue);
                        ++this.countIgnoredMessages;
                        log.debug("Metrics: ignoredMessages: {}", (Object)this.countIgnoredMessages);
                    }
                    if (!this.isAutoCommitEnabled) continue;
                    TopicPartition partition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
                    this.currentOffsets.put(partition, new OffsetAndMetadata(consumerRecord.offset() + 1L));
                    ++this.countCommits;
                    log.debug("Metrics: messageCommits: {}", (Object)this.countCommits);
                }
            }
            catch (WakeupException e) {
                log.debug("Received wakeup call, proceeding with graceful shutdown");
            }
            catch (Exception error) {
                log.warn("Unexpected error: {}", (Object)error.getMessage());
                if (this.auditLog != null) {
                    this.auditLog.logException("run", KafkaOpenMetadataTopicConnectorAuditCode.EXCEPTION_RECEIVING_EVENT.getMessageDefinition(this.topicToSubscribe, error.getClass().getName(), error.getMessage()), (Throwable)error);
                }
                this.recoverAfterError();
            }
            finally {
                this.awaitNextPollingTime();
            }
        }
        if (this.consumer != null) {
            try {
                boolean changesCommitted = this.checkForFullyProcessedMessages();
                if (!changesCommitted) {
                    if (!this.isAutoCommitEnabled) {
                        int nUnprocessedMessages = this.getNumberOfUnprocessedMessages();
                        if (nUnprocessedMessages > 0) {
                            log.warn("Consumer shut down before all message processing completed! unprocessed messages: {}", (Object)nUnprocessedMessages);
                        } else {
                            log.info("All messages processed.  Consumer is shutting down.");
                        }
                    }
                    log.info("Committing current offset {} before shutdown.", this.currentOffsets);
                    try {
                        this.consumer.commitSync(this.currentOffsets);
                    }
                    catch (WakeupException nUnprocessedMessages) {
                    }
                    catch (Exception error) {
                        if (this.auditLog != null) {
                            this.auditLog.logException("consumer.commitSync", KafkaOpenMetadataTopicConnectorAuditCode.EXCEPTION_COMMITTING_OFFSETS.getMessageDefinition(error.getClass().getName(), this.topicToSubscribe, error.getMessage()), (Throwable)error);
                        }
                    }
                }
            }
            finally {
                this.consumer.close();
            }
            this.consumer = null;
        }
        log.info("Exiting main loop for topic {} & cleaning up", (Object)this.topicToSubscribe);
    }

    private void addUnprocessedEvent(int partition, String topic, KafkaIncomingEvent event) {
        if (this.isAutoCommitEnabled) {
            return;
        }
        TopicPartition key = new TopicPartition(topic, partition);
        BlockingDeque<KafkaIncomingEvent> queue = this.unprocessedEventQueues.get(key);
        if (queue == null) {
            queue = new LinkedBlockingDeque<KafkaIncomingEvent>();
            this.unprocessedEventQueues.put(key, queue);
        }
        queue.add(event);
    }

    private void checkForFullyProcessedMessagesIfNeeded() {
        if (this.isAutoCommitEnabled) {
            return;
        }
        if (System.currentTimeMillis() >= this.nextMessageProcessingStatusCheckTime) {
            this.checkForFullyProcessedMessages();
            this.nextMessageProcessingStatusCheckTime = System.currentTimeMillis() + this.messageProcessingStatusCheckIntervalMs;
        }
    }

    private boolean checkForFullyProcessedMessages() {
        block6: {
            if (this.isAutoCommitEnabled) {
                return false;
            }
            log.debug("Checking for fully processed messages whose offsets need to be committed");
            HashMap<TopicPartition, OffsetAndMetadata> commitData = new HashMap<TopicPartition, OffsetAndMetadata>();
            for (Map.Entry<TopicPartition, BlockingDeque<KafkaIncomingEvent>> entry : this.unprocessedEventQueues.entrySet()) {
                Queue queue = entry.getValue();
                KafkaIncomingEvent mostRecentProcessedEvent = this.removeFullyProcessedEventsFromBeginningOfQueue(queue);
                if (mostRecentProcessedEvent == null) continue;
                OffsetAndMetadata omd = new OffsetAndMetadata(mostRecentProcessedEvent.getOffset() + 1L);
                commitData.put(entry.getKey(), omd);
            }
            if (!commitData.isEmpty()) {
                this.currentOffsets.putAll(commitData);
                log.debug("Committing: {}", commitData);
                try {
                    this.consumer.commitSync(commitData);
                    return true;
                }
                catch (WakeupException wakeupException) {
                }
                catch (Exception error) {
                    if (this.auditLog == null) break block6;
                    this.auditLog.logException("checkForFullyProcessedMessages.commitSync", KafkaOpenMetadataTopicConnectorAuditCode.EXCEPTION_COMMITTING_OFFSETS.getMessageDefinition(error.getClass().getName(), this.topicToSubscribe, error.getMessage()), (Throwable)error);
                }
            }
        }
        return false;
    }

    private KafkaIncomingEvent removeFullyProcessedEventsFromBeginningOfQueue(Queue<KafkaIncomingEvent> queue) {
        KafkaIncomingEvent lastRemoved = null;
        while (this.isFirstEventFullyProcessed(queue)) {
            lastRemoved = queue.remove();
            log.debug("Message with offset {} has been fully processed.", (Object)lastRemoved.getOffset());
            ++this.countCommits;
            log.debug("Metrics: commits: {}", (Object)this.countCommits);
        }
        KafkaIncomingEvent firstEvent = queue.peek();
        if (firstEvent != null) {
            log.debug("Waiting for completing of processing of message with offset {}", (Object)firstEvent.getOffset());
        }
        return lastRemoved;
    }

    private boolean isFirstEventFullyProcessed(Queue<KafkaIncomingEvent> queue) {
        KafkaIncomingEvent firstEvent = queue.peek();
        if (firstEvent == null) {
            return false;
        }
        if (this.messageProcessingTimeoutMs >= 0L && firstEvent.hasTimeElapsedSinceCreation(this.messageProcessingTimeoutMs)) {
            log.warn("Processing of message at offset {} timed out.", (Object)firstEvent.getOffset());
            return true;
        }
        return firstEvent.isFullyProcessed();
    }

    private int getNumberOfUnprocessedMessages() {
        if (this.isAutoCommitEnabled) {
            return 0;
        }
        int result = 0;
        for (Queue queue : this.unprocessedEventQueues.values()) {
            if (queue.isEmpty()) continue;
            ++result;
        }
        return result;
    }

    private void awaitNextPollingTime() {
        try {
            Thread.sleep(1000L);
        }
        catch (InterruptedException e) {
            log.debug("Interrupted whilst sleeping:");
            Thread.currentThread().interrupt();
        }
    }

    private void recoverAfterError() {
        log.info("Waiting {} seconds to recover", (Object)this.recoverySleepTimeSec);
        try {
            Thread.sleep(this.recoverySleepTimeSec * 1000L);
        }
        catch (InterruptedException e1) {
            log.debug("Interrupted while recovering");
            Thread.currentThread().interrupt();
        }
    }

    void safeCloseConsumer() {
        log.debug("Closing consumer");
        this.stopRunning();
        if (this.consumer != null) {
            log.debug("Waking up consumer thread");
            this.consumer.wakeup();
        }
    }

    private boolean isRunning() {
        return this.running.get();
    }

    private void stopRunning() {
        log.debug("Set running to false");
        this.running.set(false);
    }

    private class HandleRebalance
    implements ConsumerRebalanceListener {
        AuditLog auditLog;

        public HandleRebalance(AuditLog auditLog) {
            this.auditLog = auditLog;
        }

        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            try {
                if (KafkaOpenMetadataEventConsumer.this.initialPartitionAssignment) {
                    log.debug("Received initial PartitionsAssigned event");
                    long partitionCount = partitions.size();
                    if (partitionCount != 1L) {
                        log.warn("Received PartitionsAssigned event with {} partitions. This is not supported.", (Object)partitionCount);
                    } else {
                        KafkaOpenMetadataEventConsumer.this.initialPartitionAssignment = false;
                        TopicPartition partition = partitions.iterator().next();
                        int partitionID = partition.partition();
                        String partitionTopic = partition.topic();
                        long reqStartTime = KafkaOpenMetadataEventConsumer.this.startTime;
                        log.info("Querying for offset by timestamp: {}", (Object)reqStartTime);
                        OffsetAndTimestamp otByStartTime = (OffsetAndTimestamp)KafkaOpenMetadataEventConsumer.this.consumer.offsetsForTimes(Collections.singletonMap(partition, reqStartTime)).get(partition);
                        if (otByStartTime != null) {
                            long maxOffsetWanted = otByStartTime.offset();
                            log.info("Earliest offset found for {} is {}", (Object)reqStartTime, (Object)otByStartTime.timestamp());
                            long currentOffset = KafkaOpenMetadataEventConsumer.this.consumer.position(partition);
                            if (currentOffset > maxOffsetWanted) {
                                log.info("Seeking to {} for partition {} and topic {} as current offset {} is too late", new Object[]{maxOffsetWanted, partitionID, partitionTopic, currentOffset});
                                KafkaOpenMetadataEventConsumer.this.consumer.seek(partition, maxOffsetWanted);
                            } else {
                                log.info("Not Seeking to {} for partition {} and topic {} as current offset {} is older", new Object[]{maxOffsetWanted, partitionID, partitionTopic, currentOffset});
                            }
                        } else {
                            log.info("No missed events found for partition {} and topic {}", (Object)partitionID, (Object)partitionTopic);
                        }
                    }
                } else {
                    log.debug("PartitionsAssigned Event - no action needed");
                }
            }
            catch (Exception e) {
                log.info("Error correcting seek position, continuing with defaults. Exception: {}", (Object)e.getMessage());
            }
        }

        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            block7: {
                String methodName = "onPartitionsRevoked.commitSync";
                if (!KafkaOpenMetadataEventConsumer.this.currentOffsets.isEmpty()) {
                    log.info("Lost partitions in rebalance. Committing current offsets: {}", KafkaOpenMetadataEventConsumer.this.currentOffsets);
                    try {
                        KafkaOpenMetadataEventConsumer.this.consumer.commitSync(KafkaOpenMetadataEventConsumer.this.currentOffsets);
                    }
                    catch (WakeupException wakeupException) {
                    }
                    catch (CommitFailedException error) {
                        if (this.auditLog != null) {
                            this.auditLog.logMessage("onPartitionsRevoked.commitSync", KafkaOpenMetadataTopicConnectorAuditCode.FAILED_TO_COMMIT_CONSUMED_EVENTS.getMessageDefinition());
                        }
                        break block7;
                    }
                    catch (Exception error) {
                        if (this.auditLog != null) {
                            this.auditLog.logException("onPartitionsRevoked.commitSync", KafkaOpenMetadataTopicConnectorAuditCode.EXCEPTION_COMMITTING_OFFSETS.getMessageDefinition(error.getClass().getName(), KafkaOpenMetadataEventConsumer.this.topicToSubscribe, error.getMessage()), (Throwable)error);
                        }
                        break block7;
                    }
                }
                log.debug("PartitionsRevoked Event - no action needed");
            }
        }
    }
}

