/*
 * Decompiled with CFR 0.152.
 */
package org.reveno.atp.clustering.core.components;

import java.security.SecureRandom;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.reveno.atp.clustering.api.Address;
import org.reveno.atp.clustering.api.Cluster;
import org.reveno.atp.clustering.api.ClusterView;
import org.reveno.atp.clustering.api.message.Message;
import org.reveno.atp.clustering.core.RevenoClusterConfiguration;
import org.reveno.atp.clustering.core.api.ClusterExecutor;
import org.reveno.atp.clustering.core.api.ElectionResult;
import org.reveno.atp.clustering.core.api.MessagesReceiver;
import org.reveno.atp.clustering.core.messages.VoteAck;
import org.reveno.atp.clustering.core.messages.VoteMessage;
import org.reveno.atp.clustering.util.Utils;
import org.reveno.atp.utils.BinaryUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MessagingMasterSlaveElector
implements ClusterExecutor<ElectionResult, Void>,
MessagesReceiver {
    protected Cluster cluster;
    protected RevenoClusterConfiguration config;
    protected Map<Address, VoteMessage> votes = new ConcurrentHashMap<Address, VoteMessage>(64);
    protected Map<Address, Long> acks = new ConcurrentHashMap<Address, Long>(64);
    protected long seed = this.generateSeed();
    protected static final Logger LOG = LoggerFactory.getLogger(MessagingMasterSlaveElector.class);
    protected static final Set<Integer> SUBSCRIPTION = new HashSet<Integer>(){
        {
            this.add(211);
            this.add(228);
        }
    };

    @Override
    public ElectionResult execute(ClusterView currentView, Void context) {
        LOG.info("Vote [view: {}]", (Object)currentView.viewId());
        List<VoteMessage> answers = this.sendVoteNotifications(currentView);
        if (answers.size() == 0 || !this.allAcked(currentView)) {
            return this.revote(currentView);
        }
        boolean leader = answers.stream().allMatch(a -> this.config.priorityInCluster() > a.priority);
        if (!leader && this.isAllSamePriority(answers)) {
            leader = answers.stream().allMatch(a -> this.seed > a.seed);
        }
        LOG.trace("Vote finished [view: {}, leader: {}]", (Object)currentView.viewId(), (Object)leader);
        return new ElectionResult(leader, false);
    }

    @Override
    public void onMessage(Message message) {
        if (message.type() == 228) {
            this.votes.put(message.address(), (VoteMessage)message);
        } else if (message.type() == 211) {
            this.acks.put(message.address(), ((VoteAck)message).viewId);
        }
    }

    @Override
    public Set<Integer> interestedTypes() {
        return SUBSCRIPTION;
    }

    protected ElectionResult revote(ClusterView currentView) {
        LOG.trace("Revote [view: {}; nodes: {}]", (Object)currentView.viewId(), currentView.members());
        this.seed = this.generateSeed();
        if (this.cluster.view().viewId() != currentView.viewId()) {
            LOG.trace("Vote aborted [view: {}]", (Object)currentView.viewId());
            return new ElectionResult(false, true);
        }
        return (ElectionResult)this.execute(currentView);
    }

    protected boolean allAcked(ClusterView view) {
        this.cluster.gateway().send(view.members(), new VoteAck(view.viewId()), this.cluster.gateway().oob());
        return Utils.waitFor(() -> this.acks.keySet().containsAll(view.members()) && this.acks.entrySet().stream().filter(kv -> view.members().contains(kv.getKey())).filter(kv -> view.viewId() == ((Long)kv.getValue()).longValue()).count() == (long)view.members().size(), this.config.revenoElectionTimeouts().ackTimeoutNanos());
    }

    protected List<VoteMessage> sendVoteNotifications(ClusterView view) {
        VoteMessage message = new VoteMessage(view.viewId(), this.config.priorityInCluster(), this.seed);
        this.cluster.gateway().send(view.members(), message, this.cluster.gateway().oob());
        return this.waitForAnswers(view);
    }

    protected List<VoteMessage> waitForAnswers(ClusterView view) {
        Predicate<VoteMessage> inView = m -> m.viewId == view.viewId() && view.members().contains(m.address());
        Predicate<VoteMessage> diffSeed = m -> m.seed != this.seed;
        if (!Utils.waitFor(() -> this.votes.values().stream().filter(inView).filter(diffSeed).count() == (long)view.members().size(), this.config.revenoElectionTimeouts().voteTimeoutNanos())) {
            return Collections.emptyList();
        }
        return this.votes.values().stream().filter(inView).filter(diffSeed).collect(Collectors.toList());
    }

    protected boolean isAllSamePriority(List<VoteMessage> answers) {
        Map<Integer, Long> collect = answers.stream().collect(Collectors.groupingBy(o -> o.priority, Collectors.counting()));
        return collect.size() == 1 && collect.keySet().iterator().next().intValue() == this.config.priorityInCluster();
    }

    public MessagingMasterSlaveElector(Cluster cluster, RevenoClusterConfiguration config) {
        this.cluster = cluster;
        this.config = config;
    }

    private long generateSeed() {
        return BinaryUtils.bytesToLong((byte[])SecureRandom.getSeed(8));
    }
}

