/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ratis.grpc.server;

import com.codahale.metrics.Timer;
import java.io.IOException;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.grpc.GrpcUtil;
import org.apache.ratis.grpc.metrics.GrpcServerMetrics;
import org.apache.ratis.grpc.server.GrpcServerProtocolClient;
import org.apache.ratis.grpc.server.GrpcService;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.leader.FollowerInfo;
import org.apache.ratis.server.leader.LeaderState;
import org.apache.ratis.server.leader.LogAppenderBase;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.util.ServerStringUtils;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.apache.ratis.util.AutoCloseableLock;
import org.apache.ratis.util.AutoCloseableReadWriteLock;
import org.apache.ratis.util.CodeInjectionForTesting;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.PeerProxyMap;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.TimeoutExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GrpcLogAppender
extends LogAppenderBase {
    public static final Logger LOG = LoggerFactory.getLogger(GrpcLogAppender.class);
    private static final Comparator<Long> CALL_ID_COMPARATOR = (left, right) -> {
        long diff = left - right;
        return diff == 0L ? 0 : (diff > 0L ? 1 : -1);
    };
    private final AtomicLong callId = new AtomicLong();
    private final RequestMap pendingRequests = new RequestMap();
    private final int maxPendingRequestsNum;
    private volatile boolean firstResponseReceived = false;
    private final boolean installSnapshotEnabled;
    private final TimeDuration requestTimeoutDuration;
    private final TimeDuration installSnapshotStreamTimeout;
    private final int maxOutstandingInstallSnapshots;
    private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
    private volatile StreamObservers appendLogRequestObserver;
    private final boolean useSeparateHBChannel;
    private final GrpcServerMetrics grpcServerMetrics;
    private final AutoCloseableReadWriteLock lock;
    private final StackTraceElement caller;

    public GrpcLogAppender(RaftServer.Division server, LeaderState leaderState, FollowerInfo f) {
        super(server, leaderState, f);
        Preconditions.assertNotNull(this.getServerRpc(), "getServerRpc()");
        RaftProperties properties = server.getRaftServer().getProperties();
        this.maxPendingRequestsNum = GrpcConfigKeys.Server.leaderOutstandingAppendsMax(properties);
        this.requestTimeoutDuration = RaftServerConfigKeys.Rpc.requestTimeout(properties);
        this.maxOutstandingInstallSnapshots = GrpcConfigKeys.Server.installSnapshotRequestElementLimit(properties);
        this.installSnapshotStreamTimeout = GrpcConfigKeys.Server.installSnapshotRequestTimeout(properties).multiply(this.maxOutstandingInstallSnapshots);
        this.installSnapshotEnabled = RaftServerConfigKeys.Log.Appender.installSnapshotEnabled(properties);
        this.useSeparateHBChannel = GrpcConfigKeys.Server.heartbeatChannel(properties);
        this.grpcServerMetrics = new GrpcServerMetrics(server.getMemberId().toString());
        this.grpcServerMetrics.addPendingRequestsCount(this.getFollowerId().toString(), this.pendingRequests::logRequestsSize);
        this.lock = new AutoCloseableReadWriteLock(this);
        this.caller = LOG.isTraceEnabled() ? JavaUtils.getCallerStackTraceElement() : null;
    }

    @Override
    public GrpcService getServerRpc() {
        return (GrpcService)super.getServerRpc();
    }

    private GrpcServerProtocolClient getClient() throws IOException {
        return (GrpcServerProtocolClient)((PeerProxyMap)this.getServerRpc().getProxies()).getProxy(this.getFollowerId());
    }

    private void resetClient(AppendEntriesRequest request, boolean onError) {
        try (AutoCloseableLock writeLock = this.lock.writeLock(this.caller, LOG::trace);){
            this.getClient().resetConnectBackoff();
            this.appendLogRequestObserver = null;
            this.firstResponseReceived = false;
            this.pendingRequests.clear();
            FollowerInfo f = this.getFollower();
            long nextIndex = 1L + Optional.ofNullable(request).map(AppendEntriesRequest::getPreviousLog).map(TermIndex::getIndex).orElseGet(f::getMatchIndex);
            if (onError && f.getMatchIndex() == 0L && request == null) {
                LOG.warn("{}: Follower failed when matchIndex == 0,  keep nextIndex ({}) unchanged and retry.", (Object)this, (Object)f.getNextIndex());
                return;
            }
            if (request != null && request.isHeartbeat()) {
                return;
            }
            this.getFollower().decreaseNextIndex(nextIndex);
        }
        catch (IOException ie) {
            LOG.warn(this + ": Failed to getClient for " + this.getFollowerId(), ie);
        }
    }

    private boolean isFollowerCommitBehindLastCommitIndex() {
        return this.getRaftLog().getLastCommittedIndex() > this.getFollower().getCommitIndex();
    }

    @Override
    public void run() throws IOException {
        while (this.isRunning()) {
            boolean installSnapshotRequired = false;
            if (this.shouldSendAppendEntries() || this.isFollowerCommitBehindLastCommitIndex()) {
                if (this.installSnapshotEnabled) {
                    SnapshotInfo snapshot = this.shouldInstallSnapshot();
                    if (snapshot != null) {
                        this.installSnapshot(snapshot);
                        installSnapshotRequired = true;
                    }
                } else {
                    TermIndex installSnapshotNotificationTermIndex = this.shouldNotifyToInstallSnapshot();
                    if (installSnapshotNotificationTermIndex != null) {
                        this.installSnapshot(installSnapshotNotificationTermIndex);
                        installSnapshotRequired = true;
                    }
                }
                this.appendLog(installSnapshotRequired || this.haveTooManyPendingRequests());
            }
            this.getLeaderState().checkHealth(this.getFollower());
            this.mayWait();
        }
        Optional.ofNullable(this.appendLogRequestObserver).ifPresent(StreamObservers::onCompleted);
    }

    public long getWaitTimeMs() {
        if (this.haveTooManyPendingRequests()) {
            return this.getHeartbeatWaitTimeMs();
        }
        if (this.shouldSendAppendEntries() && !this.isSlowFollower()) {
            return Math.max(this.getMinWaitTimeMs(), 0L);
        }
        return this.getHeartbeatWaitTimeMs();
    }

    private boolean isSlowFollower() {
        TimeDuration elapsedTime = this.getFollower().getLastRpcResponseTime().elapsedTime();
        return elapsedTime.compareTo(this.getServer().properties().rpcSlownessTimeout()) > 0;
    }

    private void mayWait() {
        try {
            this.getEventAwaitForSignal().await(this.getWaitTimeMs(), TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException ie) {
            LOG.warn(this + ": Wait interrupted by " + ie);
            Thread.currentThread().interrupt();
        }
    }

    @Override
    public void stop() {
        this.grpcServerMetrics.unregister();
        super.stop();
    }

    @Override
    public boolean shouldSendAppendEntries() {
        return this.appendLogRequestObserver == null || super.shouldSendAppendEntries();
    }

    @Override
    public boolean hasPendingDataRequests() {
        return this.pendingRequests.logRequestsSize() > 0;
    }

    private boolean haveTooManyPendingRequests() {
        int size = this.pendingRequests.logRequestsSize();
        if (size == 0) {
            return false;
        }
        return !this.firstResponseReceived || size >= this.maxPendingRequestsNum;
    }

    @Override
    public long getCallId() {
        return this.callId.get();
    }

    @Override
    public Comparator<Long> getCallIdComparator() {
        return CALL_ID_COMPARATOR;
    }

    private void appendLog(boolean heartbeat) throws IOException {
        AppendEntriesRequest request;
        RaftProtos.AppendEntriesRequestProto pending;
        try (AutoCloseableLock writeLock = this.lock.writeLock(this.caller, LOG::trace);){
            pending = this.newAppendEntriesRequest(this.callId.getAndIncrement(), heartbeat);
            if (pending == null) {
                return;
            }
            request = new AppendEntriesRequest(pending, this.getFollowerId(), this.grpcServerMetrics);
            this.pendingRequests.put(request);
            this.increaseNextIndex(pending);
            if (this.appendLogRequestObserver == null) {
                this.appendLogRequestObserver = new StreamObservers(this.getClient(), new AppendLogResponseHandler(), this.useSeparateHBChannel);
            }
        }
        long waitMs = this.getMinWaitTimeMs();
        if (waitMs > 0L) {
            try {
                Thread.sleep(waitMs);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw IOUtils.toInterruptedIOException("Interrupted appendLog, heartbeat? " + heartbeat, e);
            }
        }
        if (this.isRunning()) {
            this.sendRequest(request, pending);
        }
    }

    private void sendRequest(AppendEntriesRequest request, RaftProtos.AppendEntriesRequestProto proto) {
        CodeInjectionForTesting.execute(GrpcService.GRPC_SEND_SERVER_REQUEST, this.getServer().getId(), null, proto);
        request.startRequestTimer();
        this.resetHeartbeatTrigger();
        boolean sent = Optional.ofNullable(this.appendLogRequestObserver).map(observer -> {
            observer.onNext(proto);
            return true;
        }).isPresent();
        if (sent) {
            this.getFollower().updateLastRpcSendTime(request.isHeartbeat());
            this.scheduler.onTimeout(this.requestTimeoutDuration, () -> this.timeoutAppendRequest(request.getCallId(), request.isHeartbeat()), LOG, () -> "Timeout check failed for append entry request: " + request);
        } else {
            request.stopRequestTimer();
        }
    }

    private void timeoutAppendRequest(long cid, boolean heartbeat) {
        AppendEntriesRequest pending = this.pendingRequests.handleTimeout(cid, heartbeat);
        if (pending != null) {
            LOG.warn("{}: {} appendEntries Timeout, request={}", this, heartbeat ? "HEARTBEAT" : "", pending);
            this.grpcServerMetrics.onRequestTimeout(this.getFollowerId().toString(), heartbeat);
            pending.stopRequestTimer();
        }
    }

    private void increaseNextIndex(RaftProtos.AppendEntriesRequestProto request) {
        int count = request.getEntriesCount();
        if (count > 0) {
            this.getFollower().increaseNextIndex(request.getEntries(count - 1).getIndex() + 1L);
        }
    }

    private void increaseNextIndex(long installedSnapshotIndex, Object reason) {
        long newNextIndex = installedSnapshotIndex + 1L;
        LOG.info("{}: updateNextIndex {} for {}", this, newNextIndex, reason);
        this.getFollower().updateNextIndex(newNextIndex);
    }

    private void updateNextIndex(long replyNextIndex) {
        try (AutoCloseableLock writeLock = this.lock.writeLock(this.caller, LOG::trace);){
            this.pendingRequests.clear();
            this.getFollower().setNextIndex(replyNextIndex);
        }
    }

    private void installSnapshot(SnapshotInfo snapshot) {
        LOG.info("{}: followerNextIndex = {} but logStartIndex = {}, send snapshot {} to follower", this, this.getFollower().getNextIndex(), this.getRaftLog().getStartIndex(), snapshot);
        InstallSnapshotResponseHandler responseHandler = new InstallSnapshotResponseHandler();
        StreamObserver<RaftProtos.InstallSnapshotRequestProto> snapshotRequestObserver = null;
        String requestId = UUID.randomUUID().toString();
        try {
            snapshotRequestObserver = this.getClient().installSnapshot(this.getFollower().getName() + "-installSnapshot-" + requestId, this.installSnapshotStreamTimeout, this.maxOutstandingInstallSnapshots, responseHandler);
            for (RaftProtos.InstallSnapshotRequestProto request : this.newInstallSnapshotRequests(requestId, snapshot)) {
                if (!this.isRunning()) break;
                snapshotRequestObserver.onNext(request);
                this.getFollower().updateLastRpcSendTime(false);
                responseHandler.addPending(request);
            }
            snapshotRequestObserver.onCompleted();
            this.grpcServerMetrics.onInstallSnapshot();
        }
        catch (Exception e) {
            LOG.warn("{}: failed to install snapshot {}: {}", this, snapshot.getFiles(), e);
            if (snapshotRequestObserver != null) {
                snapshotRequestObserver.onError(e);
            }
            return;
        }
        while (this.isRunning() && !responseHandler.isDone()) {
            try {
                this.getEventAwaitForSignal().await();
            }
            catch (InterruptedException ignored) {
                Thread.currentThread().interrupt();
            }
        }
        if (responseHandler.hasAllResponse()) {
            this.getFollower().setSnapshotIndex(snapshot.getTermIndex().getIndex());
            LOG.info("{}: installed snapshot {} successfully", (Object)this, (Object)snapshot);
        }
    }

    private void installSnapshot(TermIndex firstAvailableLogTermIndex) {
        LOG.info("{}: followerNextIndex = {} but logStartIndex = {}, notify follower to install snapshot-{}", this, this.getFollower().getNextIndex(), this.getRaftLog().getStartIndex(), firstAvailableLogTermIndex);
        InstallSnapshotResponseHandler responseHandler = new InstallSnapshotResponseHandler(true);
        StreamObserver<RaftProtos.InstallSnapshotRequestProto> snapshotRequestObserver = null;
        RaftProtos.InstallSnapshotRequestProto request = this.newInstallSnapshotNotificationRequest(firstAvailableLogTermIndex);
        if (LOG.isInfoEnabled()) {
            LOG.info("{}: send {}", (Object)this, (Object)ServerStringUtils.toInstallSnapshotRequestString(request));
        }
        try {
            snapshotRequestObserver = this.getClient().installSnapshot(this.getFollower().getName() + "-notifyInstallSnapshot", this.requestTimeoutDuration, 0, responseHandler);
            snapshotRequestObserver.onNext(request);
            this.getFollower().updateLastRpcSendTime(false);
            responseHandler.addPending(request);
            snapshotRequestObserver.onCompleted();
        }
        catch (Exception e) {
            GrpcUtil.warn(LOG, () -> this + ": Failed to notify follower to install snapshot.", e);
            if (snapshotRequestObserver != null) {
                snapshotRequestObserver.onError(e);
            }
            return;
        }
        while (this.isRunning() && !responseHandler.isDone()) {
            try {
                this.getEventAwaitForSignal().await();
            }
            catch (InterruptedException ignored) {
                Thread.currentThread().interrupt();
            }
        }
    }

    private TermIndex shouldNotifyToInstallSnapshot() {
        FollowerInfo follower = this.getFollower();
        long leaderNextIndex = this.getRaftLog().getNextIndex();
        boolean isFollowerBootstrapping = this.getLeaderState().isFollowerBootstrapping(follower);
        long leaderStartIndex = this.getRaftLog().getStartIndex();
        TermIndex firstAvailable = Optional.ofNullable(this.getRaftLog().getTermIndex(leaderStartIndex)).orElseGet(() -> TermIndex.valueOf(this.getServer().getInfo().getCurrentTerm(), leaderNextIndex));
        if (isFollowerBootstrapping && !follower.hasAttemptedToInstallSnapshot()) {
            LOG.debug("{}: follower is bootstrapping, notify to install snapshot to {}.", (Object)this, (Object)firstAvailable);
            return firstAvailable;
        }
        long followerNextIndex = follower.getNextIndex();
        if (followerNextIndex >= leaderNextIndex) {
            return null;
        }
        if (followerNextIndex < leaderStartIndex) {
            return firstAvailable;
        }
        if (leaderStartIndex == -1L) {
            return firstAvailable;
        }
        return null;
    }

    static class RequestMap {
        private final Map<Long, AppendEntriesRequest> logRequests = new ConcurrentHashMap<Long, AppendEntriesRequest>();
        private final Map<Long, AppendEntriesRequest> heartbeats = new ConcurrentHashMap<Long, AppendEntriesRequest>();

        RequestMap() {
        }

        int logRequestsSize() {
            return this.logRequests.size();
        }

        void clear() {
            this.logRequests.clear();
            this.heartbeats.clear();
        }

        void put(AppendEntriesRequest request) {
            if (request.isHeartbeat()) {
                this.heartbeats.put(request.getCallId(), request);
            } else {
                this.logRequests.put(request.getCallId(), request);
            }
        }

        AppendEntriesRequest remove(RaftProtos.AppendEntriesReplyProto reply) {
            return this.remove(reply.getServerReply().getCallId(), reply.getIsHearbeat());
        }

        AppendEntriesRequest remove(long cid, boolean isHeartbeat) {
            return isHeartbeat ? this.heartbeats.remove(cid) : this.logRequests.remove(cid);
        }

        public AppendEntriesRequest handleTimeout(long callId, boolean heartbeat) {
            return heartbeat ? this.heartbeats.remove(callId) : this.logRequests.get(callId);
        }
    }

    static class AppendEntriesRequest {
        private final Timer timer;
        private volatile Timer.Context timerContext;
        private final long callId;
        private final TermIndex previousLog;
        private final int entriesCount;
        private final TermIndex lastEntry;

        AppendEntriesRequest(RaftProtos.AppendEntriesRequestProto proto, RaftPeerId followerId, GrpcServerMetrics grpcServerMetrics) {
            this.callId = proto.getServerRequest().getCallId();
            this.previousLog = proto.hasPreviousLog() ? TermIndex.valueOf(proto.getPreviousLog()) : null;
            this.entriesCount = proto.getEntriesCount();
            this.lastEntry = this.entriesCount > 0 ? TermIndex.valueOf(proto.getEntries(this.entriesCount - 1)) : null;
            this.timer = grpcServerMetrics.getGrpcLogAppenderLatencyTimer(followerId.toString(), this.isHeartbeat());
            grpcServerMetrics.onRequestCreate(this.isHeartbeat());
        }

        long getCallId() {
            return this.callId;
        }

        TermIndex getPreviousLog() {
            return this.previousLog;
        }

        void startRequestTimer() {
            this.timerContext = this.timer.time();
        }

        void stopRequestTimer() {
            this.timerContext.stop();
        }

        boolean isHeartbeat() {
            return this.entriesCount == 0;
        }

        public String toString() {
            return JavaUtils.getClassSimpleName(this.getClass()) + ":cid=" + this.callId + ",entriesCount=" + this.entriesCount + ",lastEntry=" + this.lastEntry;
        }
    }

    private class InstallSnapshotResponseHandler
    implements StreamObserver<RaftProtos.InstallSnapshotReplyProto> {
        private final String name;
        private final Queue<Integer> pending;
        private final AtomicBoolean done;
        private final boolean isNotificationOnly;

        InstallSnapshotResponseHandler() {
            this(false);
        }

        InstallSnapshotResponseHandler(boolean notifyOnly) {
            this.name = GrpcLogAppender.this.getFollower().getName() + "-" + JavaUtils.getClassSimpleName(this.getClass());
            this.done = new AtomicBoolean(false);
            this.pending = new LinkedList<Integer>();
            this.isNotificationOnly = notifyOnly;
        }

        void addPending(RaftProtos.InstallSnapshotRequestProto request) {
            try (AutoCloseableLock writeLock = GrpcLogAppender.this.lock.writeLock(GrpcLogAppender.this.caller, LOG::trace);){
                this.pending.offer(request.getSnapshotChunk().getRequestIndex());
            }
        }

        void removePending(RaftProtos.InstallSnapshotReplyProto reply) {
            try (AutoCloseableLock writeLock = GrpcLogAppender.this.lock.writeLock(GrpcLogAppender.this.caller, LOG::trace);){
                Integer index = this.pending.poll();
                Objects.requireNonNull(index, "index == null");
                Preconditions.assertTrue(index.intValue() == reply.getRequestIndex());
            }
        }

        void onFollowerCatchup(long followerSnapshotIndex) {
            long followerNextIndex = followerSnapshotIndex + 1L;
            long leaderStartIndex = GrpcLogAppender.this.getRaftLog().getStartIndex();
            if (followerNextIndex >= leaderStartIndex) {
                LOG.info("{}: Follower can catch up leader after install the snapshot, as leader's start index is {}", (Object)this, (Object)followerNextIndex);
                this.notifyInstallSnapshotFinished(RaftProtos.InstallSnapshotResult.SUCCESS, followerSnapshotIndex);
            }
        }

        void notifyInstallSnapshotFinished(RaftProtos.InstallSnapshotResult result, long snapshotIndex) {
            GrpcLogAppender.this.getServer().getStateMachine().event().notifySnapshotInstalled(result, snapshotIndex, GrpcLogAppender.this.getFollower().getPeer());
        }

        boolean isDone() {
            return this.done.get();
        }

        void close() {
            this.done.set(true);
            GrpcLogAppender.this.notifyLogAppender();
        }

        boolean hasAllResponse() {
            try (AutoCloseableLock readLock = GrpcLogAppender.this.lock.readLock(GrpcLogAppender.this.caller, LOG::trace);){
                boolean bl = this.pending.isEmpty();
                return bl;
            }
        }

        @Override
        public void onNext(RaftProtos.InstallSnapshotReplyProto reply) {
            if (LOG.isInfoEnabled()) {
                LOG.info("{}: received {} reply {}", this, GrpcLogAppender.this.firstResponseReceived ? "a" : "the first", ServerStringUtils.toInstallSnapshotReplyString(reply));
            }
            GrpcLogAppender.this.getFollower().updateLastRpcResponseTime();
            if (!GrpcLogAppender.this.firstResponseReceived) {
                GrpcLogAppender.this.firstResponseReceived = true;
            }
            switch (reply.getResult()) {
                case SUCCESS: {
                    LOG.info("{}: Completed InstallSnapshot. Reply: {}", (Object)this, (Object)reply);
                    GrpcLogAppender.this.getFollower().setAttemptedToInstallSnapshot();
                    this.removePending(reply);
                    break;
                }
                case IN_PROGRESS: {
                    LOG.info("{}: InstallSnapshot in progress.", (Object)this);
                    this.removePending(reply);
                    break;
                }
                case ALREADY_INSTALLED: {
                    long followerSnapshotIndex = reply.getSnapshotIndex();
                    LOG.info("{}: Follower snapshot is already at index {}.", (Object)this, (Object)followerSnapshotIndex);
                    GrpcLogAppender.this.getFollower().setSnapshotIndex(followerSnapshotIndex);
                    GrpcLogAppender.this.getFollower().setAttemptedToInstallSnapshot();
                    GrpcLogAppender.this.getLeaderState().onFollowerCommitIndex(GrpcLogAppender.this.getFollower(), followerSnapshotIndex);
                    GrpcLogAppender.this.increaseNextIndex(followerSnapshotIndex, reply.getResult());
                    this.removePending(reply);
                    break;
                }
                case NOT_LEADER: {
                    GrpcLogAppender.this.onFollowerTerm(reply.getTerm());
                    break;
                }
                case CONF_MISMATCH: {
                    LOG.error("{}: Configuration Mismatch ({}): Leader {} has it set to {} but follower {} has it set to {}", this, "raft.server.log.appender.install.snapshot.enabled", GrpcLogAppender.this.getServer().getId(), GrpcLogAppender.this.installSnapshotEnabled, GrpcLogAppender.this.getFollowerId(), !GrpcLogAppender.this.installSnapshotEnabled);
                    break;
                }
                case SNAPSHOT_INSTALLED: {
                    long followerSnapshotIndex = reply.getSnapshotIndex();
                    LOG.info("{}: Follower installed snapshot at index {}", (Object)this, (Object)followerSnapshotIndex);
                    GrpcLogAppender.this.getFollower().setSnapshotIndex(followerSnapshotIndex);
                    GrpcLogAppender.this.getFollower().setAttemptedToInstallSnapshot();
                    GrpcLogAppender.this.getLeaderState().onFollowerCommitIndex(GrpcLogAppender.this.getFollower(), followerSnapshotIndex);
                    GrpcLogAppender.this.increaseNextIndex(followerSnapshotIndex, reply.getResult());
                    this.onFollowerCatchup(followerSnapshotIndex);
                    this.removePending(reply);
                    break;
                }
                case SNAPSHOT_UNAVAILABLE: {
                    LOG.info("{}: Follower could not install snapshot as it is not available.", (Object)this);
                    GrpcLogAppender.this.getFollower().setAttemptedToInstallSnapshot();
                    this.notifyInstallSnapshotFinished(RaftProtos.InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, -1L);
                    this.removePending(reply);
                    break;
                }
                case UNRECOGNIZED: {
                    LOG.error("Unrecognized the reply result {}: Leader is {}, follower is {}", reply.getResult(), GrpcLogAppender.this.getServer().getId(), GrpcLogAppender.this.getFollowerId());
                    break;
                }
            }
        }

        @Override
        public void onError(Throwable t2) {
            if (!GrpcLogAppender.this.isRunning()) {
                LOG.info("{} is stopped", (Object)GrpcLogAppender.this);
                return;
            }
            GrpcUtil.warn(LOG, () -> this + ": Failed InstallSnapshot", t2);
            GrpcLogAppender.this.grpcServerMetrics.onRequestRetry();
            GrpcLogAppender.this.resetClient(null, true);
            this.close();
        }

        @Override
        public void onCompleted() {
            if (!this.isNotificationOnly || LOG.isDebugEnabled()) {
                LOG.info("{}: follower responded installSnapshot COMPLETED", (Object)this);
            }
            this.close();
        }

        public String toString() {
            return this.name;
        }
    }

    private class AppendLogResponseHandler
    implements StreamObserver<RaftProtos.AppendEntriesReplyProto> {
        private final String name;

        private AppendLogResponseHandler() {
            this.name = GrpcLogAppender.this.getFollower().getName() + "-" + JavaUtils.getClassSimpleName(this.getClass());
        }

        @Override
        public void onNext(RaftProtos.AppendEntriesReplyProto reply) {
            AppendEntriesRequest request = GrpcLogAppender.this.pendingRequests.remove(reply);
            if (request != null) {
                request.stopRequestTimer();
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("{}: received {} reply {}, request={}", this, GrpcLogAppender.this.firstResponseReceived ? "a" : "the first", ServerStringUtils.toAppendEntriesReplyString(reply), request);
            }
            try {
                this.onNextImpl(reply);
            }
            catch (Exception t2) {
                LOG.error("Failed onNext request=" + request + ", reply=" + ServerStringUtils.toAppendEntriesReplyString(reply), t2);
            }
        }

        private void onNextImpl(RaftProtos.AppendEntriesReplyProto reply) {
            GrpcLogAppender.this.getFollower().updateLastRpcResponseTime();
            if (!GrpcLogAppender.this.firstResponseReceived) {
                GrpcLogAppender.this.firstResponseReceived = true;
            }
            switch (reply.getResult()) {
                case SUCCESS: {
                    GrpcLogAppender.this.grpcServerMetrics.onRequestSuccess(GrpcLogAppender.this.getFollowerId().toString(), reply.getIsHearbeat());
                    GrpcLogAppender.this.getLeaderState().onFollowerCommitIndex(GrpcLogAppender.this.getFollower(), reply.getFollowerCommit());
                    if (!GrpcLogAppender.this.getFollower().updateMatchIndex(reply.getMatchIndex())) break;
                    GrpcLogAppender.this.getLeaderState().onFollowerSuccessAppendEntries(GrpcLogAppender.this.getFollower());
                    break;
                }
                case NOT_LEADER: {
                    GrpcLogAppender.this.grpcServerMetrics.onRequestNotLeader(GrpcLogAppender.this.getFollowerId().toString());
                    LOG.warn("{}: received {} reply with term {}", this, reply.getResult(), reply.getTerm());
                    if (!GrpcLogAppender.this.onFollowerTerm(reply.getTerm())) break;
                    return;
                }
                case INCONSISTENCY: {
                    GrpcLogAppender.this.grpcServerMetrics.onRequestInconsistency(GrpcLogAppender.this.getFollowerId().toString());
                    LOG.warn("{}: received {} reply with nextIndex {}", this, reply.getResult(), reply.getNextIndex());
                    GrpcLogAppender.this.updateNextIndex(reply.getNextIndex());
                    break;
                }
                default: {
                    throw new IllegalStateException("Unexpected reply result: " + reply.getResult());
                }
            }
            GrpcLogAppender.this.getLeaderState().onAppendEntriesReply(GrpcLogAppender.this, reply);
            GrpcLogAppender.this.notifyLogAppender();
        }

        @Override
        public void onError(Throwable t2) {
            if (!GrpcLogAppender.this.isRunning()) {
                LOG.info("{} is already stopped", (Object)GrpcLogAppender.this);
                return;
            }
            GrpcUtil.warn(LOG, () -> this + ": Failed appendEntries", t2);
            GrpcLogAppender.this.grpcServerMetrics.onRequestRetry();
            AppendEntriesRequest request = GrpcLogAppender.this.pendingRequests.remove(GrpcUtil.getCallId(t2), GrpcUtil.isHeartbeat(t2));
            GrpcLogAppender.this.resetClient(request, true);
        }

        @Override
        public void onCompleted() {
            LOG.info("{}: follower responses appendEntries COMPLETED", (Object)this);
            GrpcLogAppender.this.resetClient(null, false);
        }

        public String toString() {
            return this.name;
        }
    }

    static class StreamObservers {
        private final StreamObserver<RaftProtos.AppendEntriesRequestProto> appendLog;
        private final StreamObserver<RaftProtos.AppendEntriesRequestProto> heartbeat;

        StreamObservers(GrpcServerProtocolClient client, AppendLogResponseHandler handler, boolean separateHeartbeat) {
            this.appendLog = client.appendEntries(handler, false);
            this.heartbeat = separateHeartbeat ? client.appendEntries(handler, true) : null;
        }

        void onNext(RaftProtos.AppendEntriesRequestProto proto) {
            if (this.heartbeat != null && proto.getEntriesCount() == 0) {
                this.heartbeat.onNext(proto);
            } else {
                this.appendLog.onNext(proto);
            }
        }

        void onCompleted() {
            this.appendLog.onCompleted();
            Optional.ofNullable(this.heartbeat).ifPresent(StreamObserver::onCompleted);
        }
    }
}

