/*
 * Decompiled with CFR 0.152.
 */
package cn.ponfee.disjob.supervisor.component;

import cn.ponfee.disjob.common.base.IdGenerator;
import cn.ponfee.disjob.common.spring.TransactionUtils;
import cn.ponfee.disjob.core.base.CoreUtils;
import cn.ponfee.disjob.core.base.RetryProperties;
import cn.ponfee.disjob.core.base.Server;
import cn.ponfee.disjob.core.base.Worker;
import cn.ponfee.disjob.core.base.WorkerRpcService;
import cn.ponfee.disjob.core.dto.worker.ExistsTaskParam;
import cn.ponfee.disjob.core.dto.worker.SplitJobParam;
import cn.ponfee.disjob.core.dto.worker.SplitJobResult;
import cn.ponfee.disjob.core.dto.worker.VerifyJobParam;
import cn.ponfee.disjob.core.enums.JobType;
import cn.ponfee.disjob.core.enums.RouteStrategy;
import cn.ponfee.disjob.core.exception.JobException;
import cn.ponfee.disjob.dispatch.ExecuteTaskParam;
import cn.ponfee.disjob.dispatch.TaskDispatcher;
import cn.ponfee.disjob.registry.Discovery;
import cn.ponfee.disjob.registry.rpc.DestinationServerRestProxy;
import cn.ponfee.disjob.registry.rpc.DiscoveryGroupedServerRestProxy;
import cn.ponfee.disjob.supervisor.base.ModelConverter;
import cn.ponfee.disjob.supervisor.model.SchedJob;
import cn.ponfee.disjob.supervisor.model.SchedTask;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Predicate;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.lang.Nullable;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import org.springframework.web.client.RestTemplate;

@Component
public class WorkerClient {
    private static final Logger LOG = LoggerFactory.getLogger(WorkerClient.class);
    private final Discovery<Worker> discoverWorker;
    private final TaskDispatcher taskDispatcher;
    private final DiscoveryGroupedServerRestProxy<WorkerRpcService> groupedProxy;
    private final DestinationServerRestProxy<WorkerRpcService, Worker> destinationProxy;

    public WorkerClient(Discovery<Worker> discoverWorker, TaskDispatcher taskDispatcher, RetryProperties retry, @Qualifier(value="disjob.bean.rest-template") RestTemplate restTemplate, @Nullable WorkerRpcService workerRpcProvider, @Nullable Worker.Local localWorker) {
        this.discoverWorker = discoverWorker;
        this.taskDispatcher = taskDispatcher;
        retry.check();
        Predicate<String> localGroupMatcher = localWorker != null ? arg_0 -> ((Worker.Local)localWorker).equalsGroup(arg_0) : group -> false;
        this.groupedProxy = DiscoveryGroupedServerRestProxy.of(WorkerRpcService.class, (Object)workerRpcProvider, localGroupMatcher, discoverWorker, (RestTemplate)restTemplate, (RetryProperties)retry);
        this.destinationProxy = DestinationServerRestProxy.of(WorkerRpcService.class, (Object)workerRpcProvider, (Server)localWorker, (RestTemplate)restTemplate, (RetryProperties)RetryProperties.none());
    }

    public List<Worker> getAliveWorkers(String group) {
        return this.discoverWorker.getAliveServers(group);
    }

    public boolean hasAliveWorker(String group) {
        return CollectionUtils.isNotEmpty(this.getAliveWorkers(group));
    }

    public boolean hasAliveWorker() {
        return this.discoverWorker.hasAliveServer();
    }

    public boolean hasAliveTask(List<SchedTask> tasks) {
        return CollectionUtils.isNotEmpty(tasks) && tasks.stream().anyMatch(e -> e.isExecuting() && this.isAliveWorker(e.worker()));
    }

    public boolean shouldRedispatch(SchedTask task) {
        if (!task.isWaiting()) {
            return false;
        }
        Worker worker = task.worker();
        if (!this.isAliveWorker(worker)) {
            return true;
        }
        ExistsTaskParam param = ExistsTaskParam.of((String)worker.getGroup(), (long)task.getTaskId());
        try {
            return !this.destination(worker).existsTask(param);
        }
        catch (Throwable e) {
            LOG.error("Invoke worker exists task error: " + worker, e);
            return false;
        }
    }

    public WorkerRpcService destination(Worker destinationWorker) {
        return (WorkerRpcService)this.destinationProxy.destination((Server)destinationWorker);
    }

    boolean isAliveWorker(Worker worker) {
        return worker != null && this.discoverWorker.isAliveServer((Server)worker);
    }

    void verifyJob(SchedJob job) throws JobException {
        Assert.hasText((String)job.getJobExecutor(), (String)"Job executor cannot be blank.");
        CoreUtils.checkClobMaximumLength((String)job.getJobExecutor(), (String)"Job executor");
        CoreUtils.checkClobMaximumLength((String)job.getJobParam(), (String)"Job param");
        JobType.of((int)job.getJobType());
        RouteStrategy.of((int)job.getRouteStrategy());
        VerifyJobParam param = ModelConverter.toVerifyJobParam(job);
        ((WorkerRpcService)this.groupedProxy.group(job.getGroup())).verifyJob(param);
    }

    List<SchedTask> splitJob(String group, long instanceId, SplitJobParam param, IdGenerator idGenerator, int maximumSplitTaskSize) throws JobException {
        List<Worker> workers = this.getAliveWorkers(group);
        Assert.notEmpty(workers, () -> "None alive worker for split job: " + group);
        int wCount = workers.size();
        param.setWorkerCount(wCount);
        SplitJobResult result = ((WorkerRpcService)this.groupedProxy.group(group)).splitJob(param);
        List taskParams = result.getTaskParams();
        taskParams.forEach(e -> CoreUtils.checkClobMaximumLength((String)e, (String)"Split task param"));
        int tCount = taskParams.size();
        boolean isBroadcast = param.getRouteStrategy().isBroadcast();
        if (isBroadcast) {
            Assert.state((tCount == wCount ? 1 : 0) != 0, () -> "Illegal broadcast split task size: " + tCount + "!=" + wCount);
        } else {
            Assert.state((0 < tCount && tCount <= maximumSplitTaskSize ? 1 : 0) != 0, () -> "Illegal split task size: " + tCount);
        }
        ArrayList<SchedTask> tasks = new ArrayList<SchedTask>(tCount);
        for (int i = 0; i < tCount; ++i) {
            String worker = isBroadcast ? workers.get(i).serialize() : null;
            tasks.add(SchedTask.of((String)taskParams.get(i), idGenerator.generateId(), instanceId, i + 1, tCount, worker));
        }
        return tasks;
    }

    void dispatch(List<ExecuteTaskParam> tasks) {
        TransactionUtils.assertWithoutTransaction();
        this.taskDispatcher.dispatch(tasks);
    }

    void dispatch(String group, List<ExecuteTaskParam> tasks) {
        TransactionUtils.assertWithoutTransaction();
        this.taskDispatcher.dispatch(group, tasks);
    }
}

