/*
 * Decompiled with CFR 0.152.
 */
package org.jgroups.protocols.raft;

import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.net.InetAddress;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.ObjIntConsumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.jgroups.Address;
import org.jgroups.BytesMessage;
import org.jgroups.EmptyMessage;
import org.jgroups.Event;
import org.jgroups.Header;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.View;
import org.jgroups.annotations.MBean;
import org.jgroups.annotations.ManagedAttribute;
import org.jgroups.annotations.ManagedOperation;
import org.jgroups.annotations.Property;
import org.jgroups.conf.AttributeType;
import org.jgroups.conf.ClassConfigurator;
import org.jgroups.protocols.raft.AppendEntriesRequest;
import org.jgroups.protocols.raft.AppendEntriesResponse;
import org.jgroups.protocols.raft.AppendResult;
import org.jgroups.protocols.raft.Candidate;
import org.jgroups.protocols.raft.DynamicMembership;
import org.jgroups.protocols.raft.Follower;
import org.jgroups.protocols.raft.InstallSnapshotRequest;
import org.jgroups.protocols.raft.InternalCommand;
import org.jgroups.protocols.raft.Leader;
import org.jgroups.protocols.raft.Log;
import org.jgroups.protocols.raft.LogEntry;
import org.jgroups.protocols.raft.RaftHeader;
import org.jgroups.protocols.raft.RaftImpl;
import org.jgroups.protocols.raft.Role;
import org.jgroups.protocols.raft.Settable;
import org.jgroups.protocols.raft.StateMachine;
import org.jgroups.raft.util.CommitTable;
import org.jgroups.raft.util.RequestTable;
import org.jgroups.stack.Protocol;
import org.jgroups.util.DefaultThreadFactory;
import org.jgroups.util.ExtendedUUID;
import org.jgroups.util.MessageBatch;
import org.jgroups.util.Runner;
import org.jgroups.util.Streamable;
import org.jgroups.util.ThreadFactory;
import org.jgroups.util.Util;

@MBean(description="Implementation of the RAFT consensus protocol")
public class RAFT
extends Protocol
implements Settable,
DynamicMembership {
    public static final byte[] raft_id_key = Util.stringToBytes((String)"raft-id");
    protected static final short RAFT_ID = 521;
    protected static final short APPEND_ENTRIES_REQ = 2000;
    protected static final short APPEND_ENTRIES_RSP = 2001;
    protected static final short APPEND_RESULT = 2002;
    protected static final short INSTALL_SNAPSHOT_REQ = 2003;
    public static final Function<ExtendedUUID, String> print_function = uuid -> {
        byte[] val = uuid.get(raft_id_key);
        return val != null ? Util.bytesToString((byte[])val) : uuid.print();
    };
    @Property(description="The identifier of this node. Needs to be unique and an element of members. Must not be null", writable=false)
    protected String raft_id;
    protected final List<String> members = new ArrayList<String>();
    @ManagedAttribute(description="Majority needed to achieve consensus; computed from members)")
    protected int majority = -1;
    @Property(description="If true, we can change 'members' at runtime")
    protected boolean dynamic_view_changes = true;
    @Property(description="The fully qualified name of the class implementing Log")
    protected String log_class = "org.jgroups.protocols.raft.LevelDBLog";
    @Property(description="Arguments to the log impl, e.g. k1=v1,k2=v2. These will be passed to init()")
    protected String log_args;
    @Property(description="The directory in which the log and snapshots are stored. Defaults to the temp dir")
    protected String log_dir = Util.checkForMac() ? File.separator + "tmp" : System.getProperty("java.io.tmpdir", File.separator + "tmp");
    @Property(description="The prefix of the log and snapshot. If null, the logical name of the channel is used as prefix")
    protected String log_prefix;
    @ManagedAttribute(description="The name of the log")
    protected String log_name;
    @ManagedAttribute(description="The name of the snapshot")
    protected String snapshot_name;
    @Property(description="Interval (ms) at which AppendEntries messages are resent to members with missing log entries", type=AttributeType.TIME)
    protected long resend_interval = 1000L;
    @Property(description="Send commit message to followers immediately after leader commits (majority has consensus). Caution : it may generate more traffic than expected")
    protected boolean send_commits_immediately;
    @Property(description="Max number of bytes a log can have until a snapshot is created", type=AttributeType.BYTES)
    protected int max_log_size = 1000000;
    @ManagedAttribute(description="The current size of the log in bytes", type=AttributeType.BYTES)
    protected int curr_log_size;
    @ManagedAttribute(description="Number of successful AppendEntriesRequests")
    protected int num_successful_append_requests;
    @ManagedAttribute(description="Number of failed AppendEntriesRequests because the entry wasn't found in the log")
    protected int num_failed_append_requests_not_found;
    @ManagedAttribute(description="Number of failed AppendEntriesRequests because the prev entry's term didn't match")
    protected int num_failed_append_requests_wrong_term;
    protected StateMachine state_machine;
    protected boolean state_machine_loaded;
    protected Log log_impl;
    protected RequestTable<String> request_table;
    protected CommitTable commit_table;
    protected final List<RoleChange> role_change_listeners = new ArrayList<RoleChange>();
    protected final AtomicBoolean members_being_changed = new AtomicBoolean(false);
    protected volatile RaftImpl impl = new Follower(this);
    protected volatile View view;
    @ManagedAttribute(description="the current leader")
    protected volatile Address leader;
    @ManagedAttribute(description="The current term")
    protected int current_term;
    @ManagedAttribute(description="Index of the highest log entry appended to the log", type=AttributeType.SCALAR)
    protected int last_appended;
    @ManagedAttribute(description="Index of the last committed log entry", type=AttributeType.SCALAR)
    protected int commit_index;
    @ManagedAttribute(description="The number of snapshots performed")
    protected int num_snapshots;
    @ManagedAttribute(description="The number of times AppendEntriesRequests were resent")
    protected int num_resends;
    protected boolean snapshotting;
    @Property(description="Max size in items the processing queue can have", type=AttributeType.SCALAR)
    protected int processing_queue_max_size = 9182;
    protected BlockingQueue<Request> processing_queue;
    protected final List<Request> remove_queue = new ArrayList<Request>();
    protected Runner runner;
    protected boolean synchronous;

    public String raftId() {
        return this.raft_id;
    }

    public RAFT raftId(String id) {
        if (id != null) {
            this.raft_id = id;
        }
        return this;
    }

    public RaftImpl impl() {
        return this.impl;
    }

    public int majority() {
        return this.majority;
    }

    public String logClass() {
        return this.log_class;
    }

    public RAFT logClass(String clazz) {
        this.log_class = clazz;
        return this;
    }

    public String logArgs() {
        return this.log_args;
    }

    public RAFT logArgs(String args) {
        this.log_args = args;
        return this;
    }

    public String logPrefix() {
        return this.log_prefix;
    }

    public RAFT logPrefix(String name) {
        this.log_prefix = name;
        return this;
    }

    public String logName() {
        return this.log_name;
    }

    public String snapshotName() {
        return this.snapshot_name;
    }

    public long resendInterval() {
        return this.resend_interval;
    }

    public RAFT resendInterval(long val) {
        this.resend_interval = val;
        return this;
    }

    public boolean sendCommitsImmediately() {
        return this.send_commits_immediately;
    }

    public RAFT sendCommitsImmediately(boolean val) {
        this.send_commits_immediately = val;
        return this;
    }

    public int maxLogSize() {
        return this.max_log_size;
    }

    public RAFT maxLogSize(int val) {
        this.max_log_size = val;
        return this;
    }

    public int currentLogSize() {
        return this.curr_log_size;
    }

    public int requestTableSize() {
        return this.request_table.size();
    }

    public int numSnapshots() {
        return this.num_snapshots;
    }

    public Address leader() {
        return this.leader;
    }

    public RAFT leader(Address new_leader) {
        this.leader = new_leader;
        return this;
    }

    public boolean isLeader() {
        return Objects.equals(this.leader, this.local_addr);
    }

    public org.jgroups.logging.Log getLog() {
        return this.log;
    }

    public RAFT stateMachine(StateMachine sm) {
        this.state_machine = sm;
        return this;
    }

    public StateMachine stateMachine() {
        return this.state_machine;
    }

    public CommitTable commitTable() {
        return this.commit_table;
    }

    public int currentTerm() {
        return this.current_term;
    }

    public int lastAppended() {
        return this.last_appended;
    }

    public int commitIndex() {
        return this.commit_index;
    }

    public Log log() {
        return this.log_impl;
    }

    public RAFT log(Log new_log) {
        this.log_impl = new_log;
        return this;
    }

    public RAFT addRoleListener(RoleChange c) {
        this.role_change_listeners.add(c);
        return this;
    }

    public RAFT remRoleListener(RoleChange c) {
        this.role_change_listeners.remove(c);
        return this;
    }

    public RAFT stateMachineLoaded(boolean b) {
        this.state_machine_loaded = b;
        return this;
    }

    public boolean synchronous() {
        return this.synchronous;
    }

    public RAFT synchronous(boolean b) {
        this.synchronous = b;
        return this;
    }

    public void resetStats() {
        super.resetStats();
        this.num_failed_append_requests_wrong_term = 0;
        this.num_failed_append_requests_not_found = 0;
        this.num_successful_append_requests = 0;
        this.num_resends = 0;
        this.num_snapshots = 0;
    }

    @Property(description="List of members (logical names); majority is computed from it")
    public void setMembers(String list) {
        this.members(Util.parseCommaDelimitedStrings((String)list));
    }

    public RAFT members(Collection<String> list) {
        this.members.clear();
        this.members.addAll(new HashSet<String>(list));
        this.computeMajority();
        return this;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Property
    public List<String> members() {
        List<String> list = this.members;
        synchronized (list) {
            return new ArrayList<String>(this.members);
        }
    }

    public synchronized int currentTerm(int new_term) {
        if (new_term < this.current_term) {
            return -1;
        }
        if (new_term > this.current_term) {
            this.log.trace("%s: changed term from %d -> %d", new Object[]{this.local_addr, this.current_term, new_term});
            this.current_term = new_term;
            this.log_impl.currentTerm(new_term);
            return 1;
        }
        return 0;
    }

    @ManagedAttribute(description="The current role")
    public String role() {
        return this.impl.getClass().getSimpleName();
    }

    @ManagedOperation(description="Dumps the commit table")
    public String dumpCommitTable() {
        return this.commit_table != null ? this.commit_table.toString() : "n/a";
    }

    @ManagedAttribute(description="Number of log entries in the log")
    public int logSize() {
        return this.log_impl.size();
    }

    @ManagedOperation(description="Number of bytes in the log")
    public int logSizeInBytes() {
        AtomicInteger count = new AtomicInteger(0);
        this.log_impl.forEach((entry, index) -> count.addAndGet(entry.length()));
        return count.intValue();
    }

    @ManagedOperation(description="Dumps the last N log entries")
    public String dumpLog(int last_n) {
        StringBuilder sb = new StringBuilder();
        int to = this.last_appended;
        int from = Math.max(1, to - last_n);
        this.log_impl.forEach((entry, index) -> sb.append("index=").append(index).append(", term=").append(entry.term()).append(" (").append(entry.command().length).append(" bytes)\n"), from, to);
        return sb.toString();
    }

    @ManagedOperation(description="Dumps all log entries")
    public String dumpLog() {
        return this.dumpLog(this.last_appended - 1);
    }

    public RAFT deleteSnapshot() {
        File file = new File(this.snapshot_name);
        file.delete();
        return this;
    }

    public RAFT deleteLog() throws Exception {
        if (this.log_impl != null) {
            this.log_impl.delete();
            this.log_impl = null;
        }
        return this;
    }

    public void logEntries(ObjIntConsumer<LogEntry> func) {
        this.log_impl.forEach(func);
    }

    public synchronized int createNewTerm() {
        return ++this.current_term;
    }

    public synchronized boolean updateTermAndLeader(int term, Address new_leader) {
        if (this.leader == null || new_leader != null && !this.leader.equals(new_leader)) {
            this.leader = new_leader;
        }
        if (term > this.current_term) {
            this.current_term = term;
            return true;
        }
        return false;
    }

    public static <T> T findProtocol(Class<T> clazz, Protocol start, boolean down) {
        Protocol prot = start;
        while (prot != null && clazz != null) {
            if (clazz.isAssignableFrom(prot.getClass())) {
                return (T)prot;
            }
            prot = down ? prot.getDownProtocol() : prot.getUpProtocol();
        }
        return null;
    }

    @Override
    @ManagedOperation(description="Adds a new server to members. Prevents duplicates")
    public CompletableFuture<byte[]> addServer(String name) throws Exception {
        return this.changeMembers(name, InternalCommand.Type.addServer);
    }

    @Override
    @ManagedOperation(description="Removes a new server from members")
    public CompletableFuture<byte[]> removeServer(String name) throws Exception {
        return this.changeMembers(name, InternalCommand.Type.removeServer);
    }

    @ManagedOperation(description="Creates a new snapshot and truncates the log")
    public synchronized void snapshot() throws Exception {
        if (this.snapshotting) {
            this.log.error("%s: cannot create snapshot; snapshot is being created by another thread");
            return;
        }
        try {
            this.snapshotting = true;
            this.doSnapshot();
            ++this.num_snapshots;
        }
        finally {
            this.snapshotting = false;
        }
    }

    @ManagedOperation(description="Reads the snapshot (if present) and loads log entries from [first .. commit_index] into the state machine")
    public synchronized void initStateMachineFromLog() throws Exception {
        if (this.state_machine == null || this.state_machine_loaded) {
            return;
        }
        int snapshot_offset = 0;
        try (FileInputStream input2 = new FileInputStream(this.snapshot_name);){
            this.state_machine.readContentFrom(new DataInputStream(input2));
            snapshot_offset = 1;
            this.log.debug("%s: initialized state machine from snapshot %s", new Object[]{this.local_addr, this.snapshot_name});
        }
        catch (FileNotFoundException input2) {
            // empty catch block
        }
        int from = Math.max(1, this.log_impl.firstAppended() + snapshot_offset);
        int to = this.commit_index;
        int count = 0;
        for (int i = from; i <= to; ++i) {
            LogEntry log_entry = this.log_impl.get(i);
            if (log_entry == null) {
                this.log.error("%s: log entry for index %d not found in log", new Object[]{this.local_addr, i});
                break;
            }
            if (log_entry.command == null) continue;
            if (log_entry.internal) {
                this.executeInternalCommand(null, log_entry.command, log_entry.offset, log_entry.length);
                continue;
            }
            this.state_machine.apply(log_entry.command, log_entry.offset, log_entry.length);
            ++count;
        }
        this.state_machine_loaded = true;
        if (count > 0) {
            this.log.debug("%s: applied %d entries from the log (%d - %d) to the state machine", new Object[]{this.local_addr, count, from, to});
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void init() throws Exception {
        JChannel ch;
        super.init();
        List<String> list = this.members;
        synchronized (list) {
            HashSet<String> tmp = new HashSet<String>(this.members);
            if (tmp.size() != this.members.size()) {
                this.log.error("members (%s) contains duplicates; removing them and setting members to %s", new Object[]{this.members, tmp});
                this.members.clear();
                this.members.addAll(tmp);
            }
            this.computeMajority();
        }
        JChannel jChannel = ch = this.stack != null ? this.stack.getChannel() : null;
        if (ch != null) {
            ch.addAddressGenerator(() -> {
                ExtendedUUID.setPrintFunction(print_function);
                return ExtendedUUID.randomUUID((String)ch.getName()).put(raft_id_key, Util.stringToBytes((String)this.raft_id));
            });
        }
        this.processing_queue = new ArrayBlockingQueue<Request>(this.processing_queue_max_size);
        this.runner = new Runner((ThreadFactory)new DefaultThreadFactory("runner", true, true), "runner", this::processQueue, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() throws Exception {
        super.start();
        if (this.raft_id == null) {
            this.raft_id = InetAddress.getLocalHost().getHostName();
        }
        List<String> list = this.members;
        synchronized (list) {
            if (!this.members.contains(this.raft_id)) {
                throw new IllegalStateException(String.format("raft-id %s is not listed in members %s", this.raft_id, this.members));
            }
        }
        if (this.log_impl == null) {
            if (this.log_class == null) {
                throw new IllegalStateException("log_class has to be defined");
            }
            Class clazz = Util.loadClass((String)this.log_class, this.getClass());
            this.log_impl = (Log)clazz.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
            Map<Object, Object> args = this.log_args != null && !this.log_args.isEmpty() ? RAFT.parseCommaDelimitedProps(this.log_args) : new HashMap();
            if (this.log_prefix == null) {
                this.log_prefix = this.raft_id;
            }
            this.snapshot_name = this.log_prefix;
            this.log_name = this.createLogName(this.log_prefix, "log");
            this.snapshot_name = this.createLogName(this.snapshot_name, "snapshot");
            this.log_impl.init(this.log_name, args);
        }
        if (!(this.local_addr instanceof ExtendedUUID)) {
            throw new IllegalStateException("local address must be an ExtendedUUID but is a " + this.local_addr.getClass().getSimpleName());
        }
        this.last_appended = this.log_impl.lastAppended();
        this.commit_index = this.log_impl.commitIndex();
        this.current_term = this.log_impl.currentTerm();
        this.log.trace("set last_appended=%d, commit_index=%d, current_term=%d", new Object[]{this.last_appended, this.commit_index, this.current_term});
        if (this.snapshot_name != null) {
            this.initStateMachineFromLog();
        }
        this.curr_log_size = this.logSizeInBytes();
        this.runner.start();
    }

    public void stop() {
        super.stop();
        this.runner.stop();
        this.impl.destroy();
        Util.close((Closeable)this.log_impl);
    }

    public Object down(Event evt) {
        if (evt.getType() == 6) {
            this.handleView((View)evt.getArg());
        }
        return this.down_prot.down(evt);
    }

    public Object up(Event evt) {
        if (evt.getType() == 6) {
            this.handleView((View)evt.getArg());
        }
        return this.up_prot.up(evt);
    }

    public Object up(Message msg) {
        RaftHeader hdr = (RaftHeader)msg.getHeader(this.id);
        if (hdr != null) {
            if (this.synchronous) {
                this.handleUpRequest(msg, hdr);
            } else {
                this.add(new UpRequest(msg, hdr));
            }
            return null;
        }
        return this.up_prot.up(msg);
    }

    public void up(MessageBatch batch) {
        Iterator it = batch.iterator();
        while (it.hasNext()) {
            Message msg = (Message)it.next();
            RaftHeader hdr = (RaftHeader)msg.getHeader(this.id);
            if (hdr == null) continue;
            it.remove();
            if (this.synchronous) {
                this.handleUpRequest(msg, hdr);
                continue;
            }
            this.add(new UpRequest(msg, hdr));
        }
        if (!batch.isEmpty()) {
            this.up_prot.up(batch);
        }
    }

    @ManagedOperation(description="Sends all pending AppendEntriesRequests")
    public void flushCommitTable() {
        if (this.commit_table != null) {
            this.commit_table.forEach(this::sendAppendEntriesMessage);
        }
    }

    public void flushCommitTable(Address member) {
        CommitTable.Entry e = this.commit_table.get(Objects.requireNonNull(member));
        if (e != null) {
            this.sendAppendEntriesMessage(member, e);
        }
    }

    @Override
    public CompletableFuture<byte[]> setAsync(byte[] buf, int offset, int length) {
        return this.setAsync(buf, offset, length, null);
    }

    public CompletableFuture<byte[]> setAsync(byte[] buf, int offset, int length, InternalCommand cmd) {
        if (this.leader == null || this.local_addr != null && !this.leader.equals(this.local_addr)) {
            throw new IllegalStateException("I'm not the leader (local_addr=" + this.local_addr + ", leader=" + this.leader + ")");
        }
        if (buf == null) {
            throw new IllegalArgumentException("buffer must not be null");
        }
        CompletableFuture<byte[]> retval = new CompletableFuture<byte[]>();
        RequestTable<String> reqtab = this.request_table;
        if (reqtab == null) {
            retval.completeExceptionally(new IllegalStateException("request table was null on " + this.impl.getClass().getSimpleName()));
            return retval;
        }
        if (this.synchronous) {
            this.handleDownRequest(retval, buf, offset, length, cmd);
        } else {
            this.add(new DownRequest(retval, buf, offset, length, cmd));
        }
        return retval;
    }

    public String toString() {
        return String.format("%s commit=%d last-appended=%d curr-term=%d", RAFT.class.getSimpleName(), this.commit_index, this.last_appended, this.current_term);
    }

    protected void add(Request r) {
        try {
            this.processing_queue.put(r);
        }
        catch (InterruptedException ex) {
            this.log.error("%s: failed adding %s to processing queue: %s", new Object[]{this.local_addr, r, ex});
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void handleDownRequest(CompletableFuture<byte[]> retval, byte[] buf, int offset, int length, InternalCommand cmd) {
        if (this.leader == null || this.local_addr != null && !this.leader.equals(this.local_addr)) {
            throw new IllegalStateException("I'm not the leader (local_addr=" + this.local_addr + ", leader=" + this.leader + ")");
        }
        RequestTable<String> reqtab = this.request_table;
        RAFT rAFT = this;
        synchronized (rAFT) {
            int prev_index = this.last_appended++;
            int curr_index = this.last_appended;
            LogEntry entry = this.log_impl.get(prev_index);
            int prev_term = entry != null ? entry.term : 0;
            this.log_impl.append(curr_index, true, new LogEntry(this.current_term, buf, offset, length, cmd != null));
            if (cmd != null) {
                this.executeInternalCommand(cmd, null, 0, 0);
            }
            reqtab.create(curr_index, this.raft_id, retval, this.majority());
            Message msg = new BytesMessage(null, buf, offset, length).putHeader(this.id, (Header)new AppendEntriesRequest(this.local_addr, this.current_term, prev_index, prev_term, this.current_term, this.commit_index, cmd != null)).setFlag(new Message.TransientFlag[]{Message.TransientFlag.DONT_LOOPBACK});
            this.down_prot.down(msg);
            this.snapshotIfNeeded(length);
            if (reqtab.isCommitted(curr_index)) {
                this.handleCommit(curr_index);
            }
        }
    }

    public void handleUpRequest(Message msg, RaftHeader hdr) {
        if (this.currentTerm(hdr.curr_term) < 0) {
            return;
        }
        if (hdr instanceof AppendEntriesRequest) {
            AppendEntriesRequest r = (AppendEntriesRequest)hdr;
            AppendResult result = this.impl.handleAppendEntriesRequest(msg.getArray(), msg.getOffset(), msg.getLength(), msg.src(), r.prev_log_index, r.prev_log_term, r.entry_term, r.leader_commit, r.internal);
            result.commitIndex(this.commit_index);
            Message rsp = new EmptyMessage(this.leader).putHeader(this.id, (Header)new AppendEntriesResponse(this.current_term, result));
            this.down_prot.down(rsp);
        } else if (hdr instanceof AppendEntriesResponse) {
            AppendEntriesResponse rsp = (AppendEntriesResponse)hdr;
            this.impl.handleAppendEntriesResponse(msg.src(), rsp.curr_term, rsp.result);
        } else if (hdr instanceof InstallSnapshotRequest) {
            InstallSnapshotRequest req = (InstallSnapshotRequest)hdr;
            this.impl.handleInstallSnapshotRequest(msg, req.curr_term, req.leader, req.last_included_index, req.last_included_term);
        } else {
            this.log.warn("%s: invalid header %s", new Object[]{this.local_addr, ((Object)((Object)hdr)).getClass().getCanonicalName()});
        }
    }

    protected void processQueue() {
        try {
            Request first_req = this.processing_queue.poll(this.resend_interval, TimeUnit.MILLISECONDS);
            if (first_req == null) {
                if (this.commit_table != null) {
                    this.commit_table.forEach(this::sendAppendEntriesMessage);
                }
                return;
            }
            while (true) {
                this.remove_queue.clear();
                if (first_req != null) {
                    this.remove_queue.add(first_req);
                    first_req = null;
                }
                this.processing_queue.drainTo(this.remove_queue);
                if (this.remove_queue.isEmpty()) {
                    return;
                }
                this.process(this.remove_queue);
            }
        }
        catch (InterruptedException interruptedException) {
            return;
        }
    }

    protected void process(List<Request> q) {
        for (Request r : q) {
            try {
                if (r instanceof UpRequest) {
                    UpRequest up = (UpRequest)r;
                    this.handleUpRequest(up.msg, up.hdr);
                    continue;
                }
                if (!(r instanceof DownRequest)) continue;
                DownRequest dr = (DownRequest)r;
                this.handleDownRequest(dr.f, dr.buf, dr.offset, dr.length, dr.cmd);
            }
            catch (Throwable ex) {
                this.log.error("%s: failed handling request %s: %s", new Object[]{this.local_addr, r, ex});
            }
        }
    }

    protected synchronized void createRequestTable() {
        this.request_table = new RequestTable();
        for (int i = this.commit_index + 1; i <= this.last_appended; ++i) {
            this.request_table.create(i, this.raft_id, null, this.majority());
        }
    }

    protected void createCommitTable() {
        List jg_mbrs = this.view != null ? this.view.getMembers() : new ArrayList();
        ArrayList<Address> mbrs = new ArrayList<Address>(jg_mbrs);
        mbrs.remove(this.local_addr);
        this.commit_table = new CommitTable(mbrs, this.last_appended + 1);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void _addServer(String name) {
        if (name == null) {
            return;
        }
        List<String> list = this.members;
        synchronized (list) {
            if (!this.members.contains(name)) {
                this.members.add(name);
                this.computeMajority();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void _removeServer(String name) {
        if (name == null) {
            return;
        }
        List<String> list = this.members;
        synchronized (list) {
            if (this.members.remove(name)) {
                this.computeMajority();
            }
        }
    }

    protected void sendAppendEntriesMessage(Address member, CommitTable.Entry e) {
        if (e.nextIndex() < this.log().firstAppended()) {
            if (e.snapshotInProgress(true)) {
                try {
                    this.sendSnapshotTo(member);
                }
                catch (Exception ex) {
                    this.log.error("%s: failed sending snapshot to %s: next_index=%d, first_applied=%d", new Object[]{this.local_addr, member, e.nextIndex(), this.log().firstAppended()});
                }
            }
            return;
        }
        if (this.last_appended >= e.nextIndex()) {
            int to = e.sendSingleMessage() ? e.nextIndex() : this.last_appended;
            for (int i = Math.max(e.nextIndex(), 1); i <= to; ++i) {
                if (this.log.isTraceEnabled()) {
                    this.log.trace("%s: resending %d to %s\n", new Object[]{this.local_addr, i, member});
                }
                this.resend(member, i);
            }
            return;
        }
        if (this.last_appended > e.matchIndex()) {
            int index = this.last_appended;
            if (index > 0) {
                this.log.trace("%s: resending %d to %s\n", new Object[]{this.local_addr, index, member});
                this.resend(member, index);
            }
            return;
        }
        if (this.commit_index > e.commitIndex()) {
            Message msg = new EmptyMessage(member).putHeader(this.id, (Header)new AppendEntriesRequest(this.local_addr, this.current_term, 0, 0, this.current_term, this.commit_index, false));
            this.down_prot.down(msg);
            return;
        }
        if (this.commit_index < this.last_appended) {
            for (int i = this.commit_index + 1; i <= this.last_appended; ++i) {
                this.resend(member, i);
            }
        }
    }

    protected CompletableFuture<byte[]> changeMembers(String name, InternalCommand.Type type) throws Exception {
        if (!this.dynamic_view_changes) {
            throw new Exception("dynamic view changes are not allowed; set dynamic_view_changes to true to enable it");
        }
        if (this.leader == null || this.local_addr != null && !this.leader.equals(this.local_addr)) {
            throw new IllegalStateException("I'm not the leader (local_addr=" + this.local_addr + ", leader=" + this.leader + ")");
        }
        if (this.members_being_changed.compareAndSet(false, true)) {
            InternalCommand cmd = new InternalCommand(type, name);
            byte[] buf = Util.streamableToByteBuffer((Streamable)cmd);
            return this.setAsync(buf, 0, buf.length, cmd);
        }
        throw new IllegalStateException(String.format("%s(%s) cannot be invoked as previous operation has not yet been committed", new Object[]{type, name}));
    }

    protected void resend(Address target, int index) {
        LogEntry entry = this.log_impl.get(index);
        if (entry == null) {
            this.log.error("%s: resending of %d failed; entry not found", new Object[]{this.local_addr, index});
            return;
        }
        LogEntry prev = this.log_impl.get(index - 1);
        int prev_term = prev != null ? prev.term : 0;
        Message msg = new BytesMessage(target).setArray(entry.command, entry.offset, entry.length).putHeader(this.id, (Header)new AppendEntriesRequest(this.local_addr, this.current_term, index - 1, prev_term, entry.term, this.commit_index, entry.internal));
        this.down_prot.down(msg);
        ++this.num_resends;
    }

    protected void doSnapshot() throws Exception {
        if (this.state_machine == null) {
            throw new IllegalStateException("state machine is null");
        }
        try (FileOutputStream output = new FileOutputStream(this.snapshot_name);){
            this.state_machine.writeContentTo(new DataOutputStream(output));
        }
        this.log_impl.truncate(this.commitIndex());
    }

    protected boolean snapshotExists() {
        File file = new File(this.snapshot_name);
        return file.exists();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected synchronized void sendSnapshotTo(Address dest) throws Exception {
        try {
            if (this.snapshotting) {
                return;
            }
            this.snapshotting = true;
            LogEntry last_committed_entry = this.log_impl.get(this.commitIndex());
            int last_index = this.commit_index;
            int last_term = last_committed_entry.term;
            this.doSnapshot();
            byte[] data = Files.readAllBytes(Paths.get(this.snapshot_name, new String[0]));
            this.log.debug("%s: sending snapshot (%s) to %s", new Object[]{this.local_addr, Util.printBytes((double)data.length), dest});
            Message msg = new BytesMessage(dest, data).putHeader(this.id, (Header)new InstallSnapshotRequest(this.currentTerm(), this.leader(), last_index, last_term));
            this.down_prot.down(msg);
        }
        finally {
            this.snapshotting = false;
            if (this.commit_table != null) {
                this.commit_table.snapshotInProgress(dest, false);
            }
        }
    }

    protected synchronized void handleCommit(int index) {
        try {
            for (int i = this.commit_index + 1; i <= Math.min(index, this.last_appended); ++i) {
                if (!this.request_table.isCommitted(i)) continue;
                this.applyCommit(i);
                this.commit_index = Math.max(this.commit_index, i);
            }
        }
        catch (Throwable t) {
            this.log.error("failed applying commit %d: %s", new Object[]{index, t});
        }
    }

    protected synchronized RAFT commitLogTo(int leader_commit) {
        int old_commit = this.commit_index;
        int to = Math.min(this.last_appended, leader_commit);
        try {
            for (int i = this.commit_index + 1; i <= to; ++i) {
                this.applyCommit(i);
                this.commit_index = Math.max(this.commit_index, i);
            }
        }
        catch (Throwable t) {
            this.log.error("%s: failed moving commit_index from (exclusive) %d to (inclusive) %d (last_appended=%d, leader's commit_index=%d, failed at commit_index %d)): %s", new Object[]{this.local_addr, old_commit, to, this.last_appended, leader_commit, this.commit_index + 1, t});
        }
        return this;
    }

    protected synchronized boolean append(int term, int index, byte[] data, int offset, int length, boolean internal) {
        if (index <= this.last_appended) {
            return false;
        }
        LogEntry entry = new LogEntry(term, data, offset, length, internal);
        this.log_impl.append(index, true, entry);
        this.last_appended = this.log_impl.lastAppended();
        this.snapshotIfNeeded(length);
        return true;
    }

    protected void deleteAllLogEntriesStartingFrom(int index) {
        this.log_impl.deleteAllEntriesStartingFrom(index);
        this.last_appended = this.log_impl.lastAppended();
        this.commit_index = this.log_impl.commitIndex();
    }

    protected void snapshotIfNeeded(int bytes_added) {
        this.curr_log_size += bytes_added;
        if (this.curr_log_size >= this.max_log_size) {
            try {
                this.log.debug("%s: current log size is %d, exceeding max_log_size of %d: creating snapshot", new Object[]{this.local_addr, this.curr_log_size, this.max_log_size});
                this.snapshot();
                this.curr_log_size = this.logSizeInBytes();
            }
            catch (Exception ex) {
                this.log.error("%s: failed snapshotting log: %s", new Object[]{this.local_addr, ex});
            }
        }
    }

    protected void applyCommit(int index) throws Exception {
        LogEntry log_entry = this.log_impl.get(index);
        if (log_entry == null) {
            throw new IllegalStateException(this.local_addr + ": log entry for index " + index + " not found in log");
        }
        if (this.state_machine == null) {
            throw new IllegalStateException(this.local_addr + ": state machine is null");
        }
        byte[] rsp = null;
        if (log_entry.internal) {
            try {
                InternalCommand cmd = (InternalCommand)Util.streamableFromByteBuffer(InternalCommand.class, (byte[])log_entry.command, (int)log_entry.offset, (int)log_entry.length);
                if (cmd.type() == InternalCommand.Type.addServer || cmd.type() == InternalCommand.Type.removeServer) {
                    this.members_being_changed.set(false);
                }
            }
            catch (Throwable t) {
                this.log.error("%s: failed unmarshalling internal command: %s", new Object[]{this.local_addr, t});
            }
        } else {
            rsp = this.state_machine.apply(log_entry.command, log_entry.offset, log_entry.length);
        }
        this.log_impl.commitIndex(index);
        if (this.request_table != null) {
            this.request_table.notifyAndRemove(index, rsp);
        }
    }

    public void handleView(View view) {
        boolean check_view = this.view != null && this.view.size() < view.size();
        this.view = view;
        if (this.commit_table != null) {
            ArrayList<Address> mbrs = new ArrayList<Address>(view.getMembers());
            mbrs.remove(this.local_addr);
            this.commit_table.adjust(mbrs, this.last_appended + 1);
        }
        if (check_view && this.duplicatesInView(view)) {
            this.log.error("view contains duplicate raft-ids: %s", new Object[]{view});
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public RAFT changeRole(Role new_role) {
        RaftImpl new_impl = new_role == Role.Follower ? new Follower(this) : (new_role == Role.Candidate ? new Candidate(this) : new Leader(this));
        RaftImpl old_impl = this.impl;
        if (old_impl == null || !old_impl.getClass().equals(new_impl.getClass())) {
            if (old_impl != null) {
                old_impl.destroy();
            }
            new_impl.init();
            RAFT rAFT = this;
            synchronized (rAFT) {
                this.impl = new_impl;
            }
            this.log.trace("%s: changed role from %s -> %s", new Object[]{this.local_addr, old_impl == null ? "null" : old_impl.getClass().getSimpleName(), new_impl.getClass().getSimpleName()});
            this.notifyRoleChangeListeners(new_role);
        }
        return this;
    }

    protected void executeInternalCommand(InternalCommand cmd, byte[] buf, int offset, int length) {
        if (cmd == null) {
            try {
                cmd = (InternalCommand)Util.streamableFromByteBuffer(InternalCommand.class, (byte[])buf, (int)offset, (int)length);
            }
            catch (Exception ex) {
                this.log.error("%s: failed unmarshalling internal command: %s", new Object[]{this.local_addr, ex});
                return;
            }
        }
        try {
            cmd.execute(this);
        }
        catch (Exception ex) {
            this.log.error("%s: failed executing internal command %s: %s", new Object[]{this.local_addr, cmd, ex});
        }
    }

    protected String createLogName(String name, String suffix) {
        if (!((String)suffix).startsWith(".")) {
            suffix = "." + (String)suffix;
        }
        boolean needs_suffix = !name.endsWith((String)suffix);
        Object retval = name;
        if (!new File(name).isAbsolute()) {
            retval = this.log_dir + File.separator + name;
        }
        return needs_suffix ? (String)retval + (String)suffix : retval;
    }

    protected void notifyRoleChangeListeners(Role role) {
        for (RoleChange ch : this.role_change_listeners) {
            try {
                ch.roleChanged(role);
            }
            catch (Throwable throwable) {}
        }
    }

    protected boolean duplicatesInView(View view) {
        HashSet<String> mbrs = new HashSet<String>();
        for (Address addr : view) {
            String m;
            if (!(addr instanceof ExtendedUUID)) {
                this.log.warn("address %s is not an ExtendedUUID but a %s", new Object[]{addr, addr.getClass().getSimpleName()});
                continue;
            }
            ExtendedUUID uuid = (ExtendedUUID)addr;
            byte[] val = uuid.get(raft_id_key);
            String string = m = val != null ? Util.bytesToString((byte[])val) : null;
            if (m == null) {
                this.log.error("address %s doesn't have a raft-id", new Object[]{addr});
                continue;
            }
            if (mbrs.add(m)) continue;
            return true;
        }
        return false;
    }

    protected static Map<String, String> parseCommaDelimitedProps(String s) {
        if (s == null) {
            return null;
        }
        HashMap<String, String> props = new HashMap<String, String>();
        Pattern p = Pattern.compile("\\s*([^=\\s]+)\\s*=\\s*([^=\\s,]+)\\s*,?");
        Matcher matcher = p.matcher(s);
        while (matcher.find()) {
            props.put(matcher.group(1), matcher.group(2));
        }
        return props;
    }

    protected void computeMajority() {
        this.majority = this.members.size() / 2 + 1;
    }

    static {
        ClassConfigurator.addProtocol((short)521, RAFT.class);
        ClassConfigurator.add((short)2000, AppendEntriesRequest.class);
        ClassConfigurator.add((short)2001, AppendEntriesResponse.class);
        ClassConfigurator.add((short)2003, InstallSnapshotRequest.class);
        ClassConfigurator.add((short)2002, AppendResult.class);
    }

    protected static class DownRequest
    extends Request {
        final CompletableFuture<byte[]> f;
        final byte[] buf;
        final int offset;
        final int length;
        final InternalCommand cmd;

        public DownRequest(CompletableFuture<byte[]> f, byte[] buf, int offset, int length, InternalCommand cmd) {
            this.f = f;
            this.buf = buf;
            this.offset = offset;
            this.length = length;
            this.cmd = cmd;
        }

        public String toString() {
            return String.format("%s %d bytes", DownRequest.class.getSimpleName(), this.length);
        }
    }

    protected static class UpRequest
    extends Request {
        private final Message msg;
        private final RaftHeader hdr;

        public UpRequest(Message msg, RaftHeader hdr) {
            this.msg = msg;
            this.hdr = hdr;
        }

        public String toString() {
            return String.format("%s %s", new Object[]{UpRequest.class.getSimpleName(), this.hdr});
        }
    }

    protected static class Request {
        protected Request() {
        }
    }

    public static interface RoleChange {
        public void roleChanged(Role var1);
    }
}

