/*
 * Decompiled with CFR 0.152.
 */
package cn.ponfee.disjob.supervisor.application;

import cn.ponfee.disjob.common.base.RetryTemplate;
import cn.ponfee.disjob.common.base.SingletonClassConstraint;
import cn.ponfee.disjob.common.collect.Collects;
import cn.ponfee.disjob.common.concurrent.MultithreadExecutors;
import cn.ponfee.disjob.common.concurrent.ThreadPoolExecutors;
import cn.ponfee.disjob.common.spring.RestTemplateUtils;
import cn.ponfee.disjob.common.util.Numbers;
import cn.ponfee.disjob.core.base.HttpProperties;
import cn.ponfee.disjob.core.base.RetryProperties;
import cn.ponfee.disjob.core.base.Server;
import cn.ponfee.disjob.core.base.Supervisor;
import cn.ponfee.disjob.core.base.SupervisorMetrics;
import cn.ponfee.disjob.core.base.SupervisorRpcService;
import cn.ponfee.disjob.core.base.Worker;
import cn.ponfee.disjob.core.base.WorkerMetrics;
import cn.ponfee.disjob.core.base.WorkerRpcService;
import cn.ponfee.disjob.core.exception.AuthenticationException;
import cn.ponfee.disjob.core.exception.KeyExistsException;
import cn.ponfee.disjob.core.exception.KeyNotExistsException;
import cn.ponfee.disjob.core.param.supervisor.EventParam;
import cn.ponfee.disjob.core.param.worker.ConfigureWorkerParam;
import cn.ponfee.disjob.core.param.worker.GetMetricsParam;
import cn.ponfee.disjob.registry.SupervisorRegistry;
import cn.ponfee.disjob.registry.rpc.DestinationServerRestProxy;
import cn.ponfee.disjob.supervisor.application.SchedGroupService;
import cn.ponfee.disjob.supervisor.application.converter.ServerMetricsConverter;
import cn.ponfee.disjob.supervisor.application.request.ConfigureAllWorkerRequest;
import cn.ponfee.disjob.supervisor.application.request.ConfigureOneWorkerRequest;
import cn.ponfee.disjob.supervisor.application.response.SupervisorMetricsResponse;
import cn.ponfee.disjob.supervisor.application.response.WorkerMetricsResponse;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.lang.Nullable;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;

@Service
public class ServerInvokeService
extends SingletonClassConstraint {
    private static final Logger LOG = LoggerFactory.getLogger(ServerInvokeService.class);
    private final SupervisorRegistry supervisorRegistry;
    private final Supervisor.Current currentSupervisor;
    private final DestinationServerRestProxy.DestinationServerInvoker<SupervisorRpcService, Supervisor> supervisorRpcServiceClient;
    private final DestinationServerRestProxy.DestinationServerInvoker<WorkerRpcService, Worker> workerRpcServiceClient;

    public ServerInvokeService(@Value(value="${server.servlet.context-path:/}") String contextPath, SupervisorRegistry supervisorRegistry, Supervisor.Current currentSupervisor, SupervisorRpcService supervisorProvider, HttpProperties http, @Nullable WorkerRpcService workerProvider, @Nullable ObjectMapper objectMapper) {
        http.check();
        Function<Supervisor, String> supervisorContextPath = supervisor -> contextPath;
        Function<Worker, String> workerContextPath = worker -> Supervisor.current().getWorkerContextPath(worker.getGroup());
        RestTemplate restTemplate = RestTemplateUtils.create((int)http.getConnectTimeout(), (int)http.getReadTimeout(), (ObjectMapper)objectMapper);
        RetryProperties retry = RetryProperties.of((int)0, (int)0);
        this.supervisorRegistry = supervisorRegistry;
        this.currentSupervisor = currentSupervisor;
        this.supervisorRpcServiceClient = DestinationServerRestProxy.create(SupervisorRpcService.class, (Object)supervisorProvider, (Server)currentSupervisor, supervisorContextPath, (RestTemplate)restTemplate, (RetryProperties)retry);
        this.workerRpcServiceClient = DestinationServerRestProxy.create(WorkerRpcService.class, (Object)workerProvider, (Server)Worker.current(), workerContextPath, (RestTemplate)restTemplate, (RetryProperties)retry);
    }

    public List<SupervisorMetricsResponse> supervisors() throws Exception {
        List list = this.supervisorRegistry.getRegisteredServers();
        list = Collects.sorted((List)list, Comparator.comparing(e -> e.equals((Object)this.currentSupervisor) ? 0 : 1));
        return MultithreadExecutors.call((Collection)list, this::getSupervisorMetrics, (Executor)ThreadPoolExecutors.commonThreadPool());
    }

    public List<WorkerMetricsResponse> workers(String group, String worker) {
        if (StringUtils.isNotBlank((CharSequence)worker)) {
            int port;
            String[] array = worker.trim().split(":");
            String host = array[0].trim();
            WorkerMetricsResponse metrics = this.getWorkerMetrics(new Worker(group, "", host, port = Numbers.toInt((Object)array[1].trim(), (int)-1)));
            return StringUtils.isBlank((CharSequence)metrics.getWorkerId()) ? Collections.emptyList() : Collections.singletonList(metrics);
        }
        List list = this.supervisorRegistry.getDiscoveredServers(group);
        list = Collects.sorted((List)list, Comparator.comparing(e -> e.equals((Object)Worker.current()) ? 0 : 1));
        return MultithreadExecutors.call((Collection)list, this::getWorkerMetrics, (Executor)ThreadPoolExecutors.commonThreadPool());
    }

    public void configureOneWorker(ConfigureOneWorkerRequest req) {
        Worker worker = req.toWorker();
        if (req.getAction() == ConfigureWorkerParam.Action.ADD_WORKER) {
            List workers = this.supervisorRegistry.getDiscoveredServers(req.getGroup());
            if (workers != null) {
                if (workers.stream().anyMatch(arg_0 -> ((Worker)worker).sameWorker(arg_0))) {
                    throw new KeyExistsException("Worker already registered: " + worker);
                }
            }
            this.verifyWorkerSignature(worker);
            req.setData(req.getGroup());
        } else {
            List<Worker> workers = this.getDiscoveredWorkers(req.getGroup());
            if (!workers.contains(worker)) {
                throw new KeyNotExistsException("Not found worker: " + worker);
            }
        }
        this.configureWorker(worker, req.getAction(), req.getData());
    }

    public void configureAllWorker(ConfigureAllWorkerRequest req) {
        List<Worker> workers = this.getDiscoveredWorkers(req.getGroup());
        MultithreadExecutors.run(workers, worker -> this.configureWorker((Worker)worker, req.getAction(), req.getData()), (Executor)ThreadPoolExecutors.commonThreadPool());
    }

    public void publishOtherSupervisors(EventParam eventParam) {
        try {
            List supervisors = this.supervisorRegistry.getRegisteredServers().stream().filter(e -> !this.currentSupervisor.sameSupervisor(e)).collect(Collectors.toList());
            MultithreadExecutors.run(supervisors, supervisor -> this.publishSupervisor((Supervisor)supervisor, eventParam), (Executor)ThreadPoolExecutors.commonThreadPool());
        }
        catch (Exception e2) {
            LOG.error("Publish all supervisor error.", (Throwable)e2);
        }
    }

    private SupervisorMetricsResponse getSupervisorMetrics(Supervisor supervisor) {
        SupervisorMetrics metrics = null;
        Long pingTime = null;
        try {
            long start = System.currentTimeMillis();
            metrics = (SupervisorMetrics)this.supervisorRpcServiceClient.invoke((Server)supervisor, SupervisorRpcService::metrics);
            pingTime = System.currentTimeMillis() - start;
        }
        catch (Throwable e) {
            LOG.warn("Ping supervisor occur error: {} {}", (Object)supervisor, (Object)e.getMessage());
        }
        SupervisorMetricsResponse response = metrics == null ? new SupervisorMetricsResponse() : ServerMetricsConverter.INSTANCE.convert(metrics);
        response.setHost(supervisor.getHost());
        response.setPort(supervisor.getPort());
        response.setPingTime(pingTime);
        return response;
    }

    private WorkerMetricsResponse getWorkerMetrics(Worker worker) {
        WorkerMetrics metrics = null;
        Long pingTime = null;
        String group = worker.getGroup();
        GetMetricsParam param = this.buildGetMetricsParam(group);
        try {
            long start = System.currentTimeMillis();
            metrics = (WorkerMetrics)this.workerRpcServiceClient.invoke((Server)worker, client -> client.metrics(param));
            pingTime = System.currentTimeMillis() - start;
        }
        catch (Throwable e) {
            LOG.warn("Ping worker occur error: {} {}", (Object)worker, (Object)e.getMessage());
        }
        WorkerMetricsResponse response = metrics == null || !SchedGroupService.verifyWorkerSignatureToken(metrics.getSignature(), group) ? new WorkerMetricsResponse(worker.getWorkerId()) : ServerMetricsConverter.INSTANCE.convert(metrics);
        response.setHost(worker.getHost());
        response.setPort(worker.getPort());
        response.setPingTime(pingTime);
        return response;
    }

    private List<Worker> getDiscoveredWorkers(String group) {
        List list = this.supervisorRegistry.getDiscoveredServers(group);
        if (CollectionUtils.isEmpty((Collection)list)) {
            throw new KeyNotExistsException("Group '" + group + "' not exists workers.");
        }
        return list;
    }

    private void verifyWorkerSignature(Worker worker) {
        String group = worker.getGroup();
        GetMetricsParam param = this.buildGetMetricsParam(group);
        WorkerMetrics metrics = (WorkerMetrics)this.workerRpcServiceClient.invoke((Server)worker, client -> client.metrics(param));
        if (!SchedGroupService.verifyWorkerSignatureToken(metrics.getSignature(), group)) {
            throw new AuthenticationException("Worker authenticated failed: " + worker);
        }
    }

    private void configureWorker(Worker worker, ConfigureWorkerParam.Action action, String data) {
        ConfigureWorkerParam param = new ConfigureWorkerParam(SchedGroupService.createSupervisorAuthenticationToken(worker.getGroup()));
        param.setAction(action);
        param.setData(data);
        this.workerRpcServiceClient.invokeWithoutResult((Server)worker, client -> client.configureWorker(param));
    }

    private void publishSupervisor(Supervisor supervisor, EventParam param) {
        RetryTemplate.executeQuietly(() -> this.supervisorRpcServiceClient.invokeWithoutResult((Server)supervisor, client -> client.publish(param)), (int)1, (long)2000L);
    }

    private GetMetricsParam buildGetMetricsParam(String group) {
        return new GetMetricsParam(SchedGroupService.createSupervisorAuthenticationToken(group), group);
    }
}

