/*
 * Decompiled with CFR 0.152.
 */
package de.ruedigermoeller.kontraktor.impl;

import de.ruedigermoeller.kontraktor.Actor;
import de.ruedigermoeller.kontraktor.Callback;
import de.ruedigermoeller.kontraktor.Future;
import de.ruedigermoeller.kontraktor.Promise;
import de.ruedigermoeller.kontraktor.impl.BackOffStrategy;
import de.ruedigermoeller.kontraktor.impl.CallEntry;
import de.ruedigermoeller.kontraktor.impl.CallbackWrapper;
import io.jaq.mpsc.MpscConcurrentQueue;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;

public class DispatcherThread
extends Thread {
    public static AtomicInteger instanceCount = new AtomicInteger(0);
    public static BackOffStrategy backOffStrategy = new BackOffStrategy();
    AtomicInteger usingActors = new AtomicInteger(0);
    volatile boolean shutDown = false;
    private boolean dead;
    public static int DEFAULT_QUEUE_SIZE = 30000;
    Queue queue;
    Queue cbQueue;
    int instanceNum;
    String stack;

    public DispatcherThread() {
        this.init(DEFAULT_QUEUE_SIZE);
    }

    public DispatcherThread(int qSize) {
        this.init(qSize);
    }

    public boolean isEmpty() {
        return this.queue.isEmpty() && this.cbQueue.isEmpty();
    }

    public InvocationHandler getInvoker(Object toWrap) {
        return new CallbackInvokeHandler(toWrap);
    }

    protected void init(int qSize) {
        if (qSize <= 0) {
            qSize = DEFAULT_QUEUE_SIZE;
        }
        this.queue = new MpscConcurrentQueue(qSize);
        this.cbQueue = new MpscConcurrentQueue(qSize);
        this.instanceNum = instanceCount.incrementAndGet();
        StringWriter stringWriter = new StringWriter(1000);
        PrintWriter s = new PrintWriter(stringWriter);
        new Exception().printStackTrace(s);
        s.flush();
        this.stack = stringWriter.getBuffer().toString();
        this.setName("ActorDisp spawned from [" + Thread.currentThread().getName() + "] " + System.identityHashCode(this));
        this.start();
    }

    @Override
    public String toString() {
        return "DispatcherThread{ name:" + this.getName() + '}';
    }

    public void actorAdded(Actor a) {
        this.usingActors.incrementAndGet();
    }

    public void actorStopped(Actor a) {
        int count = this.usingActors.decrementAndGet();
        if (count == 0) {
            this.shutDown();
        }
    }

    public static Future pollDispatchOnObject(DispatcherThread currentThreadDispatcher, CallEntry e) {
        Promise fut;
        if (e.hasFutureResult()) {
            fut = new Promise();
            e.setFutureCB(new CallbackWrapper(currentThreadDispatcher, new Callback(){

                public void receiveResult(Object result, Object error) {
                    fut.receiveResult(result, error);
                }
            }));
        } else {
            fut = null;
        }
        int count = 0;
        while (e.getDispatcher().dispatchOnObject(e)) {
            if (currentThreadDispatcher != null) {
                DispatcherThread.yield(count++);
                continue;
            }
            DispatcherThread.yield(count++);
        }
        return fut;
    }

    public boolean dispatchOnObject(CallEntry entry) {
        if (this.dead) {
            throw new RuntimeException("received message on terminated dispatcher " + this);
        }
        return !this.queue.offer(entry);
    }

    public boolean dispatchCallback(CallEntry ce) {
        if (this.dead) {
            throw new RuntimeException("received message on terminated dispatcher " + this);
        }
        DispatcherThread sender = DispatcherThread.getThreadDispatcher();
        return !this.cbQueue.offer(ce);
    }

    public static DispatcherThread getThreadDispatcher() {
        DispatcherThread sender = null;
        if (Thread.currentThread() instanceof DispatcherThread) {
            sender = (DispatcherThread)Thread.currentThread();
        }
        return sender;
    }

    @Override
    public void run() {
        int emptyCount = 0;
        boolean isShutDown = false;
        while (!isShutDown) {
            if (this.pollQs()) {
                emptyCount = 0;
                continue;
            }
            DispatcherThread.yield(++emptyCount);
            if (!this.shutDown) continue;
            isShutDown = true;
        }
        this.dead = true;
        instanceCount.decrementAndGet();
    }

    public boolean pollQs() {
        CallEntry poll = (CallEntry)this.cbQueue.poll();
        if (poll == null) {
            poll = (CallEntry)this.queue.poll();
        }
        if (poll != null) {
            try {
                Object invoke = poll.getMethod().invoke(poll.getTarget(), poll.getArgs());
                if (poll.getFutureCB() != null) {
                    final Future futureCB = poll.getFutureCB();
                    Promise invokeResult = (Promise)invoke;
                    invokeResult.then(new Callback(){

                        public void receiveResult(Object result, Object error) {
                            futureCB.receiveResult(result, error);
                        }
                    });
                }
                return true;
            }
            catch (Exception e) {
                if (poll.getFutureCB() != null) {
                    poll.getFutureCB().receiveResult(null, e);
                }
                if (e.getCause() != null) {
                    e.getCause().printStackTrace();
                }
                e.printStackTrace();
            }
        }
        return false;
    }

    public static void yield(int count) {
        backOffStrategy.yield(count);
    }

    public int getQSize() {
        return this.queue.size() + this.cbQueue.size();
    }

    public boolean isShutDown() {
        return !this.shutDown;
    }

    public void shutDown() {
        this.shutDown = true;
    }

    public void shutDownImmediate() {
        throw new RuntimeException("unimplemented");
    }

    public void waitEmpty(long nanos) {
        while (!this.isEmpty()) {
            LockSupport.parkNanos(nanos);
        }
    }

    class CallbackInvokeHandler
    implements InvocationHandler {
        final Object target;

        public CallbackInvokeHandler(Object target) {
            this.target = target;
        }

        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            if (method.getDeclaringClass() == Object.class) {
                return method.invoke(proxy, args);
            }
            if (this.target != null) {
                CallEntry<Object> ce = new CallEntry<Object>(this.target, method, args, DispatcherThread.this);
                while (DispatcherThread.this.dispatchCallback(ce)) {
                }
            }
            return null;
        }
    }
}

