/*
 * Decompiled with CFR 0.152.
 */
package org.odpi.openmetadata.adapters.eventbus.topic.kafka;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.odpi.openmetadata.adapters.eventbus.topic.kafka.KafkaOpenMetadataEventConsumer;
import org.odpi.openmetadata.adapters.eventbus.topic.kafka.KafkaOpenMetadataEventConsumerConfiguration;
import org.odpi.openmetadata.adapters.eventbus.topic.kafka.KafkaOpenMetadataEventProducer;
import org.odpi.openmetadata.adapters.eventbus.topic.kafka.KafkaOpenMetadataTopicConnectorAuditCode;
import org.odpi.openmetadata.adapters.eventbus.topic.kafka.KafkaOpenMetadataTopicConnectorErrorCode;
import org.odpi.openmetadata.frameworks.connectors.ffdc.ConnectorCheckedException;
import org.odpi.openmetadata.frameworks.connectors.properties.EndpointProperties;
import org.odpi.openmetadata.repositoryservices.connectors.openmetadatatopic.IncomingEvent;
import org.odpi.openmetadata.repositoryservices.connectors.openmetadatatopic.OpenMetadataTopicConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaOpenMetadataTopicConnector
extends OpenMetadataTopicConnector {
    static final String ENABLE_AUTO_COMMIT_PROPERTY = "enable.auto.commit";
    private static final Logger log = LoggerFactory.getLogger(KafkaOpenMetadataTopicConnector.class);
    private final Properties producerProperties = new Properties();
    private final Properties consumerEgeriaProperties = new Properties();
    private final Properties consumerProperties = new Properties();
    private KafkaOpenMetadataEventConsumer consumer = null;
    private KafkaOpenMetadataEventProducer producer = null;
    private String topicName = null;
    private String serverId = null;
    private final List<IncomingEvent> incomingEventsList = Collections.synchronizedList(new ArrayList());
    private KafkaConsumerExecutor consumerExecutor = null;
    private KafkaProducerExecutor producerExecutor = null;
    final String threadHeader = "Kafka-";
    Thread consumerThread;
    Thread producerThread;

    public KafkaOpenMetadataTopicConnector() {
        this.producerProperties.put("bootstrap.servers", "localhost:9092");
        this.producerProperties.put("acks", "all");
        this.producerProperties.put("retries", (Object)1);
        this.producerProperties.put("batch.size", (Object)16384);
        this.producerProperties.put("linger.ms", (Object)0);
        this.producerProperties.put("buffer.memory", (Object)0x2000000);
        this.producerProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        this.producerProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        this.producerProperties.put("bring.up.retries", "10");
        this.producerProperties.put("bring.up.minSleepTime", "5000");
        this.consumerProperties.put("bootstrap.servers", "localhost:9092");
        this.consumerProperties.put(ENABLE_AUTO_COMMIT_PROPERTY, "true");
        this.consumerProperties.put("auto.commit.interval.ms", "1000");
        this.consumerProperties.put("session.timeout.ms", "30000");
        this.consumerProperties.put("max.partition.fetch.bytes", (Object)0xA00000);
        this.consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        this.consumerProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        this.consumerProperties.put("bring.up.retries", "10");
        this.consumerProperties.put("bring.up.minSleepTime", "5000");
    }

    private void initializeTopic() {
        String actionDescription = "initialize";
        super.initialize(this.connectorInstanceId, this.connectionProperties);
        EndpointProperties endpoint = this.connectionProperties.getEndpoint();
        if (endpoint != null) {
            this.topicName = endpoint.getAddress();
            Map configurationProperties = this.connectionProperties.getConfigurationProperties();
            if (configurationProperties != null) {
                this.initializeKafkaProperties(configurationProperties);
                this.serverId = (String)configurationProperties.get("local.server.id");
                this.consumerProperties.put("group.id", this.serverId);
                if (this.auditLog != null) {
                    this.auditLog.logMessage("initialize", KafkaOpenMetadataTopicConnectorAuditCode.SERVICE_INITIALIZING.getMessageDefinition(this.topicName, this.serverId));
                }
            } else if (this.auditLog != null) {
                this.auditLog.logMessage("initialize", KafkaOpenMetadataTopicConnectorAuditCode.NULL_ADDITIONAL_PROPERTIES.getMessageDefinition(this.topicName));
            }
        } else if (this.auditLog != null) {
            this.auditLog.logMessage("initialize", KafkaOpenMetadataTopicConnectorAuditCode.NO_TOPIC_NAME.getMessageDefinition());
        }
    }

    private void initializeKafkaProperties(Map<String, Object> configurationProperties) {
        block2: {
            String actionDescription = "initializeKafkaProperties";
            try {
                Object propertiesObject = configurationProperties.get("producer");
                this.copyProperties(propertiesObject, this.producerProperties);
                propertiesObject = configurationProperties.get("consumer");
                this.copyProperties(propertiesObject, this.consumerProperties);
                propertiesObject = configurationProperties.get("egeria_kafka_consumer");
                this.copyProperties(propertiesObject, this.consumerEgeriaProperties);
            }
            catch (Exception error) {
                if (this.auditLog == null) break block2;
                this.auditLog.logMessage("initializeKafkaProperties", KafkaOpenMetadataTopicConnectorAuditCode.UNABLE_TO_PARSE_CONFIG_PROPERTIES.getMessageDefinition(this.topicName, error.getClass().getName(), error.getMessage()));
            }
        }
    }

    private void copyProperties(Object propertiesObject, Properties target) {
        if (propertiesObject != null) {
            Map propertiesMap = (Map)propertiesObject;
            for (Map.Entry entry : propertiesMap.entrySet()) {
                target.setProperty((String)entry.getKey(), String.valueOf(entry.getValue()));
            }
        }
    }

    public void start() throws ConnectorCheckedException {
        this.initializeTopic();
        KafkaStatusChecker kafkaStatus = new KafkaStatusChecker();
        boolean up = false;
        if (this.consumerProperties.getProperty("bootstrap.servers").equals(this.producerProperties.getProperty("bootstrap.servers"))) {
            up = kafkaStatus.waitForBrokers(this.producerProperties);
        } else {
            boolean bl = up = kafkaStatus.waitForBrokers(this.producerProperties) && kafkaStatus.waitForBrokers(this.consumerProperties);
        }
        if (!up) {
            String actionDescription = "waitForThisBroker";
            if (this.auditLog != null) {
                this.auditLog.logMessage("waitForThisBroker", KafkaOpenMetadataTopicConnectorAuditCode.SERVICE_FAILED_INITIALIZING.getMessageDefinition(this.topicName));
            }
            throw new ConnectorCheckedException(KafkaOpenMetadataTopicConnectorErrorCode.ERROR_ATTEMPTING_KAFKA_INITIALIZATION.getMessageDefinition(kafkaStatus.getLastException().getClass().getName(), this.topicName, kafkaStatus.getLastException().getMessage()), ((Object)((Object)this)).getClass().getName(), "KafkaMonitor.waitForThisBroker", (Throwable)kafkaStatus.getLastException());
        }
        this.initializeConsumerAndConsumerThread();
        this.consumerExecutor = new KafkaConsumerExecutor();
        this.consumerExecutor.execute(this.consumerThread);
        this.initializeProducerAndProducerThread();
        this.producerExecutor = new KafkaProducerExecutor();
        this.producerExecutor.execute(this.producerThread);
        super.start();
    }

    private void initializeConsumerAndConsumerThread() {
        KafkaOpenMetadataEventConsumerConfiguration consumerConfig = new KafkaOpenMetadataEventConsumerConfiguration(this.consumerEgeriaProperties, this.auditLog);
        this.consumer = new KafkaOpenMetadataEventConsumer(this.topicName, this.serverId, consumerConfig, this.consumerProperties, this, this.auditLog);
        this.consumerThread = new Thread((Runnable)this.consumer, "Kafka-Consumer-" + this.topicName);
    }

    private void initializeProducerAndProducerThread() {
        this.producer = new KafkaOpenMetadataEventProducer(this.topicName, this.serverId, this.producerProperties, this, this.auditLog);
        this.producerThread = new Thread((Runnable)this.producer, "Kafka-Producer-" + this.topicName);
    }

    public void sendEvent(String event) throws ConnectorCheckedException {
        if (this.producer != null) {
            this.producer.sendEvent(event);
        }
    }

    protected List<IncomingEvent> checkForIncomingEvents() {
        ArrayList<IncomingEvent> newEvents = null;
        if (this.incomingEventsList != null && !this.incomingEventsList.isEmpty()) {
            log.debug("Checking for events.  Number of found events: {}", (Object)this.incomingEventsList.size());
            newEvents = new ArrayList<IncomingEvent>(this.incomingEventsList);
            this.incomingEventsList.removeAll(newEvents);
        }
        return newEvents;
    }

    void distributeToListeners(IncomingEvent event) {
        log.debug("distribute event to listeners" + event);
        this.incomingEventsList.add(event);
    }

    public void disconnect() throws ConnectorCheckedException {
        block11: {
            String command;
            block10: {
                String actionDescription = "disconnect";
                if (this.consumer != null) {
                    this.consumer.safeCloseConsumer();
                }
                if (this.producer != null) {
                    this.producer.safeCloseProducer();
                }
                if (this.consumerThread != null) {
                    try {
                        this.consumerThread.join();
                    }
                    catch (InterruptedException interruptedException) {
                    }
                    catch (Exception error) {
                        if (this.auditLog == null) break block10;
                        command = "consumerThread.join";
                        this.auditLog.logException("disconnect", KafkaOpenMetadataTopicConnectorAuditCode.UNEXPECTED_SHUTDOWN_EXCEPTION.getMessageDefinition(error.getClass().getName(), this.topicName, "consumerThread.join", error.getMessage()), (Throwable)error);
                    }
                }
            }
            if (this.producerThread != null) {
                try {
                    this.producerThread.join();
                }
                catch (InterruptedException error) {
                }
                catch (Exception error) {
                    if (this.auditLog == null) break block11;
                    command = "producerThread.join";
                    this.auditLog.logException("disconnect", KafkaOpenMetadataTopicConnectorAuditCode.UNEXPECTED_SHUTDOWN_EXCEPTION.getMessageDefinition(error.getClass().getName(), this.topicName, "producerThread.join", error.getMessage()), (Throwable)error);
                }
            }
        }
        super.disconnect();
        this.auditLog.logMessage("disconnect", KafkaOpenMetadataTopicConnectorAuditCode.SERVICE_SHUTDOWN.getMessageDefinition(this.topicName));
    }

    int getNumberOfUnprocessedEvents() {
        return this.incomingEventsList.size();
    }

    private class KafkaStatusChecker {
        private Exception lastException = new Exception();

        private KafkaStatusChecker() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        boolean getRunningBrokers(Properties connectionProperties) {
            boolean found = false;
            AdminClient adminClient = null;
            try {
                adminClient = KafkaAdminClient.create((Properties)connectionProperties);
                DescribeClusterResult describeClusterResult = adminClient.describeCluster();
                Collection brokers = (Collection)describeClusterResult.nodes().get();
                if (!brokers.isEmpty()) {
                    found = true;
                }
            }
            catch (Exception e) {
                this.lastException = e;
            }
            finally {
                if (adminClient != null) {
                    adminClient.close(Duration.ZERO);
                }
            }
            return found;
        }

        public boolean waitForBrokers(Properties connectionProperties) {
            boolean found = false;
            try {
                int napCount = Integer.parseInt(connectionProperties.getProperty("bring.up.retries"));
                int minSleepTime = Integer.parseInt(connectionProperties.getProperty("bring.up.minSleepTime"));
                for (int count = 0; count < napCount; ++count) {
                    KafkaOpenMetadataTopicConnector.this.auditLog.logMessage("waitForBrokers", KafkaOpenMetadataTopicConnectorAuditCode.KAFKA_CONNECTION_RETRY.getMessageDefinition(String.valueOf(count + 1)));
                    Instant start = Instant.now();
                    if (!this.getRunningBrokers(connectionProperties)) continue;
                    found = true;
                    break;
                }
            }
            catch (Exception e) {
                this.lastException = e;
            }
            return found;
        }

        public Exception getLastException() {
            return this.lastException;
        }
    }

    private class KafkaConsumerExecutor
    extends ThreadPoolExecutor {
        KafkaConsumerExecutor() {
            super(1, 1, Long.MAX_VALUE, TimeUnit.MICROSECONDS, new LinkedBlockingQueue<Runnable>(1));
        }

        @Override
        public void afterExecute(Runnable r, Throwable t) {
            super.afterExecute(r, t);
            KafkaOpenMetadataTopicConnector.this.initializeConsumerAndConsumerThread();
            if (KafkaOpenMetadataTopicConnector.this.isActive()) {
                KafkaOpenMetadataTopicConnector.this.consumerExecutor.execute(KafkaOpenMetadataTopicConnector.this.consumerThread);
            }
        }
    }

    private class KafkaProducerExecutor
    extends ThreadPoolExecutor {
        KafkaProducerExecutor() {
            super(1, 1, Long.MAX_VALUE, TimeUnit.MICROSECONDS, new LinkedBlockingQueue<Runnable>(1));
        }

        @Override
        public void afterExecute(Runnable r, Throwable t) {
            super.afterExecute(r, t);
            KafkaOpenMetadataTopicConnector.this.initializeProducerAndProducerThread();
            if (KafkaOpenMetadataTopicConnector.this.isActive()) {
                KafkaOpenMetadataTopicConnector.this.producerExecutor.execute(KafkaOpenMetadataTopicConnector.this.producerThread);
            }
        }
    }
}

