/*
 * Decompiled with CFR 0.152.
 */
package org.odpi.openmetadata.governanceservers.dataengineproxy.processor;

import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.odpi.openmetadata.accessservices.dataengine.client.DataEngineClient;
import org.odpi.openmetadata.accessservices.dataengine.model.DataFile;
import org.odpi.openmetadata.accessservices.dataengine.model.Database;
import org.odpi.openmetadata.accessservices.dataengine.model.Engine;
import org.odpi.openmetadata.accessservices.dataengine.model.LineageMapping;
import org.odpi.openmetadata.accessservices.dataengine.model.Process;
import org.odpi.openmetadata.accessservices.dataengine.model.ProcessHierarchy;
import org.odpi.openmetadata.accessservices.dataengine.model.RelationalTable;
import org.odpi.openmetadata.accessservices.dataengine.model.SchemaType;
import org.odpi.openmetadata.adminservices.configuration.properties.DataEngineProxyConfig;
import org.odpi.openmetadata.frameworks.connectors.ffdc.ConnectorCheckedException;
import org.odpi.openmetadata.frameworks.connectors.ffdc.InvalidParameterException;
import org.odpi.openmetadata.frameworks.connectors.ffdc.OCFRuntimeException;
import org.odpi.openmetadata.frameworks.connectors.ffdc.PropertyServerException;
import org.odpi.openmetadata.frameworks.connectors.ffdc.UserNotAuthorizedException;
import org.odpi.openmetadata.governanceservers.dataengineproxy.auditlog.DataEngineProxyAuditCode;
import org.odpi.openmetadata.governanceservers.dataengineproxy.auditlog.DataEngineProxyErrorCode;
import org.odpi.openmetadata.governanceservers.dataengineproxy.connectors.DataEngineConnectorBase;
import org.odpi.openmetadata.repositoryservices.auditlog.OMRSAuditLog;

public class DataEngineProxyService
implements Runnable {
    private final OMRSAuditLog auditLog;
    private final DataEngineProxyConfig dataEngineProxyConfig;
    private final DataEngineClient dataEngineOMASClient;
    private final DataEngineConnectorBase connector;
    private final String userId;
    private final AtomicBoolean running = new AtomicBoolean(false);

    public void initialize() throws ConnectorCheckedException, UserNotAuthorizedException, InvalidParameterException, PropertyServerException {
        String methodName = "start";
        this.auditLog.logMessage("start", DataEngineProxyAuditCode.INIT_POLLING.getMessageDefinition());
        if (this.connector != null) {
            Engine dataEngineDetails = this.connector.getDataEngineDetails();
            this.dataEngineOMASClient.createExternalDataEngine(this.userId, dataEngineDetails);
            this.dataEngineOMASClient.setExternalSourceName(dataEngineDetails.getQualifiedName());
            if (this.connector.requiresPolling()) {
                Thread worker = new Thread(this);
                worker.setName(DataEngineProxyService.class.getName());
                worker.start();
            }
        }
    }

    public void stop() {
        this.running.set(false);
    }

    public DataEngineProxyService(DataEngineConnectorBase connector, String userId, DataEngineProxyConfig dataEngineProxyConfig, DataEngineClient dataEngineOMASClient, OMRSAuditLog auditLog) {
        this.connector = connector;
        this.userId = userId;
        this.dataEngineProxyConfig = dataEngineProxyConfig;
        this.dataEngineOMASClient = dataEngineOMASClient;
        this.auditLog = auditLog;
    }

    @Override
    public void run() {
        String methodName = "ProcessPollThread::run";
        this.running.set(true);
        while (this.running.get()) {
            try {
                Date changesLastSynced = this.connector.getChangesLastSynced();
                Date oldestSinceSync = this.connector.getOldestChangeSince(changesLastSynced);
                Date changesCutoff = new Date();
                if (oldestSinceSync == null) {
                    oldestSinceSync = changesLastSynced;
                } else {
                    long window = oldestSinceSync.getTime() + (long)this.dataEngineProxyConfig.getBatchWindowInSeconds() * 1000L;
                    long now = changesCutoff.getTime();
                    changesCutoff = new Date(Math.min(window, now));
                }
                this.ensureSourceNameIsSet();
                this.auditLog.logMessage("ProcessPollThread::run", DataEngineProxyAuditCode.POLLING.getMessageDefinition(oldestSinceSync == null ? "0" : oldestSinceSync.toString(), changesCutoff.toString()));
                this.upsertSchemaTypes(oldestSinceSync, changesCutoff);
                this.upsertDataStores(oldestSinceSync, changesCutoff);
                this.upsertProcesses(oldestSinceSync, changesCutoff);
                this.upsertProcessHierarchies(oldestSinceSync, changesCutoff);
                this.upsertLineageMappings(oldestSinceSync, changesCutoff);
                this.connector.setChangesLastSynced(changesCutoff);
                this.sleep();
            }
            catch (PropertyServerException e) {
                this.auditLog.logException("ProcessPollThread::run", DataEngineProxyAuditCode.RUNTIME_EXCEPTION.getMessageDefinition(), (Throwable)e);
                this.sleep();
            }
            catch (ConnectorCheckedException | InvalidParameterException | UserNotAuthorizedException e) {
                this.auditLog.logException("ProcessPollThread::run", DataEngineProxyAuditCode.RUNTIME_EXCEPTION.getMessageDefinition(), e);
                throw new OCFRuntimeException(DataEngineProxyErrorCode.UNKNOWN_ERROR.getMessageDefinition(), this.getClass().getName(), "ProcessPollThread::run", e);
            }
        }
    }

    public void load() {
        String methodName = "load";
        Date now = Date.from(Instant.now());
        try {
            this.ensureSourceNameIsSet();
            this.upsertSchemaTypes(now, now);
            this.upsertDataStores(now, now);
            this.upsertProcesses(now, now);
            this.upsertProcessHierarchies(now, now);
            this.upsertLineageMappings(now, now);
            this.upsertProcessingState(now);
        }
        catch (ConnectorCheckedException | InvalidParameterException | PropertyServerException | UserNotAuthorizedException e) {
            this.auditLog.logException("load", DataEngineProxyAuditCode.RUNTIME_EXCEPTION.getMessageDefinition(), e);
        }
    }

    public void pollProcessChanges(String processId) {
    }

    private void upsertProcessingState(Date changesCutoff) throws PropertyServerException, InvalidParameterException, UserNotAuthorizedException, ConnectorCheckedException {
        String processingStateKey = this.connector.getProcessingStateSyncKey();
        Map<String, Long> properties = Collections.singletonMap(processingStateKey, changesCutoff.getTime());
        this.dataEngineOMASClient.upsertProcessingState(this.userId, properties);
    }

    private Date getProcessingState() throws PropertyServerException {
        String processingStateKey = this.connector.getProcessingStateSyncKey();
        Map processingState = this.dataEngineOMASClient.getProcessingState(this.userId);
        if (MapUtils.isNotEmpty((Map)processingState)) {
            Long lastSync = (Long)processingState.get(processingStateKey);
            return new Date(lastSync);
        }
        return null;
    }

    private void sleep() {
        try {
            Thread.sleep((long)this.dataEngineProxyConfig.getPollIntervalInSeconds() * 1000L);
        }
        catch (InterruptedException e) {
            this.auditLog.logException("sleep", DataEngineProxyAuditCode.RUNTIME_EXCEPTION.getMessageDefinition(), (Throwable)e);
        }
    }

    private void ensureSourceNameIsSet() {
        if (this.dataEngineOMASClient.getExternalSourceName() == null) {
            this.dataEngineOMASClient.setExternalSourceName(this.connector.getDataEngineDetails().getQualifiedName());
        }
    }

    private void upsertSchemaTypes(Date changesLastSynced, Date changesCutoff) throws InvalidParameterException, PropertyServerException, UserNotAuthorizedException, ConnectorCheckedException {
        String methodName = "upsertSchemaTypes";
        String type = "SchemaTypes";
        this.auditLog.logMessage("upsertSchemaTypes", DataEngineProxyAuditCode.POLLING_TYPE_START.getMessageDefinition("SchemaTypes"));
        List changedSchemaTypes = this.connector.getChangedSchemaTypes(changesLastSynced, changesCutoff);
        if (changedSchemaTypes != null) {
            for (SchemaType changedSchemaType : changedSchemaTypes) {
                this.dataEngineOMASClient.createOrUpdateSchemaType(this.userId, changedSchemaType);
            }
        }
        this.auditLog.logMessage("upsertSchemaTypes", DataEngineProxyAuditCode.POLLING_TYPE_FINISH.getMessageDefinition("SchemaTypes"));
    }

    private void upsertDataStores(Date changesLastSynced, Date changesCutoff) throws InvalidParameterException, PropertyServerException, UserNotAuthorizedException, ConnectorCheckedException {
        String methodName = "upsertDataStores";
        String type = "DataStores";
        this.auditLog.logMessage("upsertDataStores", DataEngineProxyAuditCode.POLLING_TYPE_START.getMessageDefinition("DataStores"));
        List changedDataStores = this.connector.getChangedDataStores(changesLastSynced, changesCutoff);
        if (CollectionUtils.isNotEmpty((Collection)changedDataStores)) {
            for (Object changedDataStore : changedDataStores) {
                if (changedDataStore instanceof DataFile) {
                    this.dataEngineOMASClient.upsertDataFile(this.userId, (DataFile)changedDataStore);
                }
                if (!(changedDataStore instanceof Database)) continue;
                Database database = (Database)changedDataStore;
                if (database.getIncomplete()) {
                    this.dataEngineOMASClient.upsertDatabase(this.userId, database);
                    continue;
                }
                if (database.getDatabaseSchema().getIncomplete()) {
                    this.dataEngineOMASClient.upsertDatabaseSchema(this.userId, database.getDatabaseSchema(), database.getQualifiedName());
                }
                this.dataEngineOMASClient.upsertRelationalTable(this.userId, (RelationalTable)database.getTables().get(0), database.getDatabaseSchema().getQualifiedName());
            }
        }
        this.auditLog.logMessage("upsertDataStores", DataEngineProxyAuditCode.POLLING_TYPE_FINISH.getMessageDefinition("DataStores"));
    }

    private void upsertProcesses(Date changesLastSynced, Date changesCutoff) throws InvalidParameterException, PropertyServerException, UserNotAuthorizedException, ConnectorCheckedException {
        String methodName = "upsertProcesses";
        String type = "Processes";
        this.auditLog.logMessage("upsertProcesses", DataEngineProxyAuditCode.POLLING_TYPE_START.getMessageDefinition("Processes"));
        List changedProcesses = this.connector.getChangedProcesses(changesLastSynced, changesCutoff);
        if (changedProcesses != null && !changedProcesses.isEmpty()) {
            for (Process changedProcess : changedProcesses) {
                this.dataEngineOMASClient.createOrUpdateProcess(this.userId, changedProcess);
                List lineageMappings = changedProcess.getLineageMappings();
                if (lineageMappings == null) continue;
                this.dataEngineOMASClient.addLineageMappings(this.userId, lineageMappings);
            }
        }
        this.auditLog.logMessage("upsertProcesses", DataEngineProxyAuditCode.POLLING_TYPE_FINISH.getMessageDefinition("Processes"));
    }

    private void upsertProcessHierarchies(Date changesLastSynced, Date changesCutoff) throws InvalidParameterException, PropertyServerException, UserNotAuthorizedException, ConnectorCheckedException {
        String methodName = "upsertProcessHierarchies";
        String type = "ProcessHierarchies";
        this.auditLog.logMessage("upsertProcessHierarchies", DataEngineProxyAuditCode.POLLING_TYPE_START.getMessageDefinition("ProcessHierarchies"));
        List changedProcessHierarchies = this.connector.getChangedProcessHierarchies(changesLastSynced, changesCutoff);
        if (changedProcessHierarchies != null) {
            for (ProcessHierarchy changedProcessHierarchy : changedProcessHierarchies) {
                this.dataEngineOMASClient.addProcessHierarchy(this.userId, changedProcessHierarchy);
            }
        }
        this.auditLog.logMessage("upsertProcessHierarchies", DataEngineProxyAuditCode.POLLING_TYPE_FINISH.getMessageDefinition("ProcessHierarchies"));
    }

    private void upsertLineageMappings(Date changesLastSynced, Date changesCutoff) throws InvalidParameterException, PropertyServerException, UserNotAuthorizedException, ConnectorCheckedException {
        String methodName = "upsertLineageMappings";
        String type = "LineageMappings";
        this.auditLog.logMessage("upsertLineageMappings", DataEngineProxyAuditCode.POLLING_TYPE_START.getMessageDefinition("LineageMappings"));
        List changedLineageMappings = this.connector.getChangedLineageMappings(changesLastSynced, changesCutoff);
        if (CollectionUtils.isNotEmpty((Collection)changedLineageMappings)) {
            if (this.dataEngineProxyConfig.isEventsClientEnabled()) {
                for (LineageMapping changedLineageMapping : changedLineageMappings) {
                    this.dataEngineOMASClient.addLineageMappings(this.userId, Collections.singletonList(changedLineageMapping));
                }
            } else {
                this.dataEngineOMASClient.addLineageMappings(this.userId, changedLineageMappings);
            }
        }
        this.auditLog.logMessage("upsertLineageMappings", DataEngineProxyAuditCode.POLLING_TYPE_FINISH.getMessageDefinition("LineageMappings"));
    }
}

