/*
 * Decompiled with CFR 0.152.
 */
package org.odpi.openmetadata.accessservices.dataengine.server.intopic;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.odpi.openmetadata.accessservices.dataengine.event.DataEngineEventHeader;
import org.odpi.openmetadata.accessservices.dataengine.event.DataEngineRegistrationEvent;
import org.odpi.openmetadata.accessservices.dataengine.event.LineageMappingsEvent;
import org.odpi.openmetadata.accessservices.dataengine.event.PortAliasEvent;
import org.odpi.openmetadata.accessservices.dataengine.event.PortImplementationEvent;
import org.odpi.openmetadata.accessservices.dataengine.event.ProcessToPortListEvent;
import org.odpi.openmetadata.accessservices.dataengine.event.ProcessesEvent;
import org.odpi.openmetadata.accessservices.dataengine.ffdc.DataEngineErrorCode;
import org.odpi.openmetadata.accessservices.dataengine.model.LineageMapping;
import org.odpi.openmetadata.accessservices.dataengine.rest.ProcessListResponse;
import org.odpi.openmetadata.accessservices.dataengine.server.admin.DataEngineServicesInstance;
import org.odpi.openmetadata.accessservices.dataengine.server.handlers.DataEngineSchemaTypeHandler;
import org.odpi.openmetadata.accessservices.dataengine.server.service.DataEngineRESTServices;
import org.odpi.openmetadata.commonservices.ffdc.rest.GUIDResponse;
import org.odpi.openmetadata.commonservices.multitenant.ffdc.exceptions.NewInstanceException;
import org.odpi.openmetadata.frameworks.connectors.ffdc.InvalidParameterException;
import org.odpi.openmetadata.frameworks.connectors.ffdc.PropertyServerException;
import org.odpi.openmetadata.frameworks.connectors.ffdc.UserNotAuthorizedException;
import org.odpi.openmetadata.repositoryservices.auditlog.OMRSAuditLog;
import org.odpi.openmetadata.repositoryservices.auditlog.OMRSAuditLogRecordSeverity;
import org.odpi.openmetadata.repositoryservices.connectors.openmetadatatopic.OpenMetadataTopicListener;
import org.odpi.openmetadata.repositoryservices.connectors.stores.metadatacollectionstore.properties.instances.InstanceStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;

public class DataEngineInTopicProcessor
implements OpenMetadataTopicListener {
    private static final Logger log = LoggerFactory.getLogger(DataEngineServicesInstance.class);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    private final OMRSAuditLog auditLog;
    private DataEngineServicesInstance instance;
    private String serverName;
    private DataEngineRESTServices dataEngineRESTServices = new DataEngineRESTServices();

    public DataEngineInTopicProcessor(DataEngineServicesInstance instance, OMRSAuditLog auditLog) {
        this.auditLog = auditLog;
        this.instance = instance;
    }

    public void processEvent(String dataEngineEvent) {
        log.debug("Processing instance event", (Object)dataEngineEvent);
        if (dataEngineEvent == null) {
            log.debug("Null instance event - ignoring event");
        } else {
            try {
                this.serverName = this.instance.getServerName();
                DataEngineEventHeader dataEngineEventHeader = (DataEngineEventHeader)OBJECT_MAPPER.readValue(dataEngineEvent, DataEngineEventHeader.class);
                if (dataEngineEventHeader != null) {
                    switch (dataEngineEventHeader.getEventType()) {
                        case DATA_ENGINE_REGISTRATION_EVENT: {
                            this.processDataEngineRegistrationEvent(dataEngineEvent);
                            break;
                        }
                        case LINEAGE_MAPPINGS_EVENT: {
                            this.processLineageMappingsEvent(dataEngineEvent);
                            break;
                        }
                        case PORT_ALIAS_EVENT: {
                            this.processPortAliasEvent(dataEngineEvent);
                            break;
                        }
                        case PORT_IMPLEMENTATION_EVENT: {
                            this.processPortImplementationEvent(dataEngineEvent);
                            break;
                        }
                        case PROCESS_TO_PORT_LIST_EVENT: {
                            this.processProcessToPortListEvent(dataEngineEvent);
                            break;
                        }
                        case PROCESSES_EVENT: {
                            this.processProcessesEvent(dataEngineEvent);
                        }
                    }
                } else {
                    log.debug("Ignored instance event - null Data Engine event type");
                }
            }
            catch (JsonProcessingException | NewInstanceException e) {
                log.debug("Exception processing event from in Data Engine In Topic", e);
                DataEngineErrorCode errorCode = DataEngineErrorCode.PROCESS_EVENT_EXCEPTION;
                this.auditLog.logException("process Data Engine inTopic Event", errorCode.getErrorMessageId(), OMRSAuditLogRecordSeverity.EXCEPTION, errorCode.getFormattedErrorMessage(new String[]{dataEngineEvent, e.getMessage()}), e.getMessage(), errorCode.getSystemAction(), errorCode.getUserAction(), e);
            }
        }
    }

    private void processDataEngineRegistrationEvent(String dataEngineEvent) {
        String methodName = "processDataEngineRegistrationEvent";
        try {
            DataEngineRegistrationEvent dataEngineRegistrationEvent = (DataEngineRegistrationEvent)OBJECT_MAPPER.readValue(dataEngineEvent, DataEngineRegistrationEvent.class);
            this.instance.getDataEngineRegistrationHandler().createExternalDataEngine(dataEngineRegistrationEvent.getUserId(), dataEngineRegistrationEvent.getSoftwareServerCapability());
        }
        catch (JsonProcessingException | InvalidParameterException | PropertyServerException | UserNotAuthorizedException e) {
            log.debug("Exception in parsing DataEngineRegistrationEvent from in Data Engine In Topic", e);
            DataEngineErrorCode errorCode = DataEngineErrorCode.PARSE_EVENT_EXCEPTION;
            this.auditLog.logException("processDataEngineRegistrationEvent", errorCode.getErrorMessageId(), OMRSAuditLogRecordSeverity.EXCEPTION, errorCode.getFormattedErrorMessage(new String[]{dataEngineEvent, e.getMessage()}), e.getMessage(), errorCode.getSystemAction(), errorCode.getUserAction(), e);
        }
    }

    private void processLineageMappingsEvent(String dataEngineEvent) {
        String methodName = "processLineageMappingsEvent";
        try {
            LineageMappingsEvent lineageMappingsEvent = (LineageMappingsEvent)OBJECT_MAPPER.readValue(dataEngineEvent, LineageMappingsEvent.class);
            log.debug("Calling method: {}", (Object)"processLineageMappingsEvent");
            if (CollectionUtils.isEmpty((Collection)lineageMappingsEvent.getLineageMappings())) {
                return;
            }
            this.addLineageMappings(lineageMappingsEvent.getUserId(), lineageMappingsEvent.getLineageMappings(), lineageMappingsEvent.getExternalSourceName());
        }
        catch (JsonProcessingException e) {
            log.debug("Exception in parsing LineageMappingsEvent from in Data Engine In Topic", (Throwable)e);
            DataEngineErrorCode errorCode = DataEngineErrorCode.PARSE_EVENT_EXCEPTION;
            this.auditLog.logException("processLineageMappingsEvent", errorCode.getErrorMessageId(), OMRSAuditLogRecordSeverity.EXCEPTION, errorCode.getFormattedErrorMessage(new String[]{dataEngineEvent, e.getMessage()}), e.getMessage(), errorCode.getSystemAction(), errorCode.getUserAction(), (Throwable)e);
        }
    }

    private void processPortAliasEvent(String dataEngineEvent) {
        String methodName = "processPortAliasEvent";
        try {
            PortAliasEvent portAliasEvent = (PortAliasEvent)OBJECT_MAPPER.readValue(dataEngineEvent, PortAliasEvent.class);
            log.debug("Calling method: {}", (Object)"processPortAliasEvent");
            this.dataEngineRESTServices.createOrUpdatePortAliasWithDelegation(portAliasEvent.getUserId(), this.serverName, portAliasEvent.getPort(), portAliasEvent.getExternalSourceName());
        }
        catch (JsonProcessingException | InvalidParameterException | PropertyServerException | UserNotAuthorizedException e) {
            log.debug("Exception in parsing PortAliasEvent from in Data Engine In Topic", e);
            DataEngineErrorCode errorCode = DataEngineErrorCode.PARSE_EVENT_EXCEPTION;
            this.auditLog.logException("processPortAliasEvent", errorCode.getErrorMessageId(), OMRSAuditLogRecordSeverity.EXCEPTION, errorCode.getFormattedErrorMessage(new String[]{dataEngineEvent, e.getMessage()}), e.getMessage(), errorCode.getSystemAction(), errorCode.getUserAction(), e);
        }
    }

    private void processPortImplementationEvent(String dataEngineEvent) {
        String methodName = "processPortImplementationEvent";
        try {
            PortImplementationEvent portImplementationEvent = (PortImplementationEvent)OBJECT_MAPPER.readValue(dataEngineEvent, PortImplementationEvent.class);
            log.debug("Calling method: {}", (Object)"processPortImplementationEvent");
            this.dataEngineRESTServices.createOrUpdatePortImplementationWithSchemaType(portImplementationEvent.getUserId(), this.serverName, portImplementationEvent.getPortImplementation(), portImplementationEvent.getExternalSourceName());
        }
        catch (JsonProcessingException | InvalidParameterException | PropertyServerException | UserNotAuthorizedException e) {
            log.debug("Exception in parsing PortImplementationEvent from in Data Engine In Topic", e);
            DataEngineErrorCode errorCode = DataEngineErrorCode.PARSE_EVENT_EXCEPTION;
            this.auditLog.logException("processPortImplementationEvent", errorCode.getErrorMessageId(), OMRSAuditLogRecordSeverity.EXCEPTION, errorCode.getFormattedErrorMessage(new String[]{dataEngineEvent, e.getMessage()}), e.getMessage(), errorCode.getSystemAction(), errorCode.getUserAction(), e);
        }
    }

    private void processProcessToPortListEvent(String dataEngineEvent) {
        String methodName = "processProcessToPortListEvent";
        try {
            ProcessToPortListEvent processToPortListEvent = (ProcessToPortListEvent)OBJECT_MAPPER.readValue(dataEngineEvent, ProcessToPortListEvent.class);
            log.debug("Calling method: {}", (Object)"processProcessToPortListEvent");
            for (String portGUID : processToPortListEvent.getPorts()) {
                this.instance.getProcessHandler().addProcessPortRelationship(processToPortListEvent.getUserId(), processToPortListEvent.getProcessGUID(), portGUID, processToPortListEvent.getExternalSourceName());
            }
        }
        catch (JsonProcessingException | InvalidParameterException | PropertyServerException | UserNotAuthorizedException e) {
            log.debug("Exception in parsing ProcessToPortListEvent from in Data Engine In Topic", e);
            DataEngineErrorCode errorCode = DataEngineErrorCode.PARSE_EVENT_EXCEPTION;
            this.auditLog.logException("processProcessToPortListEvent", errorCode.getErrorMessageId(), OMRSAuditLogRecordSeverity.EXCEPTION, errorCode.getFormattedErrorMessage(new String[]{dataEngineEvent, e.getMessage()}), e.getMessage(), errorCode.getSystemAction(), errorCode.getUserAction(), e);
        }
    }

    private void processProcessesEvent(String dataEngineEvent) {
        String methodName = "processProcessesEvent";
        try {
            ProcessesEvent processesEvent = (ProcessesEvent)OBJECT_MAPPER.readValue(dataEngineEvent, ProcessesEvent.class);
            this.createOrUpdateProcesses(processesEvent);
            log.debug("Calling method: {}", (Object)"processProcessesEvent");
        }
        catch (JsonProcessingException e) {
            log.debug("Exception in parsing event from in Data Engine In Topic", (Throwable)e);
            DataEngineErrorCode errorCode = DataEngineErrorCode.PARSE_EVENT_EXCEPTION;
            this.auditLog.logException("processProcessesEvent", errorCode.getErrorMessageId(), OMRSAuditLogRecordSeverity.EXCEPTION, errorCode.getFormattedErrorMessage(new String[]{dataEngineEvent, e.getMessage()}), e.getMessage(), errorCode.getSystemAction(), errorCode.getUserAction(), (Throwable)e);
        }
    }

    private void addLineageMappings(String userId, List<LineageMapping> lineageMappings, String externalSouceName) {
        DataEngineSchemaTypeHandler dataEngineSchemaTypeHandler = this.instance.getDataEngineSchemaTypeHandler();
        lineageMappings.parallelStream().forEach(lineageMapping -> {
            try {
                dataEngineSchemaTypeHandler.addLineageMappingRelationship(userId, lineageMapping.getSourceAttribute(), lineageMapping.getTargetAttribute(), externalSouceName);
            }
            catch (InvalidParameterException | PropertyServerException | UserNotAuthorizedException e) {
                log.debug("Exception in parsing event from in Data Engine In Topic", e);
                DataEngineErrorCode errorCode = DataEngineErrorCode.PROCESS_EVENT_EXCEPTION;
                this.auditLog.logException("add lineage mappings from LineageMappingsEvent", errorCode.getErrorMessageId(), OMRSAuditLogRecordSeverity.EXCEPTION, errorCode.getFormattedErrorMessage(new String[]{lineageMappings.toString(), e.getMessage()}), e.getMessage(), errorCode.getSystemAction(), errorCode.getUserAction(), e);
            }
        });
    }

    private void createOrUpdateProcesses(ProcessesEvent processesEvent) {
        String methodName = "createOrUpdateProcesses";
        log.debug("Calling method: {}", (Object)"createOrUpdateProcesses");
        ProcessListResponse response = new ProcessListResponse();
        if (processesEvent == null || CollectionUtils.isEmpty((Collection)processesEvent.getProcesses())) {
            // empty if block
        }
        List<GUIDResponse> guidResponses = this.dataEngineRESTServices.createOrUpdateProcesses(processesEvent.getUserId(), this.serverName, processesEvent.getProcesses(), processesEvent.getExternalSourceName());
        Predicate<GUIDResponse> processStatusPredicate = guidResponse -> guidResponse.getRelatedHTTPCode() == HttpStatus.OK.value();
        Map<Boolean, List<GUIDResponse>> mappedResponses = guidResponses.parallelStream().collect(Collectors.partitioningBy(processStatusPredicate));
        List<GUIDResponse> createdProcesses = this.dataEngineRESTServices.getGuidResponses(response, mappedResponses.get(Boolean.TRUE));
        this.dataEngineRESTServices.handleFailedProcesses(response, mappedResponses.get(Boolean.FALSE));
        createdProcesses.parallelStream().forEach(guidResponse -> this.dataEngineRESTServices.updateProcessStatus(processesEvent.getUserId(), this.serverName, (GUIDResponse)guidResponse, InstanceStatus.ACTIVE));
    }
}

