/*
 * Decompiled with CFR 0.152.
 */
package org.nustaq.kontraktor;

import java.io.Serializable;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import org.nustaq.kontraktor.Actors;
import org.nustaq.kontraktor.Callback;
import org.nustaq.kontraktor.IPromise;
import org.nustaq.kontraktor.MustBeRunFromActorThread;
import org.nustaq.kontraktor.Promise;
import org.nustaq.kontraktor.Scheduler;
import org.nustaq.kontraktor.annotations.CallerSideMethod;
import org.nustaq.kontraktor.annotations.Local;
import org.nustaq.kontraktor.annotations.Remoted;
import org.nustaq.kontraktor.impl.ActorProxyFactory;
import org.nustaq.kontraktor.impl.DispatcherThread;
import org.nustaq.kontraktor.impl.InternalActorStoppedException;
import org.nustaq.kontraktor.monitoring.Monitorable;
import org.nustaq.kontraktor.remoting.base.ConnectionRegistry;
import org.nustaq.kontraktor.remoting.base.ObjectSocket;
import org.nustaq.kontraktor.remoting.base.RateLimitException;
import org.nustaq.kontraktor.remoting.base.RemotedActor;
import org.nustaq.kontraktor.remoting.base.ServingActor;
import org.nustaq.kontraktor.remoting.encoding.RemoteCallEntry;
import org.nustaq.kontraktor.util.Log;
import org.nustaq.kontraktor.util.TicketMachine;
import org.nustaq.serialization.util.FSTUtil;

public class Actor<SELF extends Actor>
extends Actors
implements Serializable,
Monitorable,
Executor {
    public static ThreadLocal<Actor> sender = new ThreadLocal();
    public static ThreadLocal<ConnectionRegistry> connection = new ThreadLocal();
    public Object userData;
    public Queue __mailbox;
    public int __mbCapacity;
    public Queue __cbQueue;
    public Thread __currentDispatcher;
    public Scheduler __scheduler;
    public volatile boolean __stopped;
    public Actor __self;
    private Map<String, Runnable> _debounceMap;
    public long __remoteId;
    public volatile ConcurrentLinkedQueue<ConnectionRegistry> __connections;
    public ConnectionRegistry __clientConnection;
    public boolean zzRoutingGCEnabled;
    ConcurrentLinkedQueue<Callback<SELF>> __stopHandlers;
    public int __mailboxCapacity;
    protected TicketMachine __ticketMachine;
    ConcurrentHashMap methodCache;
    ConcurrentHashMap interceptedCache;
    public Callback<RemoteCallEntry> zzServerMsgCallback;
    private long[] zzLastLiveActorIds;
    public Actor __publishTarget;

    public static Actor current() {
        Actor actor = sender.get();
        if (actor == null) {
            throw new MustBeRunFromActorThread();
        }
        return actor;
    }

    public static boolean inside() {
        return sender.get() != null;
    }

    public void __submit(Runnable toRun) {
        toRun.run();
    }

    protected SELF self() {
        return (SELF)this.__self;
    }

    public ActorProxyFactory getFactory() {
        return Actors.instance.getFactory();
    }

    public SELF getActor() {
        return (SELF)this;
    }

    @CallerSideMethod
    public void stop() {
        if (this.isRemote()) {
            throw new RuntimeException("Cannot stop remote ref");
        }
        SELF self = this.self();
        if (self != null) {
            ((Actor)self).ping().then(() -> {
                SELF selfInner = this.self();
                if (selfInner != null) {
                    ((Actor)selfInner).asyncstop();
                }
            });
        }
    }

    @CallerSideMethod
    public boolean isStopped() {
        return this.__stopped;
    }

    @CallerSideMethod
    public boolean isProxy() {
        return this.getActor() != this;
    }

    public IPromise askMsg(String messageId, Object ... args) {
        return Actor.resolve(null);
    }

    public void tellMsg(String messageId, Object ... args) {
    }

    @CallerSideMethod
    public IPromise ask(String messageId, Object ... args) {
        boolean isCB;
        boolean bl = isCB = args != null && args.length > 0 && args[args.length - 1] instanceof Callback;
        if (this.isRemote() && this.__clientConnection instanceof ConnectionRegistry) {
            Actor sendingActor = sender.get();
            Object[] newArgs = new Object[args.length + 1];
            System.arraycopy(args, 0, newArgs, 1, args.length);
            newArgs[0] = messageId;
            return (IPromise)this.getScheduler().enqueueCall(this.__clientConnection, sendingActor, (Actor)this.getActor(), "ask", newArgs, isCB);
        }
        return (IPromise)this.getScheduler().enqueueCall(sender.get(), (Actor)this.getActor(), messageId, args, isCB);
    }

    @CallerSideMethod
    public void tell(String messageId, Object ... args) {
        boolean isCB;
        boolean bl = isCB = args != null && args.length > 0 && args[args.length - 1] instanceof Callback;
        if (this.isRemote() && this.__clientConnection instanceof ConnectionRegistry) {
            Actor sendingActor = sender.get();
            Object[] newArgs = new Object[args.length + 1];
            System.arraycopy(args, 0, newArgs, 1, args.length);
            newArgs[0] = messageId;
            this.getScheduler().enqueueCall(this.__clientConnection, sendingActor, (Actor)this.getActor(), "tell", newArgs, isCB);
            return;
        }
        this.getScheduler().enqueueCall(sender.get(), (Actor)this.getActor(), messageId, args, isCB);
    }

    public <T> IPromise<T> execInThreadPool(Callable<T> callable) {
        Promise prom = new Promise();
        this.__scheduler.runBlockingCall((Actor)this.self(), callable, prom);
        return prom;
    }

    @Local
    public void cyclic(long interval, Callable<Boolean> toRun) {
        if (!this.isStopped()) {
            Boolean res = true;
            try {
                res = toRun.call();
            }
            catch (Exception e) {
                Log.Warn((Object)this, e);
            }
            if (res.booleanValue()) {
                ((Actor)this.self()).delayed(interval, () -> this.cyclic(interval, toRun));
            }
        }
    }

    @Local
    public void debounce(long timeout, String tag, Runnable toRun) {
        if (this._debounceMap == null) {
            this._debounceMap = new HashMap<String, Runnable>(7);
        }
        this._debounceMap.put(tag, toRun);
        this.delayed(timeout, () -> {
            if (this._debounceMap.get(tag) == toRun) {
                this._debounceMap.remove(tag);
                toRun.run();
            }
        });
    }

    @CallerSideMethod
    @Local
    public void delayed(long millis, Runnable toRun) {
        this.__scheduler.delayedCall(millis, this.inThread((Actor)this.self(), (Object)toRun));
    }

    @CallerSideMethod
    public boolean isMailboxPressured() {
        return this.__mailbox.size() * 2 > this.__mbCapacity;
    }

    @CallerSideMethod
    public boolean isEmpty() {
        return this.__mailbox.isEmpty() && this.__cbQueue.isEmpty();
    }

    @CallerSideMethod
    public Scheduler getScheduler() {
        return this.__scheduler;
    }

    @CallerSideMethod
    public boolean isCallbackQPressured() {
        return this.__cbQueue.size() * 2 > this.__mbCapacity;
    }

    @CallerSideMethod
    public int getMailboxSize() {
        return this.__mailbox.size();
    }

    @CallerSideMethod
    public int getQSizes() {
        return this.getCallbackSize() + this.getMailboxSize();
    }

    @CallerSideMethod
    public int getCallbackSize() {
        return this.__cbQueue.size();
    }

    @CallerSideMethod
    protected <T> T inThread(Actor proxy, T cbInterface) {
        return this.__scheduler.inThread(proxy, cbInterface);
    }

    protected final void checkThread() {
        if (this.getCurrentDispatcher() != null && this.getCurrentDispatcher() != Thread.currentThread()) {
            Log.Error((Object)this, "UNEXPECTED MULTITHREADING");
            throw new RuntimeException("Wrong Thread");
        }
        if (this.getCurrentDispatcher() == null) {
            Log.Error((Object)this, "Not in Dispatcher Thread");
            throw new RuntimeException("Not in Dispatcher Thread:" + Thread.currentThread().getName());
        }
    }

    @CallerSideMethod
    public Actor getActorRef() {
        return this.__self;
    }

    @CallerSideMethod
    public boolean isRemote() {
        return this.__remoteId != 0L;
    }

    @Local
    public void close() {
        if (this.__connections != null) {
            ConcurrentLinkedQueue<ConnectionRegistry> prevCon = this.getActorRef().__connections;
            this.getActorRef().__connections = null;
            ((Actor)this.getActor()).__connections = null;
            prevCon.forEach(con -> con.close(this));
        }
    }

    protected void closeCurrentClient() {
        ConnectionRegistry remoteConnection = connection.get();
        if (remoteConnection != null) {
            this.delayed(1000L, () -> remoteConnection.close(this));
        }
    }

    @Local
    public void asyncstop() {
        this.hasStopped();
        this.__stop();
    }

    protected void hasStopped() {
    }

    @CallerSideMethod
    public void stopSafeClose() {
        if (this.isStopped()) {
            ((Actor)this.getActor()).close();
        } else {
            ((Actor)this.self()).close();
        }
    }

    public IPromise ping() {
        return new Promise<Integer>(0);
    }

    protected void serialOn(Object transactionKey, Consumer<IPromise> toRun) {
        if (this.isProxy()) {
            throw new RuntimeException("cannot call on actor proxy object");
        }
        if (this.__ticketMachine == null) {
            this.__ticketMachine = new TicketMachine();
        }
        this.__ticketMachine.getTicket(transactionKey).onResult(finSig -> {
            try {
                toRun.accept((IPromise)finSig);
            }
            catch (Throwable th) {
                Log.Warn((Object)this, th);
            }
        });
    }

    @CallerSideMethod
    public boolean isPublished() {
        return this.__connections != null && this.__connections.peek() != null;
    }

    @Override
    @CallerSideMethod
    @Local
    public void execute(Runnable command) {
        ((Actor)this.self()).__submit(command);
    }

    @CallerSideMethod
    @Local
    public DispatcherThread getCurrentDispatcher() {
        return (DispatcherThread)this.__currentDispatcher;
    }

    protected ConcurrentLinkedQueue<ConnectionRegistry> getConnections() {
        return this.__connections;
    }

    @CallerSideMethod
    public Actor getUntypedRef() {
        Actor<SELF> actor = new Actor<SELF>();
        actor.__publishTarget = this.self();
        return actor;
    }

    @CallerSideMethod
    public void __addStopHandler(Callback<SELF> cb) {
        if (this.__stopHandlers == null) {
            ((Actor)this.getActor()).__stopHandlers = this.getActorRef().__stopHandlers = new ConcurrentLinkedQueue();
        }
        this.__stopHandlers.add(cb);
    }

    @CallerSideMethod
    public void __addRemoteConnection(ConnectionRegistry con) {
        if (this.__connections == null) {
            ((Actor)this.getActor()).__connections = this.getActorRef().__connections = new ConcurrentLinkedQueue();
        }
        if (!this.__connections.contains(con)) {
            this.__connections.add(con);
            if (this instanceof RemotedActor) {
                String connectionIdentifier = con.getSocketRef().getConnectionIdentifier();
                ((RemotedActor)((Object)this)).hasBeenPublished(connectionIdentifier);
            }
        }
    }

    @CallerSideMethod
    public void __removeRemoteConnection(ConnectionRegistry con) {
        if (this.__connections != null) {
            this.__connections.remove(con);
        }
    }

    @CallerSideMethod
    public void __stop() {
        this.__stopImpl();
    }

    @CallerSideMethod
    protected void __stopImpl() {
        Log.Debug((Object)this, "stopping actor " + this.getClass().getSimpleName());
        Actor self = this.__self;
        if (self == null || this.getActor() == null || self.isStopped() && ((Actor)this.getActor()).isStopped()) {
            return;
        }
        this.getActorRef().__stopped = true;
        ((Actor)this.getActor()).__stopped = true;
        if (this.__stopHandlers != null) {
            this.__stopHandlers.forEach((Consumer<Callback<SELF>>)((Consumer<Callback>)cb -> cb.complete(this.self(), null)));
            this.__stopHandlers.clear();
        }
        throw InternalActorStoppedException.Instance;
    }

    @CallerSideMethod
    public Object __enqueueCall(Actor receiver, String methodName, Object[] args, boolean isCB) {
        if (this.__stopped) {
            if (methodName.equals("stop")) {
                return null;
            }
            this.__addDeadLetter(receiver, methodName);
        }
        return this.__scheduler.enqueueCall(sender.get(), receiver, methodName, args, isCB);
    }

    @CallerSideMethod
    public void __addDeadLetter(Actor receiver, String methodName) {
        String senderString = sender.get() == null ? "null" : sender.get().getClass().getName();
        Object s = "DEAD LETTER: sender:" + senderString + " receiver::msg:" + receiver.getClass().getSimpleName() + "::" + methodName;
        s = ((String)s).replace("_ActorProxy", "");
        Actors.AddDeadLetter((String)s);
    }

    @CallerSideMethod
    public Method __getCachedMethod(String methodName, Actor actor, BiFunction<Actor, String, Boolean> callInterceptor) {
        ConcurrentHashMap mcache;
        Method method;
        if (callInterceptor != null) {
            if (this.interceptedCache == null) {
                this.interceptedCache = new ConcurrentHashMap(7);
            }
        } else if (this.methodCache == null) {
            this.methodCache = new ConcurrentHashMap(7);
        }
        if ((method = (Method)(mcache = callInterceptor != null ? this.interceptedCache : this.methodCache).get(methodName)) == null) {
            Method[] methods = actor.getActor().getClass().getMethods();
            for (int i = 0; i < methods.length; ++i) {
                Method m = methods[i];
                if (!m.getName().equals(methodName) || callInterceptor != null && !callInterceptor.apply(actor, methodName).booleanValue()) continue;
                mcache.put(methodName, m);
                method = m;
                break;
            }
        } else if (callInterceptor != null && !callInterceptor.apply(actor, methodName).booleanValue()) {
            return null;
        }
        return method;
    }

    @CallerSideMethod
    public boolean __dispatchRemoteCall(ObjectSocket objSocket, RemoteCallEntry rce, ConnectionRegistry registry, List<IPromise> createdFutures, Object authContext, BiFunction<Actor, String, Boolean> callInterceptor, long delayCode) {
        block7: {
            rce.unpackArgs(registry.getConf());
            try {
                if (delayCode == -1L) {
                    throw new RateLimitException();
                }
                Object future = this.getScheduler().enqueueCallFromRemote(registry, null, (Actor)this.self(), rce.getMethod(), rce.getArgs(), false, null, callInterceptor, rce);
                if (future instanceof IPromise) {
                    Promise p = null;
                    if (createdFutures != null) {
                        p = new Promise();
                        createdFutures.add(p);
                    }
                    Promise finalP = p;
                    RemoteCallEntry finalRce = rce;
                    ((IPromise)future).then((r, e) -> {
                        Runnable runnable = () -> {
                            try {
                                Object finalR = r;
                                if (registry.isJsonSerialized() && finalRce.getMethodHandle() != null) {
                                    finalR = this.getScheduler().mapResult(r, finalRce);
                                }
                                registry.receiveCBResult(objSocket, finalRce.getFutureKey(), finalR, e);
                                if (finalP != null) {
                                    finalP.resolve();
                                }
                            }
                            catch (Exception ex) {
                                Log.Warn(this, ex, "--");
                            }
                        };
                        if (Thread.currentThread() != this.__currentDispatcher) {
                            ((Actor)this.self()).execute(runnable);
                        } else {
                            runnable.run();
                        }
                    });
                }
            }
            catch (Throwable th) {
                if (!(th instanceof RateLimitException)) {
                    Log.Warn((Object)this, th);
                }
                if (rce.getFutureKey() != 0L) {
                    ((Actor)this.self()).execute(() -> {
                        try {
                            registry.receiveCBResult(objSocket, rce.getFutureKey(), null, th instanceof RateLimitException ? "" + th : FSTUtil.toString((Throwable)th));
                        }
                        catch (Exception e) {
                            Log.Error((Object)this, e);
                        }
                    });
                }
                if (th instanceof RateLimitException) break block7;
                FSTUtil.rethrow((Throwable)th);
            }
        }
        return createdFutures != null && createdFutures.size() > 0;
    }

    @Local
    public void unpublish() {
        if (this.__connections != null) {
            this.__connections.forEach(conreg -> conreg.unpublishActor(this));
        }
    }

    @Local
    @CallerSideMethod
    public void setServerMsgCallback(Callback<RemoteCallEntry> cb) {
        this.zzServerMsgCallback = cb;
    }

    @Remoted
    public IPromise<Long> router$clientPing(long tim, long[] publishedActorIds) {
        ConnectionRegistry remoteConnection = connection.get();
        if (remoteConnection != null) {
            remoteConnection.pingFromRoutingClient();
        }
        return Actor.resolve(tim);
    }

    @Remoted
    public void zzRoutingRefGC(long[] ids) {
        if (this.zzRoutingGCEnabled) {
            ConnectionRegistry connectionRegistry = connection.get();
            if (this.zzLastLiveActorIds == null) {
                this.zzLastLiveActorIds = connectionRegistry.getPublishedActorIds();
            } else {
                for (int i = 0; i < this.zzLastLiveActorIds.length; ++i) {
                    Actor publishedActor;
                    long zzLastLiveActorId = this.zzLastLiveActorIds[i];
                    if (zzLastLiveActorId < 2L) continue;
                    boolean alive = false;
                    for (int j = 0; j < ids.length; ++j) {
                        long id = ids[j];
                        if (id != zzLastLiveActorId) continue;
                        alive = true;
                        break;
                    }
                    if (alive || (publishedActor = connectionRegistry.getPublishedActor(zzLastLiveActorId)) == null || publishedActor.isStopped() || !publishedActor.isPublished()) continue;
                    System.out.println("unpublishing " + zzLastLiveActorId + " " + publishedActor);
                    connectionRegistry.unpublishActor(publishedActor);
                    publishedActor.stop();
                }
                this.zzLastLiveActorIds = connectionRegistry.getPublishedActorIds();
            }
        }
    }

    @Remoted
    public void zzkrouterLostClient() {
        ConnectionRegistry remoteConnection = connection.get();
        if (this instanceof ServingActor) {
            ((ServingActor)((Object)this)).clientDisconnected(null, null);
        }
    }

    @Override
    @Local
    public IPromise getReport() {
        return new Promise<ActorReport>(new ActorReport(this.getActor().getClass().getSimpleName(), this.getMailboxSize(), this.getCallbackSize()));
    }

    @Override
    @Local
    public IPromise<Monitorable[]> getSubMonitorables() {
        return new Promise<Monitorable[]>(new Monitorable[0]);
    }

    public static class ActorReport {
        String clz;
        int mailboxSize;
        int cbqSize;

        public ActorReport() {
        }

        public ActorReport(String clz, int mailboxSize, int cbqSize) {
            this.clz = clz;
            this.mailboxSize = mailboxSize;
            this.cbqSize = cbqSize;
        }

        public String getClz() {
            return this.clz;
        }

        public int getMailboxSize() {
            return this.mailboxSize;
        }

        public int getCbqSize() {
            return this.cbqSize;
        }
    }
}

