/*
 * Decompiled with CFR 0.152.
 */
package org.swisspush.gateleen.hook.reducedpropagation;

import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.Message;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.json.JsonObject;
import io.vertx.redis.client.Response;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.swisspush.gateleen.core.http.HttpRequest;
import org.swisspush.gateleen.core.lock.Lock;
import org.swisspush.gateleen.core.util.Address;
import org.swisspush.gateleen.core.util.HttpRequestHeader;
import org.swisspush.gateleen.core.util.LockUtil;
import org.swisspush.gateleen.core.util.StringUtils;
import org.swisspush.gateleen.hook.reducedpropagation.ReducedPropagationStorage;
import org.swisspush.gateleen.queue.queuing.RequestQueue;

public class ReducedPropagationManager {
    private Vertx vertx;
    private final ReducedPropagationStorage storage;
    private final RequestQueue requestQueue;
    private Lock lock;
    public static final String PROCESS_EXPIRED_QUEUES_LOCK = "reducedPropagationProcExpQueuesLock";
    public static final String LOCK_REQUESTER = "ReducedPropagationManager";
    public static final String PROCESSOR_ADDRESS = "gateleen.hook-expired-queues-processor";
    public static final String MANAGER_QUEUE_PREFIX = "manager_";
    private static final int MAX_QUEUE_RETRY_COUNT = 50;
    private long processExpiredQueuesTimerId = -1L;
    private Map<String, Integer> failedQueueRetries = new HashMap<String, Integer>();
    private Random random = new Random();
    private Logger log = LoggerFactory.getLogger(ReducedPropagationManager.class);

    public ReducedPropagationManager(Vertx vertx, ReducedPropagationStorage storage, RequestQueue requestQueue, Lock lock) {
        this.vertx = vertx;
        this.storage = storage;
        this.requestQueue = requestQueue;
        this.lock = lock;
        this.registerExpiredQueueProcessor();
    }

    public void startExpiredQueueProcessing(long intervalMs) {
        this.log.info("About to start periodic processing of expired queues with an interval of {} ms", (Object)intervalMs);
        this.vertx.cancelTimer(this.processExpiredQueuesTimerId);
        this.processExpiredQueuesTimerId = this.vertx.setPeriodic(intervalMs, event -> {
            String token = this.createToken("reducedpropagation_expired_queue_processing");
            LockUtil.acquireLock((Lock)this.lock, (String)PROCESS_EXPIRED_QUEUES_LOCK, (String)token, (long)this.getLockExpiry(intervalMs), (Logger)this.log).onComplete(lockEvent -> {
                if (lockEvent.succeeded()) {
                    if (((Boolean)lockEvent.result()).booleanValue()) {
                        this.processExpiredQueues(token);
                    }
                } else {
                    this.log.error("Could not acquire lock '{}'. Message: {}", (Object)PROCESS_EXPIRED_QUEUES_LOCK, (Object)lockEvent.cause().getMessage());
                }
            });
        });
    }

    public Promise<Void> processIncomingRequest(HttpMethod method, String targetUri, MultiMap queueHeaders, Buffer payload, String queue, long propagationIntervalMs, Handler<Void> doneHandler) {
        Promise promise = Promise.promise();
        long expireTS = System.currentTimeMillis() + propagationIntervalMs;
        this.log.debug("Going to perform a lockedEnqueue for (original) queue '{}' and eventually starting a new timer", (Object)queue);
        this.requestQueue.lockedEnqueue(new HttpRequest(method, targetUri, queueHeaders, payload.getBytes()), queue, LOCK_REQUESTER, doneHandler);
        this.storage.addQueue(queue, expireTS).onComplete(event -> {
            if (event.failed()) {
                this.log.error("starting a new timer for queue '{}' and propagationIntervalMs '{}' failed. Cause: {}", new Object[]{queue, propagationIntervalMs, event.cause()});
                promise.fail(event.cause());
                return;
            }
            if (((Boolean)event.result()).booleanValue()) {
                this.log.debug("Timer for queue '{}' with expiration at '{}' started.", (Object)queue, (Object)expireTS);
                this.storeQueueRequest(queue, method, targetUri, queueHeaders).future().onComplete(storeResult -> {
                    if (storeResult.failed()) {
                        promise.fail(storeResult.cause());
                    } else {
                        promise.complete();
                    }
                });
            } else {
                this.log.debug("Timer for queue '{}' is already running.", (Object)queue);
                promise.complete();
            }
        });
        return promise;
    }

    private Promise<Void> storeQueueRequest(String queue, HttpMethod method, String targetUri, MultiMap queueHeaders) {
        this.log.debug("Going to write the queue request for queue '{}' to the storage", (Object)queue);
        Promise promise = Promise.promise();
        MultiMap queueHeadersCopy = MultiMap.caseInsensitiveMultiMap().addAll(queueHeaders);
        if (HttpRequestHeader.containsHeader((MultiMap)queueHeadersCopy, (HttpRequestHeader)HttpRequestHeader.CONTENT_LENGTH)) {
            queueHeadersCopy.set(HttpRequestHeader.CONTENT_LENGTH.getName(), "0");
        }
        HttpRequest request = new HttpRequest(method, targetUri, queueHeadersCopy, null);
        this.storage.storeQueueRequest(queue, request.toJsonObject()).onComplete(storeResult -> {
            if (storeResult.failed()) {
                this.log.error("Storing the queue request for queue '{}' failed. Cause: {}", (Object)queue, (Object)storeResult.cause());
                promise.fail(storeResult.cause());
            } else {
                this.log.debug("Successfully stored the queue request for queue '{}'", (Object)queue);
                promise.complete();
            }
        });
        return promise;
    }

    private void processExpiredQueues(String lockToken) {
        this.log.debug("Going to process expired queues");
        this.storage.removeExpiredQueues(System.currentTimeMillis()).onComplete(event -> {
            if (event.failed()) {
                this.log.error("Going to release lock because process expired queues failed. Cause: " + event.cause());
                LockUtil.releaseLock((Lock)this.lock, (String)PROCESS_EXPIRED_QUEUES_LOCK, (String)lockToken, (Logger)this.log);
                return;
            }
            Response response = (Response)event.result();
            this.log.debug("Got {} expired queues to process", (Object)response.size());
            for (Response expiredQueue : response) {
                this.log.debug("About to notify a consumer to process expired queue '{}'", (Object)expiredQueue);
                this.vertx.eventBus().request(PROCESSOR_ADDRESS, (Object)expiredQueue.toString(), event1 -> {
                    if (event1.failed()) {
                        this.log.error("Failed to process expired queue '{}'. Cause: {}", (Object)expiredQueue, (Object)event1.cause());
                        this.handleFailedQueueRetry(expiredQueue.toString());
                    } else {
                        this.failedQueueRetries.remove(expiredQueue.toString());
                        if (!"ok".equals(((JsonObject)((Message)event1.result()).body()).getString("status"))) {
                            this.log.error("Failed to process expired queue '{}'. Message: {}", (Object)expiredQueue, (Object)((JsonObject)((Message)event1.result()).body()).getString("message"));
                        } else if (this.log.isDebugEnabled()) {
                            this.log.debug(((JsonObject)((Message)event1.result()).body()).getString("message"));
                        }
                    }
                });
            }
        });
    }

    private void handleFailedQueueRetry(String expiredQueue) {
        int failedQueueRetryCount = this.getFailedQueueRetryCount(expiredQueue);
        if (failedQueueRetryCount < 50) {
            int updatedRetryCount = failedQueueRetryCount + 1;
            this.failedQueueRetries.put(expiredQueue, updatedRetryCount);
            int randomExpiry = this.random.nextInt(28000) + 2000;
            this.log.info("Retry attempt #{} to process expired queue '{}' again in {}ms", new Object[]{updatedRetryCount, expiredQueue, randomExpiry});
            this.storage.addQueue(expiredQueue, System.currentTimeMillis() + (long)randomExpiry).onComplete(addQueueReply -> {
                if (addQueueReply.failed()) {
                    this.log.error("attempt #{} failed to add queue '{}' again for a later retry. Cause: {}", new Object[]{updatedRetryCount, expiredQueue, addQueueReply.cause()});
                }
            });
        } else {
            this.log.warn("Too many retries for expired queue '{}'. Not going to retry again", (Object)expiredQueue);
            this.failedQueueRetries.remove(expiredQueue);
        }
    }

    private int getFailedQueueRetryCount(String expiredQueue) {
        Integer retryCount = this.failedQueueRetries.get(expiredQueue);
        if (retryCount == null) {
            return 0;
        }
        return retryCount;
    }

    private long getLockExpiry(long interval) {
        if (interval <= 1L) {
            return 1L;
        }
        return interval / 2L;
    }

    private String createToken(String appendix) {
        return Address.instanceAddress() + "_" + System.currentTimeMillis() + "_" + appendix;
    }

    private void registerExpiredQueueProcessor() {
        this.vertx.eventBus().consumer(PROCESSOR_ADDRESS, event -> this.processExpiredQueue((String)event.body(), (Message<String>)event));
    }

    private void processExpiredQueue(String queue, Message<String> event) {
        this.log.debug("about to process expired queue '{}'", (Object)queue);
        if (StringUtils.isEmpty((CharSequence)queue)) {
            event.reply((Object)new JsonObject().put("status", (Object)"error").put("message", (Object)"Tried to process an expired queue without a valid queue name. Going to stop here"));
            return;
        }
        this.log.debug("get queue request for queue '{}'", (Object)queue);
        this.storage.getQueueRequest(queue).onComplete(getQueuedRequestResult -> {
            if (getQueuedRequestResult.failed()) {
                event.reply((Object)new JsonObject().put("status", (Object)"error").put("message", (Object)getQueuedRequestResult.cause().getMessage()));
            } else {
                if (getQueuedRequestResult.result() == null) {
                    event.reply((Object)new JsonObject().put("status", (Object)"error").put("message", (Object)("stored queue request for queue '" + queue + "' is null")));
                    return;
                }
                String managerQueue = MANAGER_QUEUE_PREFIX + queue;
                this.log.debug("going to delete all queue items of manager queue '{}'", (Object)managerQueue);
                this.requestQueue.deleteAllQueueItems(managerQueue, false).onComplete(managerQueueDeleteAllResult -> {
                    if (managerQueueDeleteAllResult.failed()) {
                        event.reply((Object)new JsonObject().put("status", (Object)"error").put("message", (Object)managerQueueDeleteAllResult.cause().getMessage()));
                    } else {
                        HttpRequest request = null;
                        try {
                            request = new HttpRequest((JsonObject)getQueuedRequestResult.result());
                        }
                        catch (Exception ex) {
                            event.reply((Object)new JsonObject().put("status", (Object)"error").put("message", (Object)ex.getMessage()));
                            return;
                        }
                        this.log.debug("going to enqueue into manager queue '{}'", (Object)managerQueue);
                        this.requestQueue.enqueueFuture(request, managerQueue).onComplete(enqueueResult -> {
                            if (enqueueResult.failed()) {
                                event.reply((Object)new JsonObject().put("status", (Object)"error").put("message", (Object)enqueueResult.cause().getMessage()));
                            } else {
                                this.log.debug("going to remove queue request from storage of queue '{}'", (Object)queue);
                                this.storage.removeQueueRequest(queue).onComplete(removeQueueRequestResult -> {
                                    if (removeQueueRequestResult.failed()) {
                                        this.log.error("Failed to remove request for queue '{}'. Remove it manually to remove expired data from storage", (Object)queue);
                                    }
                                    this.log.debug("going to unlock and delete all queue items of queue '{}'", (Object)queue);
                                    this.requestQueue.deleteAllQueueItems(queue, true).onComplete(deleteAllQueueItemsResult -> {
                                        if (deleteAllQueueItemsResult.succeeded()) {
                                            event.reply((Object)new JsonObject().put("status", (Object)"ok").put("message", (Object)("Successfully deleted lock and all queue items of queue " + queue)));
                                        } else {
                                            event.reply((Object)new JsonObject().put("status", (Object)"error").put("message", (Object)deleteAllQueueItemsResult.cause().getMessage()));
                                        }
                                    });
                                });
                            }
                        });
                    }
                });
            }
        });
    }
}

