/*
 * Decompiled with CFR 0.152.
 */
package org.odpi.openmetadata.adapters.eventbus.topic.kafka;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConfigurationWrapper;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.odpi.openmetadata.adapters.eventbus.topic.kafka.KafkaOpenMetadataEventConsumerConfiguration;
import org.odpi.openmetadata.adapters.eventbus.topic.kafka.KafkaOpenMetadataEventConsumerProperty;
import org.odpi.openmetadata.adapters.eventbus.topic.kafka.KafkaOpenMetadataTopicConnector;
import org.odpi.openmetadata.adapters.eventbus.topic.kafka.KafkaOpenMetadataTopicConnectorAuditCode;
import org.odpi.openmetadata.repositoryservices.auditlog.OMRSAuditLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaOpenMetadataEventConsumer
implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(KafkaOpenMetadataEventConsumer.class);
    private OMRSAuditLog auditLog;
    private final long recoverySleepTimeSec;
    private final long pollTimeout;
    private final long maxQueueSize;
    private KafkaOpenMetadataEventConsumerConfiguration config;
    private KafkaConsumer<String, String> consumer;
    private String topicToSubscribe;
    private String localServerId;
    private KafkaOpenMetadataTopicConnector connector;
    private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
    private long maxNextPollTimestampToAvoidConsumerTimeout = 0L;
    private final long maxMsBetweenPolls;
    private final long consumerTimeoutPreventionSafetyWindowMs;
    private Boolean running = true;

    public KafkaOpenMetadataEventConsumer(String topicName, String localServerId, KafkaOpenMetadataEventConsumerConfiguration config, Properties kafkaConsumerProperties, KafkaOpenMetadataTopicConnector connector, OMRSAuditLog auditLog) {
        this.auditLog = auditLog;
        this.consumer = new KafkaConsumer(kafkaConsumerProperties);
        this.topicToSubscribe = topicName;
        this.consumer.subscribe(Collections.singletonList(this.topicToSubscribe), (ConsumerRebalanceListener)new HandleRebalance());
        this.connector = connector;
        this.localServerId = localServerId;
        String actionDescription = "initialize";
        KafkaOpenMetadataTopicConnectorAuditCode auditCode = KafkaOpenMetadataTopicConnectorAuditCode.SERVICE_CONSUMER_PROPERTIES;
        auditLog.logRecord("initialize", auditCode.getLogMessageId(), auditCode.getSeverity(), auditCode.getFormattedLogMessage(topicName, connector.getPrintableProperties(kafkaConsumerProperties)), null, auditCode.getSystemAction(), auditCode.getUserAction());
        this.maxMsBetweenPolls = new KafkaConfigurationWrapper(kafkaConsumerProperties).getMaxPollIntervalMs();
        this.recoverySleepTimeSec = config.getLongProperty(KafkaOpenMetadataEventConsumerProperty.RECOVERY_SLEEP_TIME);
        this.maxQueueSize = config.getIntProperty(KafkaOpenMetadataEventConsumerProperty.MAX_QUEUE_SIZE);
        this.consumerTimeoutPreventionSafetyWindowMs = config.getLongProperty(KafkaOpenMetadataEventConsumerProperty.CONSUMER_TIMEOUT_PREVENTION_SAFETY_WINDOW_MS);
        this.pollTimeout = config.getLongProperty(KafkaOpenMetadataEventConsumerProperty.POLL_TIMEOUT);
    }

    public void stop() {
        this.running = false;
        if (this.consumer != null) {
            this.consumer.wakeup();
        }
    }

    private void updateNextMaxPollTimestamp() {
        this.maxNextPollTimestampToAvoidConsumerTimeout = System.currentTimeMillis() + this.maxMsBetweenPolls - this.consumerTimeoutPreventionSafetyWindowMs;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        String actionDescription = "run";
        while (this.isRunning()) {
            KafkaOpenMetadataTopicConnectorAuditCode auditCode;
            try {
                boolean pollRequired = System.currentTimeMillis() > this.maxNextPollTimestampToAvoidConsumerTimeout;
                int nUnprocessedEvents = this.connector.getNumberOfUnprocessedEvents();
                if (!pollRequired && (long)nUnprocessedEvents > this.maxQueueSize) {
                    log.warn("Skipping Kafka polling since unprocessed message queue size {} is greater than {}", (Object)nUnprocessedEvents, (Object)this.maxQueueSize);
                    this.awaitNextPollingTime();
                    continue;
                }
                this.updateNextMaxPollTimestamp();
                ConsumerRecords records = this.consumer.poll(this.pollTimeout);
                log.debug("Found records: " + records.count());
                for (ConsumerRecord record : records) {
                    String json = (String)record.value();
                    log.debug("Received message: " + json);
                    if (!this.localServerId.equals(record.key())) {
                        try {
                            this.connector.distributeToListeners(json);
                        }
                        catch (Exception error) {
                            log.error(String.format("Error distributing inbound event: %s", error.getMessage()), (Throwable)error);
                            if (this.auditLog != null) {
                                auditCode = KafkaOpenMetadataTopicConnectorAuditCode.EXCEPTION_DISTRIBUTING_EVENT;
                                this.auditLog.logRecord("run", auditCode.getLogMessageId(), auditCode.getSeverity(), auditCode.getFormattedLogMessage(this.topicToSubscribe, error.getClass().getName(), json, error.getMessage()), null, auditCode.getSystemAction(), auditCode.getUserAction());
                            }
                        }
                    } else {
                        log.debug("Ignoring message with key: " + (String)record.key() + " and value " + (String)record.value());
                    }
                    TopicPartition partition = new TopicPartition(record.topic(), record.partition());
                    this.currentOffsets.put(partition, new OffsetAndMetadata(record.offset() + 1L));
                }
            }
            catch (WakeupException e) {
                log.debug("Received wakeup call, proceeding with graceful shutdown", (Throwable)e);
            }
            catch (Exception error) {
                log.error(String.format("Unexpected error: %s", error.getMessage()), (Throwable)error);
                if (this.auditLog != null) {
                    auditCode = KafkaOpenMetadataTopicConnectorAuditCode.EXCEPTION_RECEIVING_EVENT;
                    this.auditLog.logRecord("run", auditCode.getLogMessageId(), auditCode.getSeverity(), auditCode.getFormattedLogMessage(this.topicToSubscribe, error.getClass().getName(), error.getMessage()), null, auditCode.getSystemAction(), auditCode.getUserAction());
                }
                this.recoverAfterError();
            }
            finally {
                this.awaitNextPollingTime();
            }
        }
        if (this.consumer != null) {
            try {
                this.consumer.commitSync(this.currentOffsets);
            }
            finally {
                this.consumer.close();
            }
            this.consumer = null;
        }
    }

    private void awaitNextPollingTime() {
        try {
            Thread.sleep(1000L);
        }
        catch (InterruptedException e) {
            log.error(String.format("Interruption error: %s", e.getMessage()), (Throwable)e);
        }
    }

    protected void recoverAfterError() {
        log.info(String.format("Waiting %s seconds to recover", this.recoverySleepTimeSec));
        try {
            Thread.sleep(this.recoverySleepTimeSec * 1000L);
        }
        catch (InterruptedException e1) {
            log.debug("Interrupted while recovering", (Throwable)e1);
        }
    }

    public void safeCloseConsumer() {
        this.stopRunning();
        if (this.consumer != null) {
            this.consumer.wakeup();
        }
    }

    private synchronized boolean isRunning() {
        return this.running;
    }

    private synchronized void stopRunning() {
        this.running = false;
    }

    private class HandleRebalance
    implements ConsumerRebalanceListener {
        private HandleRebalance() {
        }

        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        }

        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            log.info("Lost partitions in rebalance. Committing current offsets:" + KafkaOpenMetadataEventConsumer.this.currentOffsets);
            KafkaOpenMetadataEventConsumer.this.consumer.commitSync(KafkaOpenMetadataEventConsumer.this.currentOffsets);
        }
    }
}

