/*
 * Decompiled with CFR 0.152.
 */
package org.nustaq.kontraktor.impl;

import io.jaq.mpsc.MpscConcurrentQueue;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import org.nustaq.kontraktor.Actor;
import org.nustaq.kontraktor.Callback;
import org.nustaq.kontraktor.Future;
import org.nustaq.kontraktor.Promise;
import org.nustaq.kontraktor.Scheduler;
import org.nustaq.kontraktor.impl.CallEntry;
import org.nustaq.kontraktor.impl.ElasticScheduler;
import org.nustaq.kontraktor.impl.InternalActorStoppedException;
import org.nustaq.kontraktor.monitoring.Monitorable;
import org.nustaq.kontraktor.util.Log;

public class DispatcherThread
extends Thread
implements Monitorable {
    public static int SCHEDULE_TICK_NANOS = 500000;
    public static int QUEUE_PERCENTAGE_TRIGGERING_REBALANCE = 50;
    public static int MILLIS_AFTER_CREATION_BEFORE_REBALANCING = 2;
    private Scheduler scheduler;
    private Actor[] actors = new Actor[0];
    ConcurrentLinkedQueue<Actor> toAdd = new ConcurrentLinkedQueue();
    protected boolean shutDown = false;
    static AtomicInteger dtcount = new AtomicInteger(0);
    int stackDepth = 0;
    volatile boolean isIsolated = false;
    int emptySinceLastCheck = 0;
    int count = 0;
    long created = System.currentTimeMillis();

    public DispatcherThread(Scheduler scheduler) {
        this.scheduler = scheduler;
        this.setName("DispatcherThread " + dtcount.incrementAndGet());
    }

    @Override
    public String toString() {
        return "DispatcherThread{ name:" + this.getName() + '}';
    }

    public boolean isIsolated() {
        return this.isIsolated;
    }

    public void setIsolated(boolean isIsolated) {
        this.isIsolated = isIsolated;
    }

    public void addActor(Actor act) {
        act.getActorRef().__currentDispatcher = ((Actor)act.getActor()).__currentDispatcher = this;
        this.toAdd.offer(act.getActorRef());
    }

    void removeActorImmediate(Actor act) {
        if (Thread.currentThread() != this) {
            throw new RuntimeException("wrong thread");
        }
        Actor[] newAct = new Actor[this.actors.length - 1];
        int idx = 0;
        for (int i = 0; i < this.actors.length; ++i) {
            Actor actor = this.actors[i];
            if (actor == act) continue;
            newAct[idx++] = actor;
        }
        if (idx != newAct.length) {
            throw new RuntimeException("could not remove actor");
        }
        this.actors = newAct;
    }

    @Override
    public void run() {
        int emptyCount = 0;
        long scheduleTickTime = System.nanoTime();
        boolean isShutDown = false;
        while (!isShutDown) {
            try {
                if (this.pollQs()) {
                    emptyCount = 0;
                    if (System.nanoTime() - scheduleTickTime <= (long)SCHEDULE_TICK_NANOS) continue;
                    if (this.emptySinceLastCheck == 0) {
                        this.checkForSplit();
                    }
                    this.emptySinceLastCheck = 0;
                    scheduleTickTime = System.nanoTime();
                    this.schedulePendingAdds();
                    continue;
                }
                ++this.emptySinceLastCheck;
                this.scheduler.yield(++emptyCount);
                if (this.shutDown) {
                    isShutDown = true;
                }
                if (!this.scheduler.getBackoffStrategy().isSleeping(emptyCount)) continue;
                scheduleTickTime = 0L;
                this.schedulePendingAdds();
                if (System.currentTimeMillis() - this.created <= 1000L) continue;
                if (this.actors.length == 0 && this.toAdd.peek() == null) {
                    this.shutDown();
                    continue;
                }
                this.scheduler.tryStopThread(this);
            }
            catch (Throwable th) {
                Log.Warn(this, th, "from main poll loop");
            }
        }
        this.scheduler.threadStopped(this);
        for (int i = 0; i < 100; ++i) {
            LockSupport.parkNanos(5000000L);
            while (this.actors.length > 0 || this.toAdd.peek() != null) {
                if (ElasticScheduler.DEBUG_SCHEDULING) {
                    Log.Lg.warn(this, "Severe: zombie dispatcher thread detected");
                }
                this.scheduler.tryStopThread(this);
                i = 0;
            }
        }
        if (ElasticScheduler.DEBUG_SCHEDULING) {
            Log.Info(this, "dispatcher thread terminated " + this.getName());
        }
    }

    public void schedulePendingAdds() {
        Actor a;
        ArrayList<Actor> newOnes = new ArrayList<Actor>();
        while ((a = this.toAdd.poll()) != null) {
            newOnes.add(a);
        }
        if (newOnes.size() > 0) {
            Actor[] newQueue = new Actor[newOnes.size() + this.actors.length];
            System.arraycopy(this.actors, 0, newQueue, 0, this.actors.length);
            for (int i = 0; i < newOnes.size(); ++i) {
                Actor actor;
                newQueue[this.actors.length + i] = actor = (Actor)newOnes.get(i);
            }
            this.actors = newQueue;
        }
    }

    protected CallEntry pollQueues(Actor[] actors) {
        if (this.count >= actors.length) {
            this.count = 0;
            if (actors.length == 0) {
                return null;
            }
        }
        Actor actor2poll = actors[this.count];
        CallEntry res = (CallEntry)actor2poll.__cbQueue.poll();
        if (res == null) {
            res = (CallEntry)actor2poll.__mailbox.poll();
        }
        ++this.count;
        return res;
    }

    public boolean pollQs() {
        return this.pollQs(this.actors);
    }

    public boolean pollQs(Actor[] actors) {
        CallEntry callEntry = this.pollQueues(actors);
        if (callEntry != null) {
            try {
                Actor.sender.set(callEntry.getTargetActor());
                Object invoke = null;
                try {
                    invoke = this.invoke(callEntry);
                }
                catch (IllegalArgumentException iae) {
                    Class<?>[] parameterTypes = callEntry.getMethod().getParameterTypes();
                    Object[] args = callEntry.getArgs();
                    if (args.length == parameterTypes.length) {
                        for (int i = 0; i < args.length; ++i) {
                            Object arg = args[i];
                            if (parameterTypes[i] != Boolean.TYPE && parameterTypes[i] != Boolean.class || !(arg instanceof Byte)) continue;
                            args[i] = ((Byte)arg).intValue() != 0;
                        }
                        invoke = this.invoke(callEntry);
                    }
                    System.out.println("mismatch when invoking method " + callEntry);
                    for (int i = 0; i < callEntry.getArgs().length; ++i) {
                        Object o = callEntry.getArgs()[i];
                        System.out.println("arg " + i + " " + o + (o != null ? " " + o.getClass().getSimpleName() : "") + ",");
                    }
                    System.out.println();
                    throw iae;
                }
                if (callEntry.getFutureCB() != null) {
                    final Future futureCB = callEntry.getFutureCB();
                    Promise invokeResult = (Promise)invoke;
                    invokeResult.then(new Callback(){

                        public void receive(Object result, Object error) {
                            futureCB.receive(result, error);
                        }
                    });
                }
                return true;
            }
            catch (Throwable e) {
                if (e instanceof InvocationTargetException && ((InvocationTargetException)e).getTargetException() == InternalActorStoppedException.Instance) {
                    Actor actor = (Actor)callEntry.getTarget();
                    actor.__stopped = true;
                    this.removeActorImmediate(actor.getActorRef());
                    return true;
                }
                if (e instanceof InvocationTargetException) {
                    e = e.getCause();
                }
                if (callEntry.getFutureCB() != null) {
                    Log.Info(this, e, "returned catched exception to future");
                    callEntry.getFutureCB().receive(null, e);
                }
                Log.Warn(this, e, "");
            }
        }
        return false;
    }

    private Object invoke(CallEntry poll) throws IllegalAccessException, InvocationTargetException {
        Object target = poll.getTarget();
        return poll.getMethod().invoke(target, poll.getArgs());
    }

    private void checkForSplit() {
        int load = this.getLoad();
        if (load > QUEUE_PERCENTAGE_TRIGGERING_REBALANCE && this.actors.length > 1 && System.currentTimeMillis() - this.created > (long)MILLIS_AFTER_CREATION_BEFORE_REBALANCING) {
            this.scheduler.rebalance(this);
        }
    }

    public int getLoad() {
        int res = 0;
        Actor[] actors = this.actors;
        for (int i = 0; i < actors.length; ++i) {
            MpscConcurrentQueue queue = (MpscConcurrentQueue)actors[i].__mailbox;
            int load = queue.size() * 100 / queue.getCapacity();
            if (load > res) {
                res = load;
            }
            if ((load = (queue = (MpscConcurrentQueue)actors[i].__cbQueue).size() * 100 / queue.getCapacity()) <= res) continue;
            res = load;
        }
        return res;
    }

    public int getAccumulatedQSizes() {
        int res = 0;
        Actor[] actors = this.actors;
        for (int i = 0; i < actors.length; ++i) {
            res += actors[i].getQSizes();
        }
        return res;
    }

    public int getQSize() {
        int res = 0;
        Actor[] actors = this.actors;
        for (int i = 0; i < actors.length; ++i) {
            Actor a = actors[i];
            res += a.__mailbox.size();
            res += a.__cbQueue.size();
        }
        return res;
    }

    public boolean isShutDown() {
        return !this.shutDown;
    }

    public void shutDown() {
        this.shutDown = true;
    }

    public void shutDownImmediate() {
        throw new RuntimeException("unimplemented");
    }

    public boolean isEmpty() {
        for (int i = 0; i < this.actors.length; ++i) {
            Actor act = this.actors[i];
            if (act.__mailbox.isEmpty() && act.__cbQueue.isEmpty()) continue;
            return false;
        }
        return true;
    }

    public void waitEmpty(long nanos) {
        while (!this.isEmpty()) {
            LockSupport.parkNanos(nanos);
        }
    }

    public Scheduler getScheduler() {
        return this.scheduler;
    }

    public Actor[] getActors() {
        Actor[] actors = this.actors;
        Actor[] res = new Actor[actors.length];
        System.arraycopy(actors, 0, res, 0, res.length);
        return res;
    }

    Actor[] getActorsNoCopy() {
        return this.actors;
    }

    public boolean schedules(Object receiverRef) {
        if (Thread.currentThread() != this) {
            throw new RuntimeException("cannot call from foreign thread");
        }
        if (receiverRef instanceof Actor) {
            return ((Actor)receiverRef).__currentDispatcher == this;
        }
        return false;
    }

    @Override
    public Future $getReport() {
        return new Promise<DispatcherReport>(new DispatcherReport(this.getName(), this.actors.length, this.getLoad(), this.getAccumulatedQSizes()));
    }

    @Override
    public Future<Monitorable[]> $getSubMonitorables() {
        return new Promise<Actor[]>(this.getActors());
    }

    public static class DispatcherReport {
        String name;
        int numActors;
        int loadPerc;
        int qSizes;

        public DispatcherReport() {
        }

        public DispatcherReport(String name, int numActors, int loadPerc, int qSizes) {
            this.name = name;
            this.numActors = numActors;
            this.loadPerc = loadPerc;
            this.qSizes = qSizes;
        }

        public String getName() {
            return this.name;
        }

        public int getNumActors() {
            return this.numActors;
        }
    }
}

