/*
 * Decompiled with CFR 0.152.
 */
package org.teamapps.cluster.service;

import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.teamapps.cluster.crypto.AesCipher;
import org.teamapps.cluster.model.cluster.ClusterTopicInfo;
import org.teamapps.cluster.model.cluster.ClusterTopicMessage;
import org.teamapps.cluster.network.RemoteClusterNode;

public class ClusterTopic {
    private static final Logger LOGGER = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private final String topicName;
    private final AesCipher aesCipher;
    private final Set<String> registeredMemberNodeIds = new HashSet<String>();
    private final List<RemoteClusterNode> members = new ArrayList<RemoteClusterNode>();
    private final Set<String> availableMembers = new HashSet<String>();
    private Consumer<ClusterTopicMessage> messageConsumer;

    public ClusterTopic(String topicName, AesCipher aesCipher) {
        this.topicName = topicName;
        this.aesCipher = aesCipher;
    }

    public ClusterTopic(String topicName, AesCipher aesCipher, String localNodeId) {
        this.topicName = topicName;
        this.aesCipher = aesCipher;
        this.registeredMemberNodeIds.add(localNodeId);
    }

    public synchronized boolean isRegistered(String nodeId) {
        return this.registeredMemberNodeIds.contains(nodeId);
    }

    public synchronized boolean isAvailableMember(String nodeId) {
        return this.availableMembers.contains(nodeId);
    }

    public synchronized void addRegisteredMember(String nodeId) {
        this.registeredMemberNodeIds.add(nodeId);
    }

    public synchronized void sendMessageAsync(byte[] data) throws Exception {
        LOGGER.debug("Add topic message: {}, length: {}", (Object)this.topicName, (Object)data.length);
        ClusterTopicMessage message = new ClusterTopicMessage().setTopic(this.topicName).setData(data);
        byte[] messageBytes = this.aesCipher.encrypt(message.toBytes());
        for (RemoteClusterNode member : this.members) {
            member.sendMessageAsync(messageBytes);
        }
    }

    public synchronized void addMember(RemoteClusterNode member) {
        if (!this.availableMembers.contains(member.getNodeId())) {
            this.members.add(member);
            this.availableMembers.add(member.getNodeId());
        }
    }

    public synchronized void removeMember(RemoteClusterNode member) {
        this.members.remove(member);
        this.availableMembers.remove(member.getNodeId());
        this.registeredMemberNodeIds.remove(member.getNodeId());
    }

    public synchronized ClusterTopicInfo createTopicInfo() {
        ClusterTopicInfo topicInfo = new ClusterTopicInfo();
        topicInfo.setTopicName(this.topicName);
        topicInfo.setNodeIds(new ArrayList<String>(this.registeredMemberNodeIds));
        return topicInfo;
    }

    public void setMessageConsumer(Consumer<ClusterTopicMessage> messageConsumer) {
        this.messageConsumer = messageConsumer;
    }

    public void handleMessage(ClusterTopicMessage message) {
        LOGGER.debug("Receive message on topic: {}, size: {}, consumer: {}", new Object[]{this.topicName, message.getData().length, this.messageConsumer != null});
        if (this.messageConsumer != null) {
            this.messageConsumer.accept(message);
        }
    }
}

