/*
 * Decompiled with CFR 0.152.
 */
package org.teamapps.cluster.service;

import io.atomix.cluster.ClusterMembershipEvent;
import io.atomix.cluster.ClusterMembershipEventListener;
import io.atomix.cluster.Member;
import io.atomix.cluster.MemberId;
import io.atomix.cluster.Node;
import io.atomix.cluster.discovery.BootstrapDiscoveryProvider;
import io.atomix.cluster.messaging.ClusterCommunicationService;
import io.atomix.core.Atomix;
import io.atomix.primitive.partition.ManagedPartitionGroup;
import io.atomix.primitive.partition.MemberGroupStrategy;
import io.atomix.protocols.backup.partition.PrimaryBackupPartitionGroup;
import io.atomix.protocols.raft.partition.RaftPartitionGroup;
import io.atomix.utils.event.EventListener;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.teamapps.cluster.crypto.AesCipher;
import org.teamapps.cluster.crypto.ShaHash;
import org.teamapps.cluster.dto.FileProvider;
import org.teamapps.cluster.dto.Message;
import org.teamapps.cluster.dto.MessageDecoder;
import org.teamapps.cluster.model.atomix.ClusterMessage;
import org.teamapps.cluster.model.atomix.FileTransfer;
import org.teamapps.cluster.model.atomix.FileTransferResponse;
import org.teamapps.cluster.service.AbstractClusterService;
import org.teamapps.cluster.service.ServiceRegistry;
import org.teamapps.cluster.service.Utils;
import org.teamapps.common.util.ExceptionUtil;
import org.teamapps.event.Event;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;

public class AtomixCluster
implements FileProvider,
ServiceRegistry {
    private static final Logger LOGGER = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final String CLUSTER_REQUEST_CHANNEL = "cr";
    private static final String FILE_TRANSFER_CHANNEL = "ft";
    private static final String SERVICES_PROPERTY = "services";
    private final String clusterId;
    private final AesCipher aesCipher;
    private final File fileTransferPath;
    private int retryMaxAttempts = 3;
    private Duration retryBackoffDuration = Duration.ofSeconds(3L);
    private final Map<String, File> fileTransferMap = Collections.synchronizedMap(new LinkedHashMap<String, File>(){

        @Override
        protected boolean removeEldestEntry(Map.Entry<String, File> eldest) {
            return this.size() > 25000;
        }
    });
    public final Event<Member> onMemberAdded = new Event();
    public final Event<Member> onMemberRemoved = new Event();
    private String localId;
    private Atomix atomix;
    private final List<Member> members = Collections.synchronizedList(new ArrayList());
    private final Map<String, AbstractClusterService> localServices = new ConcurrentHashMap<String, AbstractClusterService>();
    private final Map<String, List<MemberId>> clusterServices = new ConcurrentHashMap<String, List<MemberId>>();
    private ExecutorService executor = new ThreadPoolExecutor(1, 32, 180L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
    private ClusterCommunicationService communicationService;
    private Member localMember;

    public AtomixCluster(String clusterKey) throws IOException {
        this(clusterKey, null);
    }

    public AtomixCluster(String clusterKey, File tempDir) throws IOException {
        this.clusterId = ShaHash.createHash("ID-" + clusterKey);
        this.aesCipher = new AesCipher(clusterKey);
        this.fileTransferPath = tempDir != null ? tempDir : Files.createTempFile("temp", "temp", new FileAttribute[0]).getParent().toFile();
    }

    private byte[] handleFileMessage(byte[] data) {
        try {
            byte[] decryptedMessage = this.aesCipher.decrypt(data);
            FileTransfer transferMessage = new FileTransfer(decryptedMessage);
            String fileId = transferMessage.getFileId();
            File file = new File(this.fileTransferPath, fileId + ".tmp");
            BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(file, true), 32000);
            bos.write(transferMessage.getData());
            bos.close();
            FileTransferResponse fileTransferResponse = new FileTransferResponse().setReceivedData(file.length());
            if (transferMessage.getFinished()) {
                this.fileTransferMap.put(fileId, file);
                fileTransferResponse.setFinished(true);
            }
            return this.aesCipher.encrypt(fileTransferResponse.toBytes());
        }
        catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    private byte[] handleClusterRequest(byte[] data) {
        try {
            byte[] decryptedMessage = this.aesCipher.decrypt(data);
            ClusterMessage clusterMessage = new ClusterMessage(decryptedMessage);
            byte[] messageData = clusterMessage.getMessageData();
            MemberId memberId = MemberId.from((String)clusterMessage.getMemberId());
            AbstractClusterService clusterService = this.localServices.get(clusterMessage.getClusterService());
            if (clusterService != null) {
                byte[] bytes = clusterService.handleMessage(clusterMessage.getClusterMethod(), messageData, this, file -> (String)ExceptionUtil.softenExceptions(() -> this.sendFile(file, memberId, null)));
                return this.aesCipher.encrypt(bytes);
            }
            LOGGER.error("Could not find requested service {}", (Object)clusterMessage.getClusterService());
            return null;
        }
        catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    @Override
    public File getFile(String fileId) {
        return this.fileTransferMap.get(fileId);
    }

    private CompletableFuture<byte[]> sendMessage(String subject, Message message, MemberId memberId) throws Exception {
        byte[] bytes = message.toBytes();
        byte[] encryptedMessage = this.aesCipher.encrypt(bytes);
        return this.communicationService.send(subject, (Object)encryptedMessage, memberId);
    }

    private String sendFile(File file, MemberId memberId, AtomicBoolean disposed) throws Exception {
        FileTransfer fileTransfer;
        int read;
        String fileId = UUID.randomUUID().toString().replace("-", ".");
        BufferedInputStream bis = new BufferedInputStream(new FileInputStream(file));
        byte[] buf = new byte[10000];
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        while ((read = bis.read(buf)) >= 0) {
            bos.write(buf, 0, read);
            if (bos.size() < 1000000) continue;
            fileTransfer = new FileTransfer().setFileId(fileId).setData(bos.toByteArray());
            bos.reset();
            if (disposed != null && disposed.get()) {
                return null;
            }
            this.sendMessage(FILE_TRANSFER_CHANNEL, fileTransfer, memberId);
        }
        fileTransfer = new FileTransfer().setFileId(fileId).setData(bos.toByteArray()).setFinished(true);
        if (disposed != null && disposed.get()) {
            return null;
        }
        CompletableFuture<byte[]> resultMessage = this.sendMessage(FILE_TRANSFER_CHANNEL, fileTransfer, memberId);
        byte[] bytes = resultMessage.get(60L, TimeUnit.SECONDS);
        FileTransferResponse response = new FileTransferResponse(this.aesCipher.decrypt(bytes));
        if (response.getReceivedData() == file.length()) {
            return fileId;
        }
        throw new Exception("Error sending file transfer:" + response);
    }

    @Override
    public boolean isServiceAvailable(String serviceName) {
        List serviceProvider = this.clusterServices.getOrDefault(serviceName, Collections.emptyList());
        return !serviceProvider.isEmpty();
    }

    @Override
    public <REQUEST extends Message, RESPONSE extends Message> Mono<RESPONSE> createServiceTask(String serviceName, String messageType, REQUEST request, MessageDecoder<RESPONSE> responseDecoder) {
        Mono mono = Mono.create(monoSink -> {
            List serviceProvider = this.clusterServices.getOrDefault(serviceName, Collections.emptyList());
            MemberId memberId = (MemberId)Utils.randomListEntry(serviceProvider);
            if (memberId == null) {
                LOGGER.warn("No cluster member available for service: {}, method: {}, with request: {}", new Object[]{serviceName, messageType, request});
                monoSink.error((Throwable)new Exception("Error: no cluster member available!"));
                return;
            }
            LOGGER.debug("Create cluster task for member: " + memberId);
            AtomicBoolean disposed = new AtomicBoolean();
            monoSink.onDispose(() -> disposed.set(true));
            try {
                byte[] messageBytes = request.toBytes(file -> (String)ExceptionUtil.softenExceptions(() -> this.sendFile(file, memberId, disposed)));
                if (disposed.get()) {
                    return;
                }
                ClusterMessage clusterMessage = new ClusterMessage().setMemberId(this.localId).setClusterService(serviceName).setClusterMethod(messageType).setMessageData(messageBytes);
                byte[] data = this.aesCipher.encrypt(clusterMessage.toBytes());
                CompletableFuture message = this.communicationService.send(CLUSTER_REQUEST_CHANNEL, (Object)data, memberId, Duration.ofSeconds(60L));
                monoSink.success((Object)message);
            }
            catch (Exception e) {
                monoSink.error((Throwable)e);
            }
        }).flatMap(Mono::fromFuture).map(bytes -> responseDecoder.decode(this.aesCipher.decryptSave((byte[])bytes), (FileProvider)this)).subscribeOn(Schedulers.boundedElastic()).retryWhen((Retry)Retry.backoff((long)this.retryMaxAttempts, (Duration)this.retryBackoffDuration));
        return mono;
    }

    public void connect(int localPort, String bootstrapNodes) {
        List nodes = bootstrapNodes == null || bootstrapNodes.isBlank() ? Collections.emptyList() : Arrays.stream(bootstrapNodes.split(";")).map(host -> host.split(":")).map(parts -> Node.builder().withHost(parts[0]).withPort(Integer.parseInt(parts[1])).build()).collect(Collectors.toList());
        PrimaryBackupPartitionGroup primaryBackupPartitionGroup = PrimaryBackupPartitionGroup.builder((String)"mmg").withNumPartitions(71).withMemberGroupStrategy(MemberGroupStrategy.RACK_AWARE).build();
        RaftPartitionGroup raftPartitionGroup = RaftPartitionGroup.builder((String)"mmg").withNumPartitions(1).withMembers((Member[])nodes.stream().map(node -> Member.builder().withAddress(node.address()).build()).toArray(Member[]::new)).build();
        this.atomix = Atomix.builder().withClusterId(this.clusterId).withPort(localPort).withMulticastEnabled(false).withMembershipProvider(BootstrapDiscoveryProvider.builder().withNodes(nodes).withHeartbeatInterval(Duration.ofMillis(500L)).withFailureTimeout(Duration.ofMillis(3000L)).build()).withManagementGroup((ManagedPartitionGroup)primaryBackupPartitionGroup).withPartitionGroups(new ManagedPartitionGroup[]{PrimaryBackupPartitionGroup.builder((String)"data").withMemberGroupStrategy(MemberGroupStrategy.RACK_AWARE).withNumPartitions(71).build()}).withShutdownHookEnabled().build();
        CompletableFuture startFuture = this.atomix.start();
        this.localId = (String)((Object)this.atomix.getMembershipService().getLocalMember().id().id());
        LOGGER.info("Start cluster with local-id: {} and cluster id: {}", (Object)this.localId, (Object)this.clusterId);
        this.communicationService = this.atomix.getCommunicationService();
        this.atomix.getMembershipService().addListener((EventListener)((ClusterMembershipEventListener)event -> {
            Member member = (Member)event.subject();
            if (((String)((Object)member.id().id())).equals(this.localId)) {
                this.localMember = member;
            } else {
                LOGGER.info("Member update: {}", (Object)event);
                switch ((ClusterMembershipEvent.Type)event.type()) {
                    case MEMBER_ADDED: {
                        this.handleNewMember(member);
                        break;
                    }
                    case METADATA_CHANGED: {
                        this.handleNewServices(member);
                        break;
                    }
                    case REACHABILITY_CHANGED: {
                        break;
                    }
                    case MEMBER_REMOVED: {
                        this.handleRemovedMember(member);
                    }
                }
            }
        }));
        this.communicationService.subscribe(FILE_TRANSFER_CHANNEL, this::handleFileMessage, (Executor)Executors.newSingleThreadExecutor());
        this.communicationService.subscribe(CLUSTER_REQUEST_CHANNEL, this::handleClusterRequest, (Executor)Executors.newWorkStealingPool(24));
        startFuture.join();
    }

    @Override
    public void registerService(AbstractClusterService clusterService) {
        this.localServices.put(clusterService.getServiceName(), clusterService);
        this.localMember.properties().setProperty(SERVICES_PROPERTY, String.join((CharSequence)",", this.localServices.keySet()));
    }

    private void handleNewMember(Member member) {
        this.members.add(member);
        this.onMemberAdded.fire((Object)member);
        this.handleNewServices(member);
    }

    private void handleRemovedMember(Member member) {
        this.members.remove(member);
        MemberId memberId = member.id();
        for (String key : this.clusterServices.keySet()) {
            this.clusterServices.get(key).remove(memberId);
        }
        this.onMemberRemoved.fire((Object)member);
    }

    private void handleNewServices(Member member) {
        MemberId memberId = member.id();
        String services = member.properties().getProperty(SERVICES_PROPERTY);
        if (services != null && !services.isBlank()) {
            for (String key : this.clusterServices.keySet()) {
                this.clusterServices.get(key).remove(memberId);
            }
            Arrays.stream(services.split(",")).forEach(service -> {
                LOGGER.info("Add cluster service {} for member {}", (Object)services, (Object)memberId.id());
                this.clusterServices.putIfAbsent((String)service, new ArrayList());
                this.clusterServices.get(service).add(memberId);
            });
        }
    }

    private int getRandomId(int max) {
        int id = max + 1;
        while (id > max) {
            id = (int)(Math.random() * (double)max);
        }
        return id;
    }

    public void disconnect() {
        this.atomix.stop();
        this.atomix = null;
    }

    public void setRetryMaxAttempts(int retryMaxAttempts) {
        this.retryMaxAttempts = retryMaxAttempts;
    }

    public void setRetryBackoffDuration(Duration retryBackoffDuration) {
        this.retryBackoffDuration = retryBackoffDuration;
    }
}

