/*
 * Decompiled with CFR 0.152.
 */
package org.odpi.openmetadata.governanceservers.openlineage.listeners;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.odpi.openmetadata.accessservices.assetlineage.event.AssetLineageEventHeader;
import org.odpi.openmetadata.accessservices.assetlineage.event.LineageEntityEvent;
import org.odpi.openmetadata.accessservices.assetlineage.event.LineageEvent;
import org.odpi.openmetadata.accessservices.assetlineage.event.LineageRelationshipEvent;
import org.odpi.openmetadata.governanceservers.openlineage.auditlog.OpenLineageServerAuditCode;
import org.odpi.openmetadata.governanceservers.openlineage.services.StoringServices;
import org.odpi.openmetadata.repositoryservices.auditlog.OMRSAuditLog;
import org.odpi.openmetadata.repositoryservices.auditlog.OMRSAuditLogRecordSeverity;
import org.odpi.openmetadata.repositoryservices.connectors.openmetadatatopic.OpenMetadataTopicListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OpenLineageInTopicListener
implements OpenMetadataTopicListener {
    private static final Logger log = LoggerFactory.getLogger(OpenLineageInTopicListener.class);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    private final OMRSAuditLog auditLog;
    private StoringServices storingServices;

    public OpenLineageInTopicListener(StoringServices storingServices, OMRSAuditLog auditLog) {
        this.storingServices = storingServices;
        this.auditLog = auditLog;
    }

    public void processEvent(String assetLineageEvent) {
        try {
            log.debug("Started processing OpenLineageEvent {}", (Object)assetLineageEvent);
            if (!assetLineageEvent.isEmpty()) {
                this.processEventBasedOnType(assetLineageEvent);
            }
        }
        catch (JsonProcessingException e) {
            this.logException(assetLineageEvent, (Exception)((Object)e));
        }
        catch (Throwable e) {
            log.error("Exception processing the in topic event", e);
            OpenLineageServerAuditCode auditCode = OpenLineageServerAuditCode.PROCESS_EVENT_EXCEPTION;
            this.auditLog.logException("processEvent", auditCode.getLogMessageId(), auditCode.getSeverity(), auditCode.getFormattedLogMessage(assetLineageEvent, e.getMessage()), e.getMessage(), auditCode.getSystemAction(), auditCode.getUserAction(), e);
        }
    }

    private void processEventBasedOnType(String assetLineageEvent) throws JsonProcessingException {
        AssetLineageEventHeader assetLineageEventHeader = (AssetLineageEventHeader)OBJECT_MAPPER.readValue(assetLineageEvent, AssetLineageEventHeader.class);
        if (assetLineageEventHeader != null) {
            switch (assetLineageEventHeader.getAssetLineageEventType()) {
                case PROCESS_CONTEXT_EVENT: 
                case GLOSSARY_TERM_CONTEXT_EVENT: 
                case CLASSIFICATION_CONTEXT_EVENT: {
                    LineageEvent lineageEvent = (LineageEvent)OBJECT_MAPPER.readValue(assetLineageEvent, LineageEvent.class);
                    this.storingServices.addEntity(lineageEvent);
                    break;
                }
                case NEW_RELATIONSHIP_EVENT: {
                    LineageRelationshipEvent lineageRelationshipEvent = (LineageRelationshipEvent)OBJECT_MAPPER.readValue(assetLineageEvent, LineageRelationshipEvent.class);
                    this.storingServices.upsertRelationship(lineageRelationshipEvent);
                    break;
                }
                case UPDATE_ENTITY_EVENT: {
                    LineageEntityEvent lineageEntityEvent = (LineageEntityEvent)OBJECT_MAPPER.readValue(assetLineageEvent, LineageEntityEvent.class);
                    this.storingServices.updateEntity(lineageEntityEvent);
                    break;
                }
                case UPDATE_RELATIONSHIP_EVENT: {
                    LineageRelationshipEvent lineageRelationshipEvent = (LineageRelationshipEvent)OBJECT_MAPPER.readValue(assetLineageEvent, LineageRelationshipEvent.class);
                    this.storingServices.updateRelationship(lineageRelationshipEvent);
                    break;
                }
                case RECLASSIFIED_ENTITY_EVENT: {
                    LineageEvent lineageEvent = (LineageEvent)OBJECT_MAPPER.readValue(assetLineageEvent, LineageEvent.class);
                    this.storingServices.updateClassification(lineageEvent);
                    break;
                }
                case DELETE_ENTITY_EVENT: {
                    LineageEntityEvent lineageEntityEvent = (LineageEntityEvent)OBJECT_MAPPER.readValue(assetLineageEvent, LineageEntityEvent.class);
                    this.storingServices.deleteEntity(lineageEntityEvent);
                    break;
                }
                case DELETE_RELATIONSHIP_EVENT: {
                    LineageRelationshipEvent lineageRelationshipEvent = (LineageRelationshipEvent)OBJECT_MAPPER.readValue(assetLineageEvent, LineageRelationshipEvent.class);
                    this.storingServices.deleteRelationship(lineageRelationshipEvent);
                    break;
                }
                case DECLASSIFIED_ENTITY_EVENT: {
                    LineageEvent lineageEvent = (LineageEvent)OBJECT_MAPPER.readValue(assetLineageEvent, LineageEvent.class);
                    this.storingServices.deleteClassification(lineageEvent);
                    break;
                }
            }
        }
    }

    private void logException(String assetLineageEvent, Exception e) {
        log.debug("Exception parsing event from AssetLineage out Topic", (Throwable)e);
        OpenLineageServerAuditCode errorCode = OpenLineageServerAuditCode.PROCESS_EVENT_EXCEPTION;
        this.auditLog.logException("parsing Asset Lineage event exception", errorCode.getLogMessageId(), OMRSAuditLogRecordSeverity.EXCEPTION, errorCode.getFormattedLogMessage(assetLineageEvent, e.getMessage()), e.getMessage(), errorCode.getSystemAction(), errorCode.getUserAction(), (Throwable)e);
    }
}

