/*
 * Decompiled with CFR 0.152.
 */
package org.nustaq.kontraktor.impl;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.Queue;
import java.util.TimerTask;
import java.util.concurrent.Callable;
import org.nustaq.kontraktor.Actor;
import org.nustaq.kontraktor.Actors;
import org.nustaq.kontraktor.Callback;
import org.nustaq.kontraktor.IPromise;
import org.nustaq.kontraktor.Promise;
import org.nustaq.kontraktor.Scheduler;
import org.nustaq.kontraktor.impl.ActorBlockedException;
import org.nustaq.kontraktor.impl.BackOffStrategy;
import org.nustaq.kontraktor.impl.CallEntry;
import org.nustaq.kontraktor.impl.CallbackWrapper;
import org.nustaq.kontraktor.impl.DispatcherThread;
import org.nustaq.kontraktor.impl.SchedulingReport;
import org.nustaq.kontraktor.impl.StoppedActorTargetedException;
import org.nustaq.kontraktor.monitoring.Monitorable;
import org.nustaq.kontraktor.remoting.base.RemoteRegistry;
import org.nustaq.kontraktor.util.Log;

public class SimpleScheduler
implements Scheduler {
    public static final boolean DEBUG_SCHEDULING = true;
    public static long BLOCKED_MS_TIL_WARN = 5000L;
    public static int DEFQSIZE = 32768;
    protected BackOffStrategy backOffStrategy = new BackOffStrategy();
    protected DispatcherThread myThread;
    int qsize = DEFQSIZE;

    protected SimpleScheduler(String dummy) {
    }

    public SimpleScheduler() {
        this.myThread = new DispatcherThread(this, true);
        this.myThread.start();
    }

    public SimpleScheduler(int qsize) {
        this.qsize = qsize;
        this.myThread = new DispatcherThread(this, true);
        this.myThread.start();
    }

    public SimpleScheduler(int qsize, boolean keepAlive) {
        this.qsize = qsize;
        this.myThread = new DispatcherThread(this, !keepAlive);
        this.myThread.start();
    }

    @Override
    public int getDefaultQSize() {
        return this.qsize;
    }

    @Override
    public void pollDelay(int count) {
        this.backOffStrategy.yield(count);
    }

    @Override
    public void put2QueuePolling(Queue q, boolean isCBQ, Object o, Object receiver) {
        int count = 0;
        long sleepStart = 0L;
        boolean warningPrinted = false;
        while (!q.offer(o)) {
            this.pollDelay(count++);
            if (!this.backOffStrategy.isYielding(count)) continue;
            Actor sendingActor = Actor.sender.get();
            if (receiver instanceof Actor && ((Actor)receiver).__stopped) {
                String dl = o instanceof CallEntry ? ((CallEntry)o).getMethod().getName() : "" + o;
                if (sendingActor != null) {
                    sendingActor.__addDeadLetter((Actor)receiver, dl);
                }
                throw new StoppedActorTargetedException(dl);
            }
            if (sendingActor != null && sendingActor.__throwExAtBlock) {
                throw ActorBlockedException.Instance;
            }
            if (!this.backOffStrategy.isSleeping(count)) continue;
            if (sleepStart == 0L) {
                sleepStart = System.currentTimeMillis();
                continue;
            }
            if (warningPrinted || System.currentTimeMillis() - sleepStart <= BLOCKED_MS_TIL_WARN) continue;
            warningPrinted = true;
            String receiverString = receiver instanceof Actor ? (q == ((Actor)receiver).__cbQueue ? receiver.getClass().getSimpleName() + " callbackQ" : (q == ((Actor)receiver).__mailbox ? receiver.getClass().getSimpleName() + " mailbox" : receiver.getClass().getSimpleName() + " unknown queue")) : "" + receiver;
            String sender = "";
            if (sendingActor != null) {
                sender = ", sender:" + sendingActor.getActor().getClass().getSimpleName();
            }
            Log.Lg.warn(this, "Warning: Thread " + Thread.currentThread().getName() + " blocked more than " + BLOCKED_MS_TIL_WARN + "ms trying to put message on " + receiverString + sender + " msg:" + o);
        }
    }

    public IPromise put2QueuePolling(CallEntry e) {
        Promise fut;
        if (e.hasFutureResult() && !(e.getFutureCB() instanceof CallbackWrapper)) {
            fut = new Promise();
            e.setFutureCB(new CallbackWrapper(e.getSendingActor(), new Callback(){

                public void complete(Object result, Object error) {
                    fut.complete(result, error);
                }
            }));
        } else {
            fut = null;
        }
        Actor targetActor = e.getTargetActor();
        this.put2QueuePolling(e.isCallback() ? targetActor.__cbQueue : targetActor.__mailbox, false, e, targetActor);
        return fut;
    }

    @Override
    public Object enqueueCall(Actor sendingActor, Actor receiver, String methodName, Object[] args, boolean isCB) {
        return this.enqueueCallFromRemote(null, sendingActor, receiver, methodName, args, isCB);
    }

    @Override
    public Object enqueueCallFromRemote(RemoteRegistry reg, Actor sendingActor, Actor receiver, String methodName, Object[] args, boolean isCB) {
        Object actor = receiver.getActor();
        Method method = ((Actor)actor).__getCachedMethod(methodName, (Actor)actor);
        if (method == null) {
            throw new RuntimeException("unknown method " + methodName + " on " + actor);
        }
        for (int i = 0; i < args.length; ++i) {
            Object arg = args[i];
            if (!(arg instanceof Callback)) continue;
            args[i] = new CallbackWrapper(sendingActor, (Callback)arg);
        }
        CallEntry e = new CallEntry(actor, method, args, Actor.sender.get(), (Actor)actor, isCB);
        e.setRemoteRefRegistry(reg);
        return this.put2QueuePolling(e);
    }

    @Override
    public void terminateIfIdle() {
        this.myThread.setAutoShutDown(true);
    }

    @Override
    public void threadStopped(DispatcherThread th) {
    }

    @Override
    public InvocationHandler getInvoker(Actor dispatcher, Object toWrap) {
        return new CallbackInvokeHandler(toWrap, dispatcher);
    }

    @Override
    public <T> T inThread(Actor actor, T callback) {
        Class<?>[] interfaces = callback.getClass().getInterfaces();
        InvocationHandler invoker = actor.__scheduler.getInvoker(actor, callback);
        if (invoker == null) {
            return callback;
        }
        return (T)Proxy.newProxyInstance(callback.getClass().getClassLoader(), interfaces, invoker);
    }

    @Override
    public void delayedCall(long millis, final Runnable toRun) {
        Actors.delayedCalls.schedule(new TimerTask(){

            @Override
            public void run() {
                toRun.run();
            }
        }, millis);
    }

    @Override
    public <T> void runBlockingCall(Actor emitter, Callable<T> toCall, Callback<T> resultHandler) {
        CallbackWrapper resultWrapper = new CallbackWrapper(emitter, resultHandler);
        Actors.exec.execute(() -> {
            try {
                resultWrapper.complete(toCall.call(), null);
            }
            catch (Throwable th) {
                resultWrapper.complete(null, th);
            }
        });
    }

    @Override
    public DispatcherThread assignDispatcher(int minLoadPerc) {
        return this.myThread;
    }

    @Override
    public void rebalance(DispatcherThread dispatcherThread) {
    }

    @Override
    public BackOffStrategy getBackoffStrategy() {
        return this.backOffStrategy;
    }

    @Override
    public void tryStopThread(DispatcherThread dispatcherThread) {
    }

    @Override
    public void tryIsolate(DispatcherThread dp, Actor actorRef) {
    }

    @Override
    public int getNumActors() {
        return this.myThread.getActorsNoCopy().length;
    }

    @Override
    public IPromise getReport() {
        return new Promise<SchedulingReport>(new SchedulingReport(1, this.getDefaultQSize(), 0));
    }

    @Override
    public IPromise<Monitorable[]> getSubMonitorables() {
        return new Promise<Monitorable[]>(new Monitorable[]{this.myThread});
    }

    class CallbackInvokeHandler
    implements InvocationHandler {
        final Object target;
        final Actor targetActor;

        public CallbackInvokeHandler(Object target, Actor act) {
            this.target = target;
            this.targetActor = act;
        }

        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            if (method.getDeclaringClass() == Object.class) {
                return method.invoke(proxy, args);
            }
            if (this.target != null) {
                CallEntry<Object> ce = new CallEntry<Object>(this.target, method, args, Actor.sender.get(), this.targetActor, true);
                SimpleScheduler.this.put2QueuePolling(this.targetActor.__cbQueue, true, ce, this.targetActor);
            }
            return null;
        }
    }
}

