/*
 * Decompiled with CFR 0.152.
 */
package org.swisspush.gateleen.queue.queuing;

import io.vertx.core.MultiMap;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.json.JsonObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.swisspush.gateleen.core.http.HttpRequest;
import org.swisspush.gateleen.core.http.RequestLoggerFactory;
import org.swisspush.gateleen.core.util.Address;
import org.swisspush.gateleen.core.util.StatusCode;
import org.swisspush.gateleen.monitoring.MonitoringHandler;
import org.swisspush.gateleen.queue.expiry.ExpiryCheckHandler;
import org.swisspush.gateleen.queue.queuing.circuitbreaker.QueueCircuitBreaker;
import org.swisspush.gateleen.queue.queuing.circuitbreaker.util.QueueCircuitState;
import org.swisspush.gateleen.queue.queuing.circuitbreaker.util.QueueResponseType;

public class QueueProcessor {
    private Vertx vertx;
    private HttpClient httpClient;
    private MonitoringHandler monitoringHandler;
    private QueueCircuitBreaker queueCircuitBreaker;
    private MessageConsumer<JsonObject> consumer;
    private Logger log = LoggerFactory.getLogger(QueueProcessor.class);

    public QueueProcessor(Vertx vertx, HttpClient httpClient, MonitoringHandler monitoringHandler) {
        this(vertx, httpClient, monitoringHandler, null);
    }

    public QueueProcessor(Vertx vertx, HttpClient httpClient, MonitoringHandler monitoringHandler, QueueCircuitBreaker queueCircuitBreaker) {
        this(vertx, httpClient, monitoringHandler, queueCircuitBreaker, true);
    }

    public QueueProcessor(Vertx vertx, HttpClient httpClient, MonitoringHandler monitoringHandler, QueueCircuitBreaker queueCircuitBreaker, boolean immediatelyStartQueueProcessing) {
        this.vertx = vertx;
        this.httpClient = httpClient;
        this.monitoringHandler = monitoringHandler;
        this.queueCircuitBreaker = queueCircuitBreaker;
        if (immediatelyStartQueueProcessing) {
            this.startQueueProcessing();
        } else {
            this.log.info("initialized QueueProcessor but queue processing has disabled");
        }
    }

    public void startQueueProcessing() {
        if (this.consumer == null || !this.consumer.isRegistered()) {
            this.log.info("about to register consumer to start queue processing");
            this.consumer = this.vertx.eventBus().consumer(this.getQueueProcessorAddress(), message -> {
                HttpRequest queuedRequestTry = null;
                JsonObject jsonRequest = new JsonObject(((JsonObject)message.body()).getString("payload"));
                try {
                    queuedRequestTry = new HttpRequest(jsonRequest);
                }
                catch (Exception exception) {
                    this.log.error("Could not build request: " + ((JsonObject)message.body()).toString() + " error is " + exception.getMessage());
                    message.reply((Object)new JsonObject().put("status", "error").put("message", exception.getMessage()));
                    return;
                }
                HttpRequest queuedRequest = queuedRequestTry;
                Logger logger = RequestLoggerFactory.getLogger(QueueProcessor.class, (MultiMap)queuedRequest.getHeaders());
                if (logger.isTraceEnabled()) {
                    logger.trace("process message: " + message);
                }
                String queueName = ((JsonObject)message.body()).getString("queue");
                if (!this.isCircuitCheckEnabled()) {
                    this.executeQueuedRequest((Message<JsonObject>)message, logger, queuedRequest, jsonRequest, queueName, null);
                } else {
                    this.queueCircuitBreaker.handleQueuedRequest(queueName, queuedRequest).setHandler(event -> {
                        if (event.failed()) {
                            String msg = "Error in QueueCircuitBreaker occurred for queue " + queueName + ". Reply with status ERROR. Message is: " + event.cause().getMessage();
                            logger.error(msg);
                            message.reply((Object)new JsonObject().put("status", "error").put("message", msg));
                            return;
                        }
                        QueueCircuitState state = (QueueCircuitState)((Object)((Object)((Object)event.result())));
                        if (QueueCircuitState.OPEN == state) {
                            message.reply((Object)new JsonObject().put("status", "error").put("message", "Circuit for queue " + queueName + " is " + (Object)((Object)state) + ". Queues using this endpoint are not allowed to be executed right now"));
                        } else {
                            this.executeQueuedRequest((Message<JsonObject>)message, logger, queuedRequest, jsonRequest, queueName, state);
                        }
                    });
                }
            });
            this.log.info("registered queue processing consumer on address: " + this.consumer.address());
        } else {
            this.log.info("queue processing is already started");
        }
    }

    public void stopQueueProcessing() {
        if (this.consumer != null && this.consumer.isRegistered()) {
            this.log.info("about to unregister consumer to stop queue processing");
            this.consumer.unregister();
        } else {
            this.log.info("queue processing is already stopped");
        }
    }

    public boolean isQueueProcessingStarted() {
        return this.consumer != null && this.consumer.isRegistered();
    }

    public String getQueueProcessorAddress() {
        return Address.queueProcessorAddress();
    }

    private boolean isCircuitCheckEnabled() {
        return this.queueCircuitBreaker != null && this.queueCircuitBreaker.isCircuitCheckEnabled();
    }

    private boolean isStatisticsUpdateEnabled() {
        return this.queueCircuitBreaker != null && this.queueCircuitBreaker.isStatisticsUpdateEnabled();
    }

    public static boolean httpMethodIsQueueable(HttpMethod method) {
        boolean result;
        switch (method) {
            case GET: 
            case HEAD: 
            case PUT: 
            case POST: 
            case DELETE: 
            case OPTIONS: 
            case PATCH: {
                result = true;
                break;
            }
            default: {
                result = false;
            }
        }
        return result;
    }

    private void performCircuitBreakerActions(String queueName, HttpRequest queuedRequest, QueueResponseType queueResponseType, QueueCircuitState state) {
        this.updateCircuitBreakerStatistics(queueName, queuedRequest, queueResponseType, state);
        if (QueueCircuitState.HALF_OPEN == state) {
            if (QueueResponseType.SUCCESS == queueResponseType) {
                this.closeCircuit(queuedRequest);
            } else if (QueueResponseType.FAILURE == queueResponseType) {
                this.reOpenCircuit(queuedRequest);
            }
        }
    }

    private void updateCircuitBreakerStatistics(String queueName, HttpRequest queuedRequest, QueueResponseType queueResponseType, QueueCircuitState state) {
        if (this.isStatisticsUpdateEnabled() && QueueCircuitState.OPEN != state) {
            this.queueCircuitBreaker.updateStatistics(queueName, queuedRequest, queueResponseType).setHandler(event -> {
                if (event.failed()) {
                    String message = "failed to update statistics for queue '" + queueName + "' to uri " + queuedRequest.getUri() + ". Message is: " + event.cause().getMessage();
                    RequestLoggerFactory.getLogger(QueueProcessor.class, (MultiMap)queuedRequest.getHeaders()).warn(message);
                }
            });
        }
    }

    private void closeCircuit(HttpRequest queuedRequest) {
        if (this.queueCircuitBreaker != null) {
            this.queueCircuitBreaker.closeCircuit(queuedRequest).setHandler(event -> {
                if (event.failed()) {
                    String message = "failed to close circuit " + queuedRequest.getUri() + ". Message is: " + event.cause().getMessage();
                    RequestLoggerFactory.getLogger(QueueProcessor.class, (MultiMap)queuedRequest.getHeaders()).error(message);
                }
            });
        }
    }

    private void reOpenCircuit(HttpRequest queuedRequest) {
        if (this.queueCircuitBreaker != null) {
            this.queueCircuitBreaker.reOpenCircuit(queuedRequest).setHandler(event -> {
                if (event.failed()) {
                    String message = "failed to re-open circuit " + queuedRequest.getUri() + ". Message is: " + event.cause().getMessage();
                    RequestLoggerFactory.getLogger(QueueProcessor.class, (MultiMap)queuedRequest.getHeaders()).error(message);
                }
            });
        }
    }

    private void executeQueuedRequest(Message<JsonObject> message, Logger logger, HttpRequest queuedRequest, JsonObject jsonRequest, String queueName, QueueCircuitState state) {
        logger.debug("performing request " + queuedRequest.getMethod() + " " + queuedRequest.getUri());
        if (ExpiryCheckHandler.isExpired(queuedRequest.getHeaders(), jsonRequest.getLong("queueTimestamp"))) {
            logger.debug("request expired to " + queuedRequest.getUri());
            message.reply((Object)new JsonObject().put("status", "ok"));
            return;
        }
        HttpClientRequest request = this.httpClient.request(queuedRequest.getMethod(), queuedRequest.getUri(), response -> {
            if (logger.isTraceEnabled()) {
                logger.trace("response: " + response.statusCode());
            }
            if (response.statusCode() >= 200 && response.statusCode() < 300 || response.statusCode() == 409) {
                if (response.statusCode() != StatusCode.CONFLICT.getStatusCode()) {
                    logger.debug("Successful request to " + queuedRequest.getUri());
                } else {
                    logger.warn("Ignoring request conflict to " + queuedRequest.getUri() + ": " + response.statusCode() + " " + response.statusMessage());
                }
                message.reply((Object)new JsonObject().put("status", "ok"));
                this.performCircuitBreakerActions(queueName, queuedRequest, QueueResponseType.SUCCESS, state);
                this.monitoringHandler.updateDequeue();
            } else {
                logger.info("Failed queued request to " + queuedRequest.getUri() + ": " + response.statusCode() + " " + response.statusMessage());
                message.reply((Object)new JsonObject().put("status", "error").put("message", response.statusCode() + " " + response.statusMessage()));
                this.performCircuitBreakerActions(queueName, queuedRequest, QueueResponseType.FAILURE, state);
            }
            response.bodyHandler(event -> logger.debug("Discarding backend body"));
            response.endHandler(event -> logger.debug("Backend response end"));
            response.exceptionHandler(exception -> {
                logger.warn("Exception on response from " + queuedRequest.getUri() + ": " + exception.getMessage());
                message.reply((Object)new JsonObject().put("status", "error").put("message", exception.getMessage()));
                this.performCircuitBreakerActions(queueName, queuedRequest, QueueResponseType.FAILURE, state);
            });
        });
        if (queuedRequest.getHeaders() != null && !queuedRequest.getHeaders().isEmpty()) {
            request.headers().setAll(queuedRequest.getHeaders());
        }
        request.exceptionHandler(exception -> {
            logger.warn("Failed request to " + queuedRequest.getUri() + ": " + exception.getMessage());
            message.reply((Object)new JsonObject().put("status", "error").put("message", exception.getMessage()));
            this.performCircuitBreakerActions(queueName, queuedRequest, QueueResponseType.FAILURE, state);
        });
        request.setTimeout(120000L);
        if (queuedRequest.getPayload() != null) {
            request.end(Buffer.buffer((byte[])queuedRequest.getPayload()));
        } else {
            request.end();
        }
    }

    public HttpClient getHttpClient() {
        return this.httpClient;
    }
}

