/*
 * Decompiled with CFR 0.152.
 */
package org.odpi.openmetadata.adapters.connectors.integration.kafka;

import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.kafka.clients.admin.Admin;
import org.odpi.openmetadata.accessservices.datamanager.metadataelements.TopicElement;
import org.odpi.openmetadata.accessservices.datamanager.properties.TemplateProperties;
import org.odpi.openmetadata.accessservices.datamanager.properties.TopicProperties;
import org.odpi.openmetadata.adapters.connectors.integration.kafka.ffdc.KafkaIntegrationConnectorAuditCode;
import org.odpi.openmetadata.adapters.connectors.integration.kafka.ffdc.KafkaIntegrationConnectorErrorCode;
import org.odpi.openmetadata.frameworks.connectors.ffdc.ConnectorCheckedException;
import org.odpi.openmetadata.frameworks.connectors.properties.EndpointProperties;
import org.odpi.openmetadata.integrationservices.topic.connector.TopicIntegratorConnector;
import org.odpi.openmetadata.integrationservices.topic.connector.TopicIntegratorContext;

public class KafkaMonitorIntegrationConnector
extends TopicIntegratorConnector {
    private String templateQualifiedName = null;
    private String templateGUID = null;
    private String targetRootURL = "localhost:9092";
    private TopicIntegratorContext myContext = null;

    public void start() throws ConnectorCheckedException {
        block8: {
            Map configurationProperties;
            super.start();
            String methodName = "start";
            this.myContext = super.getContext();
            EndpointProperties endpoint = this.connectionProperties.getEndpoint();
            if (endpoint != null) {
                this.targetRootURL = endpoint.getAddress();
            }
            if ((configurationProperties = this.connectionProperties.getConfigurationProperties()) != null) {
                this.templateQualifiedName = configurationProperties.get("templateQualifiedName").toString();
            }
            if (this.auditLog != null) {
                this.auditLog.logMessage("start", KafkaIntegrationConnectorAuditCode.CONNECTOR_CONFIGURATION.getMessageDefinition(this.connectorName, this.targetRootURL, this.templateQualifiedName));
            }
            if (this.templateQualifiedName != null) {
                try {
                    List templateElements = this.myContext.getTopicsByName(this.templateQualifiedName, 0, 0);
                    if (templateElements != null) {
                        for (TopicElement templateElement : templateElements) {
                            String qualifiedName = templateElement.getProperties().getQualifiedName();
                            if (!this.templateQualifiedName.equals(qualifiedName)) continue;
                            this.templateGUID = templateElement.getElementHeader().getGUID();
                        }
                    }
                }
                catch (Exception error) {
                    if (this.auditLog == null) break block8;
                    this.auditLog.logException("start", KafkaIntegrationConnectorAuditCode.MISSING_TEMPLATE.getMessageDefinition(this.connectorName, this.templateQualifiedName), (Throwable)error);
                }
            }
        }
    }

    public void refresh() throws ConnectorCheckedException {
        String methodName = "refresh";
        try {
            Properties properties = new Properties();
            properties.put("bootstrap.servers", this.targetRootURL);
            Admin admin = Admin.create((Properties)properties);
            Set activeTopicNames = (Set)admin.listTopics().names().get();
            admin.close();
            if (activeTopicNames != null) {
                if (this.auditLog != null) {
                    this.auditLog.logMessage("refresh", KafkaIntegrationConnectorAuditCode.RETRIEVED_TOPICS.getMessageDefinition(this.connectorName, "localhost:9092", Integer.toString(activeTopicNames.size())));
                }
                int startFrom = 0;
                List cataloguedTopics = this.myContext.getMyTopics(startFrom, this.myContext.getMaxPageSize());
                while (cataloguedTopics != null) {
                    startFrom += this.myContext.getMaxPageSize();
                    for (TopicElement topicElement : cataloguedTopics) {
                        String topicName = topicElement.getProperties().getQualifiedName();
                        String topicGUID = topicElement.getElementHeader().getGUID();
                        if (!activeTopicNames.contains(topicName)) {
                            this.myContext.removeTopic(topicGUID, topicName);
                            if (this.auditLog == null) continue;
                            this.auditLog.logMessage("refresh", KafkaIntegrationConnectorAuditCode.TOPIC_DELETED.getMessageDefinition(this.connectorName, topicName, topicGUID));
                            continue;
                        }
                        activeTopicNames.remove(topicName);
                    }
                    cataloguedTopics = this.myContext.getMyTopics(startFrom, this.myContext.getMaxPageSize());
                }
                for (String topicName : activeTopicNames) {
                    String topicGUID;
                    if (this.templateGUID == null) {
                        TopicProperties topicProperties = new TopicProperties();
                        topicProperties.setQualifiedName(topicName);
                        topicProperties.setTypeName("KafkaTopic");
                        topicGUID = this.myContext.createTopic(topicProperties);
                        if (topicGUID == null || this.auditLog == null) continue;
                        this.auditLog.logMessage("refresh", KafkaIntegrationConnectorAuditCode.TOPIC_CREATED.getMessageDefinition(this.connectorName, topicName, topicGUID));
                        continue;
                    }
                    TemplateProperties templateProperties = new TemplateProperties();
                    templateProperties.setQualifiedName(topicName);
                    topicGUID = this.myContext.createTopicFromTemplate(this.templateGUID, templateProperties);
                    if (topicGUID == null || this.auditLog == null) continue;
                    this.auditLog.logMessage("refresh", KafkaIntegrationConnectorAuditCode.TOPIC_CREATED_FROM_TEMPLATE.getMessageDefinition(this.connectorName, topicName, topicGUID, this.templateQualifiedName, this.templateGUID));
                }
            }
        }
        catch (Exception error) {
            if (this.auditLog != null) {
                this.auditLog.logException("refresh", KafkaIntegrationConnectorAuditCode.UNABLE_TO_RETRIEVE_TOPICS.getMessageDefinition(this.connectorName, "localhost:9092", error.getClass().getName(), error.getMessage()), (Throwable)error);
            }
            throw new ConnectorCheckedException(KafkaIntegrationConnectorErrorCode.UNEXPECTED_EXCEPTION.getMessageDefinition(this.connectorName, error.getClass().getName(), error.getMessage()), ((Object)((Object)this)).getClass().getName(), "refresh", (Throwable)error);
        }
    }

    public void disconnect() throws ConnectorCheckedException {
        String methodName = "disconnect";
        if (this.auditLog != null) {
            this.auditLog.logMessage("disconnect", KafkaIntegrationConnectorAuditCode.CONNECTOR_STOPPING.getMessageDefinition(this.connectorName));
        }
        super.disconnect();
    }
}

