/*
 * Decompiled with CFR 0.152.
 */
package org.odpi.openmetadata.adapters.eventbus.topic.kafka;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.odpi.openmetadata.adapters.eventbus.topic.kafka.KafkaOpenMetadataEventConsumer;
import org.odpi.openmetadata.frameworks.connectors.ffdc.ConnectorCheckedException;
import org.odpi.openmetadata.frameworks.connectors.properties.AdditionalProperties;
import org.odpi.openmetadata.frameworks.connectors.properties.ConnectionProperties;
import org.odpi.openmetadata.repositoryservices.connectors.openmetadatatopic.OpenMetadataTopicConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaOpenMetadataTopicConnector
extends OpenMetadataTopicConnector {
    private static final Logger log = LoggerFactory.getLogger(KafkaOpenMetadataTopicConnector.class);
    private Properties producerProps;
    private Properties consumerProps;
    private Thread consumerThread;
    private static String KAFKA_TOPIC_ID = "kafka.omrs.topic.id";
    private String outTopic;
    private KafkaOpenMetadataEventConsumer consumer;
    private List<String> eventList = Collections.synchronizedList(new ArrayList());

    public KafkaOpenMetadataTopicConnector() {
        this.producerProps = new Properties();
        this.consumerProps = new Properties();
    }

    public void initialize(String connectorInstanceId, ConnectionProperties connectionProperties) {
        super.initialize(connectorInstanceId, connectionProperties);
        AdditionalProperties additionalProperties = connectionProperties.getAdditionalProperties();
        this.initializeKafkaProperties(additionalProperties);
        String consumerGroupId = "omrsConsumerGroup-" + UUID.randomUUID().toString();
        this.consumerProps.put("group.id", consumerGroupId);
        this.connectorInstanceId = connectorInstanceId;
        this.outTopic = this.producerProps.getProperty(KAFKA_TOPIC_ID);
        this.consumer = new KafkaOpenMetadataEventConsumer(this.consumerProps, this);
        this.consumerThread = new Thread(this.consumer);
        this.consumerThread.start();
    }

    private void initializeKafkaProperties(AdditionalProperties additionalProperties) {
        if (additionalProperties != null) {
            Map propertiesMap = (Map)additionalProperties.getProperty("producer");
            for (Map.Entry entry : propertiesMap.entrySet()) {
                this.producerProps.setProperty((String)entry.getKey(), (String)entry.getValue());
            }
            propertiesMap = (Map)additionalProperties.getProperty("consumer");
            for (Map.Entry entry : propertiesMap.entrySet()) {
                this.consumerProps.setProperty((String)entry.getKey(), (String)entry.getValue());
            }
        }
    }

    public void start() throws ConnectorCheckedException {
        super.start();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void sendEvent(String event) throws ConnectorCheckedException {
        KafkaProducer producer = new KafkaProducer(this.producerProps);
        try {
            log.debug("sending message {0}" + event);
            ProducerRecord record = new ProducerRecord(this.outTopic, (Object)event);
            producer.send(record).get();
        }
        catch (InterruptedException | ExecutionException e) {
            log.error("Exception in sendevent", (Throwable)e);
            e.printStackTrace();
        }
        finally {
            producer.flush();
            producer.close();
        }
    }

    protected List<String> checkForEvents() {
        ArrayList<String> newEvents = null;
        if (this.eventList != null & !this.eventList.isEmpty()) {
            log.debug("checking for events {0}" + this.eventList);
            newEvents = new ArrayList<String>();
            newEvents.addAll(this.eventList);
            this.eventList.removeAll(newEvents);
        }
        return newEvents;
    }

    public void distributeToListeners(String event) {
        log.debug("distribute event to listeners" + event);
        this.eventList.add(event);
    }

    public void disconnect() throws ConnectorCheckedException {
        super.disconnect();
        this.consumer.safeCloseConsumer();
    }
}

