/*
 * Decompiled with CFR 0.152.
 */
package org.nustaq.kontraktor.remoting;

import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import org.nustaq.kontraktor.Actor;
import org.nustaq.kontraktor.Actors;
import org.nustaq.kontraktor.Callback;
import org.nustaq.kontraktor.Future;
import org.nustaq.kontraktor.RemoteConnection;
import org.nustaq.kontraktor.Spore;
import org.nustaq.kontraktor.Timeout;
import org.nustaq.kontraktor.impl.BackOffStrategy;
import org.nustaq.kontraktor.impl.CallEntry;
import org.nustaq.kontraktor.impl.CallbackWrapper;
import org.nustaq.kontraktor.impl.InternalActorStoppedException;
import org.nustaq.kontraktor.impl.RemoteScheduler;
import org.nustaq.kontraktor.remoting.ActorRefSerializer;
import org.nustaq.kontraktor.remoting.CallbackRefSerializer;
import org.nustaq.kontraktor.remoting.Coding;
import org.nustaq.kontraktor.remoting.ObjectSocket;
import org.nustaq.kontraktor.remoting.RemotableActor;
import org.nustaq.kontraktor.remoting.RemoteCallEntry;
import org.nustaq.kontraktor.remoting.SerializerType;
import org.nustaq.kontraktor.remoting.SporeRefSerializer;
import org.nustaq.kontraktor.remoting.TimeoutSerializer;
import org.nustaq.kontraktor.util.Log;
import org.nustaq.serialization.FSTConfiguration;
import org.nustaq.serialization.FSTObjectSerializer;

public abstract class RemoteRefRegistry
implements RemoteConnection {
    protected FSTConfiguration conf;
    RemoteScheduler scheduler = new RemoteScheduler();
    AtomicInteger actorIdCount = new AtomicInteger(0);
    ConcurrentHashMap<Integer, Object> publishedActorMapping = new ConcurrentHashMap();
    ConcurrentHashMap<Object, Integer> publishedActorMappingReverse = new ConcurrentHashMap();
    BackOffStrategy backOffStrategy = new BackOffStrategy();
    ConcurrentLinkedQueue<Actor> remoteActors = new ConcurrentLinkedQueue();
    ConcurrentHashMap<Integer, Actor> remoteActorSet = new ConcurrentHashMap();
    public ThreadLocal<ObjectSocket> currentObjectSocket = new ThreadLocal();
    protected volatile boolean terminated = false;
    BiFunction<Actor, String, Boolean> remoteCallInterceptor;
    protected Consumer<Actor> disconnectHandler;

    public RemoteRefRegistry() {
        this(null);
    }

    public RemoteRefRegistry(Coding code) {
        if (code == null) {
            code = new Coding(SerializerType.FSTSer);
        }
        switch (code.getCoding()) {
            case MinBin: {
                this.conf = FSTConfiguration.createMinBinConfiguration();
                break;
            }
            default: {
                this.conf = FSTConfiguration.createDefaultConfiguration();
            }
        }
        this.configureConfiguration(code);
    }

    public BiFunction<Actor, String, Boolean> getRemoteCallInterceptor() {
        return this.remoteCallInterceptor;
    }

    public void setRemoteCallInterceptor(BiFunction<Actor, String, Boolean> remoteCallInterceptor) {
        this.remoteCallInterceptor = remoteCallInterceptor;
    }

    protected void configureConfiguration(Coding code) {
        this.conf.registerSerializer(Actor.class, (FSTObjectSerializer)new ActorRefSerializer(this), true);
        this.conf.registerSerializer(CallbackWrapper.class, (FSTObjectSerializer)new CallbackRefSerializer(this), true);
        this.conf.registerSerializer(Spore.class, (FSTObjectSerializer)new SporeRefSerializer(), true);
        this.conf.registerClass(new Class[]{RemoteCallEntry.class});
        this.conf.registerCrossPlatformClassMapping((String[][])new String[][]{{"call", RemoteCallEntry.class.getName()}, {"cbw", CallbackWrapper.class.getName()}});
        this.conf.registerSerializer(Timeout.class, (FSTObjectSerializer)new TimeoutSerializer(), false);
        if (code.getConfigurator() != null) {
            code.getConfigurator().accept(this.conf);
        }
    }

    public Actor getPublishedActor(int id) {
        return (Actor)this.publishedActorMapping.get(id);
    }

    public Callback getPublishedCallback(int id) {
        return (Callback)this.publishedActorMapping.get(id);
    }

    public RemoteScheduler getScheduler() {
        return this.scheduler;
    }

    public ConcurrentLinkedQueue<Actor> getRemoteActors() {
        return this.remoteActors;
    }

    public boolean isTerminated() {
        return this.terminated;
    }

    public void setTerminated(boolean terminated) {
        this.terminated = terminated;
    }

    public int publishActor(Actor act) {
        Integer integer = this.publishedActorMappingReverse.get(act.getActorRef());
        if (integer == null) {
            integer = this.actorIdCount.incrementAndGet();
            this.publishedActorMapping.put(integer, act.getActorRef());
            this.publishedActorMappingReverse.put(act.getActorRef(), integer);
            act.__addRemoteConnection(this);
        }
        return integer;
    }

    public void unpublishActor(Actor act) {
        Integer integer = this.publishedActorMappingReverse.get(act.getActorRef());
        if (integer != null) {
            this.publishedActorMapping.remove(integer);
            this.publishedActorMappingReverse.remove(act.getActorRef());
            act.__removeRemoteConnection(this);
            if (act instanceof RemotableActor) {
                ((RemotableActor)((Object)act)).$hasBeenUnpublished();
            }
        }
    }

    public int registerPublishedCallback(Callback cb) {
        Integer integer = this.publishedActorMappingReverse.get(cb);
        if (integer == null) {
            integer = this.actorIdCount.incrementAndGet();
            this.publishedActorMapping.put(integer, cb);
            this.publishedActorMappingReverse.put(cb, integer);
        }
        return integer;
    }

    public void removePublishedObject(int receiverKey) {
        Object remove = this.publishedActorMapping.remove(receiverKey);
        if (remove != null) {
            this.publishedActorMappingReverse.remove(remove);
        }
    }

    public void registerRemoteRefDirect(Actor act) {
        act = act.getActorRef();
        this.remoteActorSet.put(act.__remoteId, act);
        this.remoteActors.add(act);
        act.__addStopHandler((actor, err) -> this.remoteRefStopped((Actor)actor));
    }

    public Actor registerRemoteActorRef(Class actorClazz, int remoteId, Object client) {
        Actor actorRef = this.remoteActorSet.get(remoteId);
        if (actorRef == null) {
            Object res = Actors.AsActor((Class<? extends Actor>)actorClazz, this.getScheduler());
            ((Actor)res).__remoteId = remoteId;
            this.remoteActorSet.put(remoteId, (Actor)res);
            this.remoteActors.add((Actor)res);
            ((Actor)res).__addStopHandler((actor, err) -> this.remoteRefStopped((Actor)actor));
            return res;
        }
        return actorRef;
    }

    protected void remoteRefStopped(Actor actor) {
        this.removeRemoteActor(actor);
        actor.getActorRef().__stopped = true;
        ((Actor)actor.getActor()).__stopped = true;
    }

    protected void stopRemoteRefs() {
        new ArrayList<Actor>(this.remoteActors).forEach(actor -> {
            if (this.disconnectHandler != null) {
                this.disconnectHandler.accept((Actor)actor);
            }
            this.removeRemoteActor((Actor)actor);
            actor.getActorRef().__stopped = true;
            if (actor.getActor() != null) {
                ((Actor)actor.getActor()).__stopped = true;
            }
        });
    }

    private void removeRemoteActor(Actor act) {
        this.remoteActorSet.remove(act.__remoteId);
        this.remoteActors.remove(act);
        try {
            act.__stop();
        }
        catch (InternalActorStoppedException internalActorStoppedException) {
            // empty catch block
        }
    }

    protected void sendLoop(ObjectSocket channel) throws IOException {
        try {
            int count = 0;
            while (!this.isTerminated()) {
                if (this.singleSendLoop(channel)) {
                    count = 0;
                }
                this.backOffStrategy.yield(count++);
            }
        }
        finally {
            this.stopRemoteRefs();
        }
    }

    protected void receiveLoop(ObjectSocket channel) {
        try {
            while (!this.isTerminated()) {
                if (!this.singleReceive(channel)) continue;
            }
        }
        catch (Exception e) {
            Log.Lg.infoLong(this, e, "");
        }
        finally {
            this.cleanUp();
        }
    }

    public boolean singleReceive(ObjectSocket channel) throws Exception {
        boolean isContinue;
        Object response = channel.readObject();
        if (!(response instanceof RemoteCallEntry)) {
            if (response != null) {
                Log.Lg.error(this, null, "unexpected response:" + response);
            }
            return true;
        }
        RemoteCallEntry read = (RemoteCallEntry)response;
        boolean bl = isContinue = read.getArgs().length > 1 && "CNT".equals(read.getArgs()[1]);
        if (isContinue) {
            read.getArgs()[1] = "CNT";
        }
        if (read.getQueue() == 0) {
            Actor targetActor = this.getPublishedActor(read.getReceiverKey());
            if (targetActor == null) {
                Log.Lg.error(this, null, "no actor found for key " + read);
                return true;
            }
            if (this.remoteCallInterceptor != null && !this.remoteCallInterceptor.apply(targetActor, read.getMethod()).booleanValue()) {
                Log.Warn((Object)this, "remote message blocked by securityinterceptor " + targetActor.getClass().getName() + " " + read.getMethod());
                return true;
            }
            Object future = targetActor.getScheduler().enqueueCall(null, targetActor, read.getMethod(), read.getArgs(), false);
            if (future instanceof Future) {
                ((Future)future).then((r, e) -> {
                    try {
                        this.receiveCBResult(channel, read.getFutureKey(), r, e);
                    }
                    catch (Exception ex) {
                        Log.Warn(this, ex, "");
                    }
                });
            }
        } else if (read.getQueue() == 1) {
            Callback publishedCallback = this.getPublishedCallback(read.getReceiverKey());
            if (publishedCallback == null) {
                throw new RuntimeException("Publisher already deregistered, use Actor.CONT in order to signal more messages will be sent");
            }
            publishedCallback.receive(read.getArgs()[0], read.getArgs()[1]);
            if (!isContinue) {
                this.removePublishedObject(read.getReceiverKey());
            }
        }
        return false;
    }

    public void cleanUp() {
        this.stopRemoteRefs();
        ((ConcurrentHashMap.KeySetView)this.publishedActorMappingReverse.keySet()).forEach(act -> {
            if (act instanceof Actor) {
                this.unpublishActor((Actor)act);
            }
        });
        this.getFacadeProxy().__removeRemoteConnection(this);
    }

    public boolean singleSendLoop(ObjectSocket chan) throws IOException {
        boolean res = false;
        int sumQueued = 0;
        ArrayList<Actor> toRemove = null;
        for (Actor remoteActor : this.remoteActors) {
            CallEntry ce = (CallEntry)remoteActor.__mailbox.poll();
            if (ce == null) continue;
            if (ce.getMethod().getName().equals("$close")) {
                chan.close();
                continue;
            }
            if (ce.getMethod().getName().equals("$stop")) {
                new Thread(() -> {
                    try {
                        ((Actor)remoteActor.getActor()).$stop();
                    }
                    catch (InternalActorStoppedException internalActorStoppedException) {
                        // empty catch block
                    }
                }, "stopper thread").start();
                continue;
            }
            sumQueued += remoteActor.__mailbox.size();
            int futId = 0;
            if (ce.hasFutureResult()) {
                futId = this.registerPublishedCallback(ce.getFutureCB());
            }
            try {
                RemoteCallEntry rce = new RemoteCallEntry(futId, remoteActor.__remoteId, ce.getMethod().getName(), ce.getArgs());
                rce.setQueue(0);
                this.writeObject(chan, rce);
                res = true;
            }
            catch (Exception ex) {
                chan.setLastError(ex);
                if (toRemove == null) {
                    toRemove = new ArrayList<Actor>();
                }
                toRemove.add(remoteActor);
                remoteActor.$stop();
                Log.Lg.infoLong(this, ex, "connection closed");
                break;
            }
        }
        if (toRemove != null) {
            toRemove.forEach(act -> this.removeRemoteActor((Actor)act));
        }
        if (sumQueued < 100) {
            chan.flush();
        }
        return res;
    }

    protected void writeObject(ObjectSocket chan, RemoteCallEntry rce) throws Exception {
        chan.writeObject(rce);
    }

    public void receiveCBResult(ObjectSocket chan, int id, Object result, Object error) throws Exception {
        if ("EOT".equals(error)) {
            return;
        }
        RemoteCallEntry rce = new RemoteCallEntry(0, id, null, new Object[]{result, error});
        rce.setQueue(1);
        this.writeObject(chan, rce);
    }

    @Override
    public void close() {
        this.cleanUp();
    }

    public FSTConfiguration getConf() {
        return this.conf;
    }

    public abstract Actor getFacadeProxy();

    public void setDisconnectHandler(Consumer<Actor> disconnectHandler) {
        this.disconnectHandler = disconnectHandler;
    }

    public Consumer<Actor> getDisconnectHandler() {
        return this.disconnectHandler;
    }
}

