/*
 * Decompiled with CFR 0.152.
 */
package org.nustaq.kontraktor.impl;

import java.io.Serializable;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.List;
import java.util.Queue;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.nustaq.kontraktor.Actor;
import org.nustaq.kontraktor.Actors;
import org.nustaq.kontraktor.Callback;
import org.nustaq.kontraktor.Future;
import org.nustaq.kontraktor.Promise;
import org.nustaq.kontraktor.Scheduler;
import org.nustaq.kontraktor.impl.ActorBlockedException;
import org.nustaq.kontraktor.impl.BackOffStrategy;
import org.nustaq.kontraktor.impl.CallEntry;
import org.nustaq.kontraktor.impl.CallbackWrapper;
import org.nustaq.kontraktor.impl.DispatcherThread;
import org.nustaq.kontraktor.impl.StoppedActorTargetedException;
import org.nustaq.kontraktor.monitoring.Monitorable;
import org.nustaq.kontraktor.util.Log;

public class ElasticScheduler
implements Scheduler,
Monitorable {
    public static final int MAX_STACK_ON_SYNC_CBDISPATCH = 200000;
    public static int DEFQSIZE = 32768;
    public static boolean DEBUG_SCHEDULING = true;
    public static boolean REALLY_DEBUG_SCHEDULING = false;
    public static int RECURSE_ON_BLOCK_THRESHOLD = 2;
    int maxThread = Runtime.getRuntime().availableProcessors();
    protected BackOffStrategy backOffStrategy = new BackOffStrategy();
    final DispatcherThread[] threads;
    int defQSize = DEFQSIZE;
    protected ExecutorService exec = Executors.newCachedThreadPool();
    public static Timer delayedCalls = new Timer();
    private AtomicInteger isolateCount = new AtomicInteger(0);
    final Object balanceLock = new Object();

    public ElasticScheduler(int maxThreads) {
        this(maxThreads, DEFQSIZE);
    }

    public ElasticScheduler(int maxThreads, int defQSize) {
        this.maxThread = maxThreads;
        this.defQSize = defQSize;
        if (defQSize <= 1) {
            this.defQSize = DEFQSIZE;
        }
        this.threads = new DispatcherThread[maxThreads];
    }

    public int getActiveThreads() {
        int res = 0;
        for (int i = 0; i < this.threads.length; ++i) {
            if (this.threads[i] == null) continue;
            ++res;
        }
        return res;
    }

    @Override
    public int getMaxThreads() {
        return this.maxThread;
    }

    @Override
    public int getDefaultQSize() {
        return this.defQSize;
    }

    public Future put2QueuePolling(CallEntry e) {
        Promise fut;
        if (e.hasFutureResult() && !(e.getFutureCB() instanceof CallbackWrapper)) {
            fut = new Promise();
            e.setFutureCB(new CallbackWrapper(e.getSendingActor(), new Callback(){

                public void receive(Object result, Object error) {
                    fut.receive(result, error);
                }
            }));
        } else {
            fut = null;
        }
        Actor targetActor = e.getTargetActor();
        this.put2QueuePolling(e.isCallback() ? targetActor.__cbQueue : targetActor.__mailbox, false, e, targetActor);
        return fut;
    }

    @Override
    public void yield(int count) {
        this.backOffStrategy.yield(count);
    }

    @Override
    public void put2QueuePolling(Queue q, boolean isCBQ, Object o, Object receiver) {
        int count = 0;
        boolean warningPrinted = false;
        while (!q.offer(o)) {
            this.yield(count++);
            if (count > RECURSE_ON_BLOCK_THRESHOLD && isCBQ && Thread.currentThread() instanceof DispatcherThread) {
                DispatcherThread dp = (DispatcherThread)Thread.currentThread();
                if (dp.stackDepth < 200000 && dp.getActorsNoCopy().length > 1) {
                    Actor recAct = (Actor)receiver;
                    if (dp.schedules(recAct = recAct.getActorRef())) {
                        ++dp.stackDepth;
                        if (dp.pollQs(new Actor[]{recAct})) {
                            count = 0;
                        }
                        --dp.stackDepth;
                    }
                }
            }
            if (!this.backOffStrategy.isYielding(count)) continue;
            Actor sendingActor = Actor.sender.get();
            if (receiver instanceof Actor && ((Actor)receiver).__stopped) {
                String dl = o instanceof CallEntry ? ((CallEntry)o).getMethod().getName() : "" + o;
                sendingActor.__addDeadLetter((Actor)receiver, dl);
                throw new StoppedActorTargetedException(dl);
            }
            if (sendingActor != null && sendingActor.__throwExAtBlock) {
                throw ActorBlockedException.Instance;
            }
            if (!this.backOffStrategy.isSleeping(count)) continue;
            if (!warningPrinted) {
                warningPrinted = true;
                String receiverString = receiver instanceof Actor ? (q == ((Actor)receiver).__cbQueue ? receiver.getClass().getSimpleName() + " callbackQ" : (q == ((Actor)receiver).__mailbox ? receiver.getClass().getSimpleName() + " mailbox" : receiver.getClass().getSimpleName() + " unknown queue")) : "" + receiver;
                String sender = "";
                if (sendingActor != null) {
                    sender = ", sender:" + sendingActor.getActor().getClass().getSimpleName();
                }
                if (DEBUG_SCHEDULING) {
                    Log.Lg.warn(this, "Warning: Thread " + Thread.currentThread().getName() + " blocked trying to put message on " + receiverString + sender + " msg:" + o);
                }
            }
            if (sendingActor == null || !(Thread.currentThread() instanceof DispatcherThread)) continue;
            DispatcherThread dp = (DispatcherThread)Thread.currentThread();
            dp.schedulePendingAdds();
            if (dp.getActors().length > 1) {
                if (DEBUG_SCHEDULING) {
                    Log.Lg.warn(this, "  try unblock Thread " + Thread.currentThread().getName() + " actors:" + dp.getActors().length);
                }
                dp.getScheduler().tryIsolate(dp, sendingActor.getActorRef());
                if (!DEBUG_SCHEDULING) continue;
                Log.Lg.warn(this, "  unblock done Thread " + Thread.currentThread().getName() + " actors:" + dp.getActors().length);
                continue;
            }
            if (dp.getActors().length <= 1) continue;
        }
        if (warningPrinted && DEBUG_SCHEDULING) {
            Log.Lg.warn(this, "Thread " + Thread.currentThread().getName() + " continued");
        }
    }

    @Override
    public Object enqueueCall(Actor sendingActor, Actor receiver, String methodName, Object[] args, boolean isCB) {
        Object actor = receiver.getActor();
        Method method = ((Actor)actor).__getCachedMethod(methodName, (Actor)actor);
        boolean count = false;
        for (int i = 0; i < args.length; ++i) {
            Object arg = args[i];
            if (!(arg instanceof Callback)) continue;
            args[i] = new CallbackWrapper(sendingActor, (Callback)arg);
        }
        CallEntry e = new CallEntry(actor, method, args, Actor.sender.get(), (Actor)actor, isCB);
        return this.put2QueuePolling(e);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void threadStopped(DispatcherThread th) {
        DispatcherThread[] dispatcherThreadArray = this.threads;
        synchronized (this.threads) {
            for (int i = 0; i < this.threads.length; ++i) {
                if (this.threads[i] != th) continue;
                this.threads[i] = null;
                // ** MonitorExit[var2_2] (shouldn't be in output)
                return;
            }
            // ** MonitorExit[var2_2] (shouldn't be in output)
            if (th.isIsolated()) {
                if (DEBUG_SCHEDULING) {
                    Log.Info(this, "  was decoupled one.");
                }
                this.isolateCount.decrementAndGet();
            }
            return;
        }
    }

    @Override
    public InvocationHandler getInvoker(Actor dispatcher, Object toWrap) {
        return new CallbackInvokeHandler(toWrap, dispatcher);
    }

    @Override
    public <T> T inThread(Actor actor, T callback) {
        Class<?>[] interfaces = callback.getClass().getInterfaces();
        InvocationHandler invoker = actor.__scheduler.getInvoker(actor, callback);
        if (invoker == null) {
            return callback;
        }
        return (T)Proxy.newProxyInstance(callback.getClass().getClassLoader(), interfaces, invoker);
    }

    @Override
    public void delayedCall(long millis, final Runnable toRun) {
        delayedCalls.schedule(new TimerTask(){

            @Override
            public void run() {
                toRun.run();
            }
        }, millis);
    }

    @Override
    public <T> void runBlockingCall(Actor emitter, Callable<T> toCall, Callback<T> resultHandler) {
        CallbackWrapper resultWrapper = new CallbackWrapper(emitter, resultHandler);
        this.exec.execute(() -> {
            try {
                resultWrapper.receive(toCall.call(), null);
            }
            catch (Throwable th) {
                resultWrapper.receive(null, th);
            }
        });
    }

    @Override
    public Future<Future[]> yield(Future ... futures) {
        return Actors.yield(futures);
    }

    @Override
    public Future<List<Future>> yield(List<Future> futures) {
        return Actors.yield(futures);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public DispatcherThread assignDispatcher(int minLoadPerc) {
        Object object = this.balanceLock;
        synchronized (object) {
            DispatcherThread minThread = this.findMinLoadThread(minLoadPerc, null);
            if (minThread != null) {
                return minThread;
            }
            DispatcherThread newThreadIfPossible = this.createNewThreadIfPossible();
            if (newThreadIfPossible != null) {
                newThreadIfPossible.start();
                return newThreadIfPossible;
            }
            return this.findMinLoadThread(Integer.MIN_VALUE, null);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private DispatcherThread findMinLoadThread(int minLoad, DispatcherThread dispatcherThread) {
        Object object = this.balanceLock;
        synchronized (object) {
            DispatcherThread minThread = null;
            for (int i = 0; i < this.threads.length; ++i) {
                int load;
                DispatcherThread thread = this.threads[i];
                if (thread == null || thread == dispatcherThread || (load = thread.getLoad()) >= minLoad) continue;
                minLoad = load;
                minThread = thread;
            }
            return minThread;
        }
    }

    private DispatcherThread createNewThreadIfPossible() {
        for (int i = 0; i < this.threads.length; ++i) {
            DispatcherThread th;
            DispatcherThread thread = this.threads[i];
            if (thread != null) continue;
            this.threads[i] = th = this.createDispatcherThread();
            return th;
        }
        return null;
    }

    protected DispatcherThread createDispatcherThread() {
        return new DispatcherThread(this);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void rebalance(DispatcherThread dispatcherThread) {
        Object object = this.balanceLock;
        synchronized (object) {
            DispatcherThread minLoadThread = this.assignDispatcher(dispatcherThread.getLoad());
            if (minLoadThread == null || minLoadThread == dispatcherThread) {
                return;
            }
            int qSizes = dispatcherThread.getAccumulatedQSizes();
            Actor[] qList = dispatcherThread.getActors();
            long otherQSizes = minLoadThread.getAccumulatedQSizes();
            if (4L * otherQSizes / 3L > (long)qSizes) {
                if (REALLY_DEBUG_SCHEDULING) {
                    Log.Info(this, "no payoff, skip rebalance load:" + qSizes + " other:" + otherQSizes);
                }
                return;
            }
            for (int i = 0; i < qList.length; ++i) {
                Actor actor = qList[i];
                if (otherQSizes + (long)actor.getQSizes() >= (long)(qSizes - actor.getQSizes())) continue;
                otherQSizes += (long)actor.getQSizes();
                qSizes -= actor.getQSizes();
                if (REALLY_DEBUG_SCHEDULING) {
                    Log.Info(this, "move " + actor.getQSizes() + " myload " + qSizes + " otherload " + otherQSizes + " from " + dispatcherThread.getName() + " to " + minLoadThread.getName());
                }
                dispatcherThread.removeActorImmediate(actor);
                minLoadThread.addActor(actor);
            }
            if (!minLoadThread.isAlive()) {
                minLoadThread.start();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void tryIsolate(DispatcherThread dispatcherThread, Actor refToExclude) {
        if (dispatcherThread != Thread.currentThread()) {
            throw new RuntimeException("bad error");
        }
        Object object = this.balanceLock;
        synchronized (object) {
            int i;
            if (refToExclude == null) {
                throw new IllegalArgumentException("excluderef should not be null");
            }
            Actor[] qList = dispatcherThread.getActors();
            DispatcherThread minLoadThread = this.findMinLoadThread(Integer.MAX_VALUE, dispatcherThread);
            for (i = 0; i < this.threads.length; ++i) {
                if (this.threads[i] != dispatcherThread) continue;
                this.threads[i] = this.createDispatcherThread();
                dispatcherThread.setName(dispatcherThread.getName() + " (isolated)");
                dispatcherThread.setIsolated(true);
                this.isolateCount.incrementAndGet();
                minLoadThread = this.threads[i];
                minLoadThread.start();
                if (!DEBUG_SCHEDULING) continue;
                Log.Info(this, "created new thread to unblock " + dispatcherThread.getName());
            }
            if (minLoadThread == null) {
                minLoadThread = this.createDispatcherThread();
                minLoadThread.setName(dispatcherThread.getName() + " (isolated)");
                minLoadThread.setIsolated(true);
                this.isolateCount.incrementAndGet();
                if (DEBUG_SCHEDULING) {
                    Log.Info(this, "created new thread to unblock already isolated " + dispatcherThread.getName());
                }
            }
            for (i = 0; i < qList.length; ++i) {
                Actor actor = qList[i];
                if (actor.getActorRef() != actor) {
                    throw new RuntimeException("this should not happen ever");
                }
                if (refToExclude != null && refToExclude.getActorRef() != refToExclude) {
                    throw new RuntimeException("this also");
                }
                if (actor != refToExclude) {
                    dispatcherThread.removeActorImmediate(actor);
                    minLoadThread.addActor(actor);
                }
                if (!REALLY_DEBUG_SCHEDULING) continue;
                Log.Info(this, "move for unblock " + actor.getQSizes() + " myload " + dispatcherThread.getAccumulatedQSizes() + " actors " + qList.length);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void tryStopThread(DispatcherThread dispatcherThread) {
        if (dispatcherThread != Thread.currentThread()) {
            throw new RuntimeException("bad one");
        }
        Object object = this.balanceLock;
        synchronized (object) {
            DispatcherThread minLoadThread = this.findMinLoadThread(Integer.MAX_VALUE, dispatcherThread);
            if (minLoadThread == null) {
                return;
            }
            Actor[] qList = dispatcherThread.getActors();
            for (int i = 0; i < this.threads.length; ++i) {
                if (this.threads[i] != dispatcherThread) continue;
                this.threads[i] = null;
            }
            int maxActors2Remove = Math.min(qList.length, qList.length / 5 + 1);
            for (int i = 0; i < maxActors2Remove; ++i) {
                Actor actor = qList[i];
                if (actor.getActorRef() != actor) {
                    throw new RuntimeException("this should not happen ever");
                }
                dispatcherThread.removeActorImmediate(actor);
                minLoadThread.addActor(actor);
                if (!REALLY_DEBUG_SCHEDULING) continue;
                Log.Info(this, "move for idle " + actor.getQSizes() + " myload " + dispatcherThread.getAccumulatedQSizes() + " actors " + qList.length);
            }
        }
    }

    @Override
    public BackOffStrategy getBackoffStrategy() {
        return this.backOffStrategy;
    }

    @Override
    public Future $getReport() {
        int count = 0;
        for (int i = 0; i < this.threads.length; ++i) {
            if (this.threads[i] == null) continue;
            ++count;
        }
        return new Promise<SchedulingReport>(new SchedulingReport(count, this.defQSize, this.isolateCount.get()));
    }

    @Override
    public Future<Monitorable[]> $getSubMonitorables() {
        DispatcherThread[] current = this.threads;
        int count = 0;
        for (int i = 0; i < current.length; ++i) {
            if (current[i] == null) continue;
            ++count;
        }
        Monitorable[] res = new Monitorable[count];
        count = 0;
        for (int i = 0; i < current.length; ++i) {
            if (current[i] == null) continue;
            res[count++] = current[i];
        }
        return new Promise<Monitorable[]>(res);
    }

    public static class SchedulingReport
    implements Serializable {
        int numDispatchers;
        int defQSize;
        int isolatedThreads;

        public SchedulingReport() {
        }

        public SchedulingReport(int numDispatchers, int defQSize, int isolatedThreads) {
            this.numDispatchers = numDispatchers;
            this.defQSize = defQSize;
            this.isolatedThreads = isolatedThreads;
        }

        public int getNumDispatchers() {
            return this.numDispatchers;
        }

        public int getDefQSize() {
            return this.defQSize;
        }
    }

    class CallbackInvokeHandler
    implements InvocationHandler {
        final Object target;
        final Actor targetActor;

        public CallbackInvokeHandler(Object target, Actor act) {
            this.target = target;
            this.targetActor = act;
        }

        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            if (method.getDeclaringClass() == Object.class) {
                return method.invoke(proxy, args);
            }
            if (this.target != null) {
                CallEntry<Object> ce = new CallEntry<Object>(this.target, method, args, Actor.sender.get(), this.targetActor, true);
                ElasticScheduler.this.put2QueuePolling(this.targetActor.__cbQueue, true, ce, this.targetActor);
            }
            return null;
        }
    }
}

