/*
 * Decompiled with CFR 0.152.
 */
package cn.ponfee.scheduler.worker.thread;

import cn.ponfee.scheduler.common.base.TimingWheel;
import cn.ponfee.scheduler.common.util.Jsons;
import cn.ponfee.scheduler.core.base.AbstractHeartbeatThread;
import cn.ponfee.scheduler.core.base.Supervisor;
import cn.ponfee.scheduler.core.base.SupervisorService;
import cn.ponfee.scheduler.core.base.Worker;
import cn.ponfee.scheduler.core.param.ExecuteParam;
import cn.ponfee.scheduler.registry.Discovery;
import cn.ponfee.scheduler.worker.base.WorkerThreadPool;
import com.google.common.collect.Lists;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;

public class RotatingTimingWheelThread
extends AbstractHeartbeatThread {
    private static final int LOG_ROUND = 1000;
    private final Worker currentWorker;
    private final SupervisorService supervisorServiceClient;
    private final Discovery<Supervisor> discoverySupervisor;
    private final TimingWheel<ExecuteParam> timingWheel;
    private final WorkerThreadPool workerThreadPool;
    private int round = 0;

    public RotatingTimingWheelThread(Worker currentWorker, SupervisorService supervisorServiceClient, Discovery<Supervisor> discoverySupervisor, TimingWheel<ExecuteParam> timingWheel, WorkerThreadPool threadPool) {
        super(timingWheel.getTickMs());
        this.currentWorker = currentWorker;
        this.supervisorServiceClient = supervisorServiceClient;
        this.discoverySupervisor = discoverySupervisor;
        this.timingWheel = timingWheel;
        this.workerThreadPool = threadPool;
    }

    protected boolean heartbeat() {
        if (++this.round == 1000) {
            this.round = 0;
            this.log.warn("worker-thread-pool: {}, jvm-active-count: {}", (Object)this.workerThreadPool, (Object)Thread.activeCount());
        }
        this.process();
        return true;
    }

    private void process() {
        if (CollectionUtils.isEmpty((Collection)this.discoverySupervisor.getDiscoveredServers())) {
            if ((this.round & 0x1F) == 0) {
                this.log.warn("Not found available supervisor.");
            }
            return;
        }
        List ringTriggers = this.timingWheel.poll();
        if (ringTriggers.isEmpty()) {
            return;
        }
        List matchedTriggers = ringTriggers.stream().filter(e -> {
            if (this.currentWorker.equals((Object)e.getWorker())) {
                return true;
            }
            this.log.error("The current worker '{}' cannot match expect worker '{}'", (Object)this.currentWorker, (Object)e.getWorker());
            return false;
        }).collect(Collectors.toList());
        if (matchedTriggers.isEmpty()) {
            return;
        }
        for (List batchTriggers : Lists.partition(matchedTriggers, (int)200)) {
            boolean status;
            List batchTaskIds = batchTriggers.stream().map(ExecuteParam::getTaskId).collect(Collectors.toList());
            try {
                status = this.supervisorServiceClient.updateTaskWorker(batchTaskIds, this.currentWorker.serialize());
            }
            catch (Exception e2) {
                status = true;
                this.log.error("Update waiting sched_task.worker column failed: " + Jsons.toJson(batchTaskIds), (Throwable)e2);
            }
            if (!status) continue;
            batchTriggers.forEach(this.workerThreadPool::submit);
        }
    }
}

