/*
 * Decompiled with CFR 0.152.
 */
package org.qubership.automation.itf.trigger.kafka;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.camel.Component;
import org.apache.camel.Endpoint;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.kafka.KafkaComponent;
import org.apache.camel.component.kafka.KafkaConfiguration;
import org.apache.camel.component.kafka.KafkaEndpoint;
import org.apache.camel.model.RouteDefinition;
import org.apache.commons.lang3.StringUtils;
import org.qubership.automation.itf.JvmSettings;
import org.qubership.automation.itf.communication.TriggerExecutionMessageSender;
import org.qubership.automation.itf.core.model.communication.TransportType;
import org.qubership.automation.itf.core.model.communication.message.CommonTriggerExecutionMessage;
import org.qubership.automation.itf.core.model.communication.message.TriggerExecutionMessage;
import org.qubership.automation.itf.core.model.jpa.message.Message;
import org.qubership.automation.itf.core.model.transport.ConnectionProperties;
import org.qubership.automation.itf.core.util.descriptor.StorableDescriptor;
import org.qubership.automation.itf.core.util.exception.TriggerException;
import org.qubership.automation.itf.monitoring.metrics.MetricsAggregateService;
import org.qubership.automation.itf.trigger.camel.Helper;
import org.qubership.automation.itf.trigger.camel.inbound.AbstractCamelTrigger;
import org.qubership.automation.itf.trigger.camel.route.ItfAbstractRouteBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaTrigger
extends AbstractCamelTrigger {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaTrigger.class);
    private static final String KAFKA_INBOUND_TRANSPORT_CLASS_NAME = "org.qubership.automation.itf.transport.kafka.inbound.KafkaInboundTransport";
    private static final Integer RECONNECT_BACKOFF_MS_MAX_DEFAULT = 300000;
    private static final Integer RECONNECT_BACKOFF_MS_MAX_EXTRA_MIN = 10000;
    private static final Integer RECONNECT_BACKOFF_MS_DEFAULT = 500;
    private static final Integer RECONNECT_BACKOFF_MS_EXTRA_MIN = 500;
    private static final List<String> AUTHORIZATION_PARAMS = Arrays.asList("securityProtocol", "saslMechanism", "saslModule", "saslUsername", "saslPassword");

    public KafkaTrigger(StorableDescriptor triggerConfigurationDescriptor, ConnectionProperties connectionProperties) {
        super(triggerConfigurationDescriptor, connectionProperties);
    }

    protected void activateSpecificTrigger() throws Exception {
        this.forceDeactivateInCaseOfComponentExistence();
        final KafkaComponent kafkaComponent = new KafkaComponent(CAMEL_CONTEXT);
        CAMEL_CONTEXT.addComponent(this.getId(), (Component)kafkaComponent);
        CAMEL_CONTEXT.addRoutes((RoutesBuilder)new RouteBuilder(){

            public void configure() {
                Endpoint kafkaEndpoint = KafkaTrigger.this.createEndPoint(kafkaComponent);
                UUID projectUuid = KafkaTrigger.this.getTriggerConfigurationDescriptor().getProjectUuid();
                String brokerMessageSelectorValue = Helper.getBrokerMessageSelectorValue();
                ((RouteDefinition)this.from(kafkaEndpoint).process(exchange -> {
                    if (exchange.getIn() != null) {
                        String sessionId = UUID.randomUUID().toString();
                        MetricsAggregateService.putCommonMetrics((UUID)projectUuid, (String)sessionId);
                        LOGGER.info("Project: {}. SessionId: {}. Request is received by endpoint: {}", new Object[]{projectUuid, sessionId, kafkaEndpoint.getEndpointUri()});
                        String data = (String)exchange.getIn().getBody(String.class);
                        if (!StringUtils.isEmpty((CharSequence)data)) {
                            Message message = new Message(data);
                            message.convertAndSetHeaders(exchange.getIn().getHeaders());
                            MetricsAggregateService.checkIncomingMessageSize((UUID)projectUuid, (String)message.getText());
                            ItfAbstractRouteBuilder.logExtendedInfo((UUID)projectUuid, (String)sessionId, (String)brokerMessageSelectorValue, (String)KafkaTrigger.KAFKA_INBOUND_TRANSPORT_CLASS_NAME, (long)message.getText().getBytes(JvmSettings.CHARSET).length);
                            TriggerExecutionMessageSender.send((TriggerExecutionMessage)new CommonTriggerExecutionMessage(KafkaTrigger.KAFKA_INBOUND_TRANSPORT_CLASS_NAME, message, KafkaTrigger.this.getTriggerConfigurationDescriptor(), sessionId, brokerMessageSelectorValue), (Object)KafkaTrigger.this.getTriggerConfigurationDescriptor().getProjectUuid());
                            MetricsAggregateService.incrementIncomingRequestToProject((UUID)projectUuid, (TransportType)TransportType.KAFKA_INBOUND, (boolean)true);
                            LOGGER.debug("Project: {}, SessionId: {}, transport: '{}' - message to executor is sent.", new Object[]{projectUuid, sessionId, KafkaTrigger.KAFKA_INBOUND_TRANSPORT_CLASS_NAME});
                        }
                    }
                })).routeId(KafkaTrigger.this.getId());
            }
        });
        LOGGER.info("CAMEL_CONTEXT [{}] is activated successfully", (Object)this.getId());
    }

    protected void deactivateSpecificTrigger() throws Exception {
        CAMEL_CONTEXT.stopRoute(this.getId());
        CAMEL_CONTEXT.removeRoute(this.getId());
        CAMEL_CONTEXT.removeComponent(this.getId());
        LOGGER.info("CAMEL_CONTEXT [{}] is deactivated successfully", (Object)this.getId());
    }

    protected void applyTriggerProperties(ConnectionProperties connectionProperties) throws TriggerException {
        this.setConnectionProperties(connectionProperties);
    }

    private void forceDeactivateInCaseOfComponentExistence() {
        Component prevAdded = CAMEL_CONTEXT.hasComponent(this.getId());
        if (prevAdded != null) {
            try {
                LOGGER.info("Before activation: {} - CAMEL_CONTEXT already has component with the same ID {}. It will be deactivated.", this.getTriggerConfigurationDescriptor().getId(), (Object)this.getId());
                this.deactivateSpecificTrigger();
            }
            catch (Exception ex) {
                LOGGER.debug("Trigger {} deactivation before its activation - ignoring the exception: {}", (Object)this.getId(), (Object)ex);
            }
        }
    }

    private Endpoint createEndPoint(KafkaComponent kafkaComponent) {
        String topic = ((String)this.getConnectionProperties().get((Object)"topic")).trim();
        String brokers = ((String)this.getConnectionProperties().get((Object)"brokers")).trim();
        String group = (String)this.getConnectionProperties().get((Object)"group");
        Map extraProps = Helper.setExtraPropertiesMap((Map)((Map)this.getConnectionProperties().obtain("endpointProperties")));
        boolean isAuthParametersValid = this.checkAuthParameters(extraProps);
        StringBuilder builder = new StringBuilder("kafka:").append(topic).append("?brokers=").append(brokers);
        if (!StringUtils.isBlank((CharSequence)group)) {
            builder.append("&groupId=").append(group.trim());
        }
        Map<String, String> authProps = isAuthParametersValid ? this.fillAuthParameters(extraProps) : null;
        String endpointUri = builder.append(Helper.setExtraProperties((Map)extraProps)).toString();
        KafkaEndpoint endpoint = new KafkaEndpoint(endpointUri, kafkaComponent);
        endpoint.getConfiguration().setTopic(topic);
        endpoint.getConfiguration().setBrokers(brokers);
        endpoint.getConfiguration().setClientId(this.getTriggerConfigurationDescriptor().getProjectUuid().toString());
        endpoint.getConfiguration().setReconnectBackoffMs(RECONNECT_BACKOFF_MS_DEFAULT);
        endpoint.getConfiguration().setReconnectBackoffMaxMs(RECONNECT_BACKOFF_MS_MAX_DEFAULT);
        if (isAuthParametersValid) {
            this.setAuthParameters(endpoint, authProps);
        }
        if (!StringUtils.isBlank((CharSequence)group)) {
            endpoint.getConfiguration().setGroupId(group.trim());
        }
        this.setKafkaConfigurationProperties(endpoint.getConfiguration(), extraProps);
        return endpoint;
    }

    private void setKafkaConfigurationProperties(KafkaConfiguration configuration, Map<String, Object> extraProps) {
        for (Map.Entry<String, Object> prop : extraProps.entrySet()) {
            switch (prop.getKey()) {
                case "autoOffsetReset": 
                case "auto.offset.reset": {
                    configuration.setAutoOffsetReset(prop.getValue().toString());
                    break;
                }
                case "maxPollRecords": 
                case "max.poll.records": {
                    configuration.setMaxPollRecords(Integer.valueOf(Integer.parseInt(prop.getValue().toString())));
                    break;
                }
                case "clientId": 
                case "client.id": {
                    configuration.setClientId(prop.getValue().toString());
                    break;
                }
                case "reconnectBackoffMaxMs": 
                case "reconnect.backoff.max.ms": {
                    int intMaxMsValue = Integer.parseInt(prop.getValue().toString());
                    if (intMaxMsValue < RECONNECT_BACKOFF_MS_MAX_EXTRA_MIN) {
                        LOGGER.warn("reconnectBackoffMaxMs parameter less than minimum allowed value {}. Default value {} will be used.", (Object)RECONNECT_BACKOFF_MS_MAX_EXTRA_MIN, (Object)RECONNECT_BACKOFF_MS_MAX_DEFAULT);
                        break;
                    }
                    configuration.setReconnectBackoffMaxMs(Integer.valueOf(intMaxMsValue));
                    break;
                }
                case "reconnectBackoffMs": 
                case "reconnect.backoff.ms": {
                    int intMsValue = Integer.parseInt(prop.getValue().toString());
                    if (intMsValue < RECONNECT_BACKOFF_MS_EXTRA_MIN) {
                        LOGGER.warn("reconnectBackoffMs parameter less than minimum allowed value {}. Default value {} will be used.", (Object)RECONNECT_BACKOFF_MS_EXTRA_MIN, (Object)RECONNECT_BACKOFF_MS_DEFAULT);
                        break;
                    }
                    configuration.setReconnectBackoffMs(Integer.valueOf(intMsValue));
                    break;
                }
                case "autoCommitEnable": 
                case "enable.auto.commit": {
                    configuration.setAutoCommitEnable(Boolean.valueOf(Boolean.parseBoolean(prop.getValue().toString())));
                    break;
                }
                case "allowManualCommit": {
                    configuration.setAllowManualCommit(Boolean.parseBoolean(prop.getValue().toString()));
                    break;
                }
                case "autoCommitIntervalMs": 
                case "auto.commit.interval.ms": {
                    configuration.setAutoCommitIntervalMs(Integer.valueOf(Integer.parseInt(prop.getValue().toString())));
                    break;
                }
                case "autoCommitOnStop": {
                    configuration.setAutoCommitOnStop(prop.getValue().toString());
                    break;
                }
                case "breakOnFirstError": {
                    configuration.setBreakOnFirstError(Boolean.parseBoolean(prop.getValue().toString()));
                    break;
                }
                case "checkCrcs": 
                case "check.crcs": {
                    configuration.setCheckCrcs(Boolean.valueOf(Boolean.parseBoolean(prop.getValue().toString())));
                    break;
                }
                case "consumerRequestTimeoutMs": 
                case "request.timeout.ms": {
                    configuration.setRequestTimeoutMs(Integer.valueOf(Integer.parseInt(prop.getValue().toString())));
                    break;
                }
                case "consumersCount": {
                    configuration.setConsumersCount(Integer.parseInt(prop.getValue().toString()));
                    break;
                }
                case "consumerStreams": {
                    configuration.setConsumerStreams(Integer.parseInt(prop.getValue().toString()));
                    break;
                }
                case "fetchMaxBytes": 
                case "fetch.max.bytes": {
                    configuration.setFetchMaxBytes(Integer.valueOf(Integer.parseInt(prop.getValue().toString())));
                    break;
                }
                case "fetchMinBytes": 
                case "fetch.min.bytes": {
                    configuration.setFetchMinBytes(Integer.valueOf(Integer.parseInt(prop.getValue().toString())));
                    break;
                }
                case "fetchWaitMaxMs": 
                case "fetch.max.wait.ms": {
                    configuration.setFetchWaitMaxMs(Integer.valueOf(Integer.parseInt(prop.getValue().toString())));
                    break;
                }
                case "heartbeatIntervalMs": 
                case "heartbeat.interval.ms": {
                    configuration.setHeartbeatIntervalMs(Integer.valueOf(Integer.parseInt(prop.getValue().toString())));
                    break;
                }
                case "keyDeserializer": 
                case "key.deserializer": {
                    configuration.setKeyDeserializer(prop.getValue().toString());
                    break;
                }
                case "maxPartitionFetchBytes": 
                case "max.partition.fetch.bytes": {
                    configuration.setMaxPartitionFetchBytes(Integer.valueOf(Integer.parseInt(prop.getValue().toString())));
                    break;
                }
                case "partitionAssignor": 
                case "partition.assignment.strategy": {
                    configuration.setPartitionAssignor(prop.getValue().toString());
                    break;
                }
                case "pollTimeoutMs": {
                    configuration.setPollTimeoutMs(Long.valueOf(Long.parseLong(prop.getValue().toString())));
                    break;
                }
                case "maxPollIntervalMs": 
                case "max.poll.interval.ms": {
                    configuration.setMaxPollIntervalMs(Long.valueOf(Long.parseLong(prop.getValue().toString())));
                    break;
                }
                case "seekTo": {
                    configuration.setSeekTo(prop.getValue().toString());
                    break;
                }
                case "sessionTimeoutMs": 
                case "session.timeout.ms": {
                    configuration.setSessionTimeoutMs(Integer.valueOf(Integer.parseInt(prop.getValue().toString())));
                    break;
                }
                case "topicIsPattern": {
                    configuration.setTopicIsPattern(Boolean.parseBoolean(prop.getValue().toString()));
                    break;
                }
                case "valueDeserializer": 
                case "value.deserializer": {
                    configuration.setValueDeserializer(prop.getValue().toString());
                    break;
                }
            }
        }
    }

    private void setAuthParameters(KafkaEndpoint endpoint, Map<String, String> authProps) {
        if (!authProps.isEmpty()) {
            String saslJaasConfig = String.format("%s required username=\"%s\" password=\"%s\";", authProps.get("saslModule"), authProps.get("saslUsername"), authProps.get("saslPassword"));
            endpoint.getConfiguration().setSaslJaasConfig(saslJaasConfig);
            endpoint.getConfiguration().setSaslMechanism(authProps.get("saslMechanism"));
            endpoint.getConfiguration().setSecurityProtocol(authProps.get("securityProtocol"));
        }
    }

    private Map<String, String> fillAuthParameters(Map<String, Object> extraProps) {
        HashMap<String, String> authProps = new HashMap<String, String>();
        for (String param : AUTHORIZATION_PARAMS) {
            authProps.put(param, extraProps.get(param).toString());
            extraProps.remove(param);
        }
        return authProps;
    }

    private boolean checkAuthParameters(Map<String, Object> extraProps) {
        for (String param : AUTHORIZATION_PARAMS) {
            if (extraProps.containsKey(param)) continue;
            return false;
        }
        return true;
    }
}

