/*
 * Decompiled with CFR 0.152.
 */
package org.teamapps.cluster.core;

import java.io.File;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.teamapps.cluster.core.ClusterConnection;
import org.teamapps.cluster.core.ClusterNode;
import org.teamapps.cluster.core.ClusterTask;
import org.teamapps.cluster.message.protocol.ClusterAvailableServicesUpdate;
import org.teamapps.cluster.message.protocol.ClusterConfig;
import org.teamapps.cluster.message.protocol.ClusterConnectionRequest;
import org.teamapps.cluster.message.protocol.ClusterConnectionResult;
import org.teamapps.cluster.message.protocol.ClusterNewLeaderInfo;
import org.teamapps.cluster.message.protocol.ClusterNewPeerInfo;
import org.teamapps.cluster.message.protocol.ClusterNodeData;
import org.teamapps.cluster.message.protocol.ClusterNodeShutDownInfo;
import org.teamapps.cluster.message.protocol.ClusterServiceBroadcastMessage;
import org.teamapps.cluster.message.protocol.ClusterServiceMethodErrorType;
import org.teamapps.cluster.message.protocol.ClusterServiceMethodRequest;
import org.teamapps.cluster.message.protocol.ClusterServiceMethodResult;
import org.teamapps.commons.event.Event;
import org.teamapps.commons.util.collections.ByKeyComparisonResult;
import org.teamapps.commons.util.collections.CollectionUtil;
import org.teamapps.configuration.Configuration;
import org.teamapps.message.protocol.message.Message;
import org.teamapps.message.protocol.model.ModelCollection;
import org.teamapps.message.protocol.model.PojoObjectDecoder;
import org.teamapps.message.protocol.service.AbstractClusterService;
import org.teamapps.message.protocol.service.ClusterServiceRegistry;

public class Cluster
implements ClusterServiceRegistry {
    private static final Logger LOGGER = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    public static final String CLUSTER_SERVICE = "clusterService";
    public final Event<ClusterNodeData> onLeaderAvailable = new Event();
    public final Event<List<ClusterNodeData>> onAvailableNodesChange = new Event();
    private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
    private final ClusterNodeData localNode;
    private final Map<String, ClusterNode> clusterNodeMap = new ConcurrentHashMap<String, ClusterNode>();
    private final Map<String, AbstractClusterService> localServices = new ConcurrentHashMap<String, AbstractClusterService>();
    private final Map<String, List<ClusterNode>> nodesByServiceName = new HashMap<String, List<ClusterNode>>();
    private final Map<ClusterNode, List<String>> servicesByNode = new HashMap<ClusterNode, List<String>>();
    private final Map<Long, ClusterTask> pendingServiceRequestsMap = new ConcurrentHashMap<Long, ClusterTask>();
    private final File tempDir;
    private ClusterConfig clusterConfig;
    private boolean active = true;
    private ClusterNodeData leaderNode;
    private ThreadPoolExecutor taskExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
    private ServerSocket serverSocket;

    public static Cluster start() {
        Configuration configuration = Configuration.getConfiguration();
        Cluster cluster = Cluster.start((ClusterConfig)configuration.getConfig(CLUSTER_SERVICE, ClusterConfig.getMessageDecoder()));
        configuration.addConfigUpdateListener(cluster::handleConfigUpdate, CLUSTER_SERVICE, ClusterConfig.getMessageDecoder());
        return cluster;
    }

    public static Cluster startServerMember(String clusterSecret, int port) {
        return Cluster.start(new ClusterConfig().setClusterSecret(clusterSecret).setPort(port));
    }

    public static Cluster startClientMember(String clusterSecret, String host, int port) {
        return Cluster.start(new ClusterConfig().setClusterSecret(clusterSecret).addPeerNodes(new ClusterNodeData().setHost(host).setPort(port)));
    }

    public static Cluster start(ClusterConfig clusterConfig) {
        return new Cluster(clusterConfig);
    }

    private Cluster(ClusterConfig clusterConfig) {
        this.clusterConfig = clusterConfig;
        this.localNode = new ClusterNodeData().setNodeId(clusterConfig.getNodeId() != null && !clusterConfig.getNodeId().isBlank() ? clusterConfig.getNodeId() : UUID.randomUUID().toString()).setHost(clusterConfig.getHost()).setPort(clusterConfig.getPort()).setLeaderNode(clusterConfig.isLeaderNode());
        if (clusterConfig.isLeaderNode()) {
            this.leaderNode = this.localNode;
            this.onLeaderAvailable.fire((Object)this.leaderNode);
        }
        this.tempDir = Cluster.createTempDirSave();
        LOGGER.info("Cluster node [{}]: started {}", (Object)this.localNode.getNodeId(), (Object)(clusterConfig.isLeaderNode() ? "as leader node" : ""));
        this.startServerSocket(this.localNode);
        if (clusterConfig.getPeerNodes() != null) {
            clusterConfig.getPeerNodes().stream().filter(node -> node.getPort() > 0).filter(node -> node.getHost() != null).forEach(this::connectNode);
        }
        Runtime.getRuntime().addShutdownHook(new Thread(this::shutDown));
    }

    private void startServerSocket(ClusterNodeData localNode) {
        if (localNode.getPort() <= 0) {
            return;
        }
        Thread thread = new Thread(() -> {
            try {
                this.serverSocket = new ServerSocket(localNode.getPort(), 50);
                while (this.active) {
                    try {
                        Socket socket = this.serverSocket.accept();
                        new ClusterConnection(this, socket);
                    }
                    catch (IOException e) {
                        LOGGER.info("Cluster node [{}]: error on server socket: {}", (Object)localNode.getNodeId(), (Object)e.getMessage());
                    }
                }
            }
            catch (IOException e) {
                LOGGER.info("Cluster node [{}]: error opening server socket: {}", new Object[]{localNode.getNodeId(), e.getMessage(), e});
            }
        });
        thread.setName("server-socket-" + localNode.getHost() + "-" + localNode.getPort());
        thread.setDaemon(false);
        thread.start();
        LOGGER.info("Cluster node [{}]: network started, accepting connections on port: {}", (Object)localNode.getNodeId(), (Object)localNode.getPort());
    }

    protected synchronized ClusterConnectionResult handleConnectionRequest(ClusterConnectionRequest request, ClusterConnection connection) {
        ClusterNodeData remoteNode = request.getLocalNode();
        LOGGER.info("Cluster node [{}]: connection requested from: {}, {}", new Object[]{this.localNode.getNodeId(), request.getLocalNode().getNodeId(), request.getLocalNode().getHost()});
        String[] nodeServices = request.getLocalServices();
        ClusterConnectionResult connectionResult = new ClusterConnectionResult().setLocalNode(this.localNode);
        if (request.getLeaderNode() != null) {
            if (this.leaderNode != null && !this.leaderNode.getNodeId().equals(request.getLeaderNode().getNodeId())) {
                LOGGER.error("Cluster node [{}]: error: connection requested denied from {}, {} - different leader node: {} vs {}", new Object[]{this.localNode.getNodeId(), request.getLocalNode().getNodeId(), request.getLocalNode().getHost(), this.leaderNode.getNodeId(), request.getLocalNode().getNodeId()});
                return connectionResult.setAccepted(false);
            }
            if (this.leaderNode == null) {
                this.leaderNode = remoteNode;
                this.sendLeaderNodeUpdateToPeers();
                this.onLeaderAvailable.fire((Object)this.leaderNode);
                LOGGER.info("Cluster node [{}]: new leader node: {}", (Object)this.localNode.getNodeId(), (Object)request.getLeaderNode().getNodeId());
            }
        }
        ClusterNode clusterNode = this.clusterNodeMap.get(remoteNode.getNodeId());
        List<ClusterNodeData> knownPeers = new ArrayList<ClusterNode>(this.clusterNodeMap.values()).stream().map(ClusterNode::getNodeData).filter(nodeData -> !nodeData.getNodeId().equals(this.localNode.getNodeId())).filter(nodeData -> nodeData.getPort() > 0).toList();
        if (clusterNode == null || !clusterNode.isConnected()) {
            if (clusterNode == null) {
                clusterNode = new ClusterNode(this, remoteNode, connection);
                this.clusterNodeMap.put(remoteNode.getNodeId(), clusterNode);
            } else {
                clusterNode.handleConnectionUpdate(connection);
            }
            if (nodeServices != null) {
                this.updateClusterNodeServices(clusterNode, nodeServices);
            }
            if (remoteNode.getHost() != null && remoteNode.getPort() > 0) {
                this.sendMessageToPeerNodes(new ClusterNewPeerInfo().setNewPeer(remoteNode), remoteNode);
            }
            request.getKnownPeers().stream().filter(peer -> peer.getHost() != null).filter(peer -> peer.getPort() > 0).filter(peer -> !this.clusterNodeMap.containsKey(peer.getNodeId())).filter(peer -> !this.localNode.getNodeId().equals(peer.getNodeId())).forEach(this::connectNode);
            this.handleAvailableClusterNodesChanged();
            return connectionResult.setAccepted(true).setKnownPeers(knownPeers).setLocalServices(this.localServices.isEmpty() ? null : this.localServices.keySet().toArray(new String[0])).setKnownServices(nodeServices);
        }
        return connectionResult.setAccepted(false);
    }

    protected synchronized void handleConnectionResult(ClusterConnectionResult result, ClusterNodeData remoteNode, ClusterConnection connection) {
        if (result.isAccepted()) {
            LOGGER.info("Cluster node [{}]: connection request accepted from: {}, {}", new Object[]{this.localNode.getNodeId(), result.getLocalNode().getNodeId(), result.getLocalNode().getHost()});
            ClusterNode clusterNode = this.clusterNodeMap.get(remoteNode.getNodeId());
            if (result.getLeaderNode() != null) {
                if (this.leaderNode == null) {
                    this.leaderNode = result.getLeaderNode();
                    this.sendLeaderNodeUpdateToPeers();
                    this.onLeaderAvailable.fire((Object)this.leaderNode);
                    LOGGER.info("Cluster node [{}]: new leader node: {}", (Object)this.localNode.getNodeId(), (Object)result.getLeaderNode().getNodeId());
                } else if (!this.leaderNode.getNodeId().equals(result.getLocalNode().getNodeId())) {
                    LOGGER.error("Cluster node [{}]: error: connection result denied from {}, {} - different leader node: {} vs {}", new Object[]{this.localNode.getNodeId(), result.getLocalNode().getNodeId(), result.getLocalNode().getHost(), this.leaderNode.getNodeId(), result.getLocalNode().getNodeId()});
                    return;
                }
            }
            if (clusterNode == null) {
                clusterNode = new ClusterNode(this, remoteNode, connection);
                this.clusterNodeMap.put(remoteNode.getNodeId(), clusterNode);
                this.sendMessageToPeerNodes(new ClusterNewPeerInfo().setNewPeer(remoteNode), remoteNode);
            } else if (!clusterNode.isConnected()) {
                clusterNode.handleConnectionUpdate(connection);
            }
            result.getKnownPeers().stream().filter(peer -> peer.getHost() != null).filter(peer -> peer.getPort() > 0).filter(peer -> !this.clusterNodeMap.containsKey(peer.getNodeId())).filter(peer -> !this.localNode.getNodeId().equals(peer.getNodeId())).forEach(this::connectNode);
            String[] nodeServices = result.getLocalServices();
            if (nodeServices != null) {
                this.updateClusterNodeServices(clusterNode, nodeServices);
            }
            if (!(this.localServices.isEmpty() || result.getKnownServices() != null && result.getKnownServices().length == this.localServices.size())) {
                String[] services = this.localServices.keySet().toArray(new String[0]);
                ClusterAvailableServicesUpdate servicesUpdate = new ClusterAvailableServicesUpdate().setServices(services);
                this.sendMessageToPeerNodes(servicesUpdate, new ClusterNodeData[0]);
            }
            this.handleAvailableClusterNodesChanged();
        } else {
            LOGGER.info("Cluster node [{}]: connection request denied from: {}, {}", new Object[]{this.localNode.getNodeId(), result.getLocalNode().getNodeId(), result.getLocalNode().getHost()});
        }
    }

    protected void handleServiceMethodExecutionRequest(ClusterServiceMethodRequest methodRequest, ClusterNode clusterNode) {
        LOGGER.info("Cluster node [{}]: handle service method request {}/{} from {}", new Object[]{this.localNode.getNodeId(), methodRequest.getServiceName(), methodRequest.getMethodName(), clusterNode.getNodeData().getNodeId()});
        AbstractClusterService localService = this.localServices.get(methodRequest.getServiceName());
        ClusterServiceMethodResult methodResult = new ClusterServiceMethodResult().setClusterTaskId(methodRequest.getClusterTaskId()).setServiceName(methodRequest.getServiceName()).setMethodName(methodRequest.getMethodName());
        if (localService != null) {
            this.taskExecutor.execute(() -> {
                LOGGER.info("Cluster node [{}]: execute task", (Object)this.localNode.getNodeId());
                try {
                    Message message = localService.handleMessage(methodRequest.getMethodName(), methodRequest.getRequestMessage());
                    methodResult.setResultMessage(message);
                    clusterNode.writeMessage(methodResult);
                }
                catch (Throwable e) {
                    e.printStackTrace();
                    String stackTrace = ExceptionUtils.getStackTrace((Throwable)e);
                    methodResult.setError(true).setErrorType(ClusterServiceMethodErrorType.SERVICE_EXCEPTION).setErrorMessage(e.getMessage()).setErrorStackTrace(stackTrace);
                    clusterNode.writeMessage(methodResult);
                }
            });
        } else {
            methodResult.setError(true).setErrorType(ClusterServiceMethodErrorType.SERVICE_EXCEPTION).setErrorMessage("Error: missing service:" + methodRequest.getMethodName());
            clusterNode.writeMessage(methodResult);
        }
    }

    protected void handleServiceMethodExecutionResult(ClusterServiceMethodResult methodResult, ClusterNode clusterNode) {
        LOGGER.info("Cluster node [{}]: handle service method result {}/{} from {}", new Object[]{this.localNode.getNodeId(), methodResult.getServiceName(), methodResult.getMethodName(), clusterNode.getNodeData().getNodeId()});
        ClusterTask clusterTask = this.pendingServiceRequestsMap.get(methodResult.getClusterTaskId());
        if (clusterTask != null) {
            clusterTask.setError(methodResult.isError());
            clusterTask.setErrorType(methodResult.getErrorType());
            clusterTask.setErrorMessage(methodResult.getErrorMessage());
            clusterTask.setErrorStackTrace(methodResult.getErrorStackTrace());
            if (!methodResult.isError() && methodResult.getResultMessage() != null) {
                clusterTask.setFinished(true);
            }
            clusterTask.setResult(methodResult.getResultMessage());
        }
    }

    protected void handleClusterNewPeerInfo(ClusterNewPeerInfo newPeerInfo, ClusterNode clusterNode) {
        if (!this.clusterNodeMap.containsKey(newPeerInfo.getNewPeer().getNodeId())) {
            this.connectNode(newPeerInfo.getNewPeer());
        }
    }

    protected void handleClusterNewLeaderInfo(ClusterNewLeaderInfo newLeaderInfo, ClusterNode clusterNode) {
        if (this.leaderNode == null) {
            this.leaderNode = newLeaderInfo.getLeaderNode();
            this.sendLeaderNodeUpdateToPeers();
            this.onLeaderAvailable.fire((Object)this.leaderNode);
            LOGGER.info("Cluster node [{}]: new leader node: {}", (Object)this.localNode.getNodeId(), (Object)newLeaderInfo.getLeaderNode().getNodeId());
        }
    }

    protected void handleClusterAvailableServicesUpdate(ClusterAvailableServicesUpdate availableServicesUpdate, ClusterNode clusterNode) {
        this.updateClusterNodeServices(clusterNode, availableServicesUpdate.getServices());
    }

    protected synchronized void handleDisconnect(ClusterNode clusterNode) {
        this.pendingServiceRequestsMap.values().stream().filter(clusterTask -> Objects.equals(clusterTask.getProcessingNodeId(), clusterNode.getNodeData().getNodeId())).forEach(this::executeClusterTask);
        this.handleAvailableClusterNodesChanged();
    }

    private void handleAvailableClusterNodesChanged() {
        List<ClusterNodeData> availableNodes = this.getAvailablePeerNodes();
        this.onAvailableNodesChange.fire(availableNodes);
    }

    private List<ClusterNodeData> getAvailablePeerNodes() {
        return this.clusterNodeMap.values().stream().filter(ClusterNode::isConnected).map(ClusterNode::getNodeData).toList();
    }

    private void handleConfigUpdate(ClusterConfig config) {
    }

    private synchronized void updateClusterNodeServices(ClusterNode clusterNode, String[] servicesArray) {
        List<String> services = servicesArray == null ? Collections.emptyList() : Arrays.stream(servicesArray).toList();
        LOGGER.info("Cluster node [{}]: update peer node services for {} with services: {}", new Object[]{this.localNode.getNodeId(), clusterNode.getNodeData().getNodeId(), String.join((CharSequence)", ", services)});
        List<String> previousServices = this.servicesByNode.get(clusterNode);
        if (previousServices != null) {
            ByKeyComparisonResult keyComparator = CollectionUtil.compareByKey(previousServices, services, o -> o, o -> o);
            keyComparator.getAEntriesNotInB().forEach(service -> this.nodesByServiceName.get(service).remove(clusterNode));
            keyComparator.getBEntriesNotInA().forEach(service -> this.nodesByServiceName.computeIfAbsent((String)service, s -> new ArrayList()).add(clusterNode));
        } else {
            services.forEach(service -> this.nodesByServiceName.computeIfAbsent((String)service, s -> new ArrayList()).add(clusterNode));
        }
        this.servicesByNode.put(clusterNode, services);
    }

    protected void connectNode(ClusterNodeData peerNode) {
        List<ClusterNodeData> knownPeers = new ArrayList<ClusterNode>(this.clusterNodeMap.values()).stream().map(ClusterNode::getNodeData).filter(nodeData -> !nodeData.getNodeId().equals(this.localNode.getNodeId())).filter(nodeData -> nodeData.getPort() > 0).toList();
        String[] services = this.localServices.keySet().toArray(new String[0]);
        ClusterConnectionRequest clusterConnectionRequest = new ClusterConnectionRequest().setLocalNode(this.localNode).setLocalServices(services).setLeaderNode(this.leaderNode).setKnownPeers(knownPeers);
        new ClusterConnection(this, peerNode, clusterConnectionRequest);
    }

    private synchronized void sendMessageToPeerNodes(Message message, ClusterNodeData ... excludingNodes) {
        HashSet excludeSet = excludingNodes == null ? new HashSet() : Arrays.stream(excludingNodes).map(ClusterNodeData::getNodeId).collect(Collectors.toSet());
        List<ClusterNode> peerNodes = this.clusterNodeMap.values().stream().filter(node -> node.isConnected() && !excludeSet.contains(node.getNodeData().getNodeId())).toList();
        LOGGER.info("Cluster node [{}]: send to peer nodes: {}, message: {}", new Object[]{this.localNode.getNodeId(), peerNodes.size(), message.getMessageDefUuid()});
        peerNodes.forEach(node -> node.writeMessage(message));
    }

    private synchronized void sendLeaderNodeUpdateToPeers() {
        List<ClusterNode> peerNodes = this.clusterNodeMap.values().stream().toList();
        ClusterNewLeaderInfo clusterNewLeaderInfo = new ClusterNewLeaderInfo().setLeaderNode(this.leaderNode);
        peerNodes.forEach(node -> node.writeMessage(clusterNewLeaderInfo));
    }

    public void sendMessage(String nodeId, Message message) {
        ClusterNode clusterNode = this.clusterNodeMap.get(nodeId);
        if (clusterNode != null) {
            clusterNode.writeMessage(message);
        }
    }

    public void sendMessage(List<String> nodeIds, Message message) {
        for (String nodeId : nodeIds) {
            this.sendMessage(nodeId, message);
        }
    }

    public void registerService(AbstractClusterService clusterService) {
        LOGGER.info("Cluster node [{}]: register local service: {}", (Object)this.localNode.getNodeId(), (Object)clusterService.getServiceName());
        String serviceName = clusterService.getServiceName();
        this.localServices.put(serviceName, clusterService);
        String[] services = this.localServices.keySet().toArray(new String[0]);
        ClusterAvailableServicesUpdate servicesUpdate = new ClusterAvailableServicesUpdate().setServices(services);
        this.sendMessageToPeerNodes(servicesUpdate, new ClusterNodeData[0]);
    }

    public void registerModelCollection(ModelCollection modelCollection) {
    }

    public boolean isServiceAvailable(String serviceName) {
        return this.localServices.containsKey(serviceName);
    }

    public <REQUEST extends Message, RESPONSE extends Message> RESPONSE executeServiceMethod(String serviceName, String method, REQUEST request, PojoObjectDecoder<RESPONSE> responseDecoder) {
        return this.executeServiceMethod(null, serviceName, method, request, responseDecoder);
    }

    public <REQUEST extends Message, RESPONSE extends Message> RESPONSE executeServiceMethod(String clusterNodeId, String serviceName, String method, REQUEST request, PojoObjectDecoder<RESPONSE> responseDecoder) {
        LOGGER.info("Cluster node: {} - execute service method {}/{}" + (clusterNodeId != null ? ", on node {}" : ""), new Object[]{this.localNode.getNodeId(), serviceName, method, clusterNodeId});
        ClusterTask clusterTask = new ClusterTask(serviceName, method, request, clusterNodeId);
        this.pendingServiceRequestsMap.put(clusterTask.getTaskId(), clusterTask);
        while (!clusterTask.isFinished()) {
            clusterTask.startProcessing();
            this.executeClusterTask(clusterTask);
            clusterTask.waitForResult();
            if (clusterTask.isFinished()) {
                this.pendingServiceRequestsMap.remove(clusterTask.getTaskId());
                Message clusterTaskResult = clusterTask.getResult();
                return (RESPONSE)responseDecoder.remap(clusterTaskResult);
            }
            if (clusterTask.isRetryLimitReached()) {
                LOGGER.warn("Cluster node [{}]: method execution {}/{} caused error '{}' with execution attempts: {}, retry limit reached - giving up!", new Object[]{this.localNode.getNodeId(), serviceName, method, clusterTask.getErrorMessage(), clusterTask.getExecutionAttempts()});
                this.pendingServiceRequestsMap.remove(clusterTask.getTaskId());
                throw new RuntimeException("Error: execute cluster service method failed:" + serviceName + ", " + method);
            }
            LOGGER.warn("Cluster node [{}]: method execution {}/{} caused error '{}' with execution attempts: {}, will retry...", new Object[]{this.localNode.getNodeId(), serviceName, method, clusterTask.getErrorMessage(), clusterTask.getExecutionAttempts()});
        }
        throw new RuntimeException("Error: execute cluster service method failed:" + serviceName + ", " + method);
    }

    public <MESSAGE extends Message> void executeServiceBroadcast(String serviceName, String method, MESSAGE message) {
        LOGGER.info("Cluster node: {} - execute service broadcast method {}/{}", new Object[]{this.localNode.getNodeId(), serviceName, method});
        List<ClusterNode> clusterNodes = this.nodesByServiceName.get(serviceName);
        if (clusterNodes != null) {
            ClusterServiceBroadcastMessage broadcastMessage = new ClusterServiceBroadcastMessage().setServiceName(serviceName).setMethodName(method).setMessage(message);
            clusterNodes.forEach(node -> node.writeMessage(broadcastMessage));
        }
    }

    public void handleServiceBroadcastMessage(ClusterServiceBroadcastMessage broadcastMessage, ClusterNode clusterNode) {
        String serviceName = broadcastMessage.getServiceName();
        String method = broadcastMessage.getMethodName();
        LOGGER.info("Cluster node [{}]: handle broadcast message from {}: {}/{}", new Object[]{this.localNode.getNodeId(), clusterNode.getNodeData().getNodeId(), serviceName, method});
        AbstractClusterService clusterService = this.localServices.get(serviceName);
        if (clusterService != null) {
            this.taskExecutor.execute(() -> clusterService.handleMessage(method, broadcastMessage.getMessage()));
        }
    }

    private void executeClusterTask(ClusterTask clusterTask) {
        if (clusterTask.isRetryLimitReached()) {
            LOGGER.warn("Cluster node [{}]: Error: stop cluster task, too many retries; service: {}, method: {}", new Object[]{this.localNode.getNodeId(), clusterTask.getServiceName(), clusterTask.getMethod()});
            clusterTask.setError(true);
            clusterTask.setErrorMessage("Error: too many retries");
            clusterTask.setResult(null);
            return;
        }
        clusterTask.addExecutionAttempt();
        AbstractClusterService localService = this.localServices.get(clusterTask.getServiceName());
        Object clusterNode = clusterTask.isFixedServiceNode() ? (this.localNode.getNodeId().equals(clusterTask.getFixedServiceNodeId()) ? null : this.clusterNodeMap.get(clusterTask.getFixedServiceNodeId())) : this.getBestServiceNode(clusterTask.getServiceName());
        if (localService != null && clusterNode != null) {
            if (this.getActiveTasks() <= ((ClusterNode)clusterNode).getActiveTasks()) {
                this.runLocalClusterTask(localService, clusterTask);
            } else {
                this.runRemoteClusterTask((ClusterNode)clusterNode, clusterTask);
            }
        } else if (localService != null) {
            this.runLocalClusterTask(localService, clusterTask);
        } else if (clusterNode != null) {
            this.runRemoteClusterTask((ClusterNode)clusterNode, clusterTask);
        } else {
            LOGGER.warn("Cluster node [{}]: Error: no service available for cluster task; service: {}, method: {}", new Object[]{this.localNode.getNodeId(), clusterTask.getServiceName(), clusterTask.getMethod()});
            clusterTask.setError(true);
            clusterTask.setErrorMessage("Error: no service available");
            clusterTask.setResult(null);
        }
    }

    private void runLocalClusterTask(AbstractClusterService localService, ClusterTask clusterTask) {
        this.taskExecutor.execute(() -> {
            try {
                Message message = localService.handleMessage(clusterTask.getMethod(), clusterTask.getRequest());
                clusterTask.setFinished(true);
                clusterTask.setResult(message);
            }
            catch (Throwable e) {
                String stackTrace = ExceptionUtils.getStackTrace((Throwable)e);
                clusterTask.setError(true);
                clusterTask.setErrorMessage(e.getMessage());
                clusterTask.setErrorStackTrace(stackTrace);
                clusterTask.setResult(null);
                e.printStackTrace();
            }
        });
    }

    private void runRemoteClusterTask(ClusterNode clusterNode, ClusterTask clusterTask) {
        clusterNode.addTask();
        clusterTask.setProcessingNodeId(clusterNode.getNodeData().getNodeId());
        ClusterServiceMethodRequest clusterServiceMethodRequest = new ClusterServiceMethodRequest().setServiceName(clusterTask.getServiceName()).setMethodName(clusterTask.getMethod()).setClusterTaskId(clusterTask.getTaskId()).setRequestMessage(clusterTask.getRequest());
        clusterNode.writeMessage(clusterServiceMethodRequest);
    }

    private synchronized ClusterNode getBestServiceNode(String serviceName) {
        List<ClusterNode> clusterNodes = this.nodesByServiceName.get(serviceName);
        if (clusterNodes == null) {
            return null;
        }
        List<ClusterNode> workloadSortedServices = clusterNodes.stream().filter(ClusterNode::isConnected).sorted(Comparator.comparingInt(ClusterNode::getActiveTasks)).toList();
        if (!workloadSortedServices.isEmpty()) {
            return workloadSortedServices.get(0);
        }
        return null;
    }

    protected ScheduledExecutorService getScheduledExecutorService() {
        return this.scheduledExecutorService;
    }

    public void shutDown() {
        try {
            LOGGER.info("Cluster node [{}]: shutdown cluster node", (Object)this.localNode.getNodeId());
            this.active = false;
            this.sendMessageToPeerNodes(new ClusterNodeShutDownInfo(), new ClusterNodeData[0]);
            this.clusterNodeMap.values().forEach(ClusterNode::closeConnection);
            this.scheduledExecutorService.shutdownNow();
            if (this.serverSocket != null) {
                this.serverSocket.close();
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static File createTempDirSave() {
        try {
            return Files.createTempDirectory("temp", new FileAttribute[0]).toFile();
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public ClusterNodeData getLocalNode() {
        return this.localNode;
    }

    public ClusterConfig getClusterConfig() {
        return this.clusterConfig;
    }

    public File getTempDir() {
        return this.tempDir;
    }

    private int getActiveTasks() {
        return this.taskExecutor.getActiveCount() + this.taskExecutor.getQueue().size();
    }

    private long getCompletedTaskCount() {
        return this.taskExecutor.getCompletedTaskCount();
    }

    public List<ClusterNodeData> getPeerNodes(boolean connectedOnly) {
        return this.clusterNodeMap.values().stream().filter(node -> !connectedOnly || node.isConnected()).map(ClusterNode::getNodeData).collect(Collectors.toList());
    }

    public List<ClusterNode> getClusterNodes() {
        return new ArrayList<ClusterNode>(this.clusterNodeMap.values());
    }

    public boolean isConnected(ClusterNodeData clusterNodeData) {
        ClusterNode clusterNode = this.clusterNodeMap.get(clusterNodeData.getNodeId());
        return clusterNode != null && clusterNode.isConnected();
    }

    public synchronized List<String> getClusterNodeServices(ClusterNode clusterNode) {
        List<String> services = this.servicesByNode.get(clusterNode);
        return new ArrayList<String>(services == null ? Collections.emptyList() : services);
    }

    public ClusterNodeData getLeaderNode() {
        return this.leaderNode;
    }

    public boolean isLeaderNode() {
        return this.leaderNode != null && this.leaderNode.getNodeId().equals(this.localNode.getNodeId());
    }
}

