/*
 * Decompiled with CFR 0.152.
 */
package org.odpi.openmetadata.governanceservers.openlineage.admin;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.apache.commons.collections4.CollectionUtils;
import org.odpi.openmetadata.accessservices.assetlineage.AssetLineage;
import org.odpi.openmetadata.accessservices.assetlineage.event.AssetLineageEventListener;
import org.odpi.openmetadata.accessservices.assetlineage.outtopic.connector.AssetLineageOutTopicClientConnector;
import org.odpi.openmetadata.adminservices.configuration.properties.OLSBackgroundJob;
import org.odpi.openmetadata.adminservices.configuration.properties.OLSSimplifiedAccessServiceConfig;
import org.odpi.openmetadata.adminservices.configuration.properties.OpenLineageServerConfig;
import org.odpi.openmetadata.adminservices.configuration.registration.GovernanceServicesDescription;
import org.odpi.openmetadata.adminservices.ffdc.exception.OMAGConfigurationErrorException;
import org.odpi.openmetadata.commonservices.ffdc.exceptions.PropertyServerException;
import org.odpi.openmetadata.commonservices.ocf.metadatamanagement.client.OCFRESTClient;
import org.odpi.openmetadata.commonservices.ocf.metadatamanagement.rest.ConnectionResponse;
import org.odpi.openmetadata.frameworks.auditlog.AuditLog;
import org.odpi.openmetadata.frameworks.connectors.Connector;
import org.odpi.openmetadata.frameworks.connectors.ConnectorBroker;
import org.odpi.openmetadata.frameworks.connectors.ffdc.InvalidParameterException;
import org.odpi.openmetadata.frameworks.connectors.ffdc.OCFCheckedExceptionBase;
import org.odpi.openmetadata.frameworks.connectors.ffdc.UserNotAuthorizedException;
import org.odpi.openmetadata.frameworks.connectors.properties.beans.Connection;
import org.odpi.openmetadata.governanceservers.openlineage.OpenLineageGraphConnector;
import org.odpi.openmetadata.governanceservers.openlineage.OpenLineageQueryService;
import org.odpi.openmetadata.governanceservers.openlineage.auditlog.OpenLineageServerAuditCode;
import org.odpi.openmetadata.governanceservers.openlineage.ffdc.OpenLineageServerErrorCode;
import org.odpi.openmetadata.governanceservers.openlineage.graph.LineageGraph;
import org.odpi.openmetadata.governanceservers.openlineage.handlers.OpenLineageAssetContextHandler;
import org.odpi.openmetadata.governanceservers.openlineage.handlers.OpenLineageHandler;
import org.odpi.openmetadata.governanceservers.openlineage.listeners.OpenLineageInTopicListener;
import org.odpi.openmetadata.governanceservers.openlineage.scheduler.AssetLineageUpdateJob;
import org.odpi.openmetadata.governanceservers.openlineage.scheduler.AssetLineageUpdateJobConfiguration;
import org.odpi.openmetadata.governanceservers.openlineage.scheduler.JobConfiguration;
import org.odpi.openmetadata.governanceservers.openlineage.scheduler.LineageGraphJob;
import org.odpi.openmetadata.governanceservers.openlineage.server.OpenLineageServerInstance;
import org.odpi.openmetadata.governanceservers.openlineage.services.StoringServices;
import org.odpi.openmetadata.repositoryservices.auditlog.OMRSAuditLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OpenLineageServerOperationalServices {
    private static final Logger log = LoggerFactory.getLogger(OpenLineageServerOperationalServices.class);
    private static final String EMPTY_STRING = "";
    private static final int RETRIEVE_OUT_TOPIC_CONNECTION_TIMEOUT = 60000;
    private final String localServerName;
    private final String localServerUserId;
    private final String localServerPassword;
    private final int maxPageSize;
    private final String localServerId;
    private OpenLineageServerConfig openLineageServerConfig;
    private OpenLineageServerInstance openLineageServerInstance;
    private OMRSAuditLog auditLog;
    private OpenLineageGraphConnector lineageGraphConnector;
    private AssetLineageOutTopicClientConnector inTopicConnector;
    private AssetLineage assetLineageClient;
    private List<JobConfiguration> backgroundJobs;

    public OpenLineageServerOperationalServices(String localServerId, String localServerName, String localServerUserId, String localServerPassword, int maxPageSize) {
        this.localServerId = localServerId;
        this.localServerName = localServerName;
        this.localServerUserId = localServerUserId;
        this.localServerPassword = localServerPassword;
        this.maxPageSize = maxPageSize;
    }

    public void initialize(OpenLineageServerConfig openLineageServerConfig, OMRSAuditLog auditLog) throws OMAGConfigurationErrorException {
        this.openLineageServerConfig = openLineageServerConfig;
        this.auditLog = auditLog;
        String methodName = "initialize";
        String actionDescription = "Initialize Open lineage Services";
        this.logRecord(OpenLineageServerAuditCode.SERVER_INITIALIZING, "Initialize Open lineage Services");
        if (openLineageServerConfig == null) {
            this.throwOMAGConfigurationErrorException(OpenLineageServerErrorCode.NO_CONFIG_DOC, this.localServerName, "initialize", OpenLineageServerAuditCode.NO_CONFIG_DOC, "Initialize Open lineage Services");
        }
        this.validateAccessServiceConfig(openLineageServerConfig.getAccessServiceConfig(), "initialize");
        try {
            this.initializeOLS(openLineageServerConfig);
        }
        catch (OMAGConfigurationErrorException e) {
            throw e;
        }
        catch (Exception e) {
            this.exceptionToOMAGConfigurationError(e, OpenLineageServerErrorCode.ERROR_INITIALIZING_OLS, "initialize", OpenLineageServerAuditCode.ERROR_INITIALIZING_OLS, "Initialize Open lineage Services");
        }
    }

    private void validateAccessServiceConfig(OLSSimplifiedAccessServiceConfig accessServiceConfig, String methodName) throws OMAGConfigurationErrorException {
        String actionDescription = "Verify the access service configuration";
        if (accessServiceConfig.getServerName() == null || accessServiceConfig.getServerName().isEmpty()) {
            this.throwOMAGConfigurationErrorException(OpenLineageServerErrorCode.BAD_ACCESS_SERVICE_CONFIG, "serverName", methodName, OpenLineageServerAuditCode.BAD_ACCESS_SERVICE_CONFIG, actionDescription);
        }
        if (accessServiceConfig.getServerPlatformUrlRoot() == null || accessServiceConfig.getServerPlatformUrlRoot().isEmpty()) {
            this.throwOMAGConfigurationErrorException(OpenLineageServerErrorCode.BAD_ACCESS_SERVICE_CONFIG, "serverPlatformUrlRoot", methodName, OpenLineageServerAuditCode.BAD_ACCESS_SERVICE_CONFIG, actionDescription);
        }
        if (accessServiceConfig.getUser() == null || accessServiceConfig.getUser().isEmpty()) {
            this.throwOMAGConfigurationErrorException(OpenLineageServerErrorCode.BAD_ACCESS_SERVICE_CONFIG, "user", methodName, OpenLineageServerAuditCode.BAD_ACCESS_SERVICE_CONFIG, actionDescription);
        }
    }

    private void initializeOLS(OpenLineageServerConfig openLineageServerConfig) throws OMAGConfigurationErrorException, InvalidParameterException, InterruptedException {
        String methodName = "initializeOLS";
        String actionDescription = "Initialize Open lineage Services";
        Connection lineageGraphConnection = openLineageServerConfig.getLineageGraphConnection();
        OLSSimplifiedAccessServiceConfig accessServiceConfig = openLineageServerConfig.getAccessServiceConfig();
        Connection inTopicConnection = this.getAssetLineageOutTopicConnection("initializeOLS", accessServiceConfig);
        this.lineageGraphConnector = (OpenLineageGraphConnector)this.getConnector(lineageGraphConnection, OpenLineageServerErrorCode.ERROR_OBTAINING_LINEAGE_GRAPH_CONNECTOR, OpenLineageServerAuditCode.ERROR_OBTAINING_LINEAGE_GRAPH_CONNECTOR);
        this.inTopicConnector = (AssetLineageOutTopicClientConnector)this.getConnector(inTopicConnection, OpenLineageServerErrorCode.ERROR_OBTAINING_IN_TOPIC_CONNECTOR, OpenLineageServerAuditCode.ERROR_OBTAINING_IN_TOPIC_CONNECTOR);
        this.initializeAndStartConnectors();
        OpenLineageQueryService openLineageQueryService = this.lineageGraphConnector.getLineageQueryService();
        OpenLineageHandler openLineageHandler = new OpenLineageHandler(openLineageQueryService);
        this.initializeAndStartBackgroundJobs();
        this.openLineageServerInstance = new OpenLineageServerInstance(this.localServerName, GovernanceServicesDescription.OPEN_LINEAGE_SERVICES.getServiceName(), this.maxPageSize, openLineageHandler);
        this.logRecord(OpenLineageServerAuditCode.SERVER_INITIALIZED, "Initialize Open lineage Services");
    }

    private Connection getAssetLineageOutTopicConnection(String methodName, OLSSimplifiedAccessServiceConfig accessServiceConfig) throws InvalidParameterException, InterruptedException {
        String serverName = accessServiceConfig.getServerName();
        String serverPlatformURLRoot = accessServiceConfig.getServerPlatformUrlRoot();
        String serverPassword = accessServiceConfig.getPassword();
        String serverUserId = accessServiceConfig.getUser();
        OCFRESTClient restClient = serverPassword == null ? new OCFRESTClient(serverName, serverPlatformURLRoot) : new OCFRESTClient(serverName, serverPlatformURLRoot, serverUserId, serverPassword, (AuditLog)this.auditLog);
        ConnectionResponse restResult = this.getConnection(methodName, restClient, accessServiceConfig);
        while (restResult == null) {
            Thread.sleep(60000L);
            restResult = this.getConnection(methodName, restClient, accessServiceConfig);
        }
        return restResult.getConnection();
    }

    private ConnectionResponse getConnection(String methodName, OCFRESTClient restClient, OLSSimplifiedAccessServiceConfig accessServiceConfig) {
        String actionDescription = "Retrieve topic Asset Lineage out topic connection";
        String urlTemplate = "/servers/{0}/open-metadata/access-services/asset-lineage/users/{1}/topics/out-topic-connection/{2}";
        String serverName = accessServiceConfig.getServerName();
        String serverPlatformURLRoot = accessServiceConfig.getServerPlatformUrlRoot();
        String serverUserId = accessServiceConfig.getUser();
        ConnectionResponse restResult = null;
        try {
            restResult = restClient.callOCFConnectionGetRESTCall(methodName, serverPlatformURLRoot + "/servers/{0}/open-metadata/access-services/asset-lineage/users/{1}/topics/out-topic-connection/{2}", new Object[]{serverName, serverUserId, this.localServerId});
        }
        catch (InvalidParameterException | org.odpi.openmetadata.frameworks.connectors.ffdc.PropertyServerException | UserNotAuthorizedException e) {
            this.logException(OpenLineageServerAuditCode.COULD_NOT_RETRIEVE_TOPIC_CONNECTOR, "Retrieve topic Asset Lineage out topic connection", (Exception)e);
        }
        return restResult;
    }

    private void initializeAndStartBackgroundJobs() {
        Optional<OLSBackgroundJob> assetLineageUpdateJob;
        this.backgroundJobs = new ArrayList<JobConfiguration>();
        Optional<OLSBackgroundJob> lineageGraphJob = this.getJob("LineageGraphJob");
        if (this.isJobEnabled(lineageGraphJob)) {
            int lineageGraphJobInterval = this.getJobInterval(lineageGraphJob);
            this.backgroundJobs.add(new JobConfiguration(this.lineageGraphConnector, "LineageGraphJob", LineageGraphJob.class, lineageGraphJobInterval));
        }
        if (this.isJobEnabled(assetLineageUpdateJob = this.getJob("AssetLineageUpdateJob"))) {
            int assetLineageJobInterval = this.getJobInterval(assetLineageUpdateJob);
            String configAssetLineageLastUpdateTime = this.getDefaultValue(assetLineageUpdateJob);
            String assetLineageServerName = this.openLineageServerConfig.getAccessServiceConfig().getServerName();
            this.backgroundJobs.add(new AssetLineageUpdateJobConfiguration(this.lineageGraphConnector, "AssetLineageUpdateJob", AssetLineageUpdateJob.class, assetLineageJobInterval, configAssetLineageLastUpdateTime, this.assetLineageClient, assetLineageServerName, this.localServerUserId));
        }
        this.backgroundJobs.forEach(JobConfiguration::schedule);
    }

    private int getJobInterval(Optional<OLSBackgroundJob> job) {
        return job.map(OLSBackgroundJob::getJobInterval).orElse(120);
    }

    private boolean isJobEnabled(Optional<OLSBackgroundJob> job) {
        return job.map(OLSBackgroundJob::isJobEnabled).orElse(Boolean.TRUE);
    }

    private String getDefaultValue(Optional<OLSBackgroundJob> job) {
        return job.map(OLSBackgroundJob::getJobDefaultValue).orElse(EMPTY_STRING);
    }

    private Optional<OLSBackgroundJob> getJob(String name) {
        return this.openLineageServerConfig.getBackgroundJobs().stream().filter(job -> name.equals(job.getJobName())).findAny();
    }

    private Connector getConnector(Connection connection, OpenLineageServerErrorCode errorCode, OpenLineageServerAuditCode auditCode) throws OMAGConfigurationErrorException {
        String actionDescription = "Obtaining graph database connector";
        String methodName = "getGraphConnector";
        Connector connector = null;
        try {
            connector = new ConnectorBroker((AuditLog)this.auditLog).getConnector(connection);
        }
        catch (OCFCheckedExceptionBase e) {
            this.OCFCheckedExceptionToOMAGConfigurationError(e, auditCode, "Obtaining graph database connector");
        }
        catch (Exception e) {
            this.exceptionToOMAGConfigurationError(e, errorCode, "getGraphConnector", auditCode, "Obtaining graph database connector");
        }
        return connector;
    }

    private void initializeAndStartConnectors() throws OMAGConfigurationErrorException, InvalidParameterException {
        this.initializeGraphConnectorDB(this.lineageGraphConnector, OpenLineageServerErrorCode.ERROR_INITIALIZING_LINEAGE_GRAPH_CONNECTOR_DB, OpenLineageServerAuditCode.ERROR_INITIALIZING_LINEAGE_GRAPH_CONNECTOR_DB, "initializeLineageGraphConnector");
        this.startGraphConnector(this.lineageGraphConnector, OpenLineageServerErrorCode.ERROR_STARTING_LINEAGE_GRAPH_CONNECTOR, OpenLineageServerAuditCode.ERROR_STARTING_LINEAGE_GRAPH_CONNECTOR, "startLineageGraphConnector");
        this.startIntopicConnector();
    }

    private void initializeGraphConnectorDB(OpenLineageGraphConnector connector, OpenLineageServerErrorCode errorCode, OpenLineageServerAuditCode auditCode, String actionDescription) throws OMAGConfigurationErrorException {
        String methodName = "initializeGraphConnectorDB";
        try {
            connector.initializeGraphDB((AuditLog)this.auditLog);
        }
        catch (OCFCheckedExceptionBase e) {
            this.OCFCheckedExceptionToOMAGConfigurationError(e, auditCode, actionDescription);
        }
        catch (Exception e) {
            this.exceptionToOMAGConfigurationError(e, errorCode, "initializeGraphConnectorDB", auditCode, actionDescription);
        }
    }

    private void startGraphConnector(OpenLineageGraphConnector connector, OpenLineageServerErrorCode errorCode, OpenLineageServerAuditCode auditCode, String actionDescription) throws OMAGConfigurationErrorException {
        String methodName = "startGraphConnector";
        try {
            connector.start();
        }
        catch (OCFCheckedExceptionBase e) {
            this.OCFCheckedExceptionToOMAGConfigurationError(e, auditCode, actionDescription);
        }
        catch (Exception e) {
            this.exceptionToOMAGConfigurationError(e, errorCode, "startGraphConnector", auditCode, actionDescription);
        }
    }

    private void startIntopicConnector() throws OMAGConfigurationErrorException, InvalidParameterException {
        String actionDescription = "Start the Open Lineage Services in-topic listener";
        String methodName = "startIntopicConnector";
        OpenLineageServerAuditCode auditCode = OpenLineageServerAuditCode.ERROR_STARTING_IN_TOPIC_CONNECTOR;
        this.inTopicConnector.setAuditLog((AuditLog)this.auditLog);
        LineageGraph lineageGraph = this.lineageGraphConnector.getLineageStorageService();
        StoringServices storingServices = new StoringServices(lineageGraph);
        OLSSimplifiedAccessServiceConfig accessServiceConfig = this.openLineageServerConfig.getAccessServiceConfig();
        this.assetLineageClient = new AssetLineage(accessServiceConfig.getServerName(), accessServiceConfig.getServerPlatformUrlRoot());
        OpenLineageAssetContextHandler assetContextHandler = new OpenLineageAssetContextHandler(this.localServerUserId, this.assetLineageClient);
        OpenLineageInTopicListener openLineageInTopicListener = new OpenLineageInTopicListener(storingServices, assetContextHandler, this.auditLog);
        this.inTopicConnector.registerListener(this.localServerUserId, (AssetLineageEventListener)openLineageInTopicListener);
        try {
            this.inTopicConnector.start();
        }
        catch (OCFCheckedExceptionBase e) {
            this.OCFCheckedExceptionToOMAGConfigurationError(e, auditCode, "Start the Open Lineage Services in-topic listener");
        }
        catch (Exception e) {
            this.exceptionToOMAGConfigurationError(e, OpenLineageServerErrorCode.ERROR_STARTING_IN_TOPIC_CONNECTOR, "startIntopicConnector", auditCode, "Start the Open Lineage Services in-topic listener");
        }
        this.logRecord(OpenLineageServerAuditCode.SERVER_REGISTERED_WITH_IN_TOPIC, "Start the Open Lineage Services in-topic listener");
    }

    private void throwOMAGConfigurationErrorException(OpenLineageServerErrorCode errorCode, String errorDetails, String methodName, OpenLineageServerAuditCode auditCode, String actionDescription) throws OMAGConfigurationErrorException {
        String errorMessage = errorCode.getErrorMessageId() + errorCode.getFormattedErrorMessage(new String[]{errorDetails});
        OMAGConfigurationErrorException e = new OMAGConfigurationErrorException(errorCode.getHTTPErrorCode(), this.getClass().getName(), methodName, errorMessage, null, null, errorCode.getSystemAction(), errorCode.getUserAction(), null, null);
        this.logException(auditCode, actionDescription, (Exception)((Object)e));
        throw e;
    }

    private void exceptionToOMAGConfigurationError(Exception e, OpenLineageServerErrorCode errorCode, String methodName, OpenLineageServerAuditCode auditCode, String actionDescription) throws OMAGConfigurationErrorException {
        this.logException(auditCode, actionDescription, e);
        String errorMessage = errorCode.getErrorMessageId() + errorCode.getFormattedErrorMessage(new String[]{this.localServerName});
        throw new OMAGConfigurationErrorException(errorCode.getHTTPErrorCode(), this.getClass().getName(), methodName, errorMessage, null, null, errorCode.getSystemAction(), errorCode.getUserAction(), null, null);
    }

    private void exceptionToPropertyServerException(Exception e, OpenLineageServerErrorCode errorCode, String methodName, OpenLineageServerAuditCode auditCode, String actionDescription) throws PropertyServerException {
        this.logException(auditCode, actionDescription, e);
        String errorMessage = errorCode.getErrorMessageId() + errorCode.getFormattedErrorMessage(new String[]{this.localServerName});
        throw new PropertyServerException(errorCode.getHTTPErrorCode(), this.getClass().getName(), methodName, errorMessage, null, null, errorCode.getSystemAction(), errorCode.getUserAction(), null, null);
    }

    private void OCFCheckedExceptionToOMAGConfigurationError(OCFCheckedExceptionBase e, OpenLineageServerAuditCode auditCode, String actionDescription) throws OMAGConfigurationErrorException {
        this.logException(auditCode, actionDescription, (Exception)((Object)e));
        throw new OMAGConfigurationErrorException(e.getReportedHTTPCode(), e.getReportingClassName(), e.getReportingActionDescription(), e.getReportedErrorMessage(), e.getReportedErrorMessageId(), e.getReportedErrorMessageParameters(), e.getReportedSystemAction(), e.getReportedUserAction(), e.getReportedCaughtExceptionClassName(), e.getRelatedProperties());
    }

    public boolean shutdown() throws PropertyServerException {
        String actionDescription = "Shutting down the open lineage Services server.";
        this.logRecord(OpenLineageServerAuditCode.SERVER_SHUTTING_DOWN, actionDescription);
        this.stopBackgroundJob();
        this.disconnectInTopicConnector();
        this.disconnectGraphConnector(this.lineageGraphConnector, OpenLineageServerErrorCode.ERROR_DISCONNECTING_LINEAGE_GRAPH_CONNECTOR, OpenLineageServerAuditCode.ERROR_DISCONNECTING_LINEAGE_GRAPH_CONNECTOR, "Disconnecting lineage graph connection.");
        if (this.openLineageServerInstance != null) {
            this.openLineageServerInstance.shutdown();
        }
        this.logRecord(OpenLineageServerAuditCode.SERVER_SHUTDOWN, actionDescription);
        return true;
    }

    private void stopBackgroundJob() {
        if (CollectionUtils.isNotEmpty(this.backgroundJobs)) {
            this.backgroundJobs.forEach(JobConfiguration::stop);
        }
    }

    private void disconnectGraphConnector(OpenLineageGraphConnector connector, OpenLineageServerErrorCode errorCode, OpenLineageServerAuditCode auditCode, String actionDescription) throws PropertyServerException {
        String methodName = "disconnectGraphConnector";
        if (connector == null) {
            return;
        }
        try {
            connector.disconnect();
        }
        catch (Exception e) {
            this.exceptionToPropertyServerException(e, errorCode, "disconnectGraphConnector", auditCode, actionDescription);
        }
    }

    private void disconnectInTopicConnector() throws PropertyServerException {
        String methodName = "disconnectInTopicConnector";
        String actionDescription = "Disconnecting the Open Lineage Services in-topic listener";
        if (this.inTopicConnector == null) {
            return;
        }
        try {
            this.inTopicConnector.disconnect();
        }
        catch (Exception e) {
            this.exceptionToPropertyServerException(e, OpenLineageServerErrorCode.ERROR_DISCONNECTING_IN_TOPIC_CONNECTOR, "disconnectInTopicConnector", OpenLineageServerAuditCode.ERROR_DISCONNECTING_IN_TOPIC_CONNECTOR, "Disconnecting the Open Lineage Services in-topic listener");
        }
    }

    private void logRecord(OpenLineageServerAuditCode auditCode, String actionDescription) {
        this.auditLog.logRecord(actionDescription, auditCode.getLogMessageId(), auditCode.getSeverity(), auditCode.getFormattedLogMessage(this.localServerName), null, auditCode.getSystemAction(), auditCode.getUserAction());
        log.info(auditCode.getSystemAction());
    }

    private void logException(OpenLineageServerAuditCode auditCode, String actionDescription, Exception e) {
        this.auditLog.logException(actionDescription, auditCode.getLogMessageId(), auditCode.getSeverity(), auditCode.getFormattedLogMessage(this.localServerName, this.openLineageServerConfig.toString()), null, auditCode.getSystemAction(), auditCode.getUserAction(), e);
        log.error(auditCode.getSystemAction(), (Throwable)e);
    }
}

