/*
 * Decompiled with CFR 0.152.
 */
package org.swisspush.gateleen.hook.reducedpropagation;

import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.Message;
import io.vertx.core.http.CaseInsensitiveHeaders;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.json.JsonObject;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.swisspush.gateleen.core.http.HttpRequest;
import org.swisspush.gateleen.core.util.HttpRequestHeader;
import org.swisspush.gateleen.core.util.StringUtils;
import org.swisspush.gateleen.hook.reducedpropagation.ReducedPropagationStorage;
import org.swisspush.gateleen.queue.queuing.RequestQueue;

public class ReducedPropagationManager {
    private Vertx vertx;
    private final ReducedPropagationStorage storage;
    private final RequestQueue requestQueue;
    public static final String LOCK_REQUESTER = "ReducedPropagationManager";
    public static final String PROCESSOR_ADDRESS = "gateleen.hook-expired-queues-processor";
    public static final String MANAGER_QUEUE_PREFIX = "manager_";
    private long processExpiredQueuesTimerId = -1L;
    private Logger log = LoggerFactory.getLogger(ReducedPropagationManager.class);

    public ReducedPropagationManager(Vertx vertx, ReducedPropagationStorage storage, RequestQueue requestQueue) {
        this.vertx = vertx;
        this.storage = storage;
        this.requestQueue = requestQueue;
        this.registerExpiredQueueProcessor();
    }

    public void startExpiredQueueProcessing(long intervalMs) {
        this.log.info("About to start periodic processing of expired queues with an interval of " + intervalMs + " ms");
        this.vertx.cancelTimer(this.processExpiredQueuesTimerId);
        this.processExpiredQueuesTimerId = this.vertx.setPeriodic(intervalMs, event -> this.processExpiredQueues());
    }

    public Future<Void> processIncomingRequest(HttpMethod method, String targetUri, MultiMap queueHeaders, Buffer payload, String queue, long propagationIntervalMs, Handler<Void> doneHandler) {
        Future future = Future.future();
        long expireTS = System.currentTimeMillis() + propagationIntervalMs;
        this.log.info("Going to perform a lockedEnqueue for (original) queue '" + queue + "' and eventually starting a new timer");
        this.requestQueue.lockedEnqueue(new HttpRequest(method, targetUri, queueHeaders, payload.getBytes()), queue, LOCK_REQUESTER, doneHandler);
        this.storage.addQueue(queue, expireTS).setHandler(event -> {
            if (event.failed()) {
                this.log.error("starting a new timer for queue '" + queue + "' and propagationIntervalMs '" + propagationIntervalMs + "' failed. Cause: " + event.cause());
                future.fail(event.cause());
                return;
            }
            if (((Boolean)event.result()).booleanValue()) {
                this.log.info("Timer for queue '" + queue + "' with expiration at '" + expireTS + "' started.");
                this.storeQueueRequest(queue, method, targetUri, queueHeaders).setHandler(storeResult -> {
                    if (storeResult.failed()) {
                        future.fail(storeResult.cause());
                    } else {
                        future.complete();
                    }
                });
            } else {
                this.log.info("Timer for queue '" + queue + "' is already running.");
                future.complete();
            }
        });
        return future;
    }

    private Future<Void> storeQueueRequest(String queue, HttpMethod method, String targetUri, MultiMap queueHeaders) {
        this.log.info("Going to write the queue request for queue '" + queue + "' to the storage");
        Future future = Future.future();
        MultiMap queueHeadersCopy = new CaseInsensitiveHeaders().addAll(queueHeaders);
        if (HttpRequestHeader.containsHeader((MultiMap)queueHeadersCopy, (HttpRequestHeader)HttpRequestHeader.CONTENT_LENGTH)) {
            queueHeadersCopy.set(HttpRequestHeader.CONTENT_LENGTH.getName(), "0");
        }
        HttpRequest request = new HttpRequest(method, targetUri, queueHeadersCopy, null);
        this.storage.storeQueueRequest(queue, request.toJsonObject()).setHandler(storeResult -> {
            if (storeResult.failed()) {
                this.log.error("Storing the queue request for queue '" + queue + "' failed. Cause: " + storeResult.cause());
                future.fail(storeResult.cause());
            } else {
                this.log.info("Successfully stored the queue request for queue '" + queue + "'");
                future.complete();
            }
        });
        return future;
    }

    private void processExpiredQueues() {
        this.log.debug("Going to process expired queues");
        this.storage.removeExpiredQueues(System.currentTimeMillis()).setHandler(event -> {
            if (event.failed()) {
                this.log.error("Failed to process expired queues. Cause: " + event.cause());
                return;
            }
            List expiredQueues = (List)event.result();
            this.log.debug("Got " + expiredQueues.size() + " expired queues to process");
            for (String expiredQueue : expiredQueues) {
                this.log.info("About to notify a consumer to process expired queue '" + expiredQueue + "'");
                this.vertx.eventBus().send(PROCESSOR_ADDRESS, (Object)expiredQueue, event1 -> {
                    if (!"ok".equals(((JsonObject)((Message)event1.result()).body()).getString("status"))) {
                        this.log.error("Failed to process expired queue. Message: " + ((JsonObject)((Message)event1.result()).body()).getString("message"));
                    } else {
                        this.log.info(((JsonObject)((Message)event1.result()).body()).getString("message"));
                    }
                });
            }
        });
    }

    private void registerExpiredQueueProcessor() {
        this.vertx.eventBus().consumer(PROCESSOR_ADDRESS, event -> this.processExpiredQueue((String)event.body(), (Message<String>)event));
    }

    private void processExpiredQueue(String queue, Message<String> event) {
        if (StringUtils.isEmpty((CharSequence)queue)) {
            event.reply((Object)new JsonObject().put("status", "error").put("message", "Tried to process an expired queue without a valid queue name. Going to stop here"));
            return;
        }
        this.storage.getQueueRequest(queue).setHandler(getQueuedRequestResult -> {
            if (getQueuedRequestResult.failed()) {
                event.reply((Object)new JsonObject().put("status", "error").put("message", getQueuedRequestResult.cause().getMessage()));
            } else {
                if (getQueuedRequestResult.result() == null) {
                    event.reply((Object)new JsonObject().put("status", "error").put("message", "stored queue request for queue '" + queue + "' is null"));
                    return;
                }
                String managerQueue = MANAGER_QUEUE_PREFIX + queue;
                this.requestQueue.deleteAllQueueItems(managerQueue, false).setHandler(managerQueueDeleteAllResult -> {
                    if (managerQueueDeleteAllResult.failed()) {
                        event.reply((Object)new JsonObject().put("status", "error").put("message", managerQueueDeleteAllResult.cause().getMessage()));
                    } else {
                        HttpRequest request = null;
                        try {
                            request = new HttpRequest((JsonObject)getQueuedRequestResult.result());
                        }
                        catch (Exception ex) {
                            event.reply((Object)new JsonObject().put("status", "error").put("message", ex.getMessage()));
                            return;
                        }
                        this.requestQueue.enqueueFuture(request, managerQueue).setHandler(enqueueResult -> {
                            if (enqueueResult.failed()) {
                                event.reply((Object)new JsonObject().put("status", "error").put("message", enqueueResult.cause().getMessage()));
                            } else {
                                this.storage.removeQueueRequest(queue).setHandler(removeQueueRequestResult -> {
                                    if (removeQueueRequestResult.failed()) {
                                        this.log.error("Failed to remove request for queue '" + queue + "'. Remove it manually to remove expired data from storage");
                                    }
                                    this.requestQueue.deleteAllQueueItems(queue, true).setHandler(deleteAllQueueItemsResult -> {
                                        if (deleteAllQueueItemsResult.succeeded()) {
                                            event.reply((Object)new JsonObject().put("status", "ok").put("message", "Successfully deleted lock and all queue items of queue " + queue));
                                        } else {
                                            event.reply((Object)new JsonObject().put("status", "error").put("message", deleteAllQueueItemsResult.cause().getMessage()));
                                        }
                                    });
                                });
                            }
                        });
                    }
                });
            }
        });
    }
}

