/*
 * Decompiled with CFR 0.152.
 */
package org.swisspush.gateleen.queue.queuing;

import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.Message;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.json.JsonObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.swisspush.gateleen.core.http.HttpRequest;
import org.swisspush.gateleen.core.http.RequestLoggerFactory;
import org.swisspush.gateleen.core.util.Address;
import org.swisspush.gateleen.core.util.StatusCode;
import org.swisspush.gateleen.monitoring.MonitoringHandler;
import org.swisspush.gateleen.queue.expiry.ExpiryCheckHandler;
import org.swisspush.redisques.util.RedisquesAPI;

public class QueueProcessor {
    private HttpClient httpClient;

    public QueueProcessor(final Vertx vertx, final HttpClient httpClient, final MonitoringHandler monitoringHandler) {
        this.httpClient = httpClient;
        vertx.eventBus().localConsumer(Address.queueProcessorAddress(), (Handler)new Handler<Message<JsonObject>>(){

            public void handle(final Message<JsonObject> message) {
                HttpRequest queuedRequestTry = null;
                try {
                    queuedRequestTry = new HttpRequest(new JsonObject(((JsonObject)message.body()).getString("payload")));
                }
                catch (Exception exception) {
                    LoggerFactory.getLogger(QueueProcessor.class).error("QUEUE_ERROR: Could not build request: " + ((JsonObject)message.body()).toString() + " error is " + exception.getMessage());
                    message.reply((Object)new JsonObject().put("status", "error").put("message", exception.getMessage()));
                    return;
                }
                final HttpRequest queuedRequest = queuedRequestTry;
                final Logger logger = RequestLoggerFactory.getLogger(QueueProcessor.class, (MultiMap)queuedRequest.getHeaders());
                if (logger.isTraceEnabled()) {
                    logger.trace("process message: " + message);
                }
                vertx.eventBus().send(Address.redisquesAddress(), (Object)RedisquesAPI.buildGetLockOperation((String)((JsonObject)message.body()).getString("queue")), (Handler)new Handler<AsyncResult<Message<JsonObject>>>(){

                    public void handle(AsyncResult<Message<JsonObject>> reply) {
                        if ("No such lock".equals(((JsonObject)((Message)reply.result()).body()).getString("status"))) {
                            logger.debug("performing request " + queuedRequest.getMethod() + " " + queuedRequest.getUri());
                            if (ExpiryCheckHandler.isExpired(queuedRequest)) {
                                logger.debug("request expired to " + queuedRequest.getUri());
                                message.reply((Object)new JsonObject().put("status", "ok"));
                                return;
                            }
                            HttpClientRequest request = httpClient.request(queuedRequest.getMethod(), queuedRequest.getUri(), response -> {
                                if (logger.isTraceEnabled()) {
                                    logger.trace("response: " + response.statusCode());
                                }
                                if (response.statusCode() >= 200 && response.statusCode() < 300 || response.statusCode() == 409) {
                                    if (response.statusCode() != StatusCode.CONFLICT.getStatusCode()) {
                                        logger.debug("Successful request to " + queuedRequest.getUri());
                                    } else {
                                        logger.warn("Ignoring request conflict to " + queuedRequest.getUri() + ": " + response.statusCode() + " " + response.statusMessage());
                                    }
                                    message.reply((Object)new JsonObject().put("status", "ok"));
                                    monitoringHandler.updateDequeue();
                                } else {
                                    logger.error("QUEUE_ERROR: Failed request to " + queuedRequest.getUri() + ": " + response.statusCode() + " " + response.statusMessage());
                                    message.reply((Object)new JsonObject().put("status", "error").put("message", response.statusCode() + " " + response.statusMessage()));
                                }
                                response.bodyHandler(event -> logger.debug("Discarding backend body"));
                                response.endHandler(event -> logger.debug("Backend response end"));
                                response.exceptionHandler(exception -> {
                                    logger.warn("QUEUE_ERROR: Exception on response from " + queuedRequest.getUri() + ": " + exception.getMessage());
                                    message.reply((Object)new JsonObject().put("status", "error").put("message", exception.getMessage()));
                                });
                            });
                            if (queuedRequest.getHeaders() != null && !queuedRequest.getHeaders().isEmpty()) {
                                request.headers().setAll(queuedRequest.getHeaders());
                            }
                            request.exceptionHandler(exception -> {
                                logger.warn("QUEUE_ERROR: Failed request to " + queuedRequest.getUri() + ": " + exception.getMessage());
                                message.reply((Object)new JsonObject().put("status", "error").put("message", exception.getMessage()));
                            });
                            request.setTimeout(120000L);
                            if (queuedRequest.getPayload() != null) {
                                request.end(Buffer.buffer((byte[])queuedRequest.getPayload()));
                            } else {
                                request.end();
                            }
                        } else {
                            logger.warn("Queue {} is locked!", (Object)((JsonObject)message.body()).getString("queue"));
                            message.reply((Object)new JsonObject().put("status", "error").put("message", "queue locked"));
                        }
                    }
                });
            }
        });
    }

    public HttpClient getHttpClient() {
        return this.httpClient;
    }
}

