/*
 * Decompiled with CFR 0.152.
 */
package org.swisspush.gateleen.kafka;

import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.kafka.client.producer.KafkaProducer;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import java.util.List;
import java.util.Optional;
import java.util.regex.Pattern;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.swisspush.gateleen.core.configuration.ConfigurationResourceConsumer;
import org.swisspush.gateleen.core.configuration.ConfigurationResourceManager;
import org.swisspush.gateleen.core.http.RequestLoggerFactory;
import org.swisspush.gateleen.core.util.ResponseStatusCodeLogUtil;
import org.swisspush.gateleen.core.util.StatusCode;
import org.swisspush.gateleen.kafka.KafkaConfiguration;
import org.swisspush.gateleen.kafka.KafkaConfigurationParser;
import org.swisspush.gateleen.kafka.KafkaMessageSender;
import org.swisspush.gateleen.kafka.KafkaProducerRecordBuilder;
import org.swisspush.gateleen.kafka.KafkaProducerRepository;
import org.swisspush.gateleen.kafka.KafkaTopicExtractor;
import org.swisspush.gateleen.validation.ValidationException;

public class KafkaHandler
extends ConfigurationResourceConsumer {
    private final Logger log = LoggerFactory.getLogger(KafkaHandler.class);
    private final String streamingPath;
    private final KafkaProducerRepository repository;
    private final KafkaTopicExtractor topicExtractor;
    private final KafkaMessageSender kafkaMessageSender;
    private boolean initialized = false;

    public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaProducerRepository repository, KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath) {
        super(configurationResourceManager, configResourceUri, "gateleen_kafka_topic_configuration_schema");
        this.repository = repository;
        this.kafkaMessageSender = kafkaMessageSender;
        this.streamingPath = streamingPath;
        this.topicExtractor = new KafkaTopicExtractor(streamingPath);
    }

    public Future<Void> initialize() {
        Future future = Future.future();
        this.configurationResourceManager().getRegisteredResource(this.configResourceUri()).setHandler(event -> {
            if (event.succeeded() && ((Optional)event.result()).isPresent()) {
                this.initializeKafkaConfiguration((Buffer)((Optional)event.result()).get()).setHandler(event1 -> future.complete());
            } else {
                this.log.warn("No kafka configuration resource with uri '{}' found. Unable to setup kafka configuration correctly", (Object)this.configResourceUri());
                future.complete();
            }
        });
        return future;
    }

    public boolean isInitialized() {
        return this.initialized;
    }

    private Future<Void> initializeKafkaConfiguration(Buffer configuration) {
        Future future = Future.future();
        List<KafkaConfiguration> kafkaConfigurations = KafkaConfigurationParser.parse(configuration);
        this.repository.closeAll().setHandler(event -> {
            for (KafkaConfiguration kafkaConfiguration : kafkaConfigurations) {
                this.repository.addKafkaProducer(kafkaConfiguration);
            }
            this.initialized = true;
            future.complete();
        });
        return future;
    }

    public boolean handle(HttpServerRequest request) {
        if (request.uri().startsWith(this.streamingPath)) {
            RequestLoggerFactory.getLogger(KafkaHandler.class, (HttpServerRequest)request).info("Handling {}", (Object)request.uri());
            if (HttpMethod.POST != request.method()) {
                this.respondWith(StatusCode.METHOD_NOT_ALLOWED, StatusCode.METHOD_NOT_ALLOWED.getStatusMessage(), request);
                return true;
            }
            Optional<String> optTopic = this.topicExtractor.extractTopic(request);
            if (!optTopic.isPresent()) {
                this.respondWith(StatusCode.BAD_REQUEST, "Could not extract topic from request uri", request);
                return true;
            }
            String topic = optTopic.get();
            Optional<Pair<KafkaProducer<String, String>, Pattern>> optProducer = this.repository.findMatchingKafkaProducer(topic);
            if (!optProducer.isPresent()) {
                this.respondWith(StatusCode.NOT_FOUND, "Could not find a matching producer for topic " + topic, request);
                return true;
            }
            request.bodyHandler(payload -> {
                try {
                    this.log.debug("incoming kafka message payload: {}", (Object)payload.toString());
                    List<KafkaProducerRecord<String, String>> kafkaProducerRecords = KafkaProducerRecordBuilder.buildRecords(topic, payload);
                    this.kafkaMessageSender.sendMessages((KafkaProducer<String, String>)((KafkaProducer)((Pair)optProducer.get()).getLeft()), kafkaProducerRecords).setHandler(event -> {
                        if (event.succeeded()) {
                            RequestLoggerFactory.getLogger(KafkaHandler.class, (HttpServerRequest)request).info("Successfully sent {} message(s) to kafka topic '{}'", (Object)kafkaProducerRecords.size(), (Object)topic);
                            this.respondWith(StatusCode.OK, StatusCode.OK.getStatusMessage(), request);
                        } else {
                            this.respondWith(StatusCode.INTERNAL_SERVER_ERROR, event.cause().getMessage(), request);
                        }
                    });
                }
                catch (ValidationException ve) {
                    this.respondWith(StatusCode.BAD_REQUEST, ve.getMessage(), request);
                }
            });
            return true;
        }
        return false;
    }

    public void resourceChanged(String resourceUri, Buffer resource) {
        if (this.configResourceUri() != null && this.configResourceUri().equals(resourceUri)) {
            this.log.info("Kafka configuration resource " + resourceUri + " was updated. Going to initialize with new configuration");
            this.initializeKafkaConfiguration(resource);
        }
    }

    public void resourceRemoved(String resourceUri) {
        if (this.configResourceUri() != null && this.configResourceUri().equals(resourceUri)) {
            this.log.info("Kafka configuration resource " + resourceUri + " was removed. Going to close all kafka producers");
            this.repository.closeAll().setHandler(event -> {
                this.initialized = false;
            });
        }
    }

    private void respondWith(StatusCode statusCode, String responseMessage, HttpServerRequest request) {
        ResponseStatusCodeLogUtil.info((HttpServerRequest)request, (StatusCode)statusCode, KafkaHandler.class);
        if (statusCode != StatusCode.OK) {
            RequestLoggerFactory.getLogger(KafkaHandler.class, (HttpServerRequest)request).info("Response message is: {}", (Object)responseMessage);
        }
        request.response().setStatusCode(statusCode.getStatusCode());
        request.response().setStatusMessage(statusCode.getStatusMessage());
        request.response().end(responseMessage);
    }
}

