/*
 * Decompiled with CFR 0.152.
 */
package org.nustaq.kontraktor.routers;

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.BiFunction;
import org.nustaq.kontraktor.Actor;
import org.nustaq.kontraktor.Callback;
import org.nustaq.kontraktor.IPromise;
import org.nustaq.kontraktor.Promise;
import org.nustaq.kontraktor.annotations.CallerSideMethod;
import org.nustaq.kontraktor.annotations.Local;
import org.nustaq.kontraktor.impl.CallbackWrapper;
import org.nustaq.kontraktor.remoting.base.ConnectionRegistry;
import org.nustaq.kontraktor.remoting.base.ObjectSocket;
import org.nustaq.kontraktor.remoting.base.RemotedActor;
import org.nustaq.kontraktor.remoting.base.ServingActor;
import org.nustaq.kontraktor.remoting.encoding.CallbackRefSerializer;
import org.nustaq.kontraktor.remoting.encoding.RemoteCallEntry;
import org.nustaq.kontraktor.util.Log;

public abstract class AbstractKrouter<T extends AbstractKrouter>
extends Actor<T>
implements RemotedActor,
ServingActor {
    public static final String SERVICE_UNAVAILABLE = "Service unavailable";
    public static long CLIENT_PING_INTERVAL_MS = 5000L;
    protected HashMap<Object, Long> timeoutMap;
    protected HashMap<String, ConnectionRegistry> clients;
    protected Set<Long> nextAliveRemoteActors;
    protected long lastSwitch;
    private boolean stateful = false;

    public IPromise router$RegisterService(Actor remoteRef, boolean stateful) {
        ((AbstractKrouter)this.getActor()).stateful = stateful;
        ((AbstractKrouter)this.getActorRef()).stateful = stateful;
        Log.Info((Object)this, (this.stateful ? "stateful " : "") + "service registered ");
        return null;
    }

    @CallerSideMethod
    protected boolean isStateful() {
        return this.stateful;
    }

    @Local
    public abstract void router$handleServiceDisconnect(Actor var1);

    @Override
    public IPromise<Long> router$clientPing(long tim, long[] publishedActorIds) {
        for (int i = 0; i < publishedActorIds.length; ++i) {
            long publishedActorId = publishedActorIds[i];
            this.nextAliveRemoteActors.add(publishedActorId);
        }
        if (this.lastSwitch == 0L) {
            this.lastSwitch = System.currentTimeMillis();
        } else {
            long now = System.currentTimeMillis();
            if (now - this.lastSwitch > CLIENT_PING_INTERVAL_MS * 2L) {
                Set<Long> tmp = this.nextAliveRemoteActors;
                this.getServices().forEach(serv -> tmp.add(serv.__remoteId));
                this.nextAliveRemoteActors = new HashSet<Long>();
                this.lastSwitch = now;
                ConnectionRegistry reg = (ConnectionRegistry)connection.get();
                long[] alive = new long[tmp.size()];
                int idx = 0;
                for (Long next : tmp) {
                    alive[idx++] = next;
                }
                if (reg != null) {
                    RemoteCallEntry rce = new RemoteCallEntry(0L, 1L, "zzRoutingRefGC", null, reg.getConf().asByteArray((Object)new Object[]{alive}));
                    this.getServices().forEach(service -> this.forwardMultiCall(rce, (Actor)service, reg, null, null));
                }
            }
        }
        return super.router$clientPing(tim, publishedActorIds);
    }

    @Local
    public void init() {
        this.timeoutMap = new HashMap();
        this.clients = new HashMap();
        this.nextAliveRemoteActors = new HashSet<Long>();
        this.delayed(this.getServicePingTimeout(), () -> this.cyclic(this.getServicePingTimeout(), () -> {
            this.pingServices();
            return true;
        }));
        this.delayed(this.getClientPingTimeout(), () -> this.cyclic(this.getClientPingTimeout(), () -> {
            this.checkPingOnClients();
            return true;
        }));
        this.delayed(CLIENT_PING_INTERVAL_MS * 2L, () -> this.cyclic(CLIENT_PING_INTERVAL_MS * 2L, () -> {
            this.timeoutMap.clear();
            return true;
        }));
    }

    @Override
    @CallerSideMethod
    public boolean __dispatchRemoteCall(ObjectSocket objSocket, RemoteCallEntry rce, ConnectionRegistry clientRemoteRegistry, List<IPromise> createdFutures, Object authContext, BiFunction<Actor, String, Boolean> callInterceptor, long delayCode) {
        boolean isCB;
        boolean bl = isCB = rce.getMethod() != null;
        if (isCB && rce.getMethod().startsWith("router$")) {
            return super.__dispatchRemoteCall(objSocket, rce, clientRemoteRegistry, createdFutures, authContext, callInterceptor, delayCode);
        }
        boolean success = this.dispatchRemoteCall(rce, clientRemoteRegistry);
        if (!success) {
            if (rce.getCB() != null) {
                rce.getCB().reject(SERVICE_UNAVAILABLE);
            }
            if (rce.getFutureKey() != 0L) {
                RemoteCallEntry cbrce = this.createErrorPromiseResponse(rce, clientRemoteRegistry);
                clientRemoteRegistry.inFacadeThread(() -> clientRemoteRegistry.forwardRemoteMessage(cbrce));
            }
        }
        return false;
    }

    @Local
    public void pingServices() {
        this.getServices().forEach(serv -> serv.ping().then(r -> this.timeoutMap.put(serv, System.currentTimeMillis())));
        this.getServices().forEach(serv -> {
            Long tim = this.timeoutMap.get(serv);
            if (tim != null && System.currentTimeMillis() - tim > this.getServicePingTimeout() * 2L) {
                Log.Info((Object)this, "service timeout, closing " + serv);
                this.handleServiceDiscon((Actor)serv);
                if (serv.isPublished()) {
                    serv.close();
                }
            }
        });
    }

    @Local
    public void checkPingOnClients() {
        long now = System.currentTimeMillis();
        this.clients.forEach((id, reg) -> {
            if (this.isService((ConnectionRegistry)reg)) {
                boolean bl = true;
            } else if (now - reg.getLastRoutingClientPing() > this.getClientPingTimeout() || reg.isTerminated()) {
                ((AbstractKrouter)this.self()).clientDisconnected((ConnectionRegistry)reg, (String)id);
            }
        });
    }

    private boolean isService(ConnectionRegistry reg) {
        return reg.getLastRoutingClientPing() == 0L;
    }

    protected long getServicePingTimeout() {
        return 1000L;
    }

    protected long getClientPingTimeout() {
        return CLIENT_PING_INTERVAL_MS * 2L;
    }

    @CallerSideMethod
    protected void sendFailoverNotification(ConnectionRegistry clientRemoteRegistry) {
        ((AbstractKrouter)this.getActor()).sendFailoverNotificationInternal(clientRemoteRegistry);
    }

    protected void sendFailoverNotificationInternal(ConnectionRegistry registry) {
        RemoteCallEntry rce = new RemoteCallEntry(0L, 0L, "krouterTargetDidChange", null, registry.getConf().asByteArray((Object)new Object[0]));
        registry.inFacadeThread(() -> registry.forwardRemoteMessage(rce));
    }

    protected abstract List<Actor> getServices();

    @CallerSideMethod
    protected RemoteCallEntry createErrorPromiseResponse(RemoteCallEntry rce, ConnectionRegistry clientRemoteRegistry) {
        RemoteCallEntry cbrce = new RemoteCallEntry();
        cbrce.setReceiverKey(rce.getFutureKey());
        cbrce.setSerializedArgs(clientRemoteRegistry.getConf().asByteArray((Object)new Object[]{null, SERVICE_UNAVAILABLE}));
        cbrce.setQueue(1);
        return cbrce;
    }

    @CallerSideMethod
    protected void forwardMultiCall(RemoteCallEntry rce, Actor remoteRef, ConnectionRegistry clientRemoteRegistry, boolean[] done, Callback[] selected) {
        ((AbstractKrouter)this.getActor()).forwardMultiCallInternal(rce, remoteRef, clientRemoteRegistry, done, selected);
    }

    @CallerSideMethod
    protected void forwardMultiCallInternal(RemoteCallEntry rceIn, Actor remoteRef, ConnectionRegistry clientRemoteRegistry, boolean[] done, Callback[] selected) {
        RemoteCallEntry rce = rceIn.createCopy();
        Runnable toRun = () -> {
            ConnectionRegistry serviceRemoteReg = remoteRef.__self.__clientConnection;
            if (rce.getReceiverKey() == 1L) {
                rce.setReceiverKey(remoteRef.__remoteId);
            }
            if (rce.getFutureKey() != 0L) {
                long prevFuturekey = rce.getFutureKey();
                Promise p = new Promise();
                long cbid = serviceRemoteReg.registerPublishedCallback(p);
                rce.setFutureKey(-cbid);
                p.then((r, e) -> ((AbstractKrouter)this.self()).execute(() -> {
                    serviceRemoteReg.removePublishedObject(cbid);
                    if (serviceRemoteReg.isTerminated()) {
                        this.handleServiceDiscon(remoteRef);
                        return;
                    }
                    if (!done[0]) {
                        done[0] = true;
                        RemoteCallEntry cbrce = (RemoteCallEntry)r;
                        cbrce.setReceiverKey(prevFuturekey);
                        clientRemoteRegistry.forwardRemoteMessage(cbrce);
                    }
                }));
            }
            if (rce.getCB() != null) {
                Callback cb = rce.getCB();
                CallbackWrapper[] wrapperTrick = new CallbackWrapper[]{null};
                CallbackWrapper<Object> wrapper = wrapperTrick[0] = new CallbackWrapper<Object>((Actor)this.self(), (r, e) -> {
                    if (selected[0] == null) {
                        selected[0] = wrapperTrick[0];
                    }
                    if (selected[0] == wrapperTrick[0]) {
                        cb.complete(r, e);
                    }
                });
                rce.setCB(wrapper);
            }
            if (!serviceRemoteReg.isTerminated()) {
                serviceRemoteReg.forwardRemoteMessage(rce);
            } else {
                this.handleServiceDiscon(remoteRef);
            }
        };
        if (Thread.currentThread() != this.getCurrentDispatcher()) {
            ((AbstractKrouter)this.self()).execute(toRun);
        } else {
            toRun.run();
        }
    }

    @CallerSideMethod
    protected abstract boolean dispatchRemoteCall(RemoteCallEntry var1, ConnectionRegistry var2);

    @CallerSideMethod
    protected void forwardCall(RemoteCallEntry rce, Actor remoteRef, ConnectionRegistry clientRemoteRegistry) {
        ((AbstractKrouter)this.getActor()).forwardCallInternal(rce, remoteRef, clientRemoteRegistry);
    }

    protected void forwardCallInternal(RemoteCallEntry rce, Actor remoteRef, ConnectionRegistry clientRemoteRegistry) {
        RemoteCallEntry finalRce = rce.createCopy();
        Runnable toRun = () -> {
            ConnectionRegistry serviceRemoteReg = remoteRef.__self.__clientConnection;
            if (finalRce.getReceiverKey() == 1L) {
                finalRce.setReceiverKey(remoteRef.__remoteId);
            }
            if (finalRce.getFutureKey() != 0L) {
                long prevFuturekey = finalRce.getFutureKey();
                Promise p = new Promise();
                long cbid = serviceRemoteReg.registerPublishedCallback(p);
                finalRce.setFutureKey(-cbid);
                p.then((r, e) -> {
                    serviceRemoteReg.removePublishedObject(cbid);
                    if (serviceRemoteReg.isTerminated()) {
                        this.handleServiceDiscon(remoteRef);
                        return;
                    }
                    RemoteCallEntry cbrce = (RemoteCallEntry)r;
                    cbrce.setReceiverKey(prevFuturekey);
                    ((AbstractKrouter)this.self()).execute(() -> clientRemoteRegistry.forwardRemoteMessage(cbrce));
                });
            }
            if (finalRce.getCB() != null) {
                CallbackRefSerializer.MyRemotedCallback cb = (CallbackRefSerializer.MyRemotedCallback)finalRce.getCB();
                long id = cb.getId();
                finalRce.setCB(new CallbackWrapper((Actor)this.self(), (res, err) -> {
                    RemoteCallEntry rcerouted = (RemoteCallEntry)res;
                    rcerouted.setReceiverKey(id);
                    clientRemoteRegistry.forwardRemoteMessage(rcerouted);
                }){

                    @Override
                    public boolean isRouted() {
                        return true;
                    }
                });
            }
            if (!serviceRemoteReg.isTerminated()) {
                serviceRemoteReg.forwardRemoteMessage(finalRce);
            } else {
                this.handleServiceDiscon(remoteRef);
            }
        };
        if (Thread.currentThread() != this.getCurrentDispatcher()) {
            ((AbstractKrouter)this.self()).execute(toRun);
        } else {
            toRun.run();
        }
    }

    @CallerSideMethod
    protected void handleServiceDiscon(Actor remoteRef) {
        ((AbstractKrouter)this.self()).router$handleServiceDisconnect(remoteRef);
    }

    @Override
    public void hasBeenUnpublished(String connectionIdentifier) {
        Log.Info((Object)this, "Krouter lost client " + connectionIdentifier);
    }

    @Override
    public void clientConnected(ConnectionRegistry connectionRegistry, String connectionIdentifier) {
        Log.Info((Object)this, "client connected " + connectionIdentifier);
        this.clients.put(connectionIdentifier, connectionRegistry);
    }

    @Override
    public void clientDisconnected(ConnectionRegistry connectionRegistry, String connectionIdentifier) {
        Log.Info((Object)this, "client disconnected " + connectionIdentifier);
        this.clients.remove(connectionIdentifier, connectionRegistry);
    }
}

