/*
 * Decompiled with CFR 0.152.
 */
package org.odpi.openmetadata.governanceservers.dataengineproxy.processor;

import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.odpi.openmetadata.accessservices.dataengine.client.DataEngineClient;
import org.odpi.openmetadata.accessservices.dataengine.model.LineageMapping;
import org.odpi.openmetadata.accessservices.dataengine.model.Process;
import org.odpi.openmetadata.accessservices.dataengine.model.ProcessHierarchy;
import org.odpi.openmetadata.accessservices.dataengine.model.SchemaType;
import org.odpi.openmetadata.accessservices.dataengine.model.SoftwareServerCapability;
import org.odpi.openmetadata.adminservices.configuration.properties.DataEngineProxyConfig;
import org.odpi.openmetadata.frameworks.connectors.ffdc.ConnectorCheckedException;
import org.odpi.openmetadata.frameworks.connectors.ffdc.InvalidParameterException;
import org.odpi.openmetadata.frameworks.connectors.ffdc.OCFRuntimeException;
import org.odpi.openmetadata.frameworks.connectors.ffdc.PropertyServerException;
import org.odpi.openmetadata.frameworks.connectors.ffdc.UserNotAuthorizedException;
import org.odpi.openmetadata.governanceservers.dataengineproxy.auditlog.DataEngineProxyAuditCode;
import org.odpi.openmetadata.governanceservers.dataengineproxy.auditlog.DataEngineProxyErrorCode;
import org.odpi.openmetadata.governanceservers.dataengineproxy.connectors.DataEngineConnectorBase;
import org.odpi.openmetadata.repositoryservices.auditlog.OMRSAuditLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataEngineProxyChangePoller
implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(DataEngineProxyChangePoller.class);
    private OMRSAuditLog auditLog;
    private DataEngineProxyConfig dataEngineProxyConfig;
    private DataEngineClient dataEngineOMASClient;
    private DataEngineConnectorBase connector;
    private String userId;
    private final AtomicBoolean running = new AtomicBoolean(false);

    public void start() {
        Thread worker = new Thread(this);
        worker.start();
    }

    public void stop() {
        this.running.set(false);
    }

    public DataEngineProxyChangePoller(DataEngineConnectorBase connector, String userId, DataEngineProxyConfig dataEngineProxyConfig, DataEngineClient dataEngineOMASClient, OMRSAuditLog auditLog) {
        String methodName = "DataEngineProxyChangePoller";
        this.connector = connector;
        this.userId = userId;
        this.dataEngineProxyConfig = dataEngineProxyConfig;
        this.dataEngineOMASClient = dataEngineOMASClient;
        this.auditLog = auditLog;
        this.auditLog.logMessage("DataEngineProxyChangePoller", DataEngineProxyAuditCode.INIT_POLLING.getMessageDefinition());
        if (connector != null) {
            try {
                SoftwareServerCapability dataEngineDetails = connector.getDataEngineDetails();
                dataEngineOMASClient.createExternalDataEngine(userId, dataEngineDetails);
                dataEngineOMASClient.setExternalSourceName(dataEngineDetails.getQualifiedName());
            }
            catch (ConnectorCheckedException | InvalidParameterException | PropertyServerException e) {
                this.auditLog.logException("DataEngineProxyChangePoller", DataEngineProxyAuditCode.OMAS_CONNECTION_ERROR.getMessageDefinition(), e);
            }
            catch (UserNotAuthorizedException e) {
                this.auditLog.logMessage("DataEngineProxyChangePoller", DataEngineProxyAuditCode.USER_NOT_AUTHORIZED.getMessageDefinition("setup external data engine"));
            }
        }
    }

    @Override
    public void run() {
        String methodName = "ProcessPollThread::run";
        this.running.set(true);
        while (this.running.get()) {
            try {
                Date changesLastSynced = this.connector.getChangesLastSynced();
                Date oldestSinceSync = this.connector.getOldestChangeSince(changesLastSynced);
                Date changesCutoff = new Date();
                if (oldestSinceSync == null) {
                    oldestSinceSync = changesLastSynced;
                } else {
                    long window = oldestSinceSync.getTime() + (long)this.dataEngineProxyConfig.getBatchWindowInSeconds() * 1000L;
                    long now = changesCutoff.getTime();
                    changesCutoff = new Date(Math.min(window, now));
                }
                this.ensureSourceNameIsSet();
                this.auditLog.logMessage("ProcessPollThread::run", DataEngineProxyAuditCode.POLLING.getMessageDefinition(oldestSinceSync == null ? "0" : oldestSinceSync.toString(), changesCutoff.toString()));
                this.upsertSchemaTypes(oldestSinceSync, changesCutoff);
                this.upsertProcesses(oldestSinceSync, changesCutoff);
                this.upsertProcessHierarchies(oldestSinceSync, changesCutoff);
                this.upsertLineageMappings(oldestSinceSync, changesCutoff);
                this.connector.setChangesLastSynced(changesCutoff);
                Thread.sleep((long)this.dataEngineProxyConfig.getPollIntervalInSeconds() * 1000L);
            }
            catch (ConnectorCheckedException | InvalidParameterException | PropertyServerException e) {
                this.auditLog.logException("ProcessPollThread::run", DataEngineProxyAuditCode.OMAS_CONNECTION_ERROR.getMessageDefinition(), e);
            }
            catch (UserNotAuthorizedException e) {
                this.auditLog.logMessage("ProcessPollThread::run", DataEngineProxyAuditCode.USER_NOT_AUTHORIZED.getMessageDefinition("send changes"));
            }
            catch (Exception e) {
                throw new OCFRuntimeException(DataEngineProxyErrorCode.UNKNOWN_ERROR.getMessageDefinition(), this.getClass().getName(), "ProcessPollThread::run", (Throwable)e);
            }
        }
    }

    private void ensureSourceNameIsSet() {
        if (this.dataEngineOMASClient.getExternalSourceName() == null) {
            this.dataEngineOMASClient.setExternalSourceName(this.connector.getDataEngineDetails().getQualifiedName());
        }
    }

    private void upsertSchemaTypes(Date changesLastSynced, Date changesCutoff) throws InvalidParameterException, PropertyServerException, UserNotAuthorizedException, ConnectorCheckedException {
        String methodName = "upsertSchemaTypes";
        String type = "SchemaTypes";
        this.auditLog.logMessage("upsertSchemaTypes", DataEngineProxyAuditCode.POLLING_TYPE_START.getMessageDefinition("SchemaTypes"));
        List changedSchemaTypes = this.connector.getChangedSchemaTypes(changesLastSynced, changesCutoff);
        if (changedSchemaTypes != null) {
            for (SchemaType changedSchemaType : changedSchemaTypes) {
                this.dataEngineOMASClient.createOrUpdateSchemaType(this.userId, changedSchemaType);
            }
        }
        this.auditLog.logMessage("upsertSchemaTypes", DataEngineProxyAuditCode.POLLING_TYPE_FINISH.getMessageDefinition("SchemaTypes"));
    }

    private void upsertProcesses(Date changesLastSynced, Date changesCutoff) throws InvalidParameterException, PropertyServerException, UserNotAuthorizedException, ConnectorCheckedException {
        String methodName = "upsertProcesses";
        String type = "Processes";
        this.auditLog.logMessage("upsertProcesses", DataEngineProxyAuditCode.POLLING_TYPE_START.getMessageDefinition("Processes"));
        List changedProcesses = this.connector.getChangedProcesses(changesLastSynced, changesCutoff);
        if (changedProcesses != null && !changedProcesses.isEmpty()) {
            for (Process changedProcess : changedProcesses) {
                this.dataEngineOMASClient.createOrUpdateProcess(this.userId, changedProcess);
                List lineageMappings = changedProcess.getLineageMappings();
                if (lineageMappings == null) continue;
                this.dataEngineOMASClient.addLineageMappings(this.userId, lineageMappings);
            }
        }
        this.auditLog.logMessage("upsertProcesses", DataEngineProxyAuditCode.POLLING_TYPE_FINISH.getMessageDefinition("Processes"));
    }

    private void upsertProcessHierarchies(Date changesLastSynced, Date changesCutoff) throws InvalidParameterException, PropertyServerException, UserNotAuthorizedException, ConnectorCheckedException {
        String methodName = "upsertProcessHierarchies";
        String type = "ProcessHierarchies";
        this.auditLog.logMessage("upsertProcessHierarchies", DataEngineProxyAuditCode.POLLING_TYPE_START.getMessageDefinition("ProcessHierarchies"));
        List changedProcessHierarchies = this.connector.getChangedProcessHierarchies(changesLastSynced, changesCutoff);
        if (changedProcessHierarchies != null) {
            for (ProcessHierarchy changedProcessHierarchy : changedProcessHierarchies) {
                this.dataEngineOMASClient.addProcessHierarchy(this.userId, changedProcessHierarchy);
            }
        }
        this.auditLog.logMessage("upsertProcessHierarchies", DataEngineProxyAuditCode.POLLING_TYPE_FINISH.getMessageDefinition("ProcessHierarchies"));
    }

    private void upsertLineageMappings(Date changesLastSynced, Date changesCutoff) throws InvalidParameterException, PropertyServerException, UserNotAuthorizedException, ConnectorCheckedException {
        String methodName = "upsertLineageMappings";
        String type = "LineageMappings";
        this.auditLog.logMessage("upsertLineageMappings", DataEngineProxyAuditCode.POLLING_TYPE_START.getMessageDefinition("LineageMappings"));
        List changedLineageMappings = this.connector.getChangedLineageMappings(changesLastSynced, changesCutoff);
        if (changedLineageMappings != null && changedLineageMappings.size() > 0) {
            if (this.dataEngineProxyConfig.isEventsClientEnabled()) {
                for (LineageMapping changedLineageMapping : changedLineageMappings) {
                    this.dataEngineOMASClient.addLineageMappings(this.userId, Collections.singletonList(changedLineageMapping));
                }
            } else {
                this.dataEngineOMASClient.addLineageMappings(this.userId, changedLineageMappings);
            }
        }
        this.auditLog.logMessage("upsertLineageMappings", DataEngineProxyAuditCode.POLLING_TYPE_FINISH.getMessageDefinition("LineageMappings"));
    }
}

