/*
 * Decompiled with CFR 0.152.
 */
package org.odpi.openmetadata.governanceservers.openlineage.admin;

import org.odpi.openmetadata.adminservices.configuration.properties.OpenLineageServerConfig;
import org.odpi.openmetadata.adminservices.configuration.registration.GovernanceServicesDescription;
import org.odpi.openmetadata.adminservices.ffdc.exception.OMAGConfigurationErrorException;
import org.odpi.openmetadata.frameworks.connectors.ConnectorBroker;
import org.odpi.openmetadata.frameworks.connectors.ffdc.ConnectionCheckedException;
import org.odpi.openmetadata.frameworks.connectors.ffdc.ConnectorCheckedException;
import org.odpi.openmetadata.frameworks.connectors.ffdc.OCFCheckedExceptionBase;
import org.odpi.openmetadata.frameworks.connectors.properties.beans.Connection;
import org.odpi.openmetadata.governanceservers.openlineage.OpenLineageGraph;
import org.odpi.openmetadata.governanceservers.openlineage.auditlog.OpenLineageServerAuditCode;
import org.odpi.openmetadata.governanceservers.openlineage.buffergraph.BufferGraph;
import org.odpi.openmetadata.governanceservers.openlineage.ffdc.OpenLineageException;
import org.odpi.openmetadata.governanceservers.openlineage.ffdc.OpenLineageServerErrorCode;
import org.odpi.openmetadata.governanceservers.openlineage.handlers.OpenLineageHandler;
import org.odpi.openmetadata.governanceservers.openlineage.listeners.InTopicListener;
import org.odpi.openmetadata.governanceservers.openlineage.maingraph.MainGraph;
import org.odpi.openmetadata.governanceservers.openlineage.server.OpenLineageServerInstance;
import org.odpi.openmetadata.governanceservers.openlineage.services.StoringServices;
import org.odpi.openmetadata.repositoryservices.auditlog.OMRSAuditLog;
import org.odpi.openmetadata.repositoryservices.connectors.openmetadatatopic.OpenMetadataTopicConnector;
import org.odpi.openmetadata.repositoryservices.connectors.openmetadatatopic.OpenMetadataTopicListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OpenLineageServerOperationalServices {
    private static final Logger log = LoggerFactory.getLogger(OpenLineageServerOperationalServices.class);
    private String localServerName;
    private String localServerUserId;
    private String localServerPassword;
    private int maxPageSize;
    private OpenLineageServerConfig openLineageServerConfig;
    private OpenLineageServerInstance openLineageServerInstance = null;
    private OMRSAuditLog auditLog = null;
    private OpenMetadataTopicConnector inTopicConnector;

    public OpenLineageServerOperationalServices(String localServerName, String localServerUserId, String localServerPassword, int maxPageSize) {
        this.localServerName = localServerName;
        this.localServerUserId = localServerUserId;
        this.localServerPassword = localServerPassword;
        this.maxPageSize = maxPageSize;
    }

    public void initialize(OpenLineageServerConfig openLineageServerConfig, OMRSAuditLog auditLog) throws OMAGConfigurationErrorException {
        String actionDescription = "Initialize Open lineage Services";
        String methodName = "OpenLineageServerOperationalServices.initialize";
        this.openLineageServerConfig = openLineageServerConfig;
        this.auditLog = auditLog;
        OpenLineageServerAuditCode auditCode = OpenLineageServerAuditCode.SERVER_INITIALIZING;
        this.logAudit(auditCode, "Initialize Open lineage Services");
        if (openLineageServerConfig == null) {
            this.logAudit(OpenLineageServerAuditCode.NO_CONFIG_DOC, "Initialize Open lineage Services");
            this.throwError(OpenLineageServerErrorCode.NO_CONFIG_DOC, "OpenLineageServerOperationalServices.initialize");
        }
        Connection bufferGraphConnection = openLineageServerConfig.getOpenLineageBufferGraphConnection();
        Connection mainGraphConnection = openLineageServerConfig.getOpenLineageMainGraphConnection();
        BufferGraph bufferGraphConnector = (BufferGraph)this.getGraphConnector(bufferGraphConnection);
        MainGraph mainGraphConnector = (MainGraph)this.getGraphConnector(mainGraphConnection);
        try {
            bufferGraphConnector.initializeGraphDB();
            mainGraphConnector.initializeGraphDB();
        }
        catch (OpenLineageException e) {
            this.logAudit(OpenLineageServerAuditCode.CANNOT_OPEN_GRAPH_DB, "Initialize Open lineage Services");
            this.toOMAGConfigurationErrorException((OCFCheckedExceptionBase)e);
        }
        Object mainGraph = mainGraphConnector.getMainGraph();
        bufferGraphConnector.setMainGraph(mainGraph);
        try {
            bufferGraphConnector.start();
        }
        catch (ConnectorCheckedException e) {
            log.error("Could not start the buffer graph connector.");
            this.logAudit(OpenLineageServerAuditCode.ERROR_INITIALIZING_CONNECTOR, "Initialize Open lineage Services");
            this.toOMAGConfigurationErrorException((OCFCheckedExceptionBase)e);
        }
        try {
            mainGraphConnector.start();
        }
        catch (ConnectorCheckedException e) {
            log.error("Could not start the main graph connector.");
            this.logAudit(OpenLineageServerAuditCode.ERROR_INITIALIZING_CONNECTOR, "Initialize Open lineage Services");
            this.toOMAGConfigurationErrorException((OCFCheckedExceptionBase)e);
        }
        StoringServices storingServices = new StoringServices(bufferGraphConnector);
        OpenLineageHandler openLineageHandler = new OpenLineageHandler(mainGraphConnector);
        this.openLineageServerInstance = new OpenLineageServerInstance(this.localServerName, GovernanceServicesDescription.OPEN_LINEAGE_SERVICES.getServiceName(), this.maxPageSize, openLineageHandler);
        this.startEventBus(storingServices);
    }

    private OpenLineageGraph getGraphConnector(Connection connection) throws OMAGConfigurationErrorException {
        String actionDescription = "Get Open Lineage graph connector";
        String methodName = "OpenLineageServerOperationalServices.getGraphConnector";
        if (connection != null) {
            log.info("Found connection: {}", (Object)connection);
            try {
                ConnectorBroker connectorBroker = new ConnectorBroker();
                return (OpenLineageGraph)connectorBroker.getConnector(connection);
            }
            catch (ConnectionCheckedException | ConnectorCheckedException error) {
                log.error("Unable to initialize graph connector.", error);
                this.logAudit(OpenLineageServerAuditCode.ERROR_INITIALIZING_GRAPH_CONNECTOR, "Get Open Lineage graph connector");
                this.toOMAGConfigurationErrorException((OCFCheckedExceptionBase)error);
            }
        }
        return null;
    }

    private void startEventBus(StoringServices storingServices) throws OMAGConfigurationErrorException {
        String actionDescription = "Start event bus";
        String methodName = "OpenLineageServerOperationalServices.startEventBus";
        this.inTopicConnector = this.getTopicConnector(this.openLineageServerConfig.getInTopicConnection(), this.auditLog);
        if (this.inTopicConnector == null) {
            this.throwError(OpenLineageServerErrorCode.NO_IN_TOPIC_CONNECTOR, "OpenLineageServerOperationalServices.startEventBus");
        } else {
            InTopicListener governanceEventListener = new InTopicListener(storingServices, this.auditLog);
            this.inTopicConnector.registerListener((OpenMetadataTopicListener)governanceEventListener);
            this.startTopic(this.inTopicConnector);
            this.logAudit(OpenLineageServerAuditCode.SERVER_INITIALIZED, "Start event bus");
        }
    }

    private OpenMetadataTopicConnector getTopicConnector(Connection topicConnection, OMRSAuditLog auditLog) throws OMAGConfigurationErrorException {
        String actionDescription = "getGraphConnector";
        try {
            ConnectorBroker connectorBroker = new ConnectorBroker();
            OpenMetadataTopicConnector topicConnector = (OpenMetadataTopicConnector)connectorBroker.getConnector(topicConnection);
            topicConnector.setAuditLog(auditLog);
            return topicConnector;
        }
        catch (ConnectionCheckedException | ConnectorCheckedException error) {
            this.logAudit(OpenLineageServerAuditCode.ERROR_INITIALIZING_CONNECTOR, "getGraphConnector");
            this.toOMAGConfigurationErrorException((OCFCheckedExceptionBase)error);
            return null;
        }
    }

    private void startTopic(OpenMetadataTopicConnector topic) throws OMAGConfigurationErrorException {
        String actionDescription = "Start OpenMetadataTopicConnector topic connection";
        try {
            topic.start();
        }
        catch (ConnectorCheckedException error) {
            this.logAudit(OpenLineageServerAuditCode.ERROR_INITIALIZING_OPEN_LINEAGE_TOPIC_CONNECTION, "Start OpenMetadataTopicConnector topic connection");
            this.toOMAGConfigurationErrorException((OCFCheckedExceptionBase)error);
        }
    }

    public boolean disconnect(boolean permanent) {
        try {
            this.inTopicConnector.disconnect();
        }
        catch (ConnectorCheckedException e) {
            log.error("Error disconnecting Open lineages Services In Topic Connector");
            return false;
        }
        if (this.openLineageServerInstance != null) {
            this.openLineageServerInstance.shutdown();
        }
        String actionDescription = "shutdown";
        OpenLineageServerAuditCode auditCode = OpenLineageServerAuditCode.SERVER_SHUTDOWN;
        this.logAudit(auditCode, "shutdown");
        return true;
    }

    private void logAudit(OpenLineageServerAuditCode auditCode, String actionDescription) {
        this.auditLog.logRecord(actionDescription, auditCode.getLogMessageId(), auditCode.getSeverity(), auditCode.getFormattedLogMessage(this.localServerName), null, auditCode.getSystemAction(), auditCode.getUserAction());
    }

    private void throwError(OpenLineageServerErrorCode errorCode, String methodName) throws OMAGConfigurationErrorException {
        String errorMessage = errorCode.getErrorMessageId() + errorCode.getFormattedErrorMessage(new String[]{this.localServerName});
        throw new OMAGConfigurationErrorException(errorCode.getHTTPErrorCode(), this.getClass().getName(), methodName, errorMessage, errorCode.getSystemAction(), errorCode.getUserAction());
    }

    private void toOMAGConfigurationErrorException(OCFCheckedExceptionBase error) throws OMAGConfigurationErrorException {
        throw new OMAGConfigurationErrorException(error.getReportedHTTPCode(), error.getReportingClassName(), error.getReportingActionDescription(), error.getErrorMessage(), error.getReportedSystemAction(), error.getReportedUserAction());
    }
}

