/*
 * Decompiled with CFR 0.152.
 */
package org.odpi.openmetadata.adapters.eventbus.topic.kafka;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.odpi.openmetadata.adapters.eventbus.topic.kafka.KafkaOpenMetadataEventConsumer;
import org.odpi.openmetadata.adapters.eventbus.topic.kafka.KafkaOpenMetadataTopicConnectorAuditCode;
import org.odpi.openmetadata.adapters.eventbus.topic.kafka.KafkaOpenMetadataTopicConnectorErrorCode;
import org.odpi.openmetadata.frameworks.connectors.ffdc.ConnectorCheckedException;
import org.odpi.openmetadata.frameworks.connectors.properties.AdditionalProperties;
import org.odpi.openmetadata.frameworks.connectors.properties.ConnectionProperties;
import org.odpi.openmetadata.frameworks.connectors.properties.EndpointProperties;
import org.odpi.openmetadata.repositoryservices.auditlog.OMRSAuditLog;
import org.odpi.openmetadata.repositoryservices.auditlog.OMRSAuditingComponent;
import org.odpi.openmetadata.repositoryservices.connectors.openmetadatatopic.OpenMetadataTopicConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaOpenMetadataTopicConnector
extends OpenMetadataTopicConnector {
    private static final Logger log = LoggerFactory.getLogger(KafkaOpenMetadataTopicConnector.class);
    private static final OMRSAuditLog auditLog = new OMRSAuditLog(OMRSAuditingComponent.OPEN_METADATA_TOPIC_CONNECTOR);
    private Properties producerProperties = new Properties();
    private Properties consumerProperties = new Properties();
    private Thread consumerThread = null;
    private String outTopic = null;
    private String serverId = null;
    private KafkaOpenMetadataEventConsumer consumer = null;
    private List<String> incomingEventsList = Collections.synchronizedList(new ArrayList());

    public KafkaOpenMetadataTopicConnector() {
        this.producerProperties.put("bootstrap.servers", "localhost:9092");
        this.producerProperties.put("acks", "all");
        this.producerProperties.put("retries", (Object)1);
        this.producerProperties.put("batch.size", (Object)16384);
        this.producerProperties.put("linger.ms", (Object)0);
        this.producerProperties.put("buffer.memory", (Object)0x2000000);
        this.producerProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        this.producerProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        this.consumerProperties.put("bootstrap.servers", "localhost:9092");
        this.consumerProperties.put("group.id", "test");
        this.consumerProperties.put("enable.auto.commit", "true");
        this.consumerProperties.put("auto.commit.interval.ms", "1000");
        this.consumerProperties.put("session.timeout.ms", "30000");
        this.consumerProperties.put("zookeeper.session.timeout.ms", (Object)400);
        this.consumerProperties.put("zookeeper.sync.time.ms", (Object)200);
        this.consumerProperties.put("fetch.message.max.bytes", (Object)0xA00000);
        this.consumerProperties.put("max.partition.fetch.bytes", (Object)0xA00000);
        this.consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        this.consumerProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    }

    public void initialize(String connectorInstanceId, ConnectionProperties connectionProperties) {
        String actionDescription = "initialize";
        super.initialize(connectorInstanceId, connectionProperties);
        EndpointProperties endpoint = connectionProperties.getEndpoint();
        if (endpoint != null) {
            this.outTopic = endpoint.getAddress();
            AdditionalProperties additionalProperties = connectionProperties.getAdditionalProperties();
            if (additionalProperties != null) {
                this.initializeKafkaProperties(additionalProperties);
                this.serverId = (String)additionalProperties.getProperty("local.server.id");
                this.consumerProperties.put("group.id", this.serverId);
                KafkaOpenMetadataTopicConnectorAuditCode auditCode = KafkaOpenMetadataTopicConnectorAuditCode.SERVICE_INITIALIZING;
                auditLog.logRecord("initialize", auditCode.getLogMessageId(), auditCode.getSeverity(), auditCode.getFormattedLogMessage(this.outTopic, this.serverId), null, auditCode.getSystemAction(), auditCode.getUserAction());
                auditCode = KafkaOpenMetadataTopicConnectorAuditCode.SERVICE_PRODUCER_PROPERTIES;
                auditLog.logRecord("initialize", auditCode.getLogMessageId(), auditCode.getSeverity(), auditCode.getFormattedLogMessage(this.producerProperties.toString()), null, auditCode.getSystemAction(), auditCode.getUserAction());
                this.consumer = new KafkaOpenMetadataEventConsumer(this.outTopic, this.serverId, this.consumerProperties, this);
                this.consumerThread = new Thread(this.consumer);
                this.consumerThread.start();
            } else {
                KafkaOpenMetadataTopicConnectorAuditCode auditCode = KafkaOpenMetadataTopicConnectorAuditCode.NULL_ADDITIONAL_PROPERTIES;
                auditLog.logRecord("initialize", auditCode.getLogMessageId(), auditCode.getSeverity(), auditCode.getFormattedLogMessage(this.outTopic), null, auditCode.getSystemAction(), auditCode.getUserAction());
            }
        } else {
            KafkaOpenMetadataTopicConnectorAuditCode auditCode = KafkaOpenMetadataTopicConnectorAuditCode.NO_TOPIC_NAME;
            auditLog.logRecord("initialize", auditCode.getLogMessageId(), auditCode.getSeverity(), auditCode.getFormattedLogMessage(new String[0]), null, auditCode.getSystemAction(), auditCode.getUserAction());
        }
    }

    private void initializeKafkaProperties(AdditionalProperties additionalProperties) {
        String actionDescription = "initializeKafkaProperties";
        try {
            Map propertiesMap;
            Object propertiesObject = additionalProperties.getProperty("producer");
            if (propertiesObject != null) {
                propertiesMap = (Map)propertiesObject;
                for (Map.Entry entry : propertiesMap.entrySet()) {
                    this.producerProperties.setProperty((String)entry.getKey(), (String)entry.getValue());
                }
            }
            if ((propertiesObject = additionalProperties.getProperty("consumer")) != null) {
                propertiesMap = (Map)propertiesObject;
                for (Map.Entry entry : propertiesMap.entrySet()) {
                    this.consumerProperties.setProperty((String)entry.getKey(), (String)entry.getValue());
                }
            }
        }
        catch (Throwable error) {
            KafkaOpenMetadataTopicConnectorAuditCode auditCode = KafkaOpenMetadataTopicConnectorAuditCode.UNABLE_TO_PARSE_ADDITIONAL_PROPERTIES;
            auditLog.logRecord("initializeKafkaProperties", auditCode.getLogMessageId(), auditCode.getSeverity(), auditCode.getFormattedLogMessage(this.outTopic, error.getClass().getName(), error.getMessage()), null, auditCode.getSystemAction(), auditCode.getUserAction());
        }
    }

    public void start() throws ConnectorCheckedException {
        super.start();
    }

    public void sendEvent(String event) throws ConnectorCheckedException {
        String methodName = "sendEvent";
        KafkaProducer producer = new KafkaProducer(this.producerProperties);
        try {
            log.debug("Sending message {0}" + event);
            ProducerRecord record = new ProducerRecord(this.outTopic, (Object)this.serverId, (Object)event);
            producer.send(record).get();
        }
        catch (InterruptedException | ExecutionException error) {
            log.error("Exception in sendEvent ", (Throwable)error);
            KafkaOpenMetadataTopicConnectorErrorCode errorCode = KafkaOpenMetadataTopicConnectorErrorCode.ERROR_SENDING_EVENT;
            String errorMessage = errorCode.getErrorMessageId() + errorCode.getFormattedErrorMessage(error.getClass().getName(), this.outTopic, error.getMessage());
            throw new ConnectorCheckedException(errorCode.getHTTPErrorCode(), ((Object)((Object)this)).getClass().getName(), "sendEvent", errorMessage, errorCode.getSystemAction(), errorCode.getUserAction(), (Throwable)error);
        }
        finally {
            producer.flush();
            producer.close();
        }
    }

    protected List<String> checkForEvents() {
        ArrayList<String> newEvents = null;
        if (this.incomingEventsList != null && !this.incomingEventsList.isEmpty()) {
            log.debug("checking for events {0}" + this.incomingEventsList);
            newEvents = new ArrayList<String>(this.incomingEventsList);
            this.incomingEventsList.removeAll(newEvents);
        }
        return newEvents;
    }

    void distributeToListeners(String event) {
        log.debug("distribute event to listeners" + event);
        this.incomingEventsList.add(event);
    }

    public void disconnect() throws ConnectorCheckedException {
        String actionDescription = "disconnect";
        super.disconnect();
        KafkaOpenMetadataTopicConnectorAuditCode auditCode = KafkaOpenMetadataTopicConnectorAuditCode.SERVICE_SHUTDOWN;
        auditLog.logRecord("disconnect", auditCode.getLogMessageId(), auditCode.getSeverity(), auditCode.getFormattedLogMessage(this.outTopic), null, auditCode.getSystemAction(), auditCode.getUserAction());
        this.consumer.safeCloseConsumer();
    }
}

