/*
 * Decompiled with CFR 0.152.
 */
package org.odpi.openmetadata.adapters.eventbus.topic.kafka;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.odpi.openmetadata.adapters.eventbus.topic.kafka.KafkaOpenMetadataTopicConnector;
import org.odpi.openmetadata.adapters.eventbus.topic.kafka.KafkaOpenMetadataTopicConnectorAuditCode;
import org.odpi.openmetadata.repositoryservices.auditlog.OMRSAuditLog;
import org.odpi.openmetadata.repositoryservices.auditlog.OMRSAuditingComponent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaOpenMetadataEventConsumer
implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(KafkaOpenMetadataEventConsumer.class);
    private static final OMRSAuditLog auditLog = new OMRSAuditLog(OMRSAuditingComponent.OPEN_METADATA_TOPIC_CONNECTOR);
    private static final long recoverySleepTimeSec = 10L;
    private static final long defaultPollTimeout = 1000L;
    private KafkaConsumer<String, String> consumer;
    private String topicToSubscribe;
    private String localServerId;
    private KafkaOpenMetadataTopicConnector connector;
    private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
    private Boolean running = true;

    public KafkaOpenMetadataEventConsumer(String topicName, String localServerId, Properties consumerProperties, KafkaOpenMetadataTopicConnector connector) {
        this.consumer = new KafkaConsumer(consumerProperties);
        this.topicToSubscribe = topicName;
        this.consumer.subscribe(Collections.singletonList(this.topicToSubscribe), (ConsumerRebalanceListener)new HandleRebalance());
        this.connector = connector;
        this.localServerId = localServerId;
        String actionDescription = "initialize";
        KafkaOpenMetadataTopicConnectorAuditCode auditCode = KafkaOpenMetadataTopicConnectorAuditCode.SERVICE_CONSUMER_PROPERTIES;
        auditLog.logRecord("initialize", auditCode.getLogMessageId(), auditCode.getSeverity(), auditCode.getFormattedLogMessage(consumerProperties.toString()), null, auditCode.getSystemAction(), auditCode.getUserAction());
    }

    public void stop() {
        this.running = false;
        if (this.consumer != null) {
            this.consumer.wakeup();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Loose catch block
     */
    @Override
    public void run() {
        String actionDescription = "run";
        while (this.running.booleanValue()) {
            KafkaOpenMetadataTopicConnectorAuditCode auditCode;
            ConsumerRecords records = this.consumer.poll(1000L);
            log.debug("Found records: " + records.count());
            for (ConsumerRecord record : records) {
                String json = (String)record.value();
                log.debug("Received message: " + json);
                if (!this.localServerId.equals(record.key())) {
                    try {
                        this.connector.distributeToListeners(json);
                    }
                    catch (Exception error) {
                        log.error(String.format("Error distributing inbound event: %s", error.getMessage()), (Throwable)error);
                        auditCode = KafkaOpenMetadataTopicConnectorAuditCode.EXCEPTION_DISTRIBUTING_EVENT;
                        auditLog.logRecord("run", auditCode.getLogMessageId(), auditCode.getSeverity(), auditCode.getFormattedLogMessage(this.topicToSubscribe, error.getClass().getName(), json, error.getMessage()), null, auditCode.getSystemAction(), auditCode.getUserAction());
                    }
                } else {
                    log.debug("Ignoring message with key: " + (String)record.key() + " and value " + (String)record.value());
                }
                TopicPartition partition = new TopicPartition(record.topic(), record.partition());
                this.currentOffsets.put(partition, new OffsetAndMetadata(record.offset() + 1L));
            }
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException e) {
                log.error(String.format("Interruption error: %s", e.getMessage()), (Throwable)e);
            }
            continue;
            catch (WakeupException e) {
                log.debug("Received wakeup call, proceeding with graceful shutdown", (Throwable)e);
                try {
                    Thread.sleep(1000L);
                }
                catch (InterruptedException e2) {
                    log.error(String.format("Interruption error: %s", e2.getMessage()), (Throwable)e2);
                }
                continue;
            }
            catch (Exception error) {
                log.error(String.format("Unexpected error: %s", error.getMessage()), (Throwable)error);
                auditCode = KafkaOpenMetadataTopicConnectorAuditCode.EXCEPTION_RECEIVING_EVENT;
                auditLog.logRecord("run", auditCode.getLogMessageId(), auditCode.getSeverity(), auditCode.getFormattedLogMessage(this.topicToSubscribe, error.getClass().getName(), error.getMessage()), null, auditCode.getSystemAction(), auditCode.getUserAction());
                this.recoverAfterError();
                {
                    catch (Throwable throwable) {
                        try {
                            Thread.sleep(1000L);
                        }
                        catch (InterruptedException e) {
                            log.error(String.format("Interruption error: %s", e.getMessage()), (Throwable)e);
                        }
                        throw throwable;
                    }
                }
                try {
                    Thread.sleep(1000L);
                }
                catch (InterruptedException e) {
                    log.error(String.format("Interruption error: %s", e.getMessage()), (Throwable)e);
                }
            }
        }
    }

    protected void recoverAfterError() {
        log.info(String.format("Waiting %s seconds to recover", 10L));
        try {
            Thread.sleep(10000L);
        }
        catch (InterruptedException e1) {
            log.debug("Interrupted while recovering", (Throwable)e1);
        }
    }

    public void safeCloseConsumer() {
        if (this.consumer != null) {
            try {
                this.stopConsumption();
                this.consumer.commitSync(this.currentOffsets);
            }
            finally {
                this.consumer.close();
            }
            this.consumer = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stopConsumption() {
        Boolean bl = this.running;
        synchronized (bl) {
            this.running = false;
        }
    }

    private class HandleRebalance
    implements ConsumerRebalanceListener {
        private HandleRebalance() {
        }

        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        }

        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            log.info("Lost partitions in rebalance. Committing current offsets:" + KafkaOpenMetadataEventConsumer.this.currentOffsets);
            KafkaOpenMetadataEventConsumer.this.consumer.commitSync(KafkaOpenMetadataEventConsumer.this.currentOffsets);
        }
    }
}

