/*
 * Decompiled with CFR 0.152.
 */
package org.jgroups.protocols.raft;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import org.jgroups.Address;
import org.jgroups.EmptyMessage;
import org.jgroups.Event;
import org.jgroups.Header;
import org.jgroups.Message;
import org.jgroups.View;
import org.jgroups.annotations.MBean;
import org.jgroups.annotations.ManagedAttribute;
import org.jgroups.annotations.Property;
import org.jgroups.conf.AttributeType;
import org.jgroups.conf.ClassConfigurator;
import org.jgroups.protocols.raft.LeaderElected;
import org.jgroups.protocols.raft.Log;
import org.jgroups.protocols.raft.LogEntry;
import org.jgroups.protocols.raft.RAFT;
import org.jgroups.protocols.raft.RaftHeader;
import org.jgroups.protocols.raft.VoteRequest;
import org.jgroups.protocols.raft.VoteResponse;
import org.jgroups.raft.util.Utils;
import org.jgroups.stack.Protocol;
import org.jgroups.util.MessageBatch;
import org.jgroups.util.ResponseCollector;
import org.jgroups.util.Runner;

@MBean(description="Protocol performing leader election according to the RAFT paper")
public class ELECTION
extends Protocol {
    protected static final short ELECTION_ID = 520;
    protected static final short VOTE_REQ = 3000;
    protected static final short VOTE_RSP = 3001;
    protected static final short LEADER_ELECTED = 3005;
    @Property(description="Max time (ms) to wait for vote responses", type=AttributeType.TIME)
    protected long vote_timeout = 600L;
    @ManagedAttribute(description="Number of voting rounds initiated by the coordinator")
    protected int num_voting_rounds;
    protected RAFT raft;
    protected volatile View view;
    protected final Runner voting_thread = new Runner("voting-thread", this::runVotingProcess, null);
    protected ResponseCollector<VoteResponse> votes = new ResponseCollector();

    public long voteTimeout() {
        return this.vote_timeout;
    }

    public RAFT raft() {
        return this.raft;
    }

    public ELECTION raft(RAFT r) {
        this.raft = r;
        return this;
    }

    public void resetStats() {
        super.resetStats();
        this.num_voting_rounds = 0;
    }

    @ManagedAttribute(description="Is the voting thread (only on the coordinator) running?")
    public boolean isVotingThreadRunning() {
        return this.voting_thread.isRunning();
    }

    public void init() throws Exception {
        super.init();
        this.raft = this.findProtocol(RAFT.class);
    }

    public void stop() {
        this.stopVotingThread();
        this.raft.setLeaderAndTerm(null);
    }

    public Object down(Event evt) {
        switch (evt.getType()) {
            case 4: {
                this.raft.setLeaderAndTerm(null);
                break;
            }
            case 6: {
                this.handleView((View)evt.getArg());
            }
        }
        return this.down_prot.down(evt);
    }

    public Object up(Event evt) {
        if (evt.getType() == 6) {
            this.handleView((View)evt.getArg());
        }
        return this.up_prot.up(evt);
    }

    public Object up(Message msg) {
        RaftHeader hdr = (RaftHeader)msg.getHeader(this.id);
        if (hdr != null) {
            this.handleMessage(msg, hdr);
            return null;
        }
        return this.up_prot.up(msg);
    }

    public void up(MessageBatch batch) {
        Iterator it = batch.iterator();
        while (it.hasNext()) {
            Message msg = (Message)it.next();
            RaftHeader hdr = (RaftHeader)msg.getHeader(this.id);
            if (hdr == null) continue;
            it.remove();
            this.handleMessage(msg, hdr);
        }
        if (!batch.isEmpty()) {
            this.up_prot.up(batch);
        }
    }

    protected void handleView(View v) {
        Utils.Majority result = Utils.computeMajority(this.view, v, this.raft().majority(), this.raft.leader());
        this.log.debug("%s: existing view: %s, new view: %s, result: %s", new Object[]{this.local_addr, this.view, v, result});
        List joiners = View.newMembers((View)this.view, (View)v);
        boolean has_new_members = joiners != null && !joiners.isEmpty();
        this.view = v;
        switch (result) {
            case no_change: {
                if (!this.raft.isLeader() || !has_new_members) break;
                this.sendLeaderElectedMessage(this.raft.leader(), this.raft.currentTerm());
                break;
            }
            case reached: 
            case leader_lost: {
                if (!Objects.equals(this.view.getCoord(), this.local_addr)) break;
                this.log.trace("%s: starting voting process (reason: %s, view: %s)", new Object[]{this.local_addr, result, this.view});
                this.startVotingThread();
                break;
            }
            case lost: {
                this.stopVotingThread();
                this.raft.setLeaderAndTerm(null);
            }
        }
    }

    protected void handleMessage(Message msg, RaftHeader hdr) {
        if (hdr instanceof LeaderElected) {
            int term = hdr.currTerm();
            Address leader = ((LeaderElected)hdr).leader();
            this.stopVotingThread();
            this.log.trace("%s <- %s: %s", new Object[]{this.local_addr, msg.src(), hdr});
            this.raft.setLeaderAndTerm(leader, term);
            return;
        }
        if (hdr instanceof VoteRequest) {
            this.handleVoteRequest(msg.src());
            return;
        }
        if (hdr instanceof VoteResponse) {
            VoteResponse rsp = (VoteResponse)hdr;
            this.log.trace("%s <- %s: %s", new Object[]{this.local_addr, msg.src(), hdr});
            this.handleVoteResponse(msg.src(), rsp);
        }
    }

    protected void handleVoteRequest(Address sender) {
        if (this.log.isTraceEnabled()) {
            this.log.trace("%s <- %s: VoteRequest", new Object[]{this.local_addr, sender});
        }
        int new_term = this.raft.createNewTerm();
        Log log_impl = this.raft.log();
        if (log_impl == null) {
            return;
        }
        int my_last_index = log_impl.lastAppended();
        LogEntry entry = log_impl.get(my_last_index);
        int my_last_term = entry != null ? entry.term : 0;
        new_term = Math.max(new_term, my_last_term + 1);
        this.sendVoteResponse(sender, new_term, my_last_term, my_last_index);
    }

    protected void handleVoteResponse(Address sender, VoteResponse rsp) {
        this.votes.add(sender, (Object)rsp);
    }

    protected void runVotingProcess() {
        this.votes.reset(this.view.getMembersRaw());
        ++this.num_voting_rounds;
        long start = System.currentTimeMillis();
        this.sendVoteRequest();
        this.votes.waitForAllResponses(this.vote_timeout);
        long time = System.currentTimeMillis() - start;
        int majority = this.raft.majority();
        if (this.votes.numberOfValidResponses() >= majority) {
            Address leader = this.determineLeader();
            int new_term = this.highestTerm();
            this.log.trace("%s: collected votes from %s in %d ms (majority=%d) -> leader is %s (new_term=%d)", new Object[]{this.local_addr, this.votes.getValidResults(), time, majority, leader, new_term});
            this.sendLeaderElectedMessage(leader, new_term);
            this.raft.setLeaderAndTerm(leader, new_term);
            this.stopVotingThread();
        } else {
            this.log.trace("%s: collected votes from %s in %d ms (majority=%d); starting another voting round", new Object[]{this.local_addr, this.votes.getValidResults(), time, majority});
        }
    }

    protected Address determineLeader() {
        Address leader = null;
        Map results = this.votes.getResults();
        for (Address mbr : this.view.getMembersRaw()) {
            VoteResponse rsp = (VoteResponse)((Object)results.get(mbr));
            if (rsp == null) continue;
            if (leader == null) {
                leader = mbr;
            }
            if (!this.isHigher(rsp.last_log_term, rsp.last_log_index)) continue;
            leader = mbr;
        }
        return leader;
    }

    protected int highestTerm() {
        Optional<Integer> highest_term = this.votes.getResults().values().stream().filter(Objects::nonNull).map(RaftHeader::currTerm).max(Integer::compare);
        return highest_term.orElse(0);
    }

    protected boolean isHigher(int last_term, int last_index) {
        int my_last_term;
        int my_last_index = this.raft.log().lastAppended();
        LogEntry entry = this.raft.log().get(my_last_index);
        int n = my_last_term = entry != null ? entry.term : 0;
        if (last_term > my_last_term) {
            return true;
        }
        if (last_term < my_last_term) {
            return false;
        }
        return last_index > my_last_index;
    }

    protected void sendVoteResponse(int term) {
        int last_log_index = this.raft.log().lastAppended();
        LogEntry entry = this.raft.log().get(last_log_index);
        int last_log_term = entry != null ? entry.term() : 0;
        VoteResponse rsp = new VoteResponse(term, last_log_term, last_log_index);
        this.log.trace("%s -> all (-self): %s", new Object[]{this.local_addr, rsp});
        Message vote_req = new EmptyMessage(null).putHeader(this.id, (Header)rsp).setFlag(new Message.Flag[]{Message.Flag.OOB});
        this.down_prot.down(vote_req);
    }

    protected void sendVoteRequest() {
        VoteRequest req = new VoteRequest();
        this.log.trace("%s -> all (-self): %s", new Object[]{this.local_addr, req});
        Message vote_req = new EmptyMessage(null).putHeader(this.id, (Header)req).setFlag(new Message.Flag[]{Message.Flag.OOB});
        this.down_prot.down(vote_req);
    }

    protected void sendVoteResponse(Address dest, int term, int last_log_term, int last_log_index) {
        VoteResponse rsp = new VoteResponse(term, last_log_term, last_log_index);
        Message vote_rsp = new EmptyMessage(dest).putHeader(this.id, (Header)rsp).setFlag(new Message.Flag[]{Message.Flag.OOB});
        this.down_prot.down(vote_rsp);
    }

    protected void sendLeaderElectedMessage(Address leader, int term) {
        RaftHeader hdr = new LeaderElected(leader).currTerm(term);
        Message msg = new EmptyMessage(null).putHeader(this.id, (Header)hdr).setFlag(new Message.TransientFlag[]{Message.TransientFlag.DONT_LOOPBACK});
        this.log.trace("%s -> all (-self): %s", new Object[]{this.local_addr, hdr});
        this.down_prot.down(msg);
    }

    public synchronized ELECTION startVotingThread() {
        if (!this.isVotingThreadRunning()) {
            this.voting_thread.start();
        }
        return this;
    }

    public synchronized ELECTION stopVotingThread() {
        if (this.isVotingThreadRunning()) {
            this.log.debug("%s: stopping the voting thread", new Object[]{this.local_addr});
            this.voting_thread.stop();
            this.votes.reset();
        }
        return this;
    }

    protected <T extends Protocol> T findProtocol(Class<T> clazz) {
        for (Protocol p = this.up_prot; p != null; p = p.getUpProtocol()) {
            if (!clazz.isAssignableFrom(p.getClass())) continue;
            return (T)p;
        }
        throw new IllegalStateException(clazz.getSimpleName() + " not found above " + ((Object)((Object)this)).getClass().getSimpleName());
    }

    protected static long computeElectionTimeout(long min, long max) {
        long diff = max - min;
        return (long)((int)(Math.random() * 100000.0 % (double)diff)) + min;
    }

    static {
        ClassConfigurator.addProtocol((short)520, ELECTION.class);
        ClassConfigurator.add((short)3000, VoteRequest.class);
        ClassConfigurator.add((short)3001, VoteResponse.class);
        ClassConfigurator.add((short)3005, LeaderElected.class);
    }
}

