/*
 * Decompiled with CFR 0.152.
 */
package net.kuujo.copycat.internal.state;

import java.util.Observable;
import java.util.Observer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import net.kuujo.copycat.CopycatException;
import net.kuujo.copycat.CopycatState;
import net.kuujo.copycat.cluster.Cluster;
import net.kuujo.copycat.cluster.ClusterConfig;
import net.kuujo.copycat.cluster.Member;
import net.kuujo.copycat.event.MembershipChangeEvent;
import net.kuujo.copycat.internal.StateMachineExecutor;
import net.kuujo.copycat.internal.event.DefaultEventHandlerRegistry;
import net.kuujo.copycat.internal.log.ConfigurationEntry;
import net.kuujo.copycat.internal.log.OperationEntry;
import net.kuujo.copycat.internal.replication.ClusterReplicator;
import net.kuujo.copycat.internal.replication.Replicator;
import net.kuujo.copycat.internal.state.FollowerController;
import net.kuujo.copycat.internal.state.StateContext;
import net.kuujo.copycat.internal.state.StateController;
import net.kuujo.copycat.protocol.PingRequest;
import net.kuujo.copycat.protocol.PingResponse;
import net.kuujo.copycat.protocol.SubmitRequest;
import net.kuujo.copycat.protocol.SubmitResponse;
import net.kuujo.copycat.protocol.SyncRequest;
import net.kuujo.copycat.protocol.SyncResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LeaderController
extends StateController
implements Observer {
    private static final Logger LOGGER = LoggerFactory.getLogger(LeaderController.class);
    private ScheduledFuture<Void> currentTimer;
    private Replicator replicator;

    @Override
    CopycatState state() {
        return CopycatState.LEADER;
    }

    @Override
    Logger logger() {
        return LOGGER;
    }

    @Override
    public void init(StateContext context) {
        long firstEntryToApply;
        super.init(context);
        this.replicator = new ClusterReplicator(context);
        LOGGER.debug("{} - Applying pending entries to state machine", context.clusterManager().localNode());
        int count = 0;
        for (long i = firstEntryToApply = Math.max(context.lastApplied() + 1L, context.log().firstIndex()); i <= context.log().lastIndex(); ++i) {
            try {
                this.applyEntry(i);
            }
            catch (Exception e) {
                LOGGER.error("{} - Applying log {} failed with exception", new Object[]{context.clusterManager().localNode(), i, e});
                throw e;
            }
            ++count;
        }
        LOGGER.debug("{} - Applied {} entries to state machine", context.clusterManager().localNode(), (Object)count);
        ConfigurationEntry configEntry = new ConfigurationEntry(context.currentTerm(), (ClusterConfig)context.clusterManager().cluster().config().copy());
        long configIndex = context.log().appendEntry(configEntry);
        LOGGER.debug("{} - Appended {} to log at index {}", new Object[]{context.clusterManager().localNode(), configEntry, configIndex});
        context.cluster().addObserver(this);
        LOGGER.debug("{} - Observing {}", context.clusterManager().localNode(), (Object)context.cluster());
        context.currentLeader(((Member)context.clusterManager().localNode().member()).id());
        this.replicator.pingAll();
        LOGGER.debug("{} - Setting ping timer", context.clusterManager().localNode());
        this.setPingTimer();
    }

    @Override
    public void update(Observable o, Object arg) {
        this.clusterChanged(((Cluster)o).config());
    }

    private synchronized void clusterChanged(ClusterConfig cluster) {
        LOGGER.debug("{} - Detected configuration change {}", this.context.clusterManager().localNode(), (Object)cluster);
        Object userConfig = cluster.copy();
        Object internalConfig = this.context.clusterManager().cluster().config().copy();
        LOGGER.debug("{} - Committing all entries for configuration change", this.context.clusterManager().localNode());
        this.replicator.commitAll().whenComplete((arg_0, arg_1) -> this.lambda$clusterChanged$34((ClusterConfig)internalConfig, (ClusterConfig)userConfig, arg_0, arg_1));
    }

    private void setPingTimer() {
        this.currentTimer = this.context.config().getTimerStrategy().schedule(() -> {
            try {
                LOGGER.trace("Starting periodic ping all");
                this.replicator.pingAll();
            }
            catch (Exception e) {
                LOGGER.debug("Exception thrown during ping", (Throwable)e);
            }
            finally {
                this.setPingTimer();
            }
        }, this.context.config().getHeartbeatInterval(), TimeUnit.MILLISECONDS);
    }

    @Override
    public CompletableFuture<PingResponse> ping(PingRequest request) {
        if (request.term() > this.context.currentTerm()) {
            return super.ping(request);
        }
        if (request.term() < this.context.currentTerm()) {
            return CompletableFuture.completedFuture(this.logResponse(new PingResponse(this.logRequest(request).id(), this.context.currentTerm(), false)));
        }
        LOGGER.info("{} - Received ping from {} [term={}] stepping down. [local term={}]", new Object[]{this.context.clusterManager().localNode(), request.leader(), request.term(), this.context.currentTerm()});
        this.context.transition(FollowerController.class);
        return super.ping(request);
    }

    @Override
    public CompletableFuture<SyncResponse> sync(SyncRequest request) {
        if (request.term() > this.context.currentTerm()) {
            return super.sync(request);
        }
        if (request.term() < this.context.currentTerm()) {
            return CompletableFuture.completedFuture(this.logResponse(new SyncResponse(this.logRequest(request).id(), this.context.currentTerm(), false, this.context.log().lastIndex())));
        }
        this.context.transition(FollowerController.class);
        return super.sync(request);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<SubmitResponse> submit(SubmitRequest request) {
        this.logRequest(request);
        CompletableFuture<SubmitResponse> future = new CompletableFuture<SubmitResponse>();
        StateMachineExecutor.Operation operation = this.context.stateMachineExecutor().getOperation(request.operation());
        if (operation == null) {
            future.completeExceptionally(new CopycatException("Invalid operation", new Object[0]));
        } else if (operation.isReadOnly()) {
            if (this.context.config().isRequireQueryQuorum()) {
                long lastIndex = this.context.log().lastIndex();
                LOGGER.debug("{} - Synchronizing logs to index {} for read", this.context.clusterManager().localNode(), (Object)lastIndex);
                this.replicator.ping(lastIndex).whenComplete((index, error) -> {
                    if (error == null) {
                        try {
                            future.complete(this.logResponse(new SubmitResponse(request.id(), operation.apply(request.args()))));
                        }
                        catch (Exception e) {
                            future.completeExceptionally(e);
                        }
                    } else {
                        future.completeExceptionally((Throwable)error);
                    }
                });
            } else {
                try {
                    future.complete(this.logResponse(new SubmitResponse(request.id(), operation.apply(request.args()))));
                }
                catch (Exception e) {
                    future.completeExceptionally(e);
                }
            }
        } else {
            OperationEntry entry = new OperationEntry(this.context.currentTerm(), request.operation(), request.args());
            long index2 = this.context.log().appendEntry(entry);
            LOGGER.debug("{} - Appended {} to log at index {}", new Object[]{this.context.clusterManager().localNode(), entry, index2});
            if (this.context.config().isRequireCommandQuorum()) {
                LOGGER.debug("{} - Replicating logs up to index {} for write", this.context.clusterManager().localNode(), (Object)index2);
                this.replicator.commit(index2).whenComplete((resultIndex, error) -> {
                    if (error == null) {
                        try {
                            LOGGER.debug("{} - Completed replicating logs up to index {} for write", this.context.clusterManager().localNode(), (Object)index2);
                            future.complete(this.logResponse(new SubmitResponse(request.id(), operation.apply(request.args()))));
                        }
                        catch (Exception e) {
                            future.completeExceptionally(e);
                        }
                        finally {
                            this.context.lastApplied(index2);
                            this.compactLog();
                        }
                    } else {
                        LOGGER.debug("{} - Failed replicating logs up to index {} for write", this.context.clusterManager().localNode(), (Object)index2);
                        future.completeExceptionally((Throwable)error);
                    }
                });
            } else {
                try {
                    future.complete(this.logResponse(new SubmitResponse(request.id(), operation.apply(request.args()))));
                }
                catch (Exception e) {
                    future.completeExceptionally(e);
                }
                finally {
                    this.context.lastApplied(index2);
                    this.compactLog();
                }
            }
        }
        return future;
    }

    @Override
    void destroy() {
        if (this.currentTimer != null) {
            LOGGER.debug("{} - Cancelling ping timer", this.context.clusterManager().localNode());
            this.currentTimer.cancel(true);
        }
        this.context.cluster().deleteObserver(this);
    }

    public boolean equals(Object object) {
        return object instanceof LeaderController && ((StateController)object).context.equals(this.context);
    }

    public int hashCode() {
        int hashCode = 23;
        hashCode = 37 * hashCode + this.context.hashCode();
        return hashCode;
    }

    @Override
    public String toString() {
        return String.format("LeaderController[context=%s]", this.context);
    }

    private /* synthetic */ void lambda$clusterChanged$34(ClusterConfig clusterConfig, ClusterConfig clusterConfig2, Long commitIndex, Throwable commitError) {
        ClusterConfig jointConfig = ((ClusterConfig)clusterConfig.copy()).addRemoteMembers(clusterConfig2.getRemoteMembers());
        ConfigurationEntry jointConfigEntry = new ConfigurationEntry(this.context.currentTerm(), jointConfig);
        long configIndex = this.context.log().appendEntry(jointConfigEntry);
        LOGGER.debug("{} - Appended {} to log at index {}", new Object[]{this.context.clusterManager().localNode(), jointConfigEntry, configIndex});
        this.context.clusterManager().cluster().update(jointConfig, null);
        ((DefaultEventHandlerRegistry)this.context.events().membershipChange()).handle(new MembershipChangeEvent(jointConfig.getMembers()));
        LOGGER.debug("{} - Updated internal cluster configuration {}", this.context.clusterManager().localNode(), this.context.clusterManager().cluster());
        LOGGER.debug("{} - Committing all entries for configuration change", this.context.clusterManager().localNode());
        this.replicator.commit(configIndex).whenComplete((commitIndex2, commitError2) -> {
            ConfigurationEntry newConfigEntry = new ConfigurationEntry(this.context.currentTerm(), clusterConfig2);
            long newConfigIndex = this.context.log().appendEntry(newConfigEntry);
            LOGGER.debug("{} - Appended {} to log at index {}", new Object[]{this.context.clusterManager().localNode(), newConfigEntry, newConfigIndex});
            this.context.clusterManager().cluster().update(clusterConfig2, null);
            ((DefaultEventHandlerRegistry)this.context.events().membershipChange()).handle(new MembershipChangeEvent(clusterConfig2.getMembers()));
            LOGGER.debug("{} - Updated internal cluster configuration {}", this.context.clusterManager().localNode(), this.context.clusterManager().cluster());
            LOGGER.debug("{} - Committing all entries for configuration change", this.context.clusterManager().localNode());
            this.replicator.commitAll();
        });
    }
}

