/*
 * Decompiled with CFR 0.152.
 */
package org.odpi.openmetadata.governanceservers.dataengineproxy.processor;

import java.util.Date;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.odpi.openmetadata.accessservices.dataengine.client.DataEngineImpl;
import org.odpi.openmetadata.accessservices.dataengine.model.PortAlias;
import org.odpi.openmetadata.accessservices.dataengine.model.PortImplementation;
import org.odpi.openmetadata.accessservices.dataengine.model.Process;
import org.odpi.openmetadata.accessservices.dataengine.model.SchemaType;
import org.odpi.openmetadata.accessservices.dataengine.model.SoftwareServerCapability;
import org.odpi.openmetadata.adminservices.configuration.properties.DataEngineProxyConfig;
import org.odpi.openmetadata.frameworks.connectors.ffdc.InvalidParameterException;
import org.odpi.openmetadata.frameworks.connectors.ffdc.PropertyServerException;
import org.odpi.openmetadata.frameworks.connectors.ffdc.UserNotAuthorizedException;
import org.odpi.openmetadata.governanceservers.dataengineproxy.auditlog.DataEngineProxyAuditCode;
import org.odpi.openmetadata.governanceservers.dataengineproxy.connectors.DataEngineConnectorBase;
import org.odpi.openmetadata.repositoryservices.auditlog.OMRSAuditLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataEngineProxyChangePoller
implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(DataEngineProxyChangePoller.class);
    private OMRSAuditLog auditLog;
    private DataEngineProxyConfig dataEngineProxyConfig;
    private DataEngineImpl dataEngineOMASClient;
    private DataEngineConnectorBase connector;
    private String userId;
    private final AtomicBoolean running = new AtomicBoolean(false);

    public void start() {
        Thread worker = new Thread(this);
        worker.start();
    }

    public void stop() {
        this.running.set(false);
    }

    public DataEngineProxyChangePoller(DataEngineConnectorBase connector, String userId, DataEngineProxyConfig dataEngineProxyConfig, DataEngineImpl dataEngineOMASClient, OMRSAuditLog auditLog) {
        String methodName = "DataEngineProxyChangePoller";
        this.connector = connector;
        this.userId = userId;
        this.dataEngineProxyConfig = dataEngineProxyConfig;
        this.dataEngineOMASClient = dataEngineOMASClient;
        this.auditLog = auditLog;
        if (connector != null) {
            try {
                SoftwareServerCapability dataEngineDetails = connector.getDataEngineDetails();
                dataEngineOMASClient.createExternalDataEngine(userId, dataEngineDetails);
                dataEngineOMASClient.setExternalSourceName(dataEngineDetails.getQualifiedName());
            }
            catch (InvalidParameterException | PropertyServerException e) {
                this.auditLog.logException("DataEngineProxyChangePoller", DataEngineProxyAuditCode.OMAS_CONNECTION_ERROR.getMessageDefinition(), e);
            }
            catch (UserNotAuthorizedException e) {
                this.auditLog.logMessage("DataEngineProxyChangePoller", DataEngineProxyAuditCode.USER_NOT_AUTHORIZED.getMessageDefinition("setup external data engine"));
            }
        }
    }

    @Override
    public void run() {
        String methodName = "ProcessPollThread::run";
        this.running.set(true);
        while (this.running.get()) {
            try {
                Date changesLastSynced = this.connector.getChangesLastSynced();
                Date changesCutoff = new Date();
                this.ensureSourceNameIsSet();
                this.auditLog.logMessage("ProcessPollThread::run", DataEngineProxyAuditCode.POLLING.getMessageDefinition(changesLastSynced == null ? "(all changes)" : changesLastSynced.toString()));
                this.upsertSchemaTypes(changesLastSynced, changesCutoff);
                this.upsertPortImplementations(changesLastSynced, changesCutoff);
                this.upsertPortAliases(changesLastSynced, changesCutoff);
                this.upsertProcesses(changesLastSynced, changesCutoff);
                this.upsertLineageMappings(changesLastSynced, changesCutoff);
                this.connector.setChangesLastSynced(changesCutoff);
                Thread.sleep((long)this.dataEngineProxyConfig.getPollIntervalInSeconds() * 1000L);
            }
            catch (InvalidParameterException | PropertyServerException e) {
                this.auditLog.logException("ProcessPollThread::run", DataEngineProxyAuditCode.OMAS_CONNECTION_ERROR.getMessageDefinition(), e);
            }
            catch (UserNotAuthorizedException e) {
                this.auditLog.logMessage("ProcessPollThread::run", DataEngineProxyAuditCode.USER_NOT_AUTHORIZED.getMessageDefinition("send changes"));
            }
            catch (Exception e) {
                this.auditLog.logException("ProcessPollThread::run", DataEngineProxyAuditCode.UNKNOWN_ERROR.getMessageDefinition(), (Throwable)e);
            }
        }
    }

    private void ensureSourceNameIsSet() {
        if (this.dataEngineOMASClient.getExternalSourceName() == null) {
            this.dataEngineOMASClient.setExternalSourceName(this.connector.getDataEngineDetails().getQualifiedName());
        }
    }

    private void upsertSchemaTypes(Date changesLastSynced, Date changesCutoff) throws InvalidParameterException, PropertyServerException, UserNotAuthorizedException {
        log.info(" ... getting changed schema types.");
        List changedSchemaTypes = this.connector.getChangedSchemaTypes(changesLastSynced, changesCutoff);
        if (changedSchemaTypes != null) {
            for (SchemaType changedSchemaType : changedSchemaTypes) {
                this.dataEngineOMASClient.createOrUpdateSchemaType(this.userId, changedSchemaType);
            }
            log.info(" ... completing schema type changes.");
        }
    }

    private void upsertPortImplementations(Date changesLastSynced, Date changesCutoff) throws InvalidParameterException, PropertyServerException, UserNotAuthorizedException {
        log.info(" ... getting changed port implementations.");
        List changedPortImplementations = this.connector.getChangedPortImplementations(changesLastSynced, changesCutoff);
        if (changedPortImplementations != null) {
            for (PortImplementation changedPortImplementation : changedPortImplementations) {
                this.dataEngineOMASClient.createOrUpdatePortImplementation(this.userId, changedPortImplementation);
            }
            log.info(" ... completing port implementation changes.");
        }
    }

    private void upsertPortAliases(Date changesLastSynced, Date changesCutoff) throws InvalidParameterException, PropertyServerException, UserNotAuthorizedException {
        log.info(" ... getting changed port aliases.");
        List changedPortAliases = this.connector.getChangedPortAliases(changesLastSynced, changesCutoff);
        if (changedPortAliases != null) {
            for (PortAlias changedPortAlias : changedPortAliases) {
                this.dataEngineOMASClient.createOrUpdatePortAlias(this.userId, changedPortAlias);
            }
            log.info(" ... completing port alias changes.");
        }
    }

    private void upsertProcesses(Date changesLastSynced, Date changesCutoff) throws InvalidParameterException, PropertyServerException, UserNotAuthorizedException {
        log.info(" ... getting changed processes.");
        List changedProcesses = this.connector.getChangedProcesses(changesLastSynced, changesCutoff);
        if (changedProcesses != null) {
            for (Process changedProcess : changedProcesses) {
                this.dataEngineOMASClient.createOrUpdateProcess(this.userId, changedProcess);
            }
            log.info(" ... completing process changes.");
        }
    }

    private void upsertLineageMappings(Date changesLastSynced, Date changesCutoff) throws InvalidParameterException, PropertyServerException, UserNotAuthorizedException {
        log.info(" ... getting changed lineage mappings.");
        List changedLineageMappings = this.connector.getChangedLineageMappings(changesLastSynced, changesCutoff);
        if (changedLineageMappings != null) {
            this.dataEngineOMASClient.addLineageMappings(this.userId, changedLineageMappings);
            log.info(" ... completing lineage mapping changes.");
        }
    }
}

