/*
 * Decompiled with CFR 0.152.
 */
package org.teamapps.cluster.service;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.net.ServerSocket;
import java.net.Socket;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.teamapps.cluster.crypto.AesCipher;
import org.teamapps.cluster.dto.FileProvider;
import org.teamapps.cluster.dto.Message;
import org.teamapps.cluster.dto.MessageDecoder;
import org.teamapps.cluster.dto.MessageField;
import org.teamapps.cluster.model.cluster.ClusterFileTransfer;
import org.teamapps.cluster.model.cluster.ClusterFileTransferResponse;
import org.teamapps.cluster.model.cluster.ClusterNodeData;
import org.teamapps.cluster.model.cluster.ClusterNodeInfo;
import org.teamapps.cluster.model.cluster.ClusterSchemaRegistry;
import org.teamapps.cluster.model.cluster.KeepAliveMessage;
import org.teamapps.cluster.model.cluster.ServiceClusterRequest;
import org.teamapps.cluster.model.cluster.ServiceClusterResponse;
import org.teamapps.cluster.network.ClusterNodeMessageHandler;
import org.teamapps.cluster.network.LocalClusterNode;
import org.teamapps.cluster.network.NodeAddress;
import org.teamapps.cluster.network.RemoteClusterNode;
import org.teamapps.cluster.service.AbstractClusterService;
import org.teamapps.cluster.service.ServiceRegistry;
import org.teamapps.cluster.service.Utils;
import org.teamapps.common.util.ExceptionUtil;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;

public class TeamAppsCluster
extends Thread
implements ClusterNodeMessageHandler,
FileProvider,
ServiceRegistry {
    private static final Logger LOGGER = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private final String clusterSecret;
    private final LocalClusterNode localNode;
    private final AesCipher aesCipher;
    private int retryMaxAttempts = 3;
    private Duration retryBackoffDuration = Duration.ofSeconds(3L);
    private final ExecutorService executor = new ThreadPoolExecutor(1, 32, 180L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
    private volatile boolean running = true;
    private final Map<String, RemoteClusterNode> remoteNodes = new ConcurrentHashMap<String, RemoteClusterNode>();
    private final Map<String, AbstractClusterService> localServices = new ConcurrentHashMap<String, AbstractClusterService>();
    private Map<String, List<RemoteClusterNode>> clusterServices = new HashMap<String, List<RemoteClusterNode>>();
    private final Map<Long, CompletableFuture<ServiceClusterResponse>> serviceResponseFutureMap = new ConcurrentHashMap<Long, CompletableFuture<ServiceClusterResponse>>();
    private final Map<String, CompletableFuture<ClusterFileTransferResponse>> fileTransferFutureMap = new ConcurrentHashMap<String, CompletableFuture<ClusterFileTransferResponse>>();
    private final AtomicLong requestIdGenerator = new AtomicLong();
    private final File fileTransferPath;
    private final Map<String, File> fileTransferMap = Collections.synchronizedMap(new LinkedHashMap<String, File>(){

        @Override
        protected boolean removeEldestEntry(Map.Entry<String, File> eldest) {
            boolean remove;
            File file = eldest.getValue();
            boolean oldFile = System.currentTimeMillis() - file.lastModified() > 86400000L;
            boolean bl = remove = oldFile || this.size() > 100000;
            if (remove) {
                eldest.getValue().delete();
            }
            return remove;
        }
    });

    public TeamAppsCluster(String clusterSecret, int localPort, NodeAddress ... knownNodes) {
        this(clusterSecret, localPort, (File)null, knownNodes);
    }

    public TeamAppsCluster(String clusterSecret, int localPort, File tempDir, NodeAddress ... knownNodes) {
        super("cluster-server-socket");
        this.clusterSecret = clusterSecret;
        this.aesCipher = new AesCipher(clusterSecret);
        this.localNode = new LocalClusterNode(localPort);
        this.fileTransferPath = tempDir != null ? tempDir : Utils.createTempDir();
        this.start();
        this.connectNodes(knownNodes);
    }

    private void connectNodes(NodeAddress[] knownNodes) {
        if (knownNodes == null) {
            return;
        }
        for (NodeAddress nodeAddress : knownNodes) {
            RemoteClusterNode remoteClusterNode = new RemoteClusterNode((ClusterNodeMessageHandler)this, nodeAddress);
        }
    }

    @Override
    public void registerService(AbstractClusterService clusterService) {
        this.localServices.put(clusterService.getServiceName(), clusterService);
        List<RemoteClusterNode> connectedNodes = this.remoteNodes.values().stream().filter(RemoteClusterNode::isConnected).collect(Collectors.toList());
        this.sendNodeUpdate(connectedNodes);
    }

    @Override
    public boolean isServiceAvailable(String serviceName) {
        List nodesWithService = this.clusterServices.getOrDefault(serviceName, Collections.emptyList());
        return !nodesWithService.isEmpty();
    }

    public RemoteClusterNode getRandomServiceProvider(String serviceName) {
        List nodesWithService = this.clusterServices.getOrDefault(serviceName, Collections.emptyList()).stream().filter(RemoteClusterNode::isConnected).collect(Collectors.toList());
        return (RemoteClusterNode)Utils.randomListEntry(nodesWithService);
    }

    @Override
    public void run() {
        try {
            ServerSocket serverSocket = new ServerSocket(this.localNode.getPort());
            while (this.running) {
                try {
                    Socket socket = serverSocket.accept();
                    new RemoteClusterNode((ClusterNodeMessageHandler)this, socket);
                }
                catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        catch (IOException e) {
            e.printStackTrace();
        }
    }

    private void sendMessage(Message message, RemoteClusterNode clusterNode) throws Exception {
        byte[] messageBytes = this.aesCipher.encrypt(message.toBytes());
        clusterNode.sendMessage(messageBytes);
    }

    private synchronized void handleClusterNodeInfo(ClusterNodeInfo clusterNodeInfo, RemoteClusterNode clusterNode) throws Exception {
        boolean sendClusterInfo;
        clusterNode.setClusterNodeData(clusterNodeInfo.getLocalNode());
        for (ClusterNodeData knownRemoteNode : clusterNodeInfo.getKnownRemoteNodes()) {
            if (this.remoteNodes.containsKey(knownRemoteNode.getNodeId()) || knownRemoteNode.getNodeId().equals(this.localNode.getNodeId())) continue;
            RemoteClusterNode remoteClusterNode = new RemoteClusterNode((ClusterNodeMessageHandler)this, new NodeAddress(knownRemoteNode.getHost(), knownRemoteNode.getPort()));
        }
        boolean bl = sendClusterInfo = !clusterNodeInfo.getResponse();
        if (!this.remoteNodes.containsKey(clusterNode.getNodeId())) {
            clusterNode.setConnected(true);
            sendClusterInfo = true;
            LOGGER.info("New cluster node: {}", (Object)clusterNode);
            this.remoteNodes.put(clusterNode.getNodeId(), clusterNode);
        } else if (!clusterNode.isConnected()) {
            clusterNode.setConnected(true);
            LOGGER.info("Reconnected cluster node: {}", (Object)clusterNode);
        }
        if (sendClusterInfo) {
            clusterNode.sendMessage(this.createInfoMessage(true));
            List<RemoteClusterNode> connectedNodes = this.remoteNodes.values().stream().filter(RemoteClusterNode::isConnected).collect(Collectors.toList());
            this.sendNodeUpdate(connectedNodes);
        }
        this.recreateClusterServiceMap();
    }

    private void recreateClusterServiceMap() {
        HashMap<String, List<RemoteClusterNode>> serviceMap = new HashMap<String, List<RemoteClusterNode>>();
        for (RemoteClusterNode clusterNode : this.remoteNodes.values()) {
            for (String service : clusterNode.getServices()) {
                serviceMap.putIfAbsent(service, new ArrayList());
                ((List)serviceMap.get(service)).add(clusterNode);
            }
        }
        this.clusterServices = serviceMap;
    }

    private void sendNodeUpdate(List<RemoteClusterNode> nodes) {
        byte[] infoMessage = this.createInfoMessage(true);
        for (RemoteClusterNode node : nodes) {
            node.sendMessage(infoMessage);
        }
    }

    @Override
    public void handleMessage(RemoteClusterNode clusterNode, byte[] bytes) {
        this.executor.submit(() -> {
            try {
                byte[] data = this.aesCipher.decrypt(bytes);
                int messageRootFieldId = Message.getMessageFieldId(data);
                MessageField messageField = ClusterSchemaRegistry.SCHEMA.getFieldById(messageRootFieldId);
                LOGGER.debug("Handle message: id: {}, field: {}, size: {}, node: {}", new Object[]{messageRootFieldId, messageField, bytes.length, clusterNode});
                switch (messageRootFieldId) {
                    case 101007: {
                        this.handleClusterNodeInfo(new ClusterNodeInfo(data, (FileProvider)this), clusterNode);
                        break;
                    }
                    case 101021: {
                        this.handleFileTransfer(new ClusterFileTransfer(data, (FileProvider)this), clusterNode);
                        break;
                    }
                    case 101027: {
                        this.handleFileTransferResponse(new ClusterFileTransferResponse(data, (FileProvider)this), clusterNode);
                        break;
                    }
                    case 101011: {
                        this.handleServiceClusterRequest(new ServiceClusterRequest(data, (FileProvider)this), clusterNode);
                        break;
                    }
                    case 101016: {
                        this.handleServiceClusterResponse(new ServiceClusterResponse(data, (FileProvider)this), clusterNode);
                    }
                }
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        });
    }

    @Override
    public <REQUEST extends Message, RESPONSE extends Message> Mono<RESPONSE> createServiceTask(String serviceName, String method, REQUEST request, MessageDecoder<RESPONSE> responseDecoder) {
        long requestId = this.requestIdGenerator.incrementAndGet();
        Mono mono = Mono.create(monoSink -> {
            RemoteClusterNode clusterNode = this.getRandomServiceProvider(serviceName);
            if (clusterNode == null) {
                LOGGER.warn("No cluster member available for service: {}, method: {}, with request: {}", new Object[]{serviceName, method, request});
                monoSink.error((Throwable)new Exception("Error: no cluster member available!"));
                return;
            }
            LOGGER.debug("Create cluster task for member: {}", (Object)clusterNode);
            AtomicBoolean disposed = new AtomicBoolean();
            monoSink.onDispose(() -> disposed.set(true));
            try {
                byte[] messageBytes = request.toBytes(file -> (String)ExceptionUtil.softenExceptions(() -> this.sendFile(file, clusterNode, disposed)));
                if (disposed.get()) {
                    return;
                }
                ServiceClusterRequest serviceClusterRequest = new ServiceClusterRequest().setRequestId(requestId).setServiceName(serviceName).setMethod(method).setRequestData(messageBytes);
                CompletableFuture completableFuture = new CompletableFuture();
                this.serviceResponseFutureMap.put(requestId, completableFuture);
                this.sendMessage(serviceClusterRequest, clusterNode);
                monoSink.success(completableFuture);
            }
            catch (Exception e) {
                monoSink.error((Throwable)e);
            }
        }).flatMap(Mono::fromFuture).map(response -> responseDecoder.decode(response.getResponseData(), (FileProvider)this)).subscribeOn(Schedulers.boundedElastic()).retryWhen((Retry)Retry.backoff((long)this.retryMaxAttempts, (Duration)this.retryBackoffDuration));
        return mono.timeout(Duration.ofMinutes(5L)).doAfterTerminate(() -> this.serviceResponseFutureMap.remove(requestId));
    }

    private void handleServiceClusterRequest(ServiceClusterRequest request, RemoteClusterNode clusterNode) throws Exception {
        LOGGER.debug("Handle cluster request: {}, node: {}", (Object)request, (Object)clusterNode);
        AbstractClusterService clusterService = this.localServices.get(request.getServiceName());
        ServiceClusterResponse response = new ServiceClusterResponse().setRequestId(request.getRequestId());
        if (clusterService != null) {
            byte[] message = clusterService.handleMessage(request.getMethod(), request.getRequestData(), this, file -> (String)ExceptionUtil.softenExceptions(() -> this.sendFile(file, clusterNode, null)));
            response.setResponseData(message);
        } else {
            LOGGER.error("Could not find requested service {}", (Object)request.getServiceName());
            response.setError(true).setErrorMessage("could not find requested service: " + request.getServiceName());
        }
        this.sendMessage(response, clusterNode);
    }

    private void handleServiceClusterResponse(ServiceClusterResponse serviceClusterResponse, RemoteClusterNode clusterNode) {
        LOGGER.debug("Handle cluster response: {}, node: {}", (Object)serviceClusterResponse, (Object)clusterNode);
        CompletableFuture<ServiceClusterResponse> completableFuture = this.serviceResponseFutureMap.get(serviceClusterResponse.getRequestId());
        if (completableFuture != null) {
            completableFuture.complete(serviceClusterResponse);
        }
    }

    @Override
    public byte[] createInitMessage() {
        return this.createInfoMessage(false);
    }

    @Override
    public byte[] getKeepAliveMessage() {
        try {
            return this.aesCipher.encrypt(new KeepAliveMessage().toBytes());
        }
        catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    private byte[] createInfoMessage(boolean response) {
        try {
            ClusterNodeInfo nodeInfo = new ClusterNodeInfo();
            nodeInfo.setResponse(response);
            nodeInfo.setLocalNode(new ClusterNodeData().setNodeId(this.localNode.getNodeId()).setAvailableServices(this.localServices.keySet().toArray(new String[0])));
            for (RemoteClusterNode remoteNode : this.remoteNodes.values()) {
                if (!remoteNode.isOutgoing() || !remoteNode.isConnected()) continue;
                nodeInfo.addKnownRemoteNodes(remoteNode.getClusterNodeData());
            }
            return this.aesCipher.encrypt(nodeInfo.toBytes());
        }
        catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    @Override
    public File getFile(String fileId) {
        return this.fileTransferMap.get(fileId);
    }

    private File getTransferFile(String fileId) {
        return new File(this.fileTransferPath, fileId + ".tmp");
    }

    private String sendFile(File file, RemoteClusterNode clusterNode, AtomicBoolean disposed) throws Exception {
        ClusterFileTransfer fileTransfer;
        int read;
        LOGGER.info("Send file: {}, node: {}", (Object)file.getName(), (Object)clusterNode);
        String fileId = UUID.randomUUID().toString().replace("-", ".");
        BufferedInputStream bis = new BufferedInputStream(new FileInputStream(file));
        byte[] buf = new byte[10000];
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        boolean initialMessage = true;
        while ((read = bis.read(buf)) >= 0) {
            bos.write(buf, 0, read);
            if (bos.size() < 1000000) continue;
            fileTransfer = new ClusterFileTransfer().setFileId(fileId).setData(bos.toByteArray()).setInitialMessage(initialMessage);
            initialMessage = false;
            bos.reset();
            if (disposed != null && disposed.get()) {
                return null;
            }
            this.sendMessage(fileTransfer, clusterNode);
        }
        fileTransfer = new ClusterFileTransfer().setFileId(fileId).setData(bos.toByteArray()).setLastMessage(true);
        if (disposed != null && disposed.get()) {
            return null;
        }
        CompletableFuture completableFuture = new CompletableFuture();
        this.fileTransferFutureMap.put(fileId, completableFuture);
        this.sendMessage(fileTransfer, clusterNode);
        ClusterFileTransferResponse fileTransferResponse = (ClusterFileTransferResponse)completableFuture.get(60L, TimeUnit.SECONDS);
        if (fileTransferResponse.getReceivedData() == file.length()) {
            return fileId;
        }
        throw new Exception("Error sending file transfer, expected: " + file.length() + ", actual received: " + fileTransferResponse.getReceivedData());
    }

    private void handleFileTransfer(ClusterFileTransfer fileTransfer, RemoteClusterNode clusterNode) throws Exception {
        LOGGER.info("Handle file transfer: {}, node: {}", (Object)fileTransfer.getFileId(), (Object)clusterNode);
        long length = this.appendFileTransferData(fileTransfer.getFileId(), fileTransfer.getData(), fileTransfer.getInitialMessage());
        if (fileTransfer.getLastMessage()) {
            File file = this.getTransferFile(fileTransfer.getFileId());
            this.fileTransferMap.put(fileTransfer.getFileId(), file);
            this.sendMessage(new ClusterFileTransferResponse().setReceivedData(length).setFileId(fileTransfer.getFileId()), clusterNode);
        }
    }

    private long appendFileTransferData(String fileId, byte[] bytes, boolean initialData) throws IOException {
        File file = this.getTransferFile(fileId);
        BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(file, !initialData), 32000);
        bos.write(bytes);
        bos.close();
        return file.length();
    }

    private void handleFileTransferResponse(ClusterFileTransferResponse fileTransferResponse, RemoteClusterNode clusterNode) {
        CompletableFuture<ClusterFileTransferResponse> completableFuture = this.fileTransferFutureMap.remove(fileTransferResponse.getFileId());
        if (completableFuture != null) {
            completableFuture.complete(fileTransferResponse);
        }
    }
}

