/*
 * Decompiled with CFR 0.152.
 */
package org.odpi.openmetadata.repositoryservices.connectors.openmetadatatopic;

import java.util.ArrayList;
import java.util.List;
import org.odpi.openmetadata.frameworks.connectors.ConnectorBase;
import org.odpi.openmetadata.frameworks.connectors.ffdc.ConnectorCheckedException;
import org.odpi.openmetadata.frameworks.connectors.properties.AdditionalProperties;
import org.odpi.openmetadata.frameworks.connectors.properties.EndpointProperties;
import org.odpi.openmetadata.repositoryservices.auditlog.OMRSAuditCode;
import org.odpi.openmetadata.repositoryservices.auditlog.OMRSAuditLog;
import org.odpi.openmetadata.repositoryservices.connectors.auditable.AuditableConnector;
import org.odpi.openmetadata.repositoryservices.connectors.openmetadatatopic.OpenMetadataTopic;
import org.odpi.openmetadata.repositoryservices.connectors.openmetadatatopic.OpenMetadataTopicListener;
import org.odpi.openmetadata.repositoryservices.ffdc.OMRSErrorCode;
import org.odpi.openmetadata.repositoryservices.ffdc.exception.OMRSLogicErrorException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class OpenMetadataTopicConnector
extends ConnectorBase
implements OpenMetadataTopic,
Runnable,
AuditableConnector {
    private static final Logger log = LoggerFactory.getLogger(OpenMetadataTopicConnector.class);
    private static final String defaultThreadName = "OpenMetadataTopicListener";
    private static final String defaultTopicName = "OpenMetadataTopic";
    private volatile boolean keepRunning = false;
    private List<OpenMetadataTopicListener> topicListeners = new ArrayList<OpenMetadataTopicListener>();
    private String listenerThreadName = "OpenMetadataTopicListener";
    private String topicName = "OpenMetadataTopic";
    private int sleepTime = 100;
    protected OMRSAuditLog auditLog = null;

    @Override
    public void setAuditLog(OMRSAuditLog auditLog) {
        this.auditLog = auditLog;
    }

    @Override
    public void run() {
        OMRSAuditCode auditCode = OMRSAuditCode.OPEN_METADATA_TOPIC_LISTENER_START;
        this.auditLog.logRecord(this.listenerThreadName, auditCode.getLogMessageId(), auditCode.getSeverity(), auditCode.getFormattedLogMessage(this.topicName), this.getConnection().toString(), auditCode.getSystemAction(), auditCode.getUserAction());
        while (this.keepRunning) {
            try {
                try {
                    List<String> receivedEvents = this.checkForEvents();
                    if (receivedEvents != null && !receivedEvents.isEmpty()) {
                        for (String event : receivedEvents) {
                            if (event == null) continue;
                            this.distributeEvent(event);
                        }
                    }
                }
                catch (Throwable error) {
                    log.error("Bad exception from checkForEvents", error);
                }
                Thread.sleep(this.sleepTime);
            }
            catch (InterruptedException wakeUp) {
                log.info("Wake up for more events");
            }
        }
        auditCode = OMRSAuditCode.OPEN_METADATA_TOPIC_LISTENER_SHUTDOWN;
        this.auditLog.logRecord(this.listenerThreadName, auditCode.getLogMessageId(), auditCode.getSeverity(), auditCode.getFormattedLogMessage(this.topicName), this.getConnection().toString(), auditCode.getSystemAction(), auditCode.getUserAction());
    }

    private void distributeEvent(String event) {
        for (OpenMetadataTopicListener topicListener : this.topicListeners) {
            try {
                topicListener.processEvent(event);
            }
            catch (Throwable error) {
                String actionDescription = "distributeEvent";
                OMRSAuditCode auditCode = OMRSAuditCode.EVENT_PROCESSING_ERROR;
                this.auditLog.logRecord("distributeEvent", auditCode.getLogMessageId(), auditCode.getSeverity(), auditCode.getFormattedLogMessage(event, error.toString()), null, auditCode.getSystemAction(), auditCode.getUserAction());
            }
        }
    }

    protected abstract List<String> checkForEvents();

    @Override
    public String registerListener(OpenMetadataTopicListener topicListener) {
        if (topicListener != null) {
            EndpointProperties endpoint;
            this.topicListeners.add(topicListener);
            if (this.connectionProperties != null && (endpoint = this.connectionProperties.getEndpoint()) != null) {
                this.topicName = endpoint.getAddress();
            }
            return this.topicName;
        }
        String methodName = "registerListener";
        OMRSErrorCode errorCode = OMRSErrorCode.NULL_OPEN_METADATA_TOPIC_LISTENER;
        String errorMessage = errorCode.getErrorMessageId() + errorCode.getFormattedErrorMessage(this.listenerThreadName, this.topicName);
        throw new OMRSLogicErrorException(errorCode.getHTTPErrorCode(), this.getClass().getName(), "registerListener", errorMessage, errorCode.getSystemAction(), errorCode.getUserAction());
    }

    public void start() throws ConnectorCheckedException {
        super.start();
        this.keepRunning = true;
        if (this.connectionProperties != null) {
            Object sleepTime;
            AdditionalProperties additionalProperties;
            EndpointProperties endpoint = this.connectionProperties.getEndpoint();
            if (endpoint != null) {
                this.topicName = endpoint.getAddress();
                this.listenerThreadName = "OpenMetadataTopicListener: " + this.topicName;
            }
            if ((additionalProperties = this.connectionProperties.getAdditionalProperties()) != null && (sleepTime = additionalProperties.getProperty("sleepTime")) != null && sleepTime instanceof Integer) {
                this.sleepTime = (Integer)sleepTime;
            }
        }
        Thread listenerThread = new Thread((Runnable)this, this.listenerThreadName);
        listenerThread.start();
    }

    public void disconnect() throws ConnectorCheckedException {
        super.disconnect();
        this.keepRunning = false;
    }
}

