/*
 * Decompiled with CFR 0.152.
 */
package cn.ponfee.scheduler.worker.base;

import cn.ponfee.scheduler.common.base.Startable;
import cn.ponfee.scheduler.common.base.TimingWheel;
import cn.ponfee.scheduler.common.concurrent.NamedThreadFactory;
import cn.ponfee.scheduler.common.concurrent.ThreadPoolExecutors;
import cn.ponfee.scheduler.common.exception.Throwables;
import cn.ponfee.scheduler.common.util.Jsons;
import cn.ponfee.scheduler.core.base.Supervisor;
import cn.ponfee.scheduler.core.base.SupervisorService;
import cn.ponfee.scheduler.core.base.Worker;
import cn.ponfee.scheduler.core.enums.RouteStrategy;
import cn.ponfee.scheduler.core.param.ExecuteTaskParam;
import cn.ponfee.scheduler.core.param.TaskWorkerParam;
import cn.ponfee.scheduler.registry.Discovery;
import cn.ponfee.scheduler.worker.base.WorkerThreadPool;
import com.google.common.collect.Lists;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RotatingTimingWheel
implements Startable {
    private static final Logger LOG = LoggerFactory.getLogger(RotatingTimingWheel.class);
    private static final int LOG_ROUND_COUNT = 1000;
    private final Worker currentWorker;
    private final SupervisorService supervisorServiceClient;
    private final Discovery<Supervisor> discoverySupervisor;
    private final TimingWheel<ExecuteTaskParam> timingWheel;
    private final WorkerThreadPool workerThreadPool;
    private final ScheduledExecutorService scheduledExecutor;
    private final ExecutorService updateTaskWorkerExecutor;
    private final AtomicBoolean started = new AtomicBoolean(false);
    private int round = 0;

    public RotatingTimingWheel(Worker currentWorker, SupervisorService supervisorServiceClient, Discovery<Supervisor> discoverySupervisor, TimingWheel<ExecuteTaskParam> timingWheel, WorkerThreadPool threadPool, int updateTaskWorkerThreadPoolSize) {
        this.currentWorker = currentWorker;
        this.supervisorServiceClient = supervisorServiceClient;
        this.discoverySupervisor = discoverySupervisor;
        this.timingWheel = timingWheel;
        this.workerThreadPool = threadPool;
        this.scheduledExecutor = new ScheduledThreadPoolExecutor(1, r -> {
            Thread thread = new Thread(r, "rotating_timing_wheel");
            thread.setDaemon(true);
            thread.setPriority(10);
            return thread;
        });
        int poolSize = Math.max(1, updateTaskWorkerThreadPoolSize);
        this.updateTaskWorkerExecutor = ThreadPoolExecutors.builder().corePoolSize(poolSize).maximumPoolSize(poolSize).workQueue(new LinkedBlockingQueue(Integer.MAX_VALUE)).keepAliveTimeSeconds(300L).threadFactory((ThreadFactory)NamedThreadFactory.builder().prefix("update_task_worker").build()).prestartCoreThreadType(ThreadPoolExecutors.PrestartCoreThreadType.ONE).build();
    }

    public void start() {
        if (!this.started.compareAndSet(false, true)) {
            LOG.warn("Repeat do start rotating timing wheel.");
            return;
        }
        this.scheduledExecutor.scheduleAtFixedRate(this::process, this.timingWheel.getTickMs(), this.timingWheel.getTickMs(), TimeUnit.MILLISECONDS);
    }

    private void process() {
        if (++this.round == 1000) {
            this.round = 0;
            LOG.info("worker-thread-pool: {}, jvm-active-count: {}", (Object)this.workerThreadPool, (Object)Thread.activeCount());
        }
        if (!this.discoverySupervisor.hasDiscoveredServers()) {
            if ((this.round & 0x1F) == 0) {
                LOG.warn("Not found available supervisor.");
            }
            return;
        }
        List ringTriggers = this.timingWheel.poll();
        if (ringTriggers.isEmpty()) {
            return;
        }
        List matchedTriggers = ringTriggers.stream().filter(e -> {
            if (this.currentWorker.equalsGroup(e.getWorker())) {
                return true;
            }
            LOG.error("The current worker '{}' cannot match expect worker '{}'", (Object)this.currentWorker, (Object)e.getWorker());
            return false;
        }).collect(Collectors.toList());
        if (matchedTriggers.isEmpty()) {
            return;
        }
        this.updateTaskWorkerExecutor.execute(() -> {
            List partition = Lists.partition((List)matchedTriggers, (int)200);
            for (List batchTriggers : partition) {
                List list = batchTriggers.stream().filter(e -> e.getRouteStrategy() != RouteStrategy.BROADCAST).map(e -> new TaskWorkerParam(Long.valueOf(e.getTaskId()), e.getWorker().serialize())).collect(Collectors.toList());
                try {
                    this.supervisorServiceClient.updateTaskWorker(list);
                }
                catch (Exception e2) {
                    LOG.error("Update task worker error: " + Jsons.toJson(list), (Throwable)e2);
                }
                batchTriggers.forEach(this.workerThreadPool::submit);
            }
        });
    }

    public void stop() {
        Throwables.caught(() -> ThreadPoolExecutors.shutdown((ExecutorService)this.scheduledExecutor, (int)3));
        Throwables.caught(() -> ThreadPoolExecutors.shutdown((ExecutorService)this.updateTaskWorkerExecutor, (int)3));
    }
}

