/*
 * Decompiled with CFR 0.152.
 */
package org.odpi.openmetadata.adapters.eventbus.topic.kafka;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.odpi.openmetadata.adapters.eventbus.topic.kafka.KafkaOpenMetadataTopicConnector;
import org.odpi.openmetadata.repositoryservices.connectors.openmetadatatopic.OpenMetadataTopicListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaOpenMetadataEventConsumer
implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(KafkaOpenMetadataEventConsumer.class);
    private KafkaConsumer<String, String> consumer;
    private String topicToSubscribe;
    private static String CONSUMER_TOPIC_ID = "kafka.omrs.topic.id";
    private static long recoverySleepTimeSec = 10L;
    private static final long DEFAULT_POLL_TIMEOUT = 1000L;
    private KafkaOpenMetadataTopicConnector connector;
    private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
    private Boolean running = true;
    private ArrayList<OpenMetadataTopicListener> topicListeners;

    public KafkaOpenMetadataEventConsumer(Properties consumerProps, ArrayList<OpenMetadataTopicListener> topicListeners) {
        this.consumer = new KafkaConsumer(consumerProps);
        this.topicToSubscribe = consumerProps.getProperty(CONSUMER_TOPIC_ID);
        this.consumer.subscribe(Arrays.asList(this.topicToSubscribe), (ConsumerRebalanceListener)new HandleRebalance());
        this.topicListeners = topicListeners;
    }

    public KafkaOpenMetadataEventConsumer(Properties consumerProps, KafkaOpenMetadataTopicConnector connector) {
        this.consumer = new KafkaConsumer(consumerProps);
        this.topicToSubscribe = consumerProps.getProperty(CONSUMER_TOPIC_ID);
        this.consumer.subscribe(Arrays.asList(this.topicToSubscribe), (ConsumerRebalanceListener)new HandleRebalance());
        this.connector = connector;
    }

    public void stop() {
        this.running = false;
        if (this.consumer != null) {
            this.consumer.wakeup();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Loose catch block
     */
    @Override
    public void run() {
        while (this.running.booleanValue()) {
            ConsumerRecords records = this.consumer.poll(1000L);
            log.debug("Found records " + records.count());
            for (ConsumerRecord record : records) {
                String json = (String)record.value();
                log.debug("Received message: ", (Object)json);
                TopicPartition partition = new TopicPartition(record.topic(), record.partition());
                this.currentOffsets.put(partition, new OffsetAndMetadata(record.offset() + 1L));
                try {
                    this.connector.distributeToListeners(json);
                }
                catch (Exception e) {
                    log.error(String.format("Error distributing event: %s", e.getMessage()), (Throwable)e);
                    e.printStackTrace(System.err);
                }
            }
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException e) {
                log.error(String.format("Interruption error: %s", e.getMessage()), (Throwable)e);
            }
            continue;
            catch (WakeupException e) {
                log.debug("Received wakeup call, proceeding with graceful shutdown", (Throwable)e);
                try {
                    Thread.sleep(1000L);
                }
                catch (InterruptedException e2) {
                    log.error(String.format("Interruption error: %s", e2.getMessage()), (Throwable)e2);
                }
                continue;
            }
            catch (Exception e) {
                log.error(String.format("Unexpected error: %s", e.getMessage()), (Throwable)e);
                this.recoverAfterError();
                {
                    catch (Throwable throwable) {
                        try {
                            Thread.sleep(1000L);
                        }
                        catch (InterruptedException e3) {
                            log.error(String.format("Interruption error: %s", e3.getMessage()), (Throwable)e3);
                        }
                        throw throwable;
                    }
                }
                try {
                    Thread.sleep(1000L);
                }
                catch (InterruptedException e4) {
                    log.error(String.format("Interruption error: %s", e4.getMessage()), (Throwable)e4);
                }
            }
        }
    }

    protected void recoverAfterError() {
        log.info(String.format("Waiting %s seconds to recover", recoverySleepTimeSec));
        try {
            Thread.sleep(recoverySleepTimeSec * 1000L);
        }
        catch (InterruptedException e1) {
            log.debug("Interrupted while recovering", (Throwable)e1);
        }
    }

    public void safeCloseConsumer() {
        if (this.consumer != null) {
            try {
                this.consumer.commitSync(this.currentOffsets);
            }
            finally {
                this.consumer.close();
            }
            this.consumer = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stopConsumption() {
        Boolean bl = this.running;
        synchronized (bl) {
            this.running = false;
        }
    }

    private class HandleRebalance
    implements ConsumerRebalanceListener {
        private HandleRebalance() {
        }

        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        }

        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            log.info("Lost partitions in rebalance. Committing current offsets:" + KafkaOpenMetadataEventConsumer.this.currentOffsets);
            KafkaOpenMetadataEventConsumer.this.consumer.commitSync(KafkaOpenMetadataEventConsumer.this.currentOffsets);
        }
    }
}

