/*
 * Decompiled with CFR 0.152.
 */
package org.swisspush.gateleen.kafka;

import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.kafka.client.producer.KafkaProducer;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Pattern;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.swisspush.gateleen.core.configuration.ConfigurationResourceConsumer;
import org.swisspush.gateleen.core.configuration.ConfigurationResourceManager;
import org.swisspush.gateleen.core.exception.GateleenExceptionFactory;
import org.swisspush.gateleen.core.http.RequestLoggerFactory;
import org.swisspush.gateleen.core.util.ResponseStatusCodeLogUtil;
import org.swisspush.gateleen.core.util.StatusCode;
import org.swisspush.gateleen.core.validation.ValidationResult;
import org.swisspush.gateleen.core.validation.ValidationStatus;
import org.swisspush.gateleen.kafka.KafkaConfiguration;
import org.swisspush.gateleen.kafka.KafkaConfigurationParser;
import org.swisspush.gateleen.kafka.KafkaHandlerBuilder;
import org.swisspush.gateleen.kafka.KafkaMessageSender;
import org.swisspush.gateleen.kafka.KafkaMessageValidator;
import org.swisspush.gateleen.kafka.KafkaProducerRecordBuilder;
import org.swisspush.gateleen.kafka.KafkaProducerRepository;
import org.swisspush.gateleen.kafka.KafkaTopicExtractor;
import org.swisspush.gateleen.validation.ValidationException;

public class KafkaHandler
extends ConfigurationResourceConsumer {
    private final Logger log = LoggerFactory.getLogger(KafkaHandler.class);
    private final String streamingPath;
    private final GateleenExceptionFactory exceptionFactory;
    private final KafkaProducerRepository repository;
    private final KafkaTopicExtractor topicExtractor;
    private final KafkaMessageSender kafkaMessageSender;
    private final Map<String, Object> properties;
    private final KafkaProducerRecordBuilder kafkaProducerRecordBuilder;
    private final KafkaMessageValidator kafkaMessageValidator;
    private boolean initialized = false;

    public KafkaHandler(Vertx vertx, GateleenExceptionFactory exceptionFactory, ConfigurationResourceManager configurationResourceManager, KafkaMessageValidator kafkaMessageValidator, KafkaProducerRepository repository, KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath, Map<String, Object> properties) {
        super(configurationResourceManager, configResourceUri, "gateleen_kafka_topic_configuration_schema");
        this.exceptionFactory = exceptionFactory;
        this.repository = repository;
        this.kafkaMessageValidator = kafkaMessageValidator;
        this.kafkaMessageSender = kafkaMessageSender;
        this.streamingPath = streamingPath;
        this.properties = properties;
        this.topicExtractor = new KafkaTopicExtractor(streamingPath);
        this.kafkaProducerRecordBuilder = new KafkaProducerRecordBuilder(vertx, exceptionFactory);
    }

    public static KafkaHandlerBuilder builder() {
        return new KafkaHandlerBuilder();
    }

    public Future<Void> initialize() {
        Promise promise = Promise.promise();
        this.configurationResourceManager().getRegisteredResource(this.configResourceUri()).onComplete(event -> {
            if (event.succeeded() && ((Optional)event.result()).isPresent()) {
                this.initializeKafkaConfiguration((Buffer)((Optional)event.result()).get()).onComplete(event1 -> promise.complete());
            } else {
                this.log.warn("No kafka configuration resource with uri '{}' found. Unable to setup kafka configuration correctly", (Object)this.configResourceUri());
                promise.complete();
            }
        });
        return promise.future();
    }

    public boolean isInitialized() {
        return this.initialized;
    }

    private Future<Void> initializeKafkaConfiguration(Buffer configuration) {
        Promise promise = Promise.promise();
        List<KafkaConfiguration> kafkaConfigurations = KafkaConfigurationParser.parse(configuration, this.properties);
        this.repository.closeAll().future().onComplete(event -> {
            for (KafkaConfiguration kafkaConfiguration : kafkaConfigurations) {
                this.repository.addKafkaProducer(kafkaConfiguration);
            }
            this.initialized = true;
            promise.complete();
        });
        return promise.future();
    }

    public boolean handle(HttpServerRequest request) {
        if (request.uri().startsWith(this.streamingPath)) {
            RequestLoggerFactory.getLogger(KafkaHandler.class, (HttpServerRequest)request).info("Handling {}", (Object)request.uri());
            if (HttpMethod.POST != request.method()) {
                this.respondWith(StatusCode.METHOD_NOT_ALLOWED, StatusCode.METHOD_NOT_ALLOWED.getStatusMessage(), request);
                return true;
            }
            Optional<String> optTopic = this.topicExtractor.extractTopic(request);
            if (optTopic.isEmpty()) {
                this.respondWith(StatusCode.BAD_REQUEST, "Could not extract topic from request uri", request);
                return true;
            }
            String topic = optTopic.get();
            Optional<Pair<KafkaProducer<String, String>, Pattern>> optProducer = this.repository.findMatchingKafkaProducer(topic);
            if (optProducer.isEmpty()) {
                this.respondWith(StatusCode.NOT_FOUND, "Could not find a matching producer for topic " + topic, request);
                return true;
            }
            request.bodyHandler(payload -> {
                this.log.debug("incoming kafka message payload: {}", payload);
                boolean[] isResponseSent = new boolean[]{false};
                this.kafkaProducerRecordBuilder.buildRecordsAsync(topic, (Buffer)payload).compose(kafkaProducerRecords -> {
                    Future fut = this.maybeValidate(request, (List<KafkaProducerRecord<String, String>>)kafkaProducerRecords).compose(validationEvent -> {
                        if (validationEvent.isSuccess()) {
                            this.kafkaMessageSender.sendMessages((KafkaProducer<String, String>)((KafkaProducer)((Pair)optProducer.get()).getLeft()), (List<KafkaProducerRecord<String, String>>)kafkaProducerRecords).onComplete(event -> {
                                if (event.succeeded()) {
                                    RequestLoggerFactory.getLogger(KafkaHandler.class, (HttpServerRequest)request).info("Successfully sent {} message(s) to kafka topic '{}'", (Object)kafkaProducerRecords.size(), (Object)topic);
                                    isResponseSent[0] = true;
                                    this.respondWith(StatusCode.OK, StatusCode.OK.getStatusMessage(), request);
                                } else {
                                    isResponseSent[0] = true;
                                    this.respondWith(StatusCode.INTERNAL_SERVER_ERROR, event.cause().getMessage(), request);
                                }
                            });
                        } else {
                            isResponseSent[0] = true;
                            this.respondWith(StatusCode.BAD_REQUEST, validationEvent.getMessage(), request);
                        }
                        return Future.succeededFuture();
                    });
                    assert (fut != null);
                    return fut;
                }).onFailure(ex -> {
                    if (ex instanceof ValidationException && !isResponseSent[0]) {
                        this.respondWith(StatusCode.BAD_REQUEST, ex.getMessage(), request);
                        return;
                    }
                    this.log.error("TODO error handling", (Throwable)this.exceptionFactory.newException(ex));
                    if (!isResponseSent[0]) {
                        this.respondWith(StatusCode.INTERNAL_SERVER_ERROR, ex.getMessage(), request);
                    }
                });
            });
            return true;
        }
        return false;
    }

    public void resourceChanged(String resourceUri, Buffer resource) {
        if (this.configResourceUri() != null && this.configResourceUri().equals(resourceUri)) {
            this.log.info("Kafka configuration resource {} was updated. Going to initialize with new configuration", (Object)resourceUri);
            this.initializeKafkaConfiguration(resource);
        }
    }

    public void resourceRemoved(String resourceUri) {
        if (this.configResourceUri() != null && this.configResourceUri().equals(resourceUri)) {
            this.log.info("Kafka configuration resource {} was removed. Going to close all kafka producers", (Object)resourceUri);
            this.repository.closeAll().future().onComplete(event -> {
                this.initialized = false;
            });
        }
    }

    private Future<ValidationResult> maybeValidate(HttpServerRequest request, List<KafkaProducerRecord<String, String>> kafkaProducerRecords) {
        if (this.kafkaMessageValidator != null) {
            Future<ValidationResult> fut = this.kafkaMessageValidator.validateMessages(request, kafkaProducerRecords);
            assert (fut != null);
            return fut;
        }
        Future fut = Future.succeededFuture((Object)new ValidationResult(ValidationStatus.VALIDATED_POSITIV));
        assert (fut != null);
        return fut;
    }

    private void respondWith(StatusCode statusCode, String responseMessage, HttpServerRequest request) {
        ResponseStatusCodeLogUtil.info((HttpServerRequest)request, (StatusCode)statusCode, KafkaHandler.class);
        if (statusCode != StatusCode.OK) {
            RequestLoggerFactory.getLogger(KafkaHandler.class, (HttpServerRequest)request).info("Response message is: {}", (Object)responseMessage);
        }
        request.response().setStatusCode(statusCode.getStatusCode());
        request.response().setStatusMessage(statusCode.getStatusMessage());
        request.response().end(responseMessage);
    }
}

