/*
 * Decompiled with CFR 0.152.
 */
package org.odpi.egeria.connectors.apache.atlas.eventmapper;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasRelationship;
import org.apache.atlas.model.instance.AtlasRelationshipHeader;
import org.apache.atlas.model.notification.EntityNotification;
import org.apache.atlas.notification.entity.EntityMessageDeserializer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.odpi.egeria.connectors.apache.atlas.auditlog.ApacheAtlasOMRSAuditCode;
import org.odpi.egeria.connectors.apache.atlas.auditlog.ApacheAtlasOMRSErrorCode;
import org.odpi.egeria.connectors.apache.atlas.repositoryconnector.ApacheAtlasOMRSMetadataCollection;
import org.odpi.egeria.connectors.apache.atlas.repositoryconnector.ApacheAtlasOMRSRepositoryConnector;
import org.odpi.egeria.connectors.apache.atlas.repositoryconnector.mapping.EntityMappingAtlas2OMRS;
import org.odpi.egeria.connectors.apache.atlas.repositoryconnector.mapping.RelationshipMapping;
import org.odpi.egeria.connectors.apache.atlas.repositoryconnector.model.AtlasGuid;
import org.odpi.egeria.connectors.apache.atlas.repositoryconnector.stores.TypeDefStore;
import org.odpi.openmetadata.frameworks.connectors.ffdc.ConnectorCheckedException;
import org.odpi.openmetadata.repositoryservices.connectors.openmetadatatopic.OpenMetadataTopicListener;
import org.odpi.openmetadata.repositoryservices.connectors.stores.metadatacollectionstore.properties.instances.EntityDetail;
import org.odpi.openmetadata.repositoryservices.connectors.stores.metadatacollectionstore.properties.instances.Relationship;
import org.odpi.openmetadata.repositoryservices.connectors.stores.metadatacollectionstore.properties.typedefs.AttributeTypeDef;
import org.odpi.openmetadata.repositoryservices.connectors.stores.metadatacollectionstore.properties.typedefs.TypeDef;
import org.odpi.openmetadata.repositoryservices.connectors.stores.metadatacollectionstore.repositoryeventmapper.OMRSRepositoryEventMapperBase;
import org.odpi.openmetadata.repositoryservices.ffdc.exception.RepositoryErrorException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ApacheAtlasOMRSRepositoryEventMapper
extends OMRSRepositoryEventMapperBase
implements OpenMetadataTopicListener {
    private static final Logger log = LoggerFactory.getLogger(ApacheAtlasOMRSRepositoryEventMapper.class);
    private static final Duration pollDuration = Duration.ofMillis(100L);
    private String sourceName = "ApacheAtlasOMRSRepositoryEventMapper";
    private ApacheAtlasOMRSRepositoryConnector atlasRepositoryConnector;
    private ApacheAtlasOMRSMetadataCollection atlasMetadataCollection;
    private TypeDefStore typeDefStore;
    private String metadataCollectionId;
    private String originatorServerName;
    private String originatorServerType;
    private Properties atlasKafkaProperties;
    private String atlasKafkaTopic;
    private KafkaConsumerThread kafkaConsumer;
    private EntityMessageDeserializer deserializer;

    public void start() throws ConnectorCheckedException {
        super.start();
        String methodName = "start";
        ApacheAtlasOMRSAuditCode auditCode = ApacheAtlasOMRSAuditCode.EVENT_MAPPER_STARTING;
        this.auditLog.logRecord("start", auditCode.getLogMessageId(), auditCode.getSeverity(), auditCode.getFormattedLogMessage(new String[0]), null, auditCode.getSystemAction(), auditCode.getUserAction());
        if (!(this.repositoryConnector instanceof ApacheAtlasOMRSRepositoryConnector)) {
            this.raiseConnectorCheckedException(ApacheAtlasOMRSErrorCode.EVENT_MAPPER_IMPROPERLY_INITIALIZED, "start", null, this.repositoryConnector.getServerName());
        }
        this.atlasRepositoryConnector = (ApacheAtlasOMRSRepositoryConnector)this.repositoryConnector;
        this.atlasKafkaTopic = "ATLAS_ENTITIES";
        String atlasKafkaBootstrap = this.connectionBean.getEndpoint().getAddress();
        this.atlasKafkaProperties = new Properties();
        this.atlasKafkaProperties.put("bootstrap.servers", atlasKafkaBootstrap);
        this.atlasKafkaProperties.put("group.id", "ApacheAtlasOMRSRepositoryEventMapper_consumer");
        this.atlasKafkaProperties.put("key.deserializer", StringDeserializer.class.getName());
        this.atlasKafkaProperties.put("value.deserializer", StringDeserializer.class.getName());
        this.deserializer = new EntityMessageDeserializer();
        this.kafkaConsumer = new KafkaConsumerThread();
        try {
            this.atlasMetadataCollection = (ApacheAtlasOMRSMetadataCollection)this.atlasRepositoryConnector.getMetadataCollection();
        }
        catch (RepositoryErrorException e) {
            this.raiseConnectorCheckedException(ApacheAtlasOMRSErrorCode.REST_CLIENT_FAILURE, "start", (Exception)((Object)e), this.atlasRepositoryConnector.getServerName());
        }
        this.typeDefStore = this.atlasMetadataCollection.getTypeDefStore();
        this.atlasMetadataCollection.setEventMapper(this);
        this.metadataCollectionId = this.atlasRepositoryConnector.getMetadataCollectionId();
        this.originatorServerName = this.atlasRepositoryConnector.getServerName();
        this.originatorServerType = this.atlasRepositoryConnector.getServerType();
        this.kafkaConsumer.start();
    }

    public void processEvent(String event) {
        log.info("Processing event: {}", (Object)event);
        EntityNotification atlasEvent = this.deserializer.deserialize(event);
        EntityNotification.EntityNotificationV2 entityNotification = (EntityNotification.EntityNotificationV2)atlasEvent;
        if (entityNotification != null) {
            switch (entityNotification.getOperationType()) {
                case ENTITY_CREATE: {
                    this.processNewEntity(entityNotification.getEntity());
                    break;
                }
                case ENTITY_UPDATE: {
                    this.processUpdatedEntity(entityNotification.getEntity());
                    break;
                }
                case RELATIONSHIP_CREATE: {
                    this.processNewRelationship(entityNotification.getRelationship());
                    break;
                }
                default: {
                    log.warn("Unrecognized operation type from Apache Atlas: {}", (Object)event);
                    break;
                }
            }
        } else {
            log.error("Unrecognized event type from Apache Atlas: {}", (Object)event);
        }
    }

    private void processNewEntity(AtlasEntityHeader atlasEntityHeader) {
        String atlasTypeName = atlasEntityHeader.getTypeName();
        Map<String, String> omrsTypesByPrefix = this.typeDefStore.getAllMappedOMRSTypeDefNames(atlasTypeName);
        for (String prefix : omrsTypesByPrefix.keySet()) {
            EntityDetail entityDetail = this.getMappedEntity(atlasEntityHeader, prefix);
            if (entityDetail == null) continue;
            this.repositoryEventProcessor.processNewEntityEvent(this.sourceName, this.metadataCollectionId, this.originatorServerName, this.originatorServerType, this.localOrganizationName, entityDetail);
            if (prefix == null) continue;
            List<Relationship> generatedRelationships = this.getGeneratedRelationshipsForEntity(atlasEntityHeader, entityDetail);
            for (Relationship generatedRelationship : generatedRelationships) {
                this.repositoryEventProcessor.processNewRelationshipEvent(this.sourceName, this.metadataCollectionId, this.originatorServerName, this.originatorServerType, this.localOrganizationName, generatedRelationship);
            }
        }
    }

    private void processUpdatedEntity(AtlasEntityHeader atlasEntityHeader) {
        Map<String, String> omrsTypesByPrefix = this.typeDefStore.getAllMappedOMRSTypeDefNames(atlasEntityHeader.getTypeName());
        for (String prefix : omrsTypesByPrefix.keySet()) {
            EntityDetail entityDetail = this.getMappedEntity(atlasEntityHeader, prefix);
            if (entityDetail == null) continue;
            this.repositoryEventProcessor.processUpdatedEntityEvent(this.sourceName, this.metadataCollectionId, this.originatorServerName, this.originatorServerType, this.localOrganizationName, null, entityDetail);
            if (prefix == null) continue;
            List<Relationship> generatedRelationships = this.getGeneratedRelationshipsForEntity(atlasEntityHeader, entityDetail);
            for (Relationship generatedRelationship : generatedRelationships) {
                this.repositoryEventProcessor.processUpdatedRelationshipEvent(this.sourceName, this.metadataCollectionId, this.originatorServerName, this.originatorServerType, this.localOrganizationName, null, generatedRelationship);
            }
        }
    }

    private List<Relationship> getGeneratedRelationshipsForEntity(AtlasEntityHeader atlasEntityHeader, EntityDetail entityDetail) {
        String atlasTypeName = atlasEntityHeader.getTypeName();
        ArrayList<Relationship> generatedRelationships = new ArrayList<Relationship>();
        Map<String, TypeDefStore.EndpointMapping> mappings = this.typeDefStore.getAllEndpointMappingsFromAtlasName(atlasTypeName);
        for (Map.Entry<String, TypeDefStore.EndpointMapping> entry : mappings.entrySet()) {
            String relationshipPrefix = entry.getKey();
            if (relationshipPrefix == null) continue;
            AtlasGuid atlasGuid = new AtlasGuid(atlasEntityHeader.getGuid(), relationshipPrefix);
            try {
                Relationship generatedRelationship = RelationshipMapping.getSelfReferencingRelationship(this.atlasRepositoryConnector, this.typeDefStore, atlasGuid, new AtlasEntity(atlasEntityHeader));
                if (generatedRelationship != null) {
                    generatedRelationships.add(generatedRelationship);
                    continue;
                }
                log.warn("Unable to create generated relationship with prefix {}, for entity: {}", (Object)relationshipPrefix, (Object)entityDetail.getGUID());
            }
            catch (RepositoryErrorException e) {
                log.error("Unable to create generated relationship with prefix {}, for entity: {}", new Object[]{relationshipPrefix, entityDetail.getGUID(), e});
            }
        }
        return generatedRelationships;
    }

    private EntityDetail getMappedEntity(AtlasEntityHeader atlasEntityHeader, String prefix) {
        EntityDetail result = null;
        AtlasEntity.AtlasEntityWithExtInfo atlasEntity = null;
        try {
            atlasEntity = this.atlasRepositoryConnector.getEntityByGUID(atlasEntityHeader.getGuid(), false, true);
        }
        catch (AtlasServiceException e) {
            log.error("Unable to retrieve entity from Atlas: {}", (Object)atlasEntityHeader, (Object)e);
        }
        EntityMappingAtlas2OMRS mapping = new EntityMappingAtlas2OMRS(this.atlasRepositoryConnector, this.atlasMetadataCollection.getTypeDefStore(), this.atlasMetadataCollection.getAttributeTypeDefStore(), atlasEntity, prefix, null);
        try {
            result = mapping.getEntityDetail();
        }
        catch (RepositoryErrorException e) {
            log.error("Unable to map entity to OMRS EntityDetail: {}", (Object)atlasEntity, (Object)e);
        }
        return result;
    }

    private void processNewRelationship(AtlasRelationshipHeader atlasRelationshipHeader) {
        Relationship relationship = this.getMappedRelationship(atlasRelationshipHeader);
        if (relationship != null) {
            this.repositoryEventProcessor.processNewRelationshipEvent(this.sourceName, this.metadataCollectionId, this.originatorServerName, this.originatorServerType, this.localOrganizationName, relationship);
        }
    }

    private Relationship getMappedRelationship(AtlasRelationshipHeader atlasRelationshipHeader) {
        Relationship result = null;
        AtlasRelationship.AtlasRelationshipWithExtInfo atlasRelationship = null;
        try {
            atlasRelationship = this.atlasRepositoryConnector.getRelationshipByGUID(atlasRelationshipHeader.getGuid(), true);
        }
        catch (AtlasServiceException e) {
            log.error("Unable to retrieve relationship from Atlas: {}", (Object)atlasRelationshipHeader, (Object)e);
        }
        RelationshipMapping mapping = new RelationshipMapping(this.atlasRepositoryConnector, this.atlasMetadataCollection.getTypeDefStore(), this.atlasMetadataCollection.getAttributeTypeDefStore(), new AtlasGuid(atlasRelationshipHeader.getGuid(), null), atlasRelationship, null);
        try {
            result = mapping.getRelationship();
        }
        catch (RepositoryErrorException e) {
            log.error("Unable to map relationship to OMRS Relationship: {}", (Object)atlasRelationship, (Object)e);
        }
        return result;
    }

    public void disconnect() throws ConnectorCheckedException {
        super.disconnect();
        this.kafkaConsumer.stop();
        ApacheAtlasOMRSAuditCode auditCode = ApacheAtlasOMRSAuditCode.EVENT_MAPPER_SHUTDOWN;
        this.auditLog.logRecord("disconnect", auditCode.getLogMessageId(), auditCode.getSeverity(), auditCode.getFormattedLogMessage(this.atlasRepositoryConnector.getServerName()), null, auditCode.getSystemAction(), auditCode.getUserAction());
    }

    public void sendNewTypeDefEvent(TypeDef newTypeDef) {
        this.repositoryEventProcessor.processNewTypeDefEvent(this.sourceName, this.metadataCollectionId, this.localServerName, this.localServerType, this.localOrganizationName, newTypeDef);
    }

    public void sendNewAttributeTypeDefEvent(AttributeTypeDef newAttributeTypeDef) {
        this.repositoryEventProcessor.processNewAttributeTypeDefEvent(this.sourceName, this.metadataCollectionId, this.localServerName, this.localServerType, this.localOrganizationName, newAttributeTypeDef);
    }

    public void sendRefreshEntityRequest(String typeDefGUID, String typeDefName, String entityGUID, String homeMetadataCollectionId) {
        this.repositoryEventProcessor.processRefreshEntityRequested(this.sourceName, this.metadataCollectionId, this.localServerName, this.localServerType, this.localOrganizationName, typeDefGUID, typeDefName, entityGUID, homeMetadataCollectionId);
    }

    public void sendRefreshRelationshipRequest(String typeDefGUID, String typeDefName, String relationshipGUID, String homeMetadataCollectionId) {
        this.repositoryEventProcessor.processRefreshRelationshipRequest(this.sourceName, this.metadataCollectionId, this.localServerName, this.localServerType, this.localOrganizationName, typeDefGUID, typeDefName, relationshipGUID, homeMetadataCollectionId);
    }

    private void raiseConnectorCheckedException(ApacheAtlasOMRSErrorCode errorCode, String methodName, Exception cause, String ... params) throws ConnectorCheckedException {
        String errorMessage = errorCode.getErrorMessageId() + errorCode.getFormattedErrorMessage(params);
        throw new ConnectorCheckedException(errorCode.getHTTPErrorCode(), ((Object)((Object)this)).getClass().getName(), methodName, errorMessage, errorCode.getSystemAction(), errorCode.getUserAction(), (Throwable)cause);
    }

    private class KafkaConsumerThread
    implements Runnable {
        private final AtomicBoolean running = new AtomicBoolean(false);

        private KafkaConsumerThread() {
        }

        void start() {
            Thread worker = new Thread(this);
            worker.start();
        }

        void stop() {
            this.running.set(false);
        }

        @Override
        public void run() {
            this.running.set(true);
            try (KafkaConsumer consumer = new KafkaConsumer(ApacheAtlasOMRSRepositoryEventMapper.this.atlasKafkaProperties);){
                consumer.subscribe(Collections.singletonList(ApacheAtlasOMRSRepositoryEventMapper.this.atlasKafkaTopic));
                ApacheAtlasOMRSAuditCode auditCode = ApacheAtlasOMRSAuditCode.EVENT_MAPPER_RUNNING;
                ApacheAtlasOMRSRepositoryEventMapper.this.auditLog.logRecord("run", auditCode.getLogMessageId(), auditCode.getSeverity(), auditCode.getFormattedLogMessage(ApacheAtlasOMRSRepositoryEventMapper.this.atlasRepositoryConnector.getServerName()), null, auditCode.getSystemAction(), auditCode.getUserAction());
                while (this.running.get()) {
                    try {
                        ConsumerRecords events = consumer.poll(pollDuration);
                        for (ConsumerRecord event : events) {
                            ApacheAtlasOMRSRepositoryEventMapper.this.processEvent((String)event.value());
                        }
                    }
                    catch (Exception e) {
                        auditCode = ApacheAtlasOMRSAuditCode.EVENT_MAPPER_CONSUMER_FAILURE;
                        ApacheAtlasOMRSRepositoryEventMapper.this.auditLog.logException("consumer failure", auditCode.getLogMessageId(), auditCode.getSeverity(), auditCode.getFormattedLogMessage(new String[0]), null, auditCode.getSystemAction(), auditCode.getUserAction(), (Throwable)e);
                    }
                }
            }
        }
    }
}

