/*
 * Decompiled with CFR 0.152.
 */
package org.teamapps.universaldb.cluster;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.teamapps.universaldb.cluster.ClusterConfig;
import org.teamapps.universaldb.cluster.ClusterHandler;
import org.teamapps.universaldb.cluster.ClusterNode;
import org.teamapps.universaldb.cluster.ClusterNodeConfig;
import org.teamapps.universaldb.cluster.ClusterNodeRole;
import org.teamapps.universaldb.cluster.ClusterOperationMode;
import org.teamapps.universaldb.cluster.message.ClusterNodeStatusMessage;
import org.teamapps.universaldb.cluster.message.ConnectToHeadRequest;
import org.teamapps.universaldb.cluster.message.ConnectToHeadSuccessResponse;
import org.teamapps.universaldb.cluster.message.ConnectToHeadWaitResponse;
import org.teamapps.universaldb.cluster.message.HeadElectionProposal;
import org.teamapps.universaldb.cluster.message.InitMessage;
import org.teamapps.universaldb.cluster.message.InitMessageResponse;
import org.teamapps.universaldb.cluster.message.ResolvedTransactionRequest;
import org.teamapps.universaldb.cluster.message.SynchronizeTransactionResponse;
import org.teamapps.universaldb.cluster.message.SynchronizeTransactionsFinished;
import org.teamapps.universaldb.cluster.message.SynchronizeTransactionsRequest;
import org.teamapps.universaldb.cluster.message.SynchronizeTransactionsStatus;
import org.teamapps.universaldb.cluster.message.UnresolvedTransactionRequest;
import org.teamapps.universaldb.cluster.network.ConnectionHandler;
import org.teamapps.universaldb.cluster.network.MessageType;
import org.teamapps.universaldb.cluster.network.NetworkWriter;
import org.teamapps.universaldb.cluster.network.NodeConnection;
import org.teamapps.universaldb.schema.Schema;
import org.teamapps.universaldb.transaction.TransactionHandler;
import org.teamapps.universaldb.transaction.TransactionPacket;
import org.teamapps.universaldb.transaction.TransactionRequest;

public class Cluster
extends Thread
implements ConnectionHandler,
ClusterHandler {
    private static final Logger log = LoggerFactory.getLogger(Cluster.class);
    private final ClusterConfig config;
    private final TransactionHandler transactionHandler;
    private ClusterOperationMode clusterOperationMode = ClusterOperationMode.BOOTING;
    private int localNodeId;
    private ClusterNodeRole localNodeType;
    private ClusterNodeRole preferredLocalNodeType;
    private int localPort;
    private ClusterNode headNode;
    private int clusterQuorum;
    private int electoralClusterNodes;
    private List<ClusterNode> clusterNodes;
    private Map<Integer, ClusterNode> nodeById;
    private LocalState localState = LocalState.CONNECTING;
    private boolean nonVotingNode;
    private int headElectionProposal;
    private TransactionRequest currentTransactionRequest;
    private Integer netWorkSyncObject = 10001;

    public Cluster(ClusterConfig config, TransactionHandler transactionHandler) {
        this.config = config;
        this.transactionHandler = transactionHandler;
        this.clusterNodes = Collections.synchronizedList(new ArrayList());
        this.nodeById = new HashMap<Integer, ClusterNode>();
        ClusterNodeConfig localNode = config.getLocalNode();
        this.localNodeId = localNode.getNodeId();
        this.localPort = localNode.getPort();
        this.localNodeType = localNode.getPreferredNodeType().getBootUpType();
        this.preferredLocalNodeType = localNode.getPreferredNodeType();
        ++this.electoralClusterNodes;
        for (ClusterNodeConfig node : config.getRemoteNodes()) {
            ClusterNode clusterNode = new ClusterNode(node, transactionHandler, this);
            this.clusterNodes.add(clusterNode);
            this.nodeById.put(clusterNode.getNodeId(), clusterNode);
            if (!node.getPreferredNodeType().allowedToVoteForHeadSelection()) continue;
            ++this.electoralClusterNodes;
        }
        this.clusterQuorum = this.electoralClusterNodes / 2 + 1;
        if (!this.localNodeType.allowedToVoteForHeadSelection()) {
            this.nonVotingNode = true;
        }
        this.setName("cluster-socket");
        this.start();
        this.connectNodes();
        this.startNodeConnectionsCheck();
    }

    public synchronized void executeTransaction(TransactionRequest transactionRequest) {
        try {
            boolean isWaitingLogged = false;
            while (this.localState != LocalState.RUNNING && this.localState != LocalState.RUNNING_AS_HEAD) {
                if (!isWaitingLogged) {
                    log.warn("Wait for transaction execution, node not available, " + this.getNodeStatusAsString());
                }
                isWaitingLogged = true;
                try {
                    Thread.sleep(50L);
                }
                catch (InterruptedException interruptedException) {}
            }
            if (this.localState == LocalState.RUNNING_AS_HEAD) {
                this.transactionHandler.executeTransactionRequest(transactionRequest);
                ResolvedTransactionRequest executionRequest = new ResolvedTransactionRequest(transactionRequest);
                for (ClusterNode clusterNode : this.clusterNodes) {
                    if (!clusterNode.isInitialized()) continue;
                    clusterNode.sendMessage(executionRequest);
                }
            } else if (this.localState == LocalState.RUNNING) {
                this.currentTransactionRequest = transactionRequest;
                UnresolvedTransactionRequest executionRequest = new UnresolvedTransactionRequest(transactionRequest);
                this.headNode.sendMessage(executionRequest);
                this.currentTransactionRequest.waitForExecution();
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void connectNodes() {
        for (ClusterNode clusterNode : this.clusterNodes) {
            if (clusterNode.getNodeId() >= this.localNodeId) continue;
            this.connectNode(clusterNode);
        }
    }

    private void connectNode(ClusterNode clusterNode) {
        try {
            clusterNode.setConnecting(true);
            NodeConnection connection = new NodeConnection(this.config.getClusterSecret(), clusterNode);
            InitMessage initMessage = new InitMessage(this.localNodeId, this.transactionHandler.getLastTransactionId(), this.transactionHandler.getCurrentTransactionId(), this.transactionHandler.getTransactionCount(), this.transactionHandler.getSchema(), this.clusterOperationMode, this.localNodeType, this.preferredLocalNodeType);
            connection.connect(clusterNode.getConfig().getAddress(), clusterNode.getConfig().getPort(), initMessage);
        }
        catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        while (true) {
            try {
                ServerSocket serverSocket = new ServerSocket(this.localPort);
                while (true) {
                    Socket socket = serverSocket.accept();
                    NodeConnection nodeConnection = new NodeConnection(this.config.getClusterSecret(), this);
                    nodeConnection.setSocket(socket);
                }
            }
            catch (IOException e) {
                e.printStackTrace();
                continue;
            }
            break;
        }
    }

    @Override
    public void handleMessage(MessageType messageType, byte[] data, NetworkWriter networkWriter) {
        if (messageType == MessageType.INITIAL_MESSAGE) {
            try {
                InitMessage initMessage = new InitMessage(data);
                ClusterNode clusterNode = this.nodeById.get(initMessage.getNodeId());
                networkWriter.setConnectionHandler(clusterNode);
                clusterNode.setNetworkWriter(networkWriter);
                clusterNode.handleInitialMessage(initMessage);
            }
            catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    @Override
    public void handleConnectionError() {
    }

    @Override
    public ClusterOperationMode getClusterOperationMode() {
        return this.clusterOperationMode;
    }

    @Override
    public void handleNodeInitialized(ClusterNode node) {
        try {
            InitMessageResponse initMessageResponse = new InitMessageResponse(this.localNodeId, this.transactionHandler.getLastTransactionId(), this.transactionHandler.getCurrentTransactionId(), this.transactionHandler.getTransactionCount(), this.transactionHandler.getSchema(), this.clusterOperationMode, this.localNodeType, this.preferredLocalNodeType);
            node.sendMessage(initMessageResponse);
            log.info("Cluster node connected:" + node.getNodeId() + ", " + this.getNodeStatusAsString());
            this.checkClusterState();
        }
        catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void handleNodeInitializedResponse(ClusterNode clusterNode) {
        log.info("Cluster node connected (bd):" + clusterNode.getNodeId() + ", " + this.getNodeStatusAsString());
        this.checkClusterState();
    }

    @Override
    public void handleSchemaUpdate(Schema schema) {
        if (this.nonVotingNode) {
            try {
                this.transactionHandler.updateSchema(schema);
            }
            catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    @Override
    public void handleSynchronizeTransactionRequest(SynchronizeTransactionsRequest message, NetworkWriter networkWriter) {
        if (message.getRequestEndId() <= this.transactionHandler.getLastTransactionId()) {
            log.info("Transaction synchronization request, req-from:" + message.getRequestStartId() + ", to:" + message.getRequestEndId() + ", " + this.getNodeStatusAsString());
            new Thread(() -> {
                Iterator<byte[]> transactions = this.transactionHandler.getTransactions(message.getRequestStartId(), message.getRequestEndId());
                while (transactions.hasNext()) {
                    byte[] data = transactions.next();
                    networkWriter.sendMessage(new SynchronizeTransactionResponse(data));
                }
                networkWriter.sendMessage(new SynchronizeTransactionsFinished(message.getRequestStartId(), message.getRequestEndId()));
            }).start();
        } else {
            log.error("Wrong transaction synchronization request, requested:" + message.getRequestEndId() + ", available:" + this.transactionHandler.getLastTransactionId() + ", " + this.getNodeStatusAsString());
        }
    }

    @Override
    public void handleSynchronizeTransactionResponse(SynchronizeTransactionResponse synchronizeTransactionResponse, ClusterNode clusterNode) {
        try {
            TransactionPacket packet = new TransactionPacket(synchronizeTransactionResponse.getData());
            this.transactionHandler.handleTransactionSynchronizationPacket(packet);
        }
        catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void handleSynchronizeTransactionsFinished(SynchronizeTransactionsFinished synchronizeTransactionsFinished, ClusterNode node) {
        SynchronizeTransactionsStatus message = new SynchronizeTransactionsStatus(this.transactionHandler.getLastTransactionId(), this.transactionHandler.getCurrentTransactionId(), this.transactionHandler.getTransactionCount());
        for (ClusterNode clusterNode : this.clusterNodes) {
            if (!clusterNode.isInitialized()) continue;
            clusterNode.sendMessage(message);
        }
        this.localState = LocalState.WAITING_FOR_PEER_SYNCHRONISATION;
        log.info("Finished local transactions syncing:" + this.transactionHandler.getLastTransactionId() + " (" + synchronizeTransactionsFinished.getRequestEndId() + "), " + this.getNodeStatusAsString());
        this.checkSynchronizationState();
    }

    @Override
    public void handleSynchronizeTransactionsStatus(SynchronizeTransactionsStatus synchronizeTransactionsStatus, ClusterNode clusterNode) {
        if (this.localState == LocalState.WAITING_FOR_PEER_SYNCHRONISATION) {
            this.checkSynchronizationState();
        }
    }

    @Override
    public void handleHeadElectionProposal(int headIdProposal, ClusterNode clusterNode) {
        if (this.localState == LocalState.ELECTING_HEAD) {
            this.checkHeadElectionResult();
        }
    }

    @Override
    public void handleConnectToHeadRequest(ConnectToHeadRequest connectToHeadRequest, ClusterNode clusterNode) {
        if (this.localState == LocalState.RUNNING_AS_HEAD) {
            long requestedLastTransactionId = connectToHeadRequest.getLastTransactionId();
            long requestedTransactionCount = connectToHeadRequest.getTransactionCount();
            long lastTransactionId = this.transactionHandler.getLastTransactionId();
            long currentTransactionId = this.transactionHandler.getCurrentTransactionId();
            long transactionCount = this.transactionHandler.getTransactionCount();
            log.info("HEAD: Connected cluster node " + clusterNode.getNodeId() + " as " + clusterNode.getClusterNodeRole());
            if (requestedLastTransactionId < lastTransactionId) {
                log.info("HEAD: send missing transactions, requested:" + requestedLastTransactionId + ", current:" + lastTransactionId + ", missing transactions:" + (transactionCount - requestedTransactionCount));
                new Thread(() -> {}).start();
            }
            clusterNode.sendMessage(new ConnectToHeadSuccessResponse(lastTransactionId, currentTransactionId, transactionCount));
        } else if (this.localState == LocalState.ELECTING_HEAD) {
            log.info("Cannot yet connect node to this HEAD:" + clusterNode.getNodeId() + ", " + this.getNodeStatusAsString());
            clusterNode.sendMessage(new ConnectToHeadWaitResponse(100));
        } else {
            log.warn("Error: node wants to connect to head, but this is no head!" + clusterNode.getNodeId() + ", this node:" + this.getNodeStatusAsString());
        }
    }

    @Override
    public void handleUnresolvedTransactionRequest(TransactionRequest transactionRequest) {
        this.executeTransaction(transactionRequest);
    }

    @Override
    public void handleResolvedTransactionRequest(TransactionRequest transactionRequest) {
        try {
            this.transactionHandler.executeTransactionRequest(transactionRequest);
            if (this.currentTransactionRequest != null && transactionRequest.getTransaction().getTransactionRequestId() == this.currentTransactionRequest.getTransaction().getTransactionRequestId()) {
                this.currentTransactionRequest.setExecuted();
            }
        }
        catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void handleConnectToHeadWaitResponse(ConnectToHeadWaitResponse connectToHeadWaitResponse, ClusterNode clusterNode) {
        log.info("Must wait to connect to head:" + clusterNode.getNodeId() + ", " + this.getNodeStatusAsString());
    }

    @Override
    public void handleConnectToHeadSuccessResponse(ConnectToHeadSuccessResponse connectToHeadSuccessResponse, ClusterNode clusterNode) {
        log.info("Successfully connected to head:" + clusterNode.getNodeId() + ", " + this.getNodeStatusAsString());
        this.localState = LocalState.RUNNING;
    }

    @Override
    public void handleClusterNodeStatusUpdate(ClusterNodeStatusMessage clusterNodeStatusMessage, ClusterNode clusterNode) {
    }

    @Override
    public void handleLostConnection(ClusterNode node) {
        log.info("Lost connection to node:" + node);
        if (this.localState != LocalState.RUNNING && this.getClusterState() == ClusterState.MISSING_QUORUM) {
            this.localState = LocalState.CONNECTING;
        }
    }

    private boolean isAllClusterNodesSynced() {
        long lastId = this.transactionHandler.getLastTransactionId();
        long transactionCount = this.transactionHandler.getTransactionCount();
        for (ClusterNode clusterNode : this.clusterNodes) {
            if (!clusterNode.isInitialized() || lastId == clusterNode.getLastTransactionId() && transactionCount == clusterNode.getTransactionCount()) continue;
            return false;
        }
        return true;
    }

    private boolean isLocalNodeSynced() {
        long lastId = this.transactionHandler.getLastTransactionId();
        long transactionCount = this.transactionHandler.getTransactionCount();
        for (ClusterNode clusterNode : this.clusterNodes) {
            if (!clusterNode.isInitialized() || lastId >= clusterNode.getLastTransactionId() && transactionCount >= clusterNode.getTransactionCount()) continue;
            return false;
        }
        return true;
    }

    private void sendNodeStatus() {
        int countConnectedNodes = 0;
        for (ClusterNode clusterNode : this.clusterNodes) {
            if (!clusterNode.isInitialized()) continue;
            ++countConnectedNodes;
        }
        int currentHead = 0;
        if (this.headNode != null) {
            currentHead = this.headNode.getNodeId();
        }
        ClusterNodeStatusMessage nodeStatusMessage = new ClusterNodeStatusMessage(this.localNodeId, this.transactionHandler.getLastTransactionId(), this.transactionHandler.getCurrentTransactionId(), this.transactionHandler.getTransactionCount(), 0, this.clusterOperationMode, this.localNodeType, this.preferredLocalNodeType, countConnectedNodes, currentHead);
        for (ClusterNode clusterNode : this.clusterNodes) {
            if (!clusterNode.isInitialized()) continue;
            clusterNode.sendMessage(nodeStatusMessage);
        }
    }

    private String getNodeStatusAsString() {
        int headId = 0;
        if (this.headNode != null) {
            headId = this.headNode.getNodeId();
        }
        return "nodeId:" + this.localNodeId + ", mode:" + this.clusterOperationMode + ", type:" + this.localNodeType + ", state:" + this.localState + ", transId:" + this.transactionHandler.getLastTransactionId() + ", trans-count:" + this.transactionHandler.getTransactionCount();
    }

    private ClusterState getClusterState() {
        log.info("Checking cluster state, " + this.getNodeStatusAsString());
        int countNodes = 1;
        int countRunningNodes = 0;
        boolean waitingNodes = false;
        for (ClusterNode clusterNode : this.clusterNodes) {
            if (!clusterNode.isInitialized() || !clusterNode.getClusterNodeRole().allowedToVoteForHeadSelection()) continue;
            ++countNodes;
            if (clusterNode.getClusterOperationMode() == ClusterOperationMode.RUNNING) {
                ++countRunningNodes;
                continue;
            }
            if (clusterNode.getClusterOperationMode() != ClusterOperationMode.WAITING) continue;
            waitingNodes = true;
        }
        log.info("Cluster state: all nodes:" + countNodes + ", running:" + countRunningNodes + ", waiting:" + waitingNodes + ", quorum:" + this.clusterQuorum + ", electoral:" + this.electoralClusterNodes);
        if (countNodes >= this.clusterQuorum) {
            if (countRunningNodes >= this.clusterQuorum) {
                log.info("Cluster state: running");
                return ClusterState.RUNNING;
            }
            if (waitingNodes && countNodes >= this.clusterQuorum) {
                log.info("Cluster state: waiting");
                return ClusterState.WAITING;
            }
            if (countNodes >= this.electoralClusterNodes) {
                log.info("Cluster state: ready to sync");
                return ClusterState.ALL_NODES_READY_TO_SYNCHRONIZE;
            }
        }
        log.info("Cluster state: missing quorum");
        return ClusterState.MISSING_QUORUM;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void checkClusterState() {
        Integer n = this.netWorkSyncObject;
        synchronized (n) {
            log.info("pre check cluster state, " + this.getNodeStatusAsString());
            if (this.localState != LocalState.CONNECTING) {
                log.info("not checking cluster state as wrong local state:" + this.localState + ", " + this.getNodeStatusAsString());
                return;
            }
            if (this.clusterOperationMode != ClusterOperationMode.RUNNING && this.getClusterState() != ClusterState.MISSING_QUORUM) {
                this.startSyncing();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void checkSynchronizationState() {
        Integer n = this.netWorkSyncObject;
        synchronized (n) {
            if (this.localState != LocalState.WAITING_FOR_PEER_SYNCHRONISATION) {
                return;
            }
            if (!this.isLocalNodeSynced()) {
                log.info("Synchronize transactions again..., " + this.getNodeStatusAsString());
                this.synchronizeTransactions();
                return;
            }
            if (this.getClusterState() == ClusterState.RUNNING) {
                int headId = 0;
                for (ClusterNode clusterNode : this.clusterNodes) {
                    if (!clusterNode.isInitialized() || clusterNode.getClusterNodeRole() != ClusterNodeRole.HEAD) continue;
                    headId = clusterNode.getNodeId();
                    break;
                }
                this.connectToHead(headId);
            } else if (this.isAllClusterNodesSynced()) {
                this.startHeadElection();
            }
        }
    }

    private void startHeadElection() {
        this.localState = LocalState.ELECTING_HEAD;
        this.headElectionProposal = this.selectBestHead();
        log.info("Start electing head, proposal is:" + this.headElectionProposal + ", " + this.getNodeStatusAsString());
        HeadElectionProposal headElectionProposalMessage = new HeadElectionProposal(this.headElectionProposal);
        for (ClusterNode clusterNode : this.clusterNodes) {
            if (!clusterNode.isInitialized()) continue;
            clusterNode.sendMessage(headElectionProposalMessage);
        }
    }

    private void checkHeadElectionResult() {
        int countResults = 0;
        if (this.headElectionProposal > 0) {
            for (ClusterNode clusterNode : this.clusterNodes) {
                if (!clusterNode.isInitialized()) continue;
                if (this.headElectionProposal != clusterNode.getHeadIdProposal()) {
                    return;
                }
                ++countResults;
            }
            if (countResults < this.clusterQuorum) {
                log.error("Error: head election with all results but less than quorum:" + countResults + ", quorum:" + this.clusterQuorum);
            }
            if (this.localNodeId == this.headElectionProposal) {
                this.localState = LocalState.RUNNING_AS_HEAD;
                this.clusterOperationMode = ClusterOperationMode.RUNNING;
                this.sendNodeStatus();
                log.info("Start as HEAD node, " + this.getNodeStatusAsString());
            } else {
                log.info("Connect to HEAD:" + this.headElectionProposal + ", " + this.getNodeStatusAsString());
                this.connectToHead(this.headElectionProposal);
            }
        }
    }

    private void connectToHead(int headId) {
        ClusterNode clusterNode;
        this.localState = LocalState.WAIT_FOR_HEAD;
        this.headNode = clusterNode = this.nodeById.get(headId);
        this.headNode.sendMessage(new ConnectToHeadRequest(this.transactionHandler.getLastTransactionId(), this.transactionHandler.getCurrentTransactionId(), this.transactionHandler.getTransactionCount()));
    }

    private int selectBestHead() {
        ArrayList<ClusterNodeConfig> connectedNodes = new ArrayList<ClusterNodeConfig>();
        for (ClusterNodeConfig clusterNode : this.config.getClusterNodes()) {
            int nodeId = clusterNode.getNodeId();
            if (this.localNodeId == nodeId) {
                connectedNodes.add(clusterNode);
                continue;
            }
            ClusterNode node2 = this.nodeById.get(nodeId);
            if (!node2.isInitialized()) continue;
            connectedNodes.add(clusterNode);
        }
        ClusterNodeConfig head = connectedNodes.stream().filter(node -> node.getPreferredNodeType() == ClusterNodeRole.HEAD).findAny().orElse(null);
        if (head == null) {
            head = connectedNodes.stream().filter(node -> node.getPreferredNodeType() == ClusterNodeRole.AUTO).findAny().orElse(null);
        }
        if (head == null) {
            head = connectedNodes.stream().filter(node -> node.getPreferredNodeType() == ClusterNodeRole.WORKER).findAny().orElse(null);
        }
        if (head == null) {
            head = connectedNodes.stream().filter(node -> node.getPreferredNodeType() == ClusterNodeRole.SNAPSHOT_PROVIDER).findAny().orElse(null);
        }
        if (head == null) {
            log.warn("Could not find any node that can act as head of cluster! " + this.getNodeStatusAsString());
            return 0;
        }
        return head.getNodeId();
    }

    private void startSyncing() {
        try {
            this.synchronizeSchema();
            this.synchronizeTransactions();
        }
        catch (IOException e) {
            e.printStackTrace();
        }
    }

    private void synchronizeSchema() throws IOException {
        log.info("Start synchronizing schema, " + this.getNodeStatusAsString());
        this.localState = LocalState.SYNCHRONIZE_SCHEMA;
        Schema localSchema = this.transactionHandler.getSchema();
        boolean modifiedSchema = false;
        for (ClusterNode clusterNode : this.clusterNodes) {
            Schema schema;
            if (!clusterNode.getClusterNodeRole().allowedToVoteForHeadSelection() || !clusterNode.isInitialized() || localSchema.isSameSchema(schema = clusterNode.getSchema())) continue;
            if (localSchema.isCompatibleWith(schema)) {
                localSchema.merge(schema);
                modifiedSchema = true;
                continue;
            }
            this.localState = LocalState.ERROR;
            log.error("Cluster node boot: incompatible schemas of node:" + clusterNode.getNodeId() + ", with schema:" + schema + ", local-schema:" + localSchema + ", " + this.getNodeStatusAsString());
            return;
        }
        if (modifiedSchema) {
            log.info("Cluster needs to update schema - new compatible schema:" + localSchema + ", " + this.getNodeStatusAsString());
            this.transactionHandler.updateSchema(localSchema);
            if (!this.nonVotingNode) {
                for (ClusterNode clusterNode : this.clusterNodes) {
                    if (!clusterNode.isInitialized()) continue;
                    clusterNode.sendSchemaUpdate(localSchema);
                }
            }
        }
    }

    private void synchronizeTransactions() {
        long localTransactionId;
        log.info("Start synchronizing transactions, " + this.getNodeStatusAsString());
        this.localState = LocalState.SYNCHRONIZE_TRANSACTIONS;
        long highestTransactId = localTransactionId = this.transactionHandler.getLastTransactionId();
        ArrayList<ClusterNode> currentClusterNodes = new ArrayList<ClusterNode>();
        for (ClusterNode clusterNode : this.clusterNodes) {
            if (!clusterNode.isInitialized() || clusterNode.getLastTransactionId() <= localTransactionId) continue;
            if (clusterNode.getLastTransactionId() > highestTransactId) {
                currentClusterNodes.clear();
                currentClusterNodes.add(clusterNode);
                highestTransactId = clusterNode.getLastTransactionId();
                continue;
            }
            if (clusterNode.getLastTransactionId() != highestTransactId) continue;
            currentClusterNodes.add(clusterNode);
        }
        if (!currentClusterNodes.isEmpty()) {
            ClusterNode bestSynchronizationNode = this.getBestSynchronizationNode(currentClusterNodes);
            log.info("Cluster node needs to synchronize " + (bestSynchronizationNode.getTransactionCount() - this.transactionHandler.getTransactionCount()) + ", last local transaction:" + this.transactionHandler.getLastTransactionId() + ", current transaction:" + bestSynchronizationNode.getLastTransactionId());
            bestSynchronizationNode.sendMessage(new SynchronizeTransactionsRequest(this.transactionHandler.getLastTransactionId(), bestSynchronizationNode.getLastTransactionId(), this.transactionHandler.getLastTransactionId(), this.transactionHandler.getCurrentTransactionId(), this.transactionHandler.getTransactionCount()));
        } else {
            this.localState = LocalState.WAITING_FOR_PEER_SYNCHRONISATION;
            this.checkSynchronizationState();
        }
    }

    private ClusterNode getBestSynchronizationNode(List<ClusterNode> clusterNodes) {
        ClusterNode clusterNode = clusterNodes.stream().filter(node -> node.getClusterNodeRole() == ClusterNodeRole.SNAPSHOT_PROVIDER).findAny().orElse(null);
        if (clusterNode == null) {
            clusterNode = clusterNodes.stream().filter(node -> node.getClusterNodeRole() == ClusterNodeRole.TRANSACTION_LOGGER).findAny().orElse(null);
        }
        if (clusterNode == null) {
            clusterNode = clusterNodes.stream().filter(node -> node.getClusterNodeRole() == ClusterNodeRole.WORKER).findAny().orElse(null);
        }
        if (clusterNode == null) {
            clusterNode = clusterNodes.stream().filter(node -> node.getClusterNodeRole() == ClusterNodeRole.AUTO).findAny().orElse(null);
        }
        if (clusterNode == null) {
            clusterNode = clusterNodes.stream().filter(node -> node.getClusterNodeRole() == ClusterNodeRole.HEAD).findAny().orElse(null);
        }
        if (clusterNode != null) {
            return clusterNode;
        }
        return clusterNodes.get(0);
    }

    private void startNodeConnectionsCheck() {
        Thread thread = new Thread(() -> {
            try {
                Thread.sleep(3000L);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
            while (true) {
                try {
                    block7: while (true) {
                        Thread.sleep(1000L);
                        Iterator<ClusterNode> iterator = this.clusterNodes.iterator();
                        while (true) {
                            if (!iterator.hasNext()) continue block7;
                            ClusterNode clusterNode = iterator.next();
                            if (clusterNode.isConnected() || clusterNode.isConnecting() || clusterNode.getNodeId() >= this.localNodeId) continue;
                            try {
                                this.connectNode(clusterNode);
                            }
                            catch (Exception exception) {}
                        }
                        break;
                    }
                }
                catch (Exception exception) {
                    continue;
                }
                break;
            }
        }, "cluster-connect-loop");
        thread.setDaemon(true);
        thread.start();
    }

    private void startClusterNodeUpdater() {
        Thread thread = new Thread(() -> {
            while (true) {
                try {
                    while (true) {
                        Thread.sleep(1000L);
                    }
                }
                catch (Exception exception) {
                    continue;
                }
                break;
            }
        }, "cluster-node-updater");
        thread.setDaemon(true);
        thread.start();
    }

    static enum LocalState {
        CONNECTING,
        SYNCHRONIZE_SCHEMA,
        SYNCHRONIZE_TRANSACTIONS,
        WAITING_FOR_PEER_SYNCHRONISATION,
        ELECTING_HEAD,
        WAIT_FOR_HEAD,
        RUNNING,
        RUNNING_AS_HEAD,
        ERROR;

    }

    static enum ClusterState {
        MISSING_QUORUM,
        RUNNING,
        WAITING,
        ALL_NODES_READY_TO_SYNCHRONIZE;

    }
}

