package com.github.sonus21.rqueue.core;

import com.github.sonus21.rqueue.config.RqueueConfig;
import com.github.sonus21.rqueue.config.RqueueSchedulerConfig;
import com.github.sonus21.rqueue.core.RedisScriptFactory;
import com.github.sonus21.rqueue.core.eventbus.RqueueEventBus;
import com.github.sonus21.rqueue.listener.QueueDetail;
import com.github.sonus21.rqueue.models.event.RqueueBootstrapEvent;
import com.github.sonus21.rqueue.utils.ThreadUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.eventbus.Subscribe;
import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import org.slf4j.Logger;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.data.redis.RedisSystemException;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultScriptExecutor;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:com/github/sonus21/rqueue/core/MessageScheduler.class */
public abstract class MessageScheduler implements DisposableBean {
    private final Object monitor = new Object();
    protected final RqueueSchedulerConfig rqueueSchedulerConfig;
    protected final RqueueConfig rqueueConfig;
    private final RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory;
    private final RedisTemplate<String, Long> redisTemplate;
    private RedisScript<Long> redisScript;
    private DefaultScriptExecutor<String> defaultScriptExecutor;
    private Map<String, Boolean> queueRunningState;
    private Map<String, ScheduledFuture<?>> queueNameToScheduledTask;
    private Map<String, Long> queueNameToNextRunTime;

    @VisibleForTesting
    protected RedisScheduleTriggerHandler redisScheduleTriggerHandler;
    private ThreadPoolTaskScheduler scheduler;
    private Map<String, Integer> errorCount;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/github/sonus21/rqueue/core/MessageScheduler$MessageMoverTask.class */
    public class MessageMoverTask implements Runnable {
        private final String id = UUID.randomUUID().toString();
        private final String name;
        private final String queueName;
        private final String zsetName;
        private final boolean processingQueue;
        private final boolean periodic;

        MessageMoverTask(QueueDetail queueDetail, String str, boolean z) {
            this.name = queueDetail.getName();
            this.queueName = queueDetail.getQueueName();
            this.zsetName = str;
            this.periodic = z;
            this.processingQueue = MessageScheduler.this.isProcessingQueue();
        }

        private long getNextScheduleTimeInternal(Long l, long j, Exception exc) {
            long nextScheduleTime;
            int i = 0;
            if (null != exc) {
                i = ((Integer) MessageScheduler.this.errorCount.getOrDefault(this.name, 0)).intValue() + 1;
                if (i % 3 == 0) {
                    MessageScheduler.this.getLogger().error("Message mover task is failing continuously queue: {}", this.name, exc);
                }
                nextScheduleTime = j + Math.min((long) (MessageScheduler.this.rqueueSchedulerConfig.minMessageMoveDelay() * Math.pow(1.5d, i)), MessageScheduler.this.rqueueSchedulerConfig.getMaxMessageMoverDelay());
            } else {
                nextScheduleTime = MessageScheduler.this.getNextScheduleTime(this.name, j, l);
            }
            MessageScheduler.this.errorCount.put(this.name, Integer.valueOf(i));
            return nextScheduleTime;
        }

        public String toString() {
            return String.format("MessageMoverTask(id=%s, queue=%s, periodic=%s)", this.id, this.name, Boolean.valueOf(this.periodic));
        }

        private long getMessageCount() {
            return MessageScheduler.this.rqueueSchedulerConfig.getMaxMessageCount();
        }

        private List<String> scriptKeys() {
            return Arrays.asList(this.queueName, this.zsetName);
        }

        private Object[] scriptArgs() {
            Object[] objArr = new Object[3];
            objArr[0] = Long.valueOf(System.currentTimeMillis());
            objArr[1] = Long.valueOf(getMessageCount());
            objArr[2] = Integer.valueOf(this.processingQueue ? 1 : 0);
            return objArr;
        }

        private boolean shouldSkip(long j) {
            Long l = (Long) MessageScheduler.this.queueNameToNextRunTime.get(this.queueName);
            return l != null && l.longValue() > j;
        }

        @Override // java.lang.Runnable
        public void run() {
            long currentTimeMillis = System.currentTimeMillis();
            if (shouldSkip(currentTimeMillis)) {
                MessageScheduler.this.getLogger().debug("Skipped {}", this);
                return;
            }
            MessageScheduler.this.getLogger().debug("Running {}", this);
            Long l = null;
            try {
                try {
                    if (MessageScheduler.this.isQueueActive(this.name)) {
                        l = (Long) MessageScheduler.this.defaultScriptExecutor.execute(MessageScheduler.this.redisScript, scriptKeys(), scriptArgs());
                    }
                } catch (RedisSystemException e) {
                    MessageScheduler.this.queueNameToNextRunTime.put(this.queueName, Long.valueOf(getNextScheduleTimeInternal(l, currentTimeMillis, e)));
                } catch (Exception e2) {
                    MessageScheduler.this.getLogger().warn("Task execution failed for the queue: {}", getName(), e2);
                    MessageScheduler.this.queueNameToNextRunTime.put(this.queueName, Long.valueOf(getNextScheduleTimeInternal(l, currentTimeMillis, e2)));
                }
            } finally {
                MessageScheduler.this.queueNameToNextRunTime.put(this.queueName, Long.valueOf(getNextScheduleTimeInternal(l, currentTimeMillis, null)));
            }
        }

        public String getName() {
            return this.name;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public MessageScheduler(RqueueSchedulerConfig rqueueSchedulerConfig, RqueueConfig rqueueConfig, RqueueEventBus rqueueEventBus, RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory, RedisTemplate<String, Long> redisTemplate) {
        this.rqueueSchedulerConfig = rqueueSchedulerConfig;
        this.rqueueConfig = rqueueConfig;
        this.rqueueRedisListenerContainerFactory = rqueueRedisListenerContainerFactory;
        this.redisTemplate = redisTemplate;
        rqueueEventBus.register(this);
    }

    protected abstract Logger getLogger();

    protected abstract long getNextScheduleTime(String str, long j, Long l);

    protected abstract String getChannelName(String str);

    protected abstract String getZsetName(String str);

    protected abstract String getThreadNamePrefix();

    protected abstract int getThreadPoolSize();

    protected Duration getPeriod() {
        long scheduledMessageTimeIntervalInMilli = this.rqueueSchedulerConfig.getScheduledMessageTimeIntervalInMilli();
        if (scheduledMessageTimeIntervalInMilli <= 0) {
            scheduledMessageTimeIntervalInMilli = 100;
        }
        return Duration.ofMillis(scheduledMessageTimeIntervalInMilli);
    }

    protected abstract boolean isProcessingQueue();

    private void doStart() {
        Iterator<String> it = this.queueRunningState.keySet().iterator();
        while (it.hasNext()) {
            startQueue(it.next());
        }
    }

    private MessageMoverTask task(String str, boolean z) {
        return new MessageMoverTask(EndpointRegistry.get(str), getZsetName(str), z);
    }

    protected void schedule(String str) {
        this.queueNameToScheduledTask.put(str, this.scheduler.scheduleAtFixedRate(task(str, true), getPeriod()));
    }

    private void startQueue(String str) {
        if (Boolean.TRUE.equals(this.queueRunningState.get(str))) {
            return;
        }
        this.queueRunningState.put(str, true);
        if (this.rqueueSchedulerConfig.isAutoStart()) {
            schedule(str);
        }
        if (isRedisEnabled()) {
            this.redisScheduleTriggerHandler.startQueue(str);
        }
    }

    private void doStop() {
        if (CollectionUtils.isEmpty(this.queueRunningState)) {
            return;
        }
        for (Map.Entry<String, Boolean> entry : this.queueRunningState.entrySet()) {
            if (Boolean.TRUE.equals(entry.getValue())) {
                stopQueue(entry.getKey());
            }
        }
        waitForRunningQueuesToStop();
        this.queueNameToScheduledTask.clear();
        if (isRedisEnabled()) {
            this.redisScheduleTriggerHandler.stop();
        }
    }

    private void waitForRunningQueuesToStop() {
        Iterator<Map.Entry<String, Boolean>> it = this.queueRunningState.entrySet().iterator();
        while (it.hasNext()) {
            String key = it.next().getKey();
            ThreadUtils.waitForTermination(getLogger(), this.queueNameToScheduledTask.get(key), this.rqueueSchedulerConfig.getTerminationWaitTime(), "An exception occurred while stopping scheduler queue '{}'", key);
        }
    }

    private void stopQueue(String str) {
        Assert.isTrue(this.queueRunningState.containsKey(str), "Queue with name '" + str + "' does not exist");
        this.queueRunningState.put(str, false);
    }

    private boolean isRedisEnabled() {
        return this.rqueueSchedulerConfig.isRedisEnabled();
    }

    public void destroy() throws Exception {
        synchronized (this.monitor) {
            doStop();
            if (this.scheduler != null) {
                this.scheduler.destroy();
            }
            this.monitor.notifyAll();
        }
    }

    private void createScheduler(int i) {
        if (i == 0) {
            return;
        }
        this.scheduler = ThreadUtils.createTaskScheduler(Math.min(getThreadPoolSize(), i), getThreadNamePrefix(), 60);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isQueueActive(String str) {
        Boolean bool = this.queueRunningState.get(str);
        if (bool == null) {
            return false;
        }
        return bool.booleanValue();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void initialize() {
        List<String> activeQueues = EndpointRegistry.getActiveQueues();
        this.defaultScriptExecutor = new DefaultScriptExecutor<>(this.redisTemplate);
        this.redisScript = RedisScriptFactory.getScript(RedisScriptFactory.ScriptType.MOVE_EXPIRED_MESSAGE);
        this.queueRunningState = new ConcurrentHashMap(activeQueues.size());
        this.queueNameToScheduledTask = new ConcurrentHashMap(activeQueues.size());
        this.queueNameToNextRunTime = new ConcurrentHashMap(activeQueues.size());
        this.errorCount = new ConcurrentHashMap(activeQueues.size());
        createScheduler(activeQueues.size());
        Iterator<String> it = activeQueues.iterator();
        while (it.hasNext()) {
            initQueue(it.next());
        }
        if (isRedisEnabled()) {
            this.redisScheduleTriggerHandler = new RedisScheduleTriggerHandler(getLogger(), this.rqueueRedisListenerContainerFactory, this.rqueueSchedulerConfig, activeQueues, this::addTask, this::getChannelName);
            this.redisScheduleTriggerHandler.initialize();
        }
    }

    private void initQueue(String str) {
        this.queueRunningState.put(str, false);
    }

    @Subscribe
    public void onApplicationEvent(RqueueBootstrapEvent rqueueBootstrapEvent) {
        getLogger().info("{} Even received", rqueueBootstrapEvent);
        synchronized (this.monitor) {
            doStop();
            if (!this.rqueueSchedulerConfig.isEnabled()) {
                getLogger().debug("Scheduler is not enabled");
                return;
            }
            if (this.rqueueConfig.isProducer()) {
                getLogger().debug("Producer mode");
                return;
            }
            if (rqueueBootstrapEvent.isStartup()) {
                if (EndpointRegistry.getActiveQueueCount() == 0) {
                    getLogger().warn("No queues are configured");
                    return;
                } else {
                    initialize();
                    doStart();
                }
            }
            this.monitor.notifyAll();
        }
    }

    protected Future<?> addTask(String str) {
        return this.scheduler.submit(task(str, false));
    }
}
