/*
 * Decompiled with CFR 0.152.
 */
package net.kuujo.copycat.raft;

import com.google.common.collect.Ordering;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import net.kuujo.copycat.CopycatException;
import net.kuujo.copycat.raft.ActiveState;
import net.kuujo.copycat.raft.RaftContext;
import net.kuujo.copycat.raft.RaftState;
import net.kuujo.copycat.raft.protocol.AppendRequest;
import net.kuujo.copycat.raft.protocol.AppendResponse;
import net.kuujo.copycat.raft.protocol.CommitRequest;
import net.kuujo.copycat.raft.protocol.CommitResponse;
import net.kuujo.copycat.raft.protocol.PollRequest;
import net.kuujo.copycat.raft.protocol.PollResponse;
import net.kuujo.copycat.raft.protocol.QueryRequest;
import net.kuujo.copycat.raft.protocol.QueryResponse;
import net.kuujo.copycat.raft.protocol.Response;
import net.kuujo.copycat.raft.protocol.VoteRequest;
import net.kuujo.copycat.raft.protocol.VoteResponse;
import net.kuujo.copycat.util.function.TriFunction;

class LeaderState
extends ActiveState {
    private static final int MAX_BATCH_SIZE = 524288;
    private ScheduledFuture<?> currentTimer;
    private final Replicator replicator = new Replicator();

    public LeaderState(RaftContext context) {
        super(context);
    }

    @Override
    public RaftState.Type type() {
        return RaftState.Type.LEADER;
    }

    @Override
    public synchronized CompletableFuture<Void> open() {
        return ((CompletableFuture)((CompletableFuture)((CompletableFuture)super.open().thenRun(this::applyEntries)).thenRun(() -> this.replicator.commit())).thenRun(this::takeLeadership)).thenRun(this::startHeartbeatTimer);
    }

    private void takeLeadership() {
        this.context.setLeader(this.context.getLocalMember());
    }

    private void applyEntries() {
        Long lastIndex = this.context.log().lastIndex();
        if (lastIndex != null) {
            int count = 0;
            for (long commitIndex = (this.context.getCommitIndex() != null ? Long.valueOf(this.context.getCommitIndex() + 1L) : this.context.log().firstIndex()).longValue(); commitIndex <= lastIndex; ++commitIndex) {
                this.context.setCommitIndex(commitIndex);
                this.applyEntry(commitIndex);
                ++count;
            }
            this.LOGGER.debug("{} - Applied {} entries to log", (Object)this.context.getLocalMember(), (Object)count);
        }
    }

    private void startHeartbeatTimer() {
        this.LOGGER.debug("{} - Setting heartbeat timer", (Object)this.context.getLocalMember());
        this.currentTimer = this.context.executor().scheduleAtFixedRate(this::heartbeatMembers, 0L, this.context.getHeartbeatInterval(), TimeUnit.MILLISECONDS);
    }

    private void heartbeatMembers() {
        this.context.checkThread();
        if (this.isOpen()) {
            this.replicator.commit();
        }
    }

    @Override
    public CompletableFuture<PollResponse> poll(PollRequest request) {
        return CompletableFuture.completedFuture(this.logResponse(((PollResponse.Builder)PollResponse.builder().withUri(this.context.getLocalMember())).withTerm(this.context.getTerm()).withAccepted(false).build()));
    }

    @Override
    public CompletableFuture<VoteResponse> vote(VoteRequest request) {
        if (request.term() > this.context.getTerm()) {
            this.LOGGER.debug("{} - Received greater term", (Object)this.context.getLocalMember());
            this.transition(RaftState.Type.FOLLOWER);
            return super.vote(request);
        }
        return CompletableFuture.completedFuture(this.logResponse(((VoteResponse.Builder)VoteResponse.builder().withUri(this.context.getLocalMember())).withTerm(this.context.getTerm()).withVoted(false).build()));
    }

    @Override
    public CompletableFuture<AppendResponse> append(AppendRequest request) {
        this.context.checkThread();
        if (request.term() > this.context.getTerm()) {
            return super.append(request);
        }
        if (request.term() < this.context.getTerm()) {
            return CompletableFuture.completedFuture(this.logResponse(((AppendResponse.Builder)AppendResponse.builder().withUri(this.context.getLocalMember())).withTerm(this.context.getTerm()).withSucceeded(false).withLogIndex(this.context.log().lastIndex()).build()));
        }
        this.transition(RaftState.Type.FOLLOWER);
        return super.append(request);
    }

    @Override
    public CompletableFuture<QueryResponse> query(QueryRequest request) {
        this.context.checkThread();
        this.logRequest(request);
        CompletableFuture<QueryResponse> future = new CompletableFuture<QueryResponse>();
        TriFunction<Long, Long, ByteBuffer, ByteBuffer> consumer = this.context.consumer();
        switch (request.consistency()) {
            case WEAK: {
                future.complete((QueryResponse)this.logResponse(((QueryResponse.Builder)QueryResponse.builder().withUri(this.context.getLocalMember())).withResult(consumer.apply(this.context.getTerm(), null, request.entry())).build()));
                break;
            }
            case DEFAULT: {
                if (this.replicator.replicas.isEmpty() || System.nanoTime() - this.replicator.commitTime() < this.context.getElectionTimeout() * 1000L) {
                    future.complete((QueryResponse)this.logResponse(((QueryResponse.Builder)QueryResponse.builder().withUri(this.context.getLocalMember())).withResult(consumer.apply(this.context.getTerm(), null, request.entry())).build()));
                    break;
                }
            }
            case STRONG: {
                this.LOGGER.debug("{} - Synchronizing logs to index {} for read", (Object)this.context.getLocalMember(), (Object)this.context.log().lastIndex());
                long term = this.context.getTerm();
                this.replicator.commit().whenComplete((index, error) -> {
                    this.context.checkThread();
                    if (this.isOpen()) {
                        if (error == null) {
                            try {
                                future.complete((QueryResponse)this.logResponse(((QueryResponse.Builder)QueryResponse.builder().withUri(this.context.getLocalMember())).withResult((ByteBuffer)consumer.apply(term, null, request.entry())).build()));
                            }
                            catch (Exception e) {
                                future.complete((QueryResponse)this.logResponse(((QueryResponse.Builder)((QueryResponse.Builder)((QueryResponse.Builder)QueryResponse.builder().withUri(this.context.getLocalMember())).withStatus(Response.Status.ERROR)).withError(e)).build()));
                            }
                        } else {
                            future.complete((QueryResponse)this.logResponse(((QueryResponse.Builder)((QueryResponse.Builder)((QueryResponse.Builder)QueryResponse.builder().withUri(this.context.getLocalMember())).withStatus(Response.Status.ERROR)).withError((Throwable)error)).build()));
                        }
                    }
                });
            }
        }
        return future;
    }

    @Override
    public CompletableFuture<CommitResponse> commit(CommitRequest request) {
        long index;
        this.context.checkThread();
        this.logRequest(request);
        CompletableFuture<CommitResponse> future = new CompletableFuture<CommitResponse>();
        ByteBuffer entry = request.entry();
        TriFunction<Long, Long, ByteBuffer, ByteBuffer> consumer = this.context.consumer();
        ByteBuffer logEntry = ByteBuffer.allocate(entry.capacity() + 8);
        long term = this.context.getTerm();
        logEntry.putLong(term);
        logEntry.put(entry);
        entry.flip();
        try {
            index = this.context.log().appendEntry(logEntry);
            this.context.log().flush();
        }
        catch (IOException e) {
            future.completeExceptionally(new CopycatException(e));
            return future;
        }
        this.LOGGER.debug("{} - Appended entry to log at index {}", (Object)this.context.getLocalMember(), (Object)index);
        this.LOGGER.debug("{} - Replicating logs up to index {} for write", (Object)this.context.getLocalMember(), (Object)index);
        this.replicator.commit(index).whenComplete((resultIndex, error) -> {
            this.context.checkThread();
            if (this.isOpen()) {
                if (error == null) {
                    try {
                        future.complete((CommitResponse)this.logResponse(((CommitResponse.Builder)CommitResponse.builder().withUri(this.context.getLocalMember())).withResult((ByteBuffer)consumer.apply(term, index, entry)).build()));
                    }
                    catch (Exception e) {
                        future.complete((CommitResponse)this.logResponse(((CommitResponse.Builder)((CommitResponse.Builder)((CommitResponse.Builder)CommitResponse.builder().withUri(this.context.getLocalMember())).withStatus(Response.Status.ERROR)).withError(e)).build()));
                    }
                    finally {
                        this.context.setLastApplied(index);
                    }
                } else {
                    future.complete((CommitResponse)this.logResponse(((CommitResponse.Builder)((CommitResponse.Builder)((CommitResponse.Builder)CommitResponse.builder().withUri(this.context.getLocalMember())).withStatus(Response.Status.ERROR)).withError((Throwable)error)).build()));
                }
            }
        });
        return future;
    }

    private void cancelPingTimer() {
        if (this.currentTimer != null) {
            this.LOGGER.debug("{} - Cancelling ping timer", (Object)this.context.getLocalMember());
            this.currentTimer.cancel(false);
        }
    }

    @Override
    public synchronized CompletableFuture<Void> close() {
        return super.close().thenRun(this::cancelPingTimer);
    }

    private class Replicator {
        private final List<Replica> replicas;
        private final int quorum;
        private final int quorumIndex;
        private final List<Long> commitTimes;
        private long commitTime;
        private CompletableFuture<Void> commitFuture;
        private CompletableFuture<Void> nextCommitFuture;
        private long commitFailures;
        private final TreeMap<Long, CompletableFuture<Long>> commitFutures = new TreeMap();

        private Replicator() {
            this.replicas = new ArrayList<Replica>(LeaderState.this.context.getActiveMembers().size() - 1);
            this.commitTimes = new ArrayList<Long>(LeaderState.this.context.getActiveMembers().size() - 1);
            int i = 0;
            for (String member : LeaderState.this.context.getActiveMembers()) {
                if (member.equals(LeaderState.this.context.getLocalMember())) continue;
                this.replicas.add(new Replica(i++, member));
                this.commitTimes.add(System.nanoTime());
            }
            this.quorum = (int)Math.floor((double)LeaderState.this.context.getActiveMembers().size() / 2.0);
            this.quorumIndex = this.quorum - 1;
        }

        private CompletableFuture<Void> commit() {
            if (this.replicas.isEmpty()) {
                return CompletableFuture.completedFuture(null);
            }
            if (this.commitFuture == null) {
                this.commitFuture = new CompletableFuture();
                this.commitTime = System.nanoTime();
                this.replicas.forEach(rec$ -> ((Replica)rec$).commit());
                return this.commitFuture;
            }
            if (this.nextCommitFuture == null) {
                this.nextCommitFuture = new CompletableFuture();
                return this.nextCommitFuture;
            }
            return this.nextCommitFuture;
        }

        private CompletableFuture<Long> commit(long index) {
            if (this.replicas.isEmpty()) {
                return CompletableFuture.completedFuture(index);
            }
            return this.commitFutures.computeIfAbsent(index, i -> {
                this.replicas.forEach(rec$ -> ((Replica)rec$).commit());
                return new CompletableFuture();
            });
        }

        private long commitTime() {
            List sortedCommitTimes = Ordering.natural().reverse().sortedCopy(this.commitTimes);
            return (Long)sortedCommitTimes.get(this.quorumIndex);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void commitTime(int id, Throwable error) {
            if (this.commitFuture == null) {
                return;
            }
            boolean completed = false;
            Replicator replicator = this;
            synchronized (replicator) {
                if (error != null) {
                    if (this.replicas.get(id).commitStartTime == this.commitTime && (long)this.quorum > (long)this.replicas.size() - ++this.commitFailures) {
                        this.commitFuture.completeExceptionally(new CopycatException("Failed to reach quorum", new Object[0]));
                        completed = true;
                    }
                } else {
                    this.commitTimes.set(id, System.nanoTime());
                    if (this.commitFuture != null && this.commitTime < this.commitTime()) {
                        this.commitFuture.complete(null);
                        completed = true;
                    }
                    if (completed) {
                        this.commitFuture = this.nextCommitFuture;
                        this.nextCommitFuture = null;
                        this.commitFailures = 0L;
                    }
                }
                if (completed && this.commitFuture != null) {
                    this.commitTime = System.nanoTime();
                    this.replicas.forEach(rec$ -> ((Replica)rec$).commit());
                }
            }
        }

        private void commitEntries() {
            LeaderState.this.context.checkThread();
            Collections.sort(this.replicas, (o1, o2) -> Long.compare(((Replica)o2).matchIndex != null ? ((Replica)o2).matchIndex : 0L, ((Replica)o1).matchIndex != null ? ((Replica)o1).matchIndex : 0L));
            Long commitIndex = this.replicas.get(this.quorumIndex).matchIndex;
            if (commitIndex != null) {
                LeaderState.this.context.setCommitIndex(commitIndex);
                NavigableMap<Long, CompletableFuture<Long>> futures = this.commitFutures.headMap(commitIndex, true);
                for (Map.Entry entry : futures.entrySet()) {
                    ((CompletableFuture)entry.getValue()).complete(entry.getKey());
                }
                futures.clear();
            }
        }

        private class Replica {
            private final List<ByteBuffer> EMPTY_LIST = new ArrayList<ByteBuffer>(0);
            private final int id;
            private final String member;
            private Long nextIndex;
            private Long matchIndex;
            private boolean committing;
            private long commitStartTime;

            private Replica(int id, String member) {
                this.id = id;
                this.member = member;
            }

            private void commit() {
                if (!this.committing && LeaderState.this.isOpen()) {
                    this.commitStartTime = Replicator.this.commitTime;
                    if (LeaderState.this.context.log().isEmpty() || this.nextIndex == null || this.nextIndex > LeaderState.this.context.log().lastIndex()) {
                        this.emptyCommit();
                    } else {
                        this.entriesCommit();
                    }
                }
            }

            private Long getPrevIndex() {
                if (this.nextIndex == null) {
                    return LeaderState.this.context.log().isEmpty() ? null : LeaderState.this.context.log().lastIndex();
                }
                return this.nextIndex - 1L > 0L ? Long.valueOf(this.nextIndex - 1L) : null;
            }

            private ByteBuffer getPrevEntry(Long prevIndex) {
                if (prevIndex != null && LeaderState.this.context.log().containsIndex(prevIndex)) {
                    return LeaderState.this.context.log().getEntry(prevIndex);
                }
                return null;
            }

            private List<ByteBuffer> getEntries(Long prevIndex) {
                ByteBuffer entry;
                if (LeaderState.this.context.log().isEmpty()) {
                    return this.EMPTY_LIST;
                }
                long index = prevIndex != null ? prevIndex + 1L : LeaderState.this.context.log().firstIndex();
                ArrayList<ByteBuffer> entries = new ArrayList<ByteBuffer>(1024);
                for (int size = 0; size < 524288 && index <= LeaderState.this.context.log().lastIndex(); size += entry.limit(), ++index) {
                    entry = LeaderState.this.context.log().getEntry(index);
                    entries.add(entry);
                }
                return entries;
            }

            private void emptyCommit() {
                Long prevIndex = this.getPrevIndex();
                ByteBuffer prevEntry = this.getPrevEntry(prevIndex);
                this.commit(prevIndex, prevEntry, this.EMPTY_LIST);
            }

            private void entriesCommit() {
                Long prevIndex = this.getPrevIndex();
                ByteBuffer prevEntry = this.getPrevEntry(prevIndex);
                List<ByteBuffer> entries = this.getEntries(prevIndex);
                this.commit(prevIndex, prevEntry, entries);
            }

            private void commit(Long prevIndex, ByteBuffer prevEntry, List<ByteBuffer> entries) {
                AppendRequest request = ((AppendRequest.Builder)AppendRequest.builder().withUri(this.member)).withTerm(LeaderState.this.context.getTerm()).withLeader(LeaderState.this.context.getLocalMember()).withLogIndex(prevIndex).withLogTerm(prevEntry != null ? Long.valueOf(prevEntry.getLong()) : null).withEntries(entries).withFirstIndex(prevIndex == null || LeaderState.this.context.log().firstIndex() == prevIndex + 1L).withCommitIndex(LeaderState.this.context.getCommitIndex()).build();
                this.committing = true;
                LeaderState.this.LOGGER.debug("{} - Sent {} to {}", new Object[]{LeaderState.this.context.getLocalMember(), request, this.member});
                ((CompletableFuture)LeaderState.this.appendHandler.apply(request)).whenCompleteAsync((response, error) -> {
                    this.committing = false;
                    LeaderState.this.context.checkThread();
                    if (LeaderState.this.isOpen()) {
                        if (error == null) {
                            LeaderState.this.LOGGER.debug("{} - Received {} from {}", new Object[]{LeaderState.this.context.getLocalMember(), response, this.member});
                            if (response.status() == Response.Status.OK) {
                                Replicator.this.commitTime(this.id, null);
                                if (response.succeeded()) {
                                    this.updateMatchIndex((AppendResponse)response);
                                    this.updateNextIndex();
                                    if (!entries.isEmpty()) {
                                        Replicator.this.commitEntries();
                                    }
                                    if (this.hasMoreEntries()) {
                                        this.commit();
                                    }
                                } else if (response.term() > LeaderState.this.context.getTerm()) {
                                    LeaderState.this.transition(RaftState.Type.FOLLOWER);
                                } else {
                                    this.resetMatchIndex((AppendResponse)response);
                                    this.resetNextIndex();
                                    if (this.hasMoreEntries()) {
                                        this.commit();
                                    }
                                }
                            } else if (response.term() > LeaderState.this.context.getTerm()) {
                                LeaderState.this.LOGGER.debug("{} - Received higher term from {}", (Object)LeaderState.this.context.getLocalMember(), (Object)this.member);
                                LeaderState.this.transition(RaftState.Type.FOLLOWER);
                            } else {
                                LeaderState.this.LOGGER.warn("{} - {}", (Object)LeaderState.this.context.getLocalMember(), (Object)(response.error() != null ? response.error().getMessage() : ""));
                            }
                        } else {
                            LeaderState.this.LOGGER.debug("{} - {}", (Object)LeaderState.this.context.getLocalMember(), (Object)error.getMessage());
                            Replicator.this.commitTime(this.id, error);
                        }
                    }
                }, (Executor)LeaderState.this.context.executor());
            }

            private boolean hasMoreEntries() {
                return this.nextIndex != null && !LeaderState.this.context.log().isEmpty() && this.nextIndex < LeaderState.this.context.log().lastIndex();
            }

            private void updateMatchIndex(AppendResponse response) {
                if (response.logIndex() != null) {
                    this.matchIndex = this.matchIndex != null ? Long.valueOf(Math.max(this.matchIndex, response.logIndex())) : response.logIndex();
                }
            }

            private void updateNextIndex() {
                if (this.matchIndex != null) {
                    this.nextIndex = this.nextIndex != null ? Long.valueOf(Math.max(this.nextIndex, this.matchIndex + 1L)) : Long.valueOf(this.matchIndex + 1L);
                }
                if (this.nextIndex != null && this.nextIndex < LeaderState.this.context.log().firstIndex()) {
                    LeaderState.this.LOGGER.info("Log does not contain nextIndex {} due to rollover. Setting nextIndex for {} to log's firstIndex: {}", new Object[]{this.nextIndex, this.member, LeaderState.this.context.log().firstIndex()});
                    this.nextIndex = LeaderState.this.context.log().firstIndex();
                }
            }

            private void resetMatchIndex(AppendResponse response) {
                if (this.matchIndex == null) {
                    this.matchIndex = response.logIndex();
                } else if (response.logIndex() != null) {
                    this.matchIndex = Math.min(this.matchIndex - 1L, response.logIndex());
                    if (this.matchIndex == 0L) {
                        this.matchIndex = null;
                    }
                } else if (response.logIndex() == null) {
                    this.matchIndex = null;
                }
                LeaderState.this.LOGGER.debug("{} - Reset match index for {} to {}", new Object[]{LeaderState.this.context.getLocalMember(), this.member, this.matchIndex});
            }

            private void resetNextIndex() {
                this.nextIndex = this.matchIndex != null ? Long.valueOf(this.matchIndex + 1L) : LeaderState.this.context.log().firstIndex();
                LeaderState.this.LOGGER.debug("{} - Reset next index for {} to {}", new Object[]{LeaderState.this.context.getLocalMember(), this.member, this.nextIndex});
            }
        }
    }
}

