/*
 * Decompiled with CFR 0.152.
 */
package org.qubership.automation.itf.transport.kafka.outbound;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Producer;
import org.apache.camel.component.kafka.KafkaComponent;
import org.apache.camel.component.kafka.KafkaEndpoint;
import org.qubership.automation.itf.core.model.jpa.message.Message;
import org.qubership.automation.itf.core.model.transport.ConnectionProperties;
import org.qubership.automation.itf.core.util.annotation.Parameter;
import org.qubership.automation.itf.core.util.annotation.UserName;
import org.qubership.automation.itf.core.util.constants.Mep;
import org.qubership.automation.itf.transport.camel.Helper;
import org.qubership.automation.itf.transport.camel.outbound.AbstractCamelOutboundTransport;

@UserName(value="Outbound Kafka Asynchronous")
public class KafkaOutboundTransport
extends AbstractCamelOutboundTransport {
    private static final List<String> AUTHORIZATION_PARAMS = Arrays.asList("securityProtocol", "saslMechanism", "saslModule", "saslUsername", "saslPassword");
    @Parameter(shortName="brokers", longName="Brokers", description="Brokers to consume (comma-separated list in format host:port)", isDynamic=true)
    protected String brokers;
    @Parameter(shortName="topic", longName="Topic", description="Topics to consume (comma-separated list)", forTemplate=true, isDynamic=true)
    protected String topic;
    @Parameter(shortName="headers", longName="Kafka Message Headers", description="SomeHeader=SomeValue\nContent-Type=text/html", optional=true, forTemplate=true, isDynamic=true)
    protected Map<String, String> headers = new HashMap<String, String>();
    @Parameter(shortName="key", longName="Kafka Message Key", description="Message Key", optional=true, forTemplate=true, isDynamic=true)
    protected String key;
    @Parameter(shortName="endpointProperties", longName="Extra Endpoint Properties", description="Extra Endpoint Properties (name=value pairs delimited by newlines)", forServer=true, forTemplate=false, isDynamic=true, optional=true)
    protected Map<String, Object> properties;

    public String viewEndpoint(ConnectionProperties connectionProperties) {
        return null;
    }

    public Mep getMep() {
        return Mep.OUTBOUND_REQUEST_ASYNCHRONOUS;
    }

    public String getEndpointPrefix() {
        return "/mockingbird-transport-kafka";
    }

    public String getShortName() {
        return "Kafka Outbound";
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public String send(Message message, String sessionId, UUID projectUuid) throws Exception {
        Thread.currentThread().setContextClassLoader(((Object)((Object)this)).getClass().getClassLoader());
        Endpoint endpoint = this.createKafkaEndpoint(message, projectUuid);
        Producer producer = endpoint.createProducer();
        try {
            producer.start();
            producer.process(this.createExchange(endpoint, message));
        }
        finally {
            producer.stop();
        }
        return null;
    }

    private Endpoint createKafkaEndpoint(Message message, UUID projectUuid) {
        ConnectionProperties properties = new ConnectionProperties(message.getConnectionProperties());
        String topic = properties.get((Object)"topic").toString().trim();
        String brokers = properties.get((Object)"brokers").toString().trim();
        Map extraProps = Helper.setExtraPropertiesMap((Map)((Map)properties.obtain("endpointProperties")));
        boolean isAuthParametersValid = this.checkAuthParameters(extraProps);
        StringBuilder builder = new StringBuilder("kafka:").append(topic).append("?brokers=").append(brokers);
        Map<String, String> authProps = isAuthParametersValid ? this.fillAuthParameters(extraProps) : null;
        String endpointUri = builder.append(Helper.setExtraProperties((Map)extraProps)).toString();
        KafkaEndpoint endpoint = new KafkaEndpoint(endpointUri, new KafkaComponent(CAMEL_CONTEXT));
        endpoint.getConfiguration().setTopic(topic);
        endpoint.getConfiguration().setBrokers(brokers);
        endpoint.getConfiguration().setProducerBatchSize(Integer.valueOf(0));
        endpoint.getConfiguration().setClientId(projectUuid.toString());
        if (isAuthParametersValid) {
            this.setAuthParameters(endpoint, authProps);
        }
        return endpoint;
    }

    private Exchange createExchange(Endpoint endpoint, Message message) {
        Exchange exchange = endpoint.createExchange();
        exchange.getIn().setHeaders(message.getHeaders());
        exchange.getIn().setHeader("kafka.KEY", message.getConnectionProperties().get("key"));
        exchange.getIn().setBody((Object)message.getText());
        return exchange;
    }

    private Map<String, String> fillAuthParameters(Map<String, Object> extraProps) {
        HashMap<String, String> authProps = new HashMap<String, String>();
        for (String param : AUTHORIZATION_PARAMS) {
            authProps.put(param, extraProps.get(param).toString());
            extraProps.remove(param);
        }
        return authProps;
    }

    private void setAuthParameters(KafkaEndpoint endpoint, Map<String, String> authProps) {
        if (!authProps.isEmpty()) {
            String saslJaasConfig = String.format("%s required username=\"%s\" password=\"%s\";", authProps.get("saslModule"), authProps.get("saslUsername"), authProps.get("saslPassword"));
            endpoint.getConfiguration().setSaslJaasConfig(saslJaasConfig);
            endpoint.getConfiguration().setSaslMechanism(authProps.get("saslMechanism"));
            endpoint.getConfiguration().setSecurityProtocol(authProps.get("securityProtocol"));
        }
    }

    private boolean checkAuthParameters(Map<String, Object> extraProps) {
        for (String param : AUTHORIZATION_PARAMS) {
            if (extraProps.containsKey(param)) continue;
            return false;
        }
        return true;
    }
}

