/*
 * Decompiled with CFR 0.152.
 */
package org.odpi.openmetadata.governanceservers.dataengineproxy.processor;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import org.odpi.openmetadata.accessservices.dataengine.client.DataEngineImpl;
import org.odpi.openmetadata.adminservices.configuration.properties.DataEngineProxyConfig;
import org.odpi.openmetadata.frameworks.connectors.ffdc.ConnectorCheckedException;
import org.odpi.openmetadata.frameworks.connectors.ffdc.InvalidParameterException;
import org.odpi.openmetadata.frameworks.connectors.ffdc.OCFRuntimeException;
import org.odpi.openmetadata.frameworks.connectors.ffdc.PropertyServerException;
import org.odpi.openmetadata.frameworks.connectors.ffdc.UserNotAuthorizedException;
import org.odpi.openmetadata.governanceservers.dataengineproxy.auditlog.DataEngineProxyAuditCode;
import org.odpi.openmetadata.openconnectors.governancedaemonconnectors.dataengineproxy.DataEngineConnectorBase;
import org.odpi.openmetadata.openconnectors.governancedaemonconnectors.dataengineproxy.DataEngineConnectorErrorCode;
import org.odpi.openmetadata.openconnectors.governancedaemonconnectors.dataengineproxy.model.DataEngineLineageMappings;
import org.odpi.openmetadata.openconnectors.governancedaemonconnectors.dataengineproxy.model.DataEnginePortImplementation;
import org.odpi.openmetadata.openconnectors.governancedaemonconnectors.dataengineproxy.model.DataEngineProcess;
import org.odpi.openmetadata.openconnectors.governancedaemonconnectors.dataengineproxy.model.DataEngineSoftwareServerCapability;
import org.odpi.openmetadata.repositoryservices.auditlog.OMRSAuditLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataEngineProxyChangePoller
implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(DataEngineProxyChangePoller.class);
    private OMRSAuditLog auditLog;
    private DataEngineProxyConfig dataEngineProxyConfig;
    private DataEngineImpl dataEngineOMASClient;
    private DataEngineConnectorBase connector;
    private String engineGuid;

    public DataEngineProxyChangePoller(DataEngineConnectorBase connector, DataEngineProxyConfig dataEngineProxyConfig, DataEngineImpl dataEngineOMASClient, OMRSAuditLog auditLog) {
        String methodName = "DataEngineProxyChangePoller";
        this.connector = connector;
        this.dataEngineProxyConfig = dataEngineProxyConfig;
        this.dataEngineOMASClient = dataEngineOMASClient;
        this.auditLog = auditLog;
        if (connector != null) {
            try {
                connector.start();
                DataEngineSoftwareServerCapability dataEngineDetails = connector.getDataEngineDetails();
                this.engineGuid = dataEngineOMASClient.createSoftwareServerCapability(dataEngineDetails.getUserId(), dataEngineDetails.getSoftwareServerCapability());
            }
            catch (InvalidParameterException | PropertyServerException e) {
                DataEngineConnectorErrorCode errorCode = DataEngineConnectorErrorCode.OMAS_CONNECTION_ERROR;
                String errorMessage = errorCode.getErrorMessageId() + errorCode.getFormattedErrorMessage(new String[0]);
                throw new OCFRuntimeException(errorCode.getHTTPErrorCode(), this.getClass().getName(), "DataEngineProxyChangePoller", errorMessage, errorCode.getSystemAction(), errorCode.getUserAction(), e);
            }
            catch (UserNotAuthorizedException e) {
                DataEngineConnectorErrorCode errorCode = DataEngineConnectorErrorCode.USER_NOT_AUTHORIZED;
                String errorMessage = errorCode.getErrorMessageId() + errorCode.getFormattedErrorMessage(new String[0]);
                throw new OCFRuntimeException(errorCode.getHTTPErrorCode(), this.getClass().getName(), "DataEngineProxyChangePoller", errorMessage, errorCode.getSystemAction(), errorCode.getUserAction(), (Throwable)e);
            }
            catch (ConnectorCheckedException e) {
                log.error("Error in starting the Data Engine Proxy connector.", (Throwable)e);
            }
        } else {
            DataEngineConnectorErrorCode errorCode = DataEngineConnectorErrorCode.NO_CONFIG;
            String errorMessage = errorCode.getErrorMessageId() + errorCode.getFormattedErrorMessage(new String[0]);
            throw new OCFRuntimeException(errorCode.getHTTPErrorCode(), this.getClass().getName(), "DataEngineProxyChangePoller", errorMessage, errorCode.getSystemAction(), errorCode.getUserAction());
        }
        if (connector == null || !connector.isActive()) {
            DataEngineConnectorErrorCode errorCode = DataEngineConnectorErrorCode.NO_CONFIG;
            String errorMessage = errorCode.getErrorMessageId() + errorCode.getFormattedErrorMessage(new String[0]);
            throw new OCFRuntimeException(errorCode.getHTTPErrorCode(), this.getClass().getName(), "DataEngineProxyChangePoller", errorMessage, errorCode.getSystemAction(), errorCode.getUserAction());
        }
        DataEngineProxyAuditCode auditCode = DataEngineProxyAuditCode.SERVICE_INITIALIZED;
        this.auditLog.logRecord("Initializing", auditCode.getLogMessageId(), auditCode.getSeverity(), auditCode.getFormattedLogMessage(connector.getConnection().getConnectorType().getConnectorProviderClassName()), null, auditCode.getSystemAction(), auditCode.getUserAction());
    }

    @Override
    public void run() {
        String methodName = "ProcessPollThread::run";
        while (true) {
            try {
                while (true) {
                    List changedLineageMappings;
                    List changedProcesses;
                    List changedPortAliases;
                    List changedPortImplementations;
                    List changedSchemaTypes;
                    Date changesLastSynced = this.connector.getChangesLastSynced();
                    Date changesCutoff = new Date();
                    if (log.isInfoEnabled()) {
                        log.info("Polling for changes since: {}", (Object)changesLastSynced);
                    }
                    if ((changedSchemaTypes = this.connector.getChangedSchemaTypes(changesLastSynced, changesCutoff)) != null) {
                        for (Object changedSchemaType : changedSchemaTypes) {
                            this.dataEngineOMASClient.createSchemaType(changedSchemaType.getUserId(), changedSchemaType.getSchemaType());
                        }
                    }
                    if ((changedPortImplementations = this.connector.getChangedPortImplementations(changesLastSynced, changesCutoff)) != null) {
                        Object changedSchemaType;
                        changedSchemaType = changedPortImplementations.iterator();
                        while (changedSchemaType.hasNext()) {
                            DataEnginePortImplementation changedPortImplementation = (DataEnginePortImplementation)changedSchemaType.next();
                            this.dataEngineOMASClient.createPortImplementation(changedPortImplementation.getUserId(), changedPortImplementation.getPortImplementation());
                        }
                    }
                    if ((changedPortAliases = this.connector.getChangedPortAliases(changesLastSynced, changesCutoff)) != null) {
                        for (Object changedPortAlias : changedPortAliases) {
                            this.dataEngineOMASClient.createPortAlias(changedPortAlias.getUserId(), changedPortAlias.getPortAlias());
                        }
                    }
                    if (log.isInfoEnabled()) {
                        log.info(" ... getting changed processes.");
                    }
                    if ((changedProcesses = this.connector.getChangedProcesses(changesLastSynced, changesCutoff)) != null) {
                        Object changedPortAlias;
                        changedPortAlias = changedProcesses.iterator();
                        while (changedPortAlias.hasNext()) {
                            DataEngineProcess changedProcess = (DataEngineProcess)changedPortAlias.next();
                            this.dataEngineOMASClient.createProcess(changedProcess.getUserId(), changedProcess.getProcess());
                        }
                        if (log.isInfoEnabled()) {
                            log.info(" ... completing process changes.");
                        }
                    }
                    if (log.isInfoEnabled()) {
                        log.info(" ... getting changed lineage mappings.");
                    }
                    if ((changedLineageMappings = this.connector.getChangedLineageMappings(changesLastSynced, changesCutoff)) != null) {
                        for (DataEngineLineageMappings changedLineageMapping : changedLineageMappings) {
                            this.dataEngineOMASClient.addLineageMappings(changedLineageMapping.getUserId(), new ArrayList(changedLineageMapping.getLineageMappings()));
                        }
                    }
                    this.connector.setChangesLastSynced(changesCutoff);
                    Thread.sleep(this.dataEngineProxyConfig.getPollIntervalInSeconds() * 1000);
                }
            }
            catch (InterruptedException e) {
                log.error("Thread was interrupted.", (Throwable)e);
            }
            catch (InvalidParameterException | PropertyServerException e) {
                log.error("Exception caught!", e);
                DataEngineConnectorErrorCode errorCode = DataEngineConnectorErrorCode.OMAS_CONNECTION_ERROR;
                String errorMessage = errorCode.getErrorMessageId() + errorCode.getFormattedErrorMessage(new String[0]);
                throw new OCFRuntimeException(errorCode.getHTTPErrorCode(), this.getClass().getName(), "ProcessPollThread::run", errorMessage, errorCode.getSystemAction(), errorCode.getUserAction(), e);
            }
            catch (UserNotAuthorizedException e) {
                log.error("Exception caught!", (Throwable)e);
                DataEngineConnectorErrorCode errorCode = DataEngineConnectorErrorCode.USER_NOT_AUTHORIZED;
                String errorMessage = errorCode.getErrorMessageId() + errorCode.getFormattedErrorMessage(new String[0]);
                throw new OCFRuntimeException(errorCode.getHTTPErrorCode(), this.getClass().getName(), "ProcessPollThread::run", errorMessage, errorCode.getSystemAction(), errorCode.getUserAction(), (Throwable)e);
            }
            catch (Exception e) {
                log.error("Fatal error occurred during processing.", (Throwable)e);
                continue;
            }
            break;
        }
    }
}

