/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.kernel.ha;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import org.neo4j.cluster.InstanceId;
import org.neo4j.com.ComException;
import org.neo4j.com.RequestContext;
import org.neo4j.com.Response;
import org.neo4j.concurrent.BinaryLatch;
import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.kernel.ha.LastUpdateTime;
import org.neo4j.kernel.ha.UpdatePuller;
import org.neo4j.kernel.ha.com.RequestContextFactory;
import org.neo4j.kernel.ha.com.master.InvalidEpochException;
import org.neo4j.kernel.ha.com.master.Master;
import org.neo4j.kernel.ha.com.slave.InvalidEpochExceptionHandler;
import org.neo4j.kernel.impl.util.CappedLogger;
import org.neo4j.kernel.impl.util.JobScheduler;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

public class SlaveUpdatePuller
extends LifecycleAdapter
implements Runnable,
UpdatePuller {
    public static final int LOG_CAP = Integer.getInteger("org.neo4j.kernel.ha.SlaveUpdatePuller.LOG_CAP", 10);
    public static final long PARK_NANOS = TimeUnit.MILLISECONDS.toNanos(Integer.getInteger("org.neo4j.kernel.ha.SlaveUpdatePuller.PARK_MILLIS", 100).intValue());
    public static final int AVAILABILITY_AWAIT_MILLIS = Integer.getInteger("org.neo4j.kernel.ha.SlaveUpdatePuller.AVAILABILITY_AWAIT_MILLIS", 5000);
    public static final String UPDATE_PULLER_THREAD_PREFIX = "UpdatePuller@";
    static final UpdatePuller.Condition NEXT_TICKET = new UpdatePuller.Condition(){

        @Override
        public boolean evaluate(int currentTicket, int targetTicket) {
            return currentTicket >= targetTicket;
        }
    };
    private volatile boolean halted;
    private final AtomicInteger targetTicket = new AtomicInteger();
    private final AtomicInteger currentTicket = new AtomicInteger();
    private final RequestContextFactory requestContextFactory;
    private final Master master;
    private final Log logger;
    private final CappedLogger invalidEpochCappedLogger;
    private final CappedLogger comExceptionCappedLogger;
    private final LastUpdateTime lastUpdateTime;
    private final InstanceId instanceId;
    private final AvailabilityGuard availabilityGuard;
    private InvalidEpochExceptionHandler invalidEpochHandler;
    private final Monitor monitor;
    private final JobScheduler jobScheduler;
    private volatile Thread updatePullingThread;
    private volatile BinaryLatch shutdownLatch;

    SlaveUpdatePuller(RequestContextFactory requestContextFactory, Master master, LastUpdateTime lastUpdateTime, LogProvider logging, InstanceId instanceId, AvailabilityGuard availabilityGuard, InvalidEpochExceptionHandler invalidEpochHandler, JobScheduler jobScheduler, Monitor monitor) {
        this.requestContextFactory = requestContextFactory;
        this.master = master;
        this.lastUpdateTime = lastUpdateTime;
        this.instanceId = instanceId;
        this.availabilityGuard = availabilityGuard;
        this.invalidEpochHandler = invalidEpochHandler;
        this.jobScheduler = jobScheduler;
        this.monitor = monitor;
        this.logger = logging.getLog(this.getClass());
        this.invalidEpochCappedLogger = new CappedLogger(this.logger).setCountLimit(LOG_CAP);
        this.comExceptionCappedLogger = new CappedLogger(this.logger).setCountLimit(LOG_CAP);
    }

    @Override
    public void run() {
        this.updatePullingThread = Thread.currentThread();
        String oldName = this.updatePullingThread.getName();
        this.updatePullingThread.setName(UPDATE_PULLER_THREAD_PREFIX + this.instanceId);
        try {
            this.periodicallyPullUpdates();
        }
        finally {
            this.updatePullingThread.setName(oldName);
            this.updatePullingThread = null;
            this.shutdownLatch.release();
        }
    }

    private void periodicallyPullUpdates() {
        while (!this.halted) {
            int round = this.targetTicket.get();
            if (this.currentTicket.get() < round) {
                this.doPullUpdates();
                this.currentTicket.set(round);
                continue;
            }
            LockSupport.parkNanos(PARK_NANOS);
        }
    }

    public synchronized void init() throws Throwable {
        if (this.shutdownLatch != null) {
            return;
        }
        this.shutdownLatch = new BinaryLatch();
        this.jobScheduler.schedule(JobScheduler.Groups.pullUpdates, (Runnable)this);
    }

    public synchronized void shutdown() throws Throwable {
        if (this.shutdownLatch == null) {
            return;
        }
        Thread thread = this.updatePullingThread;
        this.halted = true;
        LockSupport.unpark(thread);
        this.shutdownLatch.await();
        this.shutdownLatch = null;
    }

    @Override
    public void pullUpdates() throws InterruptedException {
        if (!this.isActive() || !this.availabilityGuard.isAvailable((long)AVAILABILITY_AWAIT_MILLIS)) {
            return;
        }
        this.tryPullUpdates();
    }

    @Override
    public boolean tryPullUpdates() throws InterruptedException {
        return this.await(NEXT_TICKET, false);
    }

    @Override
    public void pullUpdates(UpdatePuller.Condition condition, boolean strictlyAssertActive) throws InterruptedException {
        this.await(condition, strictlyAssertActive);
    }

    private boolean await(UpdatePuller.Condition condition, boolean strictlyAssertActive) throws InterruptedException {
        if (!this.checkActive(strictlyAssertActive)) {
            return false;
        }
        int ticket = this.poke();
        while (!condition.evaluate(this.currentTicket.get(), ticket)) {
            if (!this.checkActive(strictlyAssertActive)) {
                return false;
            }
            Thread.sleep(1L);
        }
        return true;
    }

    private boolean checkActive(boolean strictlyAssertActive) {
        if (!this.isActive()) {
            if (strictlyAssertActive) {
                throw new IllegalStateException(this + " is not active");
            }
            return false;
        }
        return true;
    }

    private int poke() {
        int result = this.targetTicket.incrementAndGet();
        LockSupport.unpark(this.updatePullingThread);
        return result;
    }

    public boolean isActive() {
        return !this.halted;
    }

    public String toString() {
        return "UpdatePuller[halted:" + this.halted + ", current:" + this.currentTicket + ", target:" + this.targetTicket + "]";
    }

    private void doPullUpdates() {
        try {
            RequestContext context = this.requestContextFactory.newRequestContext();
            try (Response<Void> ignored = this.master.pullUpdates(context);){
                this.monitor.pulledUpdates(context.lastAppliedTransaction());
            }
            this.invalidEpochCappedLogger.reset();
            this.comExceptionCappedLogger.reset();
        }
        catch (InvalidEpochException e) {
            this.invalidEpochHandler.handle();
            this.invalidEpochCappedLogger.warn("Pull updates by " + this + " failed at the epoch check", (Throwable)((Object)e));
        }
        catch (ComException e) {
            this.invalidEpochCappedLogger.warn("Pull updates by " + this + " failed due to network error.", (Throwable)e);
        }
        catch (Throwable e) {
            this.logger.error("Pull updates by " + this + " failed", e);
        }
        this.lastUpdateTime.setLastUpdateTime(System.currentTimeMillis());
    }

    public static interface Monitor {
        public void pulledUpdates(long var1);
    }
}

