/*
 * Decompiled with CFR 0.152.
 */
package org.odpi.openmetadata.governanceservers.dataengineproxy.processor;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.odpi.openmetadata.accessservices.dataengine.client.DataEngineImpl;
import org.odpi.openmetadata.adminservices.configuration.properties.DataEngineProxyConfig;
import org.odpi.openmetadata.frameworks.connectors.ffdc.ConnectorCheckedException;
import org.odpi.openmetadata.frameworks.connectors.ffdc.InvalidParameterException;
import org.odpi.openmetadata.frameworks.connectors.ffdc.OCFRuntimeException;
import org.odpi.openmetadata.frameworks.connectors.ffdc.PropertyServerException;
import org.odpi.openmetadata.frameworks.connectors.ffdc.UserNotAuthorizedException;
import org.odpi.openmetadata.governanceservers.dataengineproxy.auditlog.DataEngineProxyAuditCode;
import org.odpi.openmetadata.openconnectors.governancedaemonconnectors.dataengineproxy.DataEngineConnectorBase;
import org.odpi.openmetadata.openconnectors.governancedaemonconnectors.dataengineproxy.DataEngineConnectorErrorCode;
import org.odpi.openmetadata.openconnectors.governancedaemonconnectors.dataengineproxy.model.DataEngineLineageMappings;
import org.odpi.openmetadata.openconnectors.governancedaemonconnectors.dataengineproxy.model.DataEnginePortAlias;
import org.odpi.openmetadata.openconnectors.governancedaemonconnectors.dataengineproxy.model.DataEnginePortImplementation;
import org.odpi.openmetadata.openconnectors.governancedaemonconnectors.dataengineproxy.model.DataEngineProcess;
import org.odpi.openmetadata.openconnectors.governancedaemonconnectors.dataengineproxy.model.DataEngineSchemaType;
import org.odpi.openmetadata.openconnectors.governancedaemonconnectors.dataengineproxy.model.DataEngineSoftwareServerCapability;
import org.odpi.openmetadata.repositoryservices.auditlog.OMRSAuditLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataEngineProxyChangePoller
implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(DataEngineProxyChangePoller.class);
    private OMRSAuditLog auditLog;
    private DataEngineProxyConfig dataEngineProxyConfig;
    private DataEngineImpl dataEngineOMASClient;
    private DataEngineConnectorBase connector;
    private final AtomicBoolean running = new AtomicBoolean(false);

    public void start() {
        Thread worker = new Thread(this);
        worker.start();
    }

    public void stop() {
        this.running.set(false);
    }

    public DataEngineProxyChangePoller(DataEngineConnectorBase connector, DataEngineProxyConfig dataEngineProxyConfig, DataEngineImpl dataEngineOMASClient, OMRSAuditLog auditLog) {
        String methodName = "DataEngineProxyChangePoller";
        this.connector = connector;
        this.dataEngineProxyConfig = dataEngineProxyConfig;
        this.dataEngineOMASClient = dataEngineOMASClient;
        this.auditLog = auditLog;
        if (connector != null) {
            try {
                connector.start();
                DataEngineSoftwareServerCapability dataEngineDetails = connector.getDataEngineDetails();
                dataEngineOMASClient.createExternalDataEngine(dataEngineDetails.getUserId(), dataEngineDetails.getSoftwareServerCapability());
                dataEngineOMASClient.setExternalSourceName(dataEngineDetails.getSoftwareServerCapability().getQualifiedName());
            }
            catch (InvalidParameterException | PropertyServerException e) {
                DataEngineConnectorErrorCode errorCode = DataEngineConnectorErrorCode.OMAS_CONNECTION_ERROR;
                String errorMessage = errorCode.getErrorMessageId() + errorCode.getFormattedErrorMessage(new String[0]);
                throw new OCFRuntimeException(errorCode.getHTTPErrorCode(), this.getClass().getName(), "DataEngineProxyChangePoller", errorMessage, errorCode.getSystemAction(), errorCode.getUserAction(), e);
            }
            catch (UserNotAuthorizedException e) {
                DataEngineConnectorErrorCode errorCode = DataEngineConnectorErrorCode.USER_NOT_AUTHORIZED;
                String errorMessage = errorCode.getErrorMessageId() + errorCode.getFormattedErrorMessage(new String[0]);
                throw new OCFRuntimeException(errorCode.getHTTPErrorCode(), this.getClass().getName(), "DataEngineProxyChangePoller", errorMessage, errorCode.getSystemAction(), errorCode.getUserAction(), (Throwable)e);
            }
            catch (ConnectorCheckedException e) {
                DataEngineConnectorErrorCode errorCode = DataEngineConnectorErrorCode.UNKNOWN_ERROR;
                String errorMessage = errorCode.getErrorMessageId() + errorCode.getFormattedErrorMessage(new String[0]);
                throw new OCFRuntimeException(errorCode.getHTTPErrorCode(), this.getClass().getName(), "DataEngineProxyChangePoller", errorMessage, errorCode.getSystemAction(), errorCode.getUserAction(), (Throwable)e);
            }
        } else {
            DataEngineConnectorErrorCode errorCode = DataEngineConnectorErrorCode.NO_CONFIG;
            String errorMessage = errorCode.getErrorMessageId() + errorCode.getFormattedErrorMessage(new String[0]);
            throw new OCFRuntimeException(errorCode.getHTTPErrorCode(), this.getClass().getName(), "DataEngineProxyChangePoller", errorMessage, errorCode.getSystemAction(), errorCode.getUserAction());
        }
        if (!connector.isActive()) {
            DataEngineConnectorErrorCode errorCode = DataEngineConnectorErrorCode.NO_CONFIG;
            String errorMessage = errorCode.getErrorMessageId() + errorCode.getFormattedErrorMessage(new String[0]);
            throw new OCFRuntimeException(errorCode.getHTTPErrorCode(), this.getClass().getName(), "DataEngineProxyChangePoller", errorMessage, errorCode.getSystemAction(), errorCode.getUserAction());
        }
        DataEngineProxyAuditCode auditCode = DataEngineProxyAuditCode.SERVICE_INITIALIZED;
        this.auditLog.logRecord("Initializing", auditCode.getLogMessageId(), auditCode.getSeverity(), auditCode.getFormattedLogMessage(connector.getConnection().getConnectorType().getConnectorProviderClassName()), null, auditCode.getSystemAction(), auditCode.getUserAction());
    }

    @Override
    public void run() {
        String methodName = "ProcessPollThread::run";
        this.running.set(true);
        while (this.running.get()) {
            String errorMessage;
            DataEngineConnectorErrorCode errorCode;
            try {
                Date changesLastSynced = this.connector.getChangesLastSynced();
                Date changesCutoff = new Date();
                this.ensureSourceNameIsSet();
                if (log.isInfoEnabled()) {
                    log.info("Polling for changes since: {}", (Object)changesLastSynced);
                }
                DataEngineProxyAuditCode auditCode = DataEngineProxyAuditCode.POLLING;
                this.auditLog.logRecord("Polling", auditCode.getLogMessageId(), auditCode.getSeverity(), auditCode.getFormattedLogMessage(changesLastSynced == null ? "(all changes)" : changesLastSynced.toString()), null, auditCode.getSystemAction(), auditCode.getUserAction());
                this.upsertSchemaTypes(changesLastSynced, changesCutoff);
                this.upsertPortImplementations(changesLastSynced, changesCutoff);
                this.upsertPortAliases(changesLastSynced, changesCutoff);
                this.upsertProcesses(changesLastSynced, changesCutoff);
                this.upsertLineageMappings(changesLastSynced, changesCutoff);
                this.connector.setChangesLastSynced(changesCutoff);
                Thread.sleep((long)this.dataEngineProxyConfig.getPollIntervalInSeconds() * 1000L);
            }
            catch (InvalidParameterException | PropertyServerException e) {
                errorCode = DataEngineConnectorErrorCode.OMAS_CONNECTION_ERROR;
                errorMessage = errorCode.getErrorMessageId() + errorCode.getFormattedErrorMessage(new String[0]);
                throw new OCFRuntimeException(errorCode.getHTTPErrorCode(), this.getClass().getName(), "ProcessPollThread::run", errorMessage, errorCode.getSystemAction(), errorCode.getUserAction(), e);
            }
            catch (UserNotAuthorizedException e) {
                errorCode = DataEngineConnectorErrorCode.USER_NOT_AUTHORIZED;
                errorMessage = errorCode.getErrorMessageId() + errorCode.getFormattedErrorMessage(new String[0]);
                throw new OCFRuntimeException(errorCode.getHTTPErrorCode(), this.getClass().getName(), "ProcessPollThread::run", errorMessage, errorCode.getSystemAction(), errorCode.getUserAction(), (Throwable)e);
            }
            catch (Exception e) {
                errorCode = DataEngineConnectorErrorCode.UNKNOWN_ERROR;
                errorMessage = errorCode.getErrorMessageId() + errorCode.getFormattedErrorMessage(new String[0]);
                throw new OCFRuntimeException(errorCode.getHTTPErrorCode(), this.getClass().getName(), "ProcessPollThread::run", errorMessage, errorCode.getSystemAction(), errorCode.getUserAction(), (Throwable)e);
            }
        }
    }

    private void ensureSourceNameIsSet() {
        if (this.dataEngineOMASClient.getExternalSourceName() == null) {
            this.dataEngineOMASClient.setExternalSourceName(this.connector.getDataEngineDetails().getSoftwareServerCapability().getQualifiedName());
        }
    }

    private void upsertSchemaTypes(Date changesLastSynced, Date changesCutoff) throws InvalidParameterException, PropertyServerException, UserNotAuthorizedException {
        List changedSchemaTypes;
        if (log.isInfoEnabled()) {
            log.info(" ... getting changed schema types.");
        }
        if ((changedSchemaTypes = this.connector.getChangedSchemaTypes(changesLastSynced, changesCutoff)) != null) {
            for (DataEngineSchemaType changedSchemaType : changedSchemaTypes) {
                this.dataEngineOMASClient.createOrUpdateSchemaType(changedSchemaType.getUserId(), changedSchemaType.getSchemaType());
            }
            if (log.isInfoEnabled()) {
                log.info(" ... completing schema type changes.");
            }
        }
    }

    private void upsertPortImplementations(Date changesLastSynced, Date changesCutoff) throws InvalidParameterException, PropertyServerException, UserNotAuthorizedException {
        List changedPortImplementations;
        if (log.isInfoEnabled()) {
            log.info(" ... getting changed port implementations.");
        }
        if ((changedPortImplementations = this.connector.getChangedPortImplementations(changesLastSynced, changesCutoff)) != null) {
            for (DataEnginePortImplementation changedPortImplementation : changedPortImplementations) {
                this.dataEngineOMASClient.createOrUpdatePortImplementation(changedPortImplementation.getUserId(), changedPortImplementation.getPortImplementation());
            }
            if (log.isInfoEnabled()) {
                log.info(" ... completing port implementation changes.");
            }
        }
    }

    private void upsertPortAliases(Date changesLastSynced, Date changesCutoff) throws InvalidParameterException, PropertyServerException, UserNotAuthorizedException {
        List changedPortAliases;
        if (log.isInfoEnabled()) {
            log.info(" ... getting changed port aliases.");
        }
        if ((changedPortAliases = this.connector.getChangedPortAliases(changesLastSynced, changesCutoff)) != null) {
            for (DataEnginePortAlias changedPortAlias : changedPortAliases) {
                this.dataEngineOMASClient.createOrUpdatePortAlias(changedPortAlias.getUserId(), changedPortAlias.getPortAlias());
            }
            if (log.isInfoEnabled()) {
                log.info(" ... completing port alias changes.");
            }
        }
    }

    private void upsertProcesses(Date changesLastSynced, Date changesCutoff) throws InvalidParameterException, PropertyServerException, UserNotAuthorizedException {
        List changedProcesses;
        if (log.isInfoEnabled()) {
            log.info(" ... getting changed processes.");
        }
        if ((changedProcesses = this.connector.getChangedProcesses(changesLastSynced, changesCutoff)) != null) {
            for (DataEngineProcess changedProcess : changedProcesses) {
                this.dataEngineOMASClient.createOrUpdateProcess(changedProcess.getUserId(), changedProcess.getProcess());
            }
            if (log.isInfoEnabled()) {
                log.info(" ... completing process changes.");
            }
        }
    }

    private void upsertLineageMappings(Date changesLastSynced, Date changesCutoff) throws InvalidParameterException, PropertyServerException, UserNotAuthorizedException {
        List changedLineageMappings;
        if (log.isInfoEnabled()) {
            log.info(" ... getting changed lineage mappings.");
        }
        if ((changedLineageMappings = this.connector.getChangedLineageMappings(changesLastSynced, changesCutoff)) != null) {
            for (DataEngineLineageMappings changedLineageMapping : changedLineageMappings) {
                this.dataEngineOMASClient.addLineageMappings(changedLineageMapping.getUserId(), new ArrayList(changedLineageMapping.getLineageMappings()));
            }
            if (log.isInfoEnabled()) {
                log.info(" ... completing lineage mapping changes.");
            }
        }
    }
}

