/*
 * Decompiled with CFR 0.152.
 */
package org.teamapps.cluster.network;

import java.lang.invoke.MethodHandles;
import java.net.Socket;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.teamapps.cluster.model.cluster.ClusterNodeData;
import org.teamapps.cluster.network.ClusterNode;
import org.teamapps.cluster.network.ClusterNodeMessageHandler;
import org.teamapps.cluster.network.Connection;
import org.teamapps.cluster.network.ConnectionHandler;
import org.teamapps.cluster.network.NodeAddress;

public class RemoteClusterNode
extends ClusterNode
implements ConnectionHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
    private final ClusterNodeMessageHandler clusterNodeMessageHandler;
    private final boolean outgoing;
    private final NodeAddress nodeAddress;
    private Connection connection;
    private volatile boolean connected;
    private int retries;
    private long lastMessageTimestamp;
    private byte[] keepAliveMessage;
    private boolean running = true;
    private ArrayBlockingQueue<byte[]> sendMessageQueue = new ArrayBlockingQueue(100000);

    public RemoteClusterNode(ClusterNodeMessageHandler clusterNodeMessageHandler, Socket socket) {
        this.clusterNodeMessageHandler = clusterNodeMessageHandler;
        this.outgoing = false;
        this.nodeAddress = new NodeAddress(socket.getInetAddress().getHostAddress(), socket.getPort());
        this.connection = new Connection(this, socket, this.nodeAddress);
        this.init();
    }

    public RemoteClusterNode(ClusterNodeMessageHandler clusterNodeMessageHandler, NodeAddress nodeAddress) {
        this.clusterNodeMessageHandler = clusterNodeMessageHandler;
        this.outgoing = true;
        this.nodeAddress = nodeAddress;
        this.createOutgoingConnection();
        this.init();
    }

    public void merge(RemoteClusterNode clusterNode) {
        if (!this.connected) {
            if (this.connection != null) {
                LOGGER.error("Error connection still exists!");
            }
            clusterNode.getConnection().setConnectionHandler(this);
            this.connection = clusterNode.getConnection();
        } else {
            LOGGER.error("Cannot merge cluster node, existing node is still connected:" + this.nodeAddress);
        }
    }

    private void init() {
        this.keepAliveMessage = this.clusterNodeMessageHandler.getKeepAliveMessage();
        scheduledExecutorService.scheduleAtFixedRate(() -> {
            if (this.connected && System.currentTimeMillis() - this.lastMessageTimestamp > 60000L) {
                this.sendKeepAlive();
            }
        }, 90L, 90L, TimeUnit.SECONDS);
        this.startSendQueueThread();
    }

    private void startSendQueueThread() {
        Thread thread = new Thread(() -> {
            byte[] unsentMessageBytes = null;
            while (this.running) {
                try {
                    if (this.connected) {
                        byte[] bytes = unsentMessageBytes != null ? unsentMessageBytes : this.sendMessageQueue.take();
                        boolean success = this.connection.writeMessage(bytes);
                        LOGGER.debug("Write async message to " + this.getNodeId() + ", len:" + bytes + ", success:" + success);
                        if (success) {
                            unsentMessageBytes = null;
                            this.lastMessageTimestamp = System.currentTimeMillis();
                            continue;
                        }
                        unsentMessageBytes = bytes;
                        Thread.sleep(1000L);
                        LOGGER.error("Could not write message async");
                        continue;
                    }
                    Thread.sleep(500L);
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        thread.setDaemon(true);
        thread.setName("connection-async-writer-" + this.nodeAddress.getHost());
        thread.start();
    }

    private void sendKeepAlive() {
        this.sendMessage(this.keepAliveMessage);
    }

    private void createOutgoingConnection() {
        this.connection = new Connection(this, this.nodeAddress);
        this.connection.writeMessage(this.clusterNodeMessageHandler.createInitMessage());
    }

    public void sendMessage(byte[] bytes) {
        if (bytes != null && this.connection != null) {
            this.lastMessageTimestamp = System.currentTimeMillis();
            this.connection.writeMessage(bytes);
        }
    }

    public boolean sendMessageAsync(byte[] bytes) {
        return bytes != null && this.sendMessageQueue.offer(bytes);
    }

    @Override
    public void handleMessage(byte[] bytes) {
        this.lastMessageTimestamp = System.currentTimeMillis();
        this.clusterNodeMessageHandler.handleMessage(this, bytes);
    }

    @Override
    public void handleConnectionClosed() {
        LOGGER.info("Remote connection closed: {}, {}", (Object)this.outgoing, (Object)this.nodeAddress);
        this.connected = false;
        this.connection = null;
        ++this.retries;
        if (this.outgoing) {
            scheduledExecutorService.schedule(this::createOutgoingConnection, this.retries < 10 ? 3L : 15L, TimeUnit.SECONDS);
        }
    }

    public boolean isOutgoing() {
        return this.outgoing;
    }

    public boolean isConnected() {
        return this.connected;
    }

    public void setConnected(boolean connected) {
        this.connected = connected;
        this.retries = 0;
    }

    public NodeAddress getNodeAddress() {
        return this.nodeAddress;
    }

    public Connection getConnection() {
        return this.connection;
    }

    public void setClusterNodeData(ClusterNodeData nodeData) {
        if (this.getNodeId() == null) {
            this.setNodeId(nodeData.getNodeId());
        }
        this.setServices(nodeData.getAvailableServices() != null ? Arrays.asList(nodeData.getAvailableServices()) : Collections.emptyList());
    }

    public ClusterNodeData getClusterNodeData() {
        return new ClusterNodeData().setNodeId(this.getNodeId()).setHost(this.getNodeAddress().getHost()).setPort(this.getNodeAddress().getPort()).setAvailableServices(this.getServices().toArray(new String[0]));
    }

    public void shutDown() {
        try {
            if (this.connection != null) {
                this.connection.closeConnection();
            }
            this.running = false;
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    public String toString() {
        return "RemoteClusterNode{ " + this.getNodeId() + ", nodeAddress=" + this.nodeAddress + ", outgoing=" + this.outgoing + ", connected=" + this.connected + ", retries=" + this.retries + ", availableServices=" + this.getServices() + "}";
    }
}

