/*
 * Decompiled with CFR 0.152.
 */
package org.odpi.openmetadata.repositoryservices.connectors.openmetadatatopic;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.odpi.openmetadata.frameworks.auditlog.AuditLog;
import org.odpi.openmetadata.frameworks.auditlog.AuditLoggingComponent;
import org.odpi.openmetadata.frameworks.auditlog.ComponentDescription;
import org.odpi.openmetadata.frameworks.connectors.ConnectorBase;
import org.odpi.openmetadata.frameworks.connectors.ffdc.ConnectorCheckedException;
import org.odpi.openmetadata.frameworks.connectors.properties.EndpointProperties;
import org.odpi.openmetadata.repositoryservices.connectors.omrstopic.InternalOMRSEventProcessingContext;
import org.odpi.openmetadata.repositoryservices.connectors.openmetadatatopic.IncomingEvent;
import org.odpi.openmetadata.repositoryservices.connectors.openmetadatatopic.IncomingEventState;
import org.odpi.openmetadata.repositoryservices.connectors.openmetadatatopic.OpenMetadataTopic;
import org.odpi.openmetadata.repositoryservices.connectors.openmetadatatopic.OpenMetadataTopicListener;
import org.odpi.openmetadata.repositoryservices.ffdc.OMRSAuditCode;
import org.odpi.openmetadata.repositoryservices.ffdc.OMRSErrorCode;
import org.odpi.openmetadata.repositoryservices.ffdc.exception.OMRSLogicErrorException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class OpenMetadataTopicConnector
extends ConnectorBase
implements OpenMetadataTopic,
Runnable,
AuditLoggingComponent {
    private static final Logger log = LoggerFactory.getLogger(OpenMetadataTopicConnector.class);
    private static final String defaultThreadName = "OpenMetadataTopicListener";
    private static final String defaultTopicName = "OpenMetadataTopic";
    private volatile boolean keepRunning = false;
    private final List<OpenMetadataTopicListener> topicListeners = new ArrayList<OpenMetadataTopicListener>();
    private String listenerThreadName = "OpenMetadataTopicListener";
    private String topicName = "OpenMetadataTopic";
    private int sleepTime = 100;
    protected AuditLog auditLog = null;

    @Override
    public void setAuditLog(AuditLog auditLog) {
        this.auditLog = auditLog;
    }

    @Override
    public ComponentDescription getConnectorComponentDescription() {
        if (this.auditLog != null && this.auditLog.getReport() != null) {
            return this.auditLog.getReport().getReportingComponent();
        }
        return null;
    }

    @Override
    public void run() {
        if (this.auditLog != null) {
            this.auditLog.logMessage(this.listenerThreadName, OMRSAuditCode.OPEN_METADATA_TOPIC_LISTENER_START.getMessageDefinition(this.topicName), this.getConnection().toString());
        }
        while (this.keepRunning) {
            try {
                try {
                    List<IncomingEvent> receivedEvents = this.checkForIncomingEvents();
                    if (receivedEvents != null && !receivedEvents.isEmpty()) {
                        for (IncomingEvent event : receivedEvents) {
                            if (event == null) continue;
                            this.distributeEvent(event);
                        }
                    }
                }
                catch (Exception error) {
                    log.error("Bad exception from checkForEvents", error);
                }
                Thread.sleep(this.sleepTime);
            }
            catch (InterruptedException wakeUp) {
                log.info("Wake up for more events");
            }
        }
        if (this.auditLog != null) {
            this.auditLog.logMessage(this.listenerThreadName, OMRSAuditCode.OPEN_METADATA_TOPIC_LISTENER_SHUTDOWN.getMessageDefinition(this.topicName), this.getConnection().toString());
        }
    }

    private void distributeEvent(IncomingEvent event) {
        InternalOMRSEventProcessingContext.clear();
        InternalOMRSEventProcessingContext.getInstance().setCurrentMessageId(event.getMessageId());
        for (OpenMetadataTopicListener topicListener : this.topicListeners) {
            try {
                topicListener.processEvent(event.getJson());
            }
            catch (Exception error) {
                String actionDescription = "distributeEvent";
                if (this.auditLog == null) continue;
                this.auditLog.logException("distributeEvent", OMRSAuditCode.EVENT_PROCESSING_ERROR.getMessageDefinition(event.getJson(), error.toString()), event.getJson(), (Throwable)error);
            }
        }
        event.setState(IncomingEventState.DISTRIBUTED_TO_ALL_TOPIC_LISTENERS);
        InternalOMRSEventProcessingContext context = InternalOMRSEventProcessingContext.getInstance();
        event.addAsyncProcessingResult(context.getOverallAsyncProcessingResult());
    }

    protected List<IncomingEvent> checkForIncomingEvents() {
        ArrayList<IncomingEvent> result = new ArrayList<IncomingEvent>();
        for (String event : this.checkForEvents()) {
            result.add(new IncomingEvent(event, String.valueOf(event.hashCode())));
        }
        return result;
    }

    @Deprecated
    protected List<String> checkForEvents() {
        return Collections.emptyList();
    }

    @Override
    public String registerListener(OpenMetadataTopicListener topicListener) {
        if (topicListener != null) {
            EndpointProperties endpoint;
            this.topicListeners.add(topicListener);
            if (this.connectionProperties != null && (endpoint = this.connectionProperties.getEndpoint()) != null) {
                this.topicName = endpoint.getAddress();
            }
            return this.topicName;
        }
        String methodName = "registerListener";
        throw new OMRSLogicErrorException(OMRSErrorCode.NULL_OPEN_METADATA_TOPIC_LISTENER.getMessageDefinition(this.listenerThreadName, this.topicName), this.getClass().getName(), "registerListener");
    }

    @Override
    public void start() throws ConnectorCheckedException {
        super.start();
        this.keepRunning = true;
        if (this.connectionProperties != null) {
            Object sleepTime;
            Map<String, Object> configurationProperties;
            EndpointProperties endpoint = this.connectionProperties.getEndpoint();
            if (endpoint != null) {
                this.topicName = endpoint.getAddress();
                this.listenerThreadName = "OpenMetadataTopicListener: " + this.topicName;
            }
            if ((configurationProperties = this.connectionProperties.getConfigurationProperties()) != null && (sleepTime = configurationProperties.get("sleepTime")) instanceof Integer) {
                this.sleepTime = (Integer)sleepTime;
            }
        }
        Thread listenerThread = new Thread((Runnable)this, this.listenerThreadName);
        listenerThread.start();
    }

    @Override
    public void disconnect() throws ConnectorCheckedException {
        super.disconnect();
        this.keepRunning = false;
    }
}

