/*
 * Decompiled with CFR 0.152.
 */
package org.qubership.integration.platform.engine.service.deployment.processing.actions.context.before;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.apache.camel.spring.SpringCamelContext;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.qubership.integration.platform.engine.configuration.PredeployCheckKafkaConfiguration;
import org.qubership.integration.platform.engine.errorhandling.DeploymentRetriableException;
import org.qubership.integration.platform.engine.model.ChainElementType;
import org.qubership.integration.platform.engine.model.deployment.update.DeploymentInfo;
import org.qubership.integration.platform.engine.model.deployment.update.ElementProperties;
import org.qubership.integration.platform.engine.service.VariablesService;
import org.qubership.integration.platform.engine.service.deployment.processing.ElementProcessingAction;
import org.qubership.integration.platform.engine.service.deployment.processing.qualifiers.OnBeforeDeploymentContextCreated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;

@Component
@ConditionalOnProperty(name={"qip.camel.component.kafka.predeploy-check-enabled"}, havingValue="true", matchIfMissing=true)
@OnBeforeDeploymentContextCreated
public class KafkaTopicAndConnectionCheckAction
extends ElementProcessingAction {
    private static final Logger log = LoggerFactory.getLogger(KafkaTopicAndConnectionCheckAction.class);
    private final VariablesService variablesService;
    private final PredeployCheckKafkaConfiguration predeployCheckKafkaConfiguration;

    @Autowired
    public KafkaTopicAndConnectionCheckAction(VariablesService variablesService, PredeployCheckKafkaConfiguration predeployCheckKafkaConfiguration) {
        this.variablesService = variablesService;
        this.predeployCheckKafkaConfiguration = predeployCheckKafkaConfiguration;
    }

    @Override
    public boolean applicableTo(ElementProperties properties) {
        ChainElementType chainElementType = ChainElementType.fromString(properties.getProperties().get("elementType"));
        return ChainElementType.isKafkaAsyncElement(chainElementType);
    }

    @Override
    public void apply(SpringCamelContext context, ElementProperties elementProperties, DeploymentInfo deploymentInfo) {
        Map<String, String> props = elementProperties.getProperties();
        try {
            HashSet<String> topics;
            String brokers = this.getProp(props, "brokers");
            String securityProtocol = this.getProp(props, "securityProtocol");
            String saslMechanism = this.getProp(props, "saslMechanism");
            String saslJaasConfig = this.getProp(props, "saslJaasConfig");
            String topicsString = this.getProp(props, "topics");
            if (brokers == null) {
                log.debug("Element with id {} not contains kafka connection params, skipping", (Object)elementProperties.getElementId());
                return;
            }
            Map<String, Object> validationKafkaAdminConfig = this.predeployCheckKafkaConfiguration.createValidationKafkaAdminConfig(brokers, securityProtocol, saslMechanism, saslJaasConfig);
            try (AdminClient client = AdminClient.create(validationKafkaAdminConfig);){
                Set kafkaTopics = (Set)client.listTopics().names().get();
                String[] topicsArray = topicsString.split(",");
                topics = new HashSet<String>();
                if (topicsArray.length == 0) {
                    throw new KafkaException("Topic property can't be empty");
                }
                topics.add(topicsArray[0]);
                topics.removeAll(kafkaTopics);
            }
            if (!topics.isEmpty()) {
                String topicString = String.join((CharSequence)", ", topics);
                throw new DeploymentRetriableException("Kafka topics (" + topicString + ") not found, check if this topics exists in kafka");
            }
        }
        catch (ExecutionException | KafkaException e) {
            if (e instanceof AuthorizationException || e.getCause() instanceof AuthorizationException) {
                log.warn("Kafka predeploy check is failed with AuthorizationException. Exception not thrown", e);
            }
            log.warn("Kafka predeploy check is failed. Connection configuration is invalid, topics not found or broker is unavailable", e);
            throw new DeploymentRetriableException("Kafka predeploy check is failed. Connection configuration is invalid, topics not found or broker is unavailable", (Exception)e);
        }
        catch (DeploymentRetriableException e) {
            log.warn("Kafka predeploy check is failed with retriable exception", (Throwable)e);
            throw e;
        }
        catch (Exception e) {
            log.warn("Failed to check kafka topic(s) or connection for deployment: {}, element: {}", new Object[]{deploymentInfo.getDeploymentId(), elementProperties.getElementId(), e});
        }
    }

    private String getProp(Map<String, String> properties, String name) {
        return this.variablesService.injectVariables(properties.get(name));
    }
}

