/*
 * Decompiled with CFR 0.152.
 */
package org.piax.gtrans;

import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Proxy;
import java.nio.channels.ClosedChannelException;
import java.util.Comparator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.piax.common.CalleeId;
import org.piax.common.Endpoint;
import org.piax.common.ObjectId;
import org.piax.common.PeerId;
import org.piax.common.TransportId;
import org.piax.gtrans.Channel;
import org.piax.gtrans.ChannelListener;
import org.piax.gtrans.ChannelTransport;
import org.piax.gtrans.DynamicStub;
import org.piax.gtrans.GTransConfigValues;
import org.piax.gtrans.IdConflictException;
import org.piax.gtrans.NetworkTimeoutException;
import org.piax.gtrans.NoSuchRemoteObjectException;
import org.piax.gtrans.Peer;
import org.piax.gtrans.RPCException;
import org.piax.gtrans.RPCHook;
import org.piax.gtrans.RPCIf;
import org.piax.gtrans.RPCMode;
import org.piax.gtrans.ReceivedMessage;
import org.piax.gtrans.ReturnValue;
import org.piax.gtrans.Transport;
import org.piax.gtrans.TransportListener;
import org.piax.gtrans.impl.RPCInvocationHandler;
import org.piax.util.ClassUtil;
import org.piax.util.MethodUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RPCInvoker<T extends RPCIf, E extends Endpoint>
implements RPCIf {
    private static final Logger logger = LoggerFactory.getLogger(RPCInvoker.class);
    public static boolean POOL_CHANNEL = true;
    public static int POOL_CHANNEL_SIZE = 100;
    public Map<E, ChannelPoolEntry> channelPool;
    public static boolean USE_CHANNEL_FOR_ONEWAY = false;
    private ThreadLocal<PeerId> srcPeerId = new ThreadLocal();
    protected final TransportId transId;
    protected final ObjectId objId;
    protected volatile ChannelTransport<E> trans;
    final Peer peer;
    final TransportListener<E> listener = new TransportListener<E>(){

        @Override
        public void onReceive(Transport<E> trans, ReceivedMessage rmsg) {
            RPCInvoker.this._onReceive(rmsg);
        }
    };
    final ChannelListener<E> chListener = new ChannelListener<E>(){

        @Override
        public boolean onAccepting(Channel<E> ch) {
            return true;
        }

        @Override
        public void onClosed(Channel<E> ch) {
        }

        @Override
        public void onFailure(Channel<E> ch, Exception cause) {
        }

        @Override
        public void onReceive(Channel<E> ch) {
            RPCInvoker.this._onReceive(ch);
        }
    };
    protected volatile boolean isActive = true;

    public PeerId getSrcPeerId() {
        return this.srcPeerId.get();
    }

    public RPCInvoker(TransportId rpcId, ChannelTransport<? super E> trans) throws IdConflictException, IOException {
        this.transId = rpcId;
        this.objId = this.createObjId(trans, rpcId);
        this.trans = trans;
        this.trans.setListener(this.transId, this.listener);
        this.trans.setChannelListener(this.transId, this.chListener);
        this.peer = Peer.getInstance(trans.getPeerId());
        this.peer.registerRPCObject(this.objId, this);
        this.channelPool = new ConcurrentHashMap<E, ChannelPoolEntry>();
        logger.debug("RPCInvoker: rpcId={}, objId={}", (Object)rpcId, (Object)this.objId);
    }

    private ObjectId createObjId(ChannelTransport<? super E> trans, ObjectId rpcId) {
        return new ObjectId(trans.getTransportIdPath().toString() + ":" + rpcId.toString());
    }

    public void fin() {
        this.offline();
        this.isActive = false;
        this.peer.unregisterRPCObject(this.objId, this);
        if (POOL_CHANNEL) {
            logger.debug("pool size ={}", (Object)this.channelPool.size());
            this.channelPool.values().stream().forEach(cpe -> {
                if (!cpe.channel.isClosed()) {
                    cpe.channel.close();
                }
            });
            this.channelPool.clear();
        }
    }

    protected void checkActive() throws IllegalStateException {
        if (!this.isActive) {
            throw new IllegalStateException("this RPCInvoker is already finished");
        }
    }

    public synchronized void changeTransport(ChannelTransport<?> trans) {
        this.checkActive();
        if (this.isOnline()) {
            this.offline();
            this.trans = trans;
            this.online();
        } else {
            this.trans = trans;
        }
    }

    public void registerRPCObject(ObjectId objId, RPCIf obj) throws IdConflictException {
        this.checkActive();
        this.peer.registerRPCObject(objId, obj);
    }

    public boolean unregisterRPCObject(ObjectId objId, RPCIf obj) {
        this.checkActive();
        return this.peer.unregisterRPCObject(objId, obj);
    }

    public RPCIf getRPCObject(ObjectId objId) {
        return this.peer.getRPCObject(objId);
    }

    @Deprecated
    public synchronized void online() {
        this.trans.setChannelListener(this.transId, this.chListener);
    }

    @Deprecated
    public synchronized void offline() {
        this.trans.setChannelListener(this.transId, null);
    }

    @Deprecated
    public synchronized boolean isOnline() {
        return this.trans.getChannelListener(this.transId) != null;
    }

    public ChannelTransport<E> getTransport() {
        return this.trans;
    }

    public E getEndpoint() {
        return this.trans.getEndpoint();
    }

    public T getStub(E remotePeer) {
        return this.getStub(remotePeer, GTransConfigValues.rpcTimeout);
    }

    public T getStub(E remotePeer, int timeout) {
        return (T)this.getStubFor(this.getClass(), this.objId, remotePeer, timeout, RPCMode.AUTO);
    }

    private <S extends RPCIf> S getStubFor(Class<? extends RPCIf> clz, ObjectId targetId, E remotePeer, int timeout, RPCMode rpcMode) {
        this.checkActive();
        ClassLoader loader = clz.getClassLoader();
        Class<RPCIf>[] ifs = ClassUtil.gatherLowerBoundSuperInterfaces(clz, RPCIf.class);
        RPCInvocationHandler<E> handler = new RPCInvocationHandler<E>(this, targetId, remotePeer, timeout, rpcMode);
        return (S)((RPCIf)Proxy.newProxyInstance(loader, ifs, handler));
    }

    public <S extends RPCIf> S getStub(Class<S> clz, ObjectId targetId, E remotePeer, int timeout, RPCMode rpcMode) {
        if (!clz.isInterface()) {
            throw new IllegalArgumentException("specified class is not an interface");
        }
        return this.getStubFor(clz, targetId, remotePeer, timeout, rpcMode);
    }

    public <S extends RPCIf> S getStub(Class<S> clz, ObjectId targetId, E remotePeer, RPCMode rpcMode) {
        return this.getStub(clz, targetId, remotePeer, GTransConfigValues.rpcTimeout, rpcMode);
    }

    public <S extends RPCIf> S getStub(Class<S> clz, ObjectId targetId, E remotePeer, int timeout) {
        if (!clz.isInterface()) {
            throw new IllegalArgumentException("Specified class is not interface");
        }
        return this.getStubFor(clz, targetId, remotePeer, timeout, RPCMode.AUTO);
    }

    public <S extends RPCIf> S getStub(Class<S> clz, ObjectId targetId, E remotePeer) {
        return this.getStub(clz, targetId, remotePeer, GTransConfigValues.rpcTimeout);
    }

    public <S extends RPCIf> S getStub(Class<S> clz, CalleeId cid, int timeout, RPCMode rpcMode) {
        return this.getStub(clz, cid.getTargetId(), cid.getPeerRef(), timeout, rpcMode);
    }

    public <S extends RPCIf> S getStub(Class<S> clz, CalleeId cid, RPCMode rpcMode) {
        return this.getStub(clz, cid.getTargetId(), cid.getPeerRef(), GTransConfigValues.rpcTimeout, rpcMode);
    }

    public <S extends RPCIf> S getStub(Class<S> clz, CalleeId cid, int timeout) {
        return this.getStub(clz, cid.getTargetId(), cid.getPeerRef(), timeout);
    }

    public <S extends RPCIf> S getStub(Class<S> clz, CalleeId cid) {
        return this.getStub(clz, cid.getTargetId(), cid.getPeerRef());
    }

    public Object rcall(ObjectId targetId, E remotePeer, int timeout, RPCMode rpcMode, String method, Object ... args) throws Throwable {
        return this.getStub(DynamicStub.class, targetId, remotePeer, timeout, rpcMode).method(method, args);
    }

    public Object rcall(ObjectId targetId, E remotePeer, RPCMode rpcMode, String method, Object ... args) throws Throwable {
        return this.rcall(targetId, remotePeer, GTransConfigValues.rpcTimeout, rpcMode, method, args);
    }

    public Object rcall(ObjectId targetId, E remotePeer, int timeout, String method, Object ... args) throws Throwable {
        return this.rcall(targetId, remotePeer, timeout, RPCMode.AUTO, method, args);
    }

    public Object rcall(ObjectId targetId, E remotePeer, String method, Object ... args) throws Throwable {
        return this.rcall(targetId, remotePeer, GTransConfigValues.rpcTimeout, method, args);
    }

    public Object rcall(CalleeId cid, int timeout, RPCMode rpcMode, String method, Object ... args) throws Throwable {
        return this.rcall(cid.getTargetId(), cid.getPeerRef(), timeout, rpcMode, method, args);
    }

    public Object rcall(CalleeId cid, RPCMode rpcMode, String method, Object ... args) throws Throwable {
        return this.rcall(cid, GTransConfigValues.rpcTimeout, rpcMode, method, args);
    }

    public Object rcall(CalleeId cid, int timeout, String method, Object ... args) throws Throwable {
        return this.rcall(cid.getTargetId(), cid.getPeerRef(), timeout, RPCMode.AUTO, method, args);
    }

    public Object rcall(CalleeId cid, String method, Object ... args) throws Throwable {
        return this.rcall(cid, GTransConfigValues.rpcTimeout, RPCMode.AUTO, method, args);
    }

    public void changeRPCTimeout(RPCIf stub, int timeout) throws IllegalArgumentException {
        this.checkActive();
        RPCInvocationHandler handler = (RPCInvocationHandler)Proxy.getInvocationHandler(stub);
        handler.setTimeout(timeout);
    }

    protected MethodCall newMethodCall(ObjectId target, E remotePeer, boolean oneway, String method, Object ... args) {
        return new MethodCall(target, this.peer.getPeerId(), oneway, method, args);
    }

    public void sendOnewayInvoke(ObjectId target, E remotePeer, String method, Object ... args) throws RPCException {
        if (USE_CHANNEL_FOR_ONEWAY && POOL_CHANNEL) {
            this.sendInvoke(target, remotePeer, -1, method, args);
        } else {
            try {
                MethodCall mc = this.newMethodCall(target, remotePeer, true, method, args);
                this.trans.send(this.transId, remotePeer, (Object)mc);
            }
            catch (IOException e) {
                throw new RPCException(e);
            }
        }
    }

    private void expireIfNeeded() {
        if (this.channelPool.size() > POOL_CHANNEL_SIZE) {
            this.channelPool.values().stream().sorted(Comparator.comparing(ChannelPoolEntry::getTimestamp)).limit(this.channelPool.size() - POOL_CHANNEL_SIZE).forEach(cpe -> {
                logger.debug("Expiring pooled channel: {}" + cpe.channel);
                ChannelPoolEntry channelPoolEntry = cpe;
                synchronized (channelPoolEntry) {
                    if (!cpe.isInUse()) {
                        cpe.channel.close();
                    }
                }
                this.channelPool.remove(cpe.getRemote());
            });
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ReturnValue<?> sendInvoke(ObjectId target, E remotePeer, int timeout, String method, Object ... args) throws RPCException {
        Channel<Object> ch = null;
        ChannelPoolEntry cpe = null;
        boolean oneway = timeout < 0;
        boolean isNewChannel = false;
        MethodCall mc = null;
        try {
            Map<E, ChannelPoolEntry> map;
            if (POOL_CHANNEL) {
                map = this.channelPool;
                synchronized (map) {
                    cpe = this.channelPool.get(remotePeer);
                    if (cpe == null) {
                        ch = this.trans.newChannel(this.transId, remotePeer);
                        cpe = new ChannelPoolEntry(ch);
                        this.expireIfNeeded();
                        cpe.startUsing();
                        this.channelPool.put(remotePeer, cpe);
                    } else {
                        logger.debug("reused pooled channel:{}" + cpe.channel);
                        ch = cpe.channel;
                        if (ch.isClosed()) {
                            logger.debug("reused but closed channel:{}" + ch);
                            ch = this.trans.newChannel(this.transId, remotePeer);
                        }
                        if (cpe.isInUse() && !oneway) {
                            isNewChannel = true;
                            ch = this.trans.newChannel(this.transId, remotePeer);
                        } else {
                            cpe.startUsing();
                        }
                    }
                }
            } else {
                ch = this.trans.newChannel(this.transId, remotePeer);
            }
            mc = this.newMethodCall(target, remotePeer, oneway, method, args);
            if (oneway) {
                ch.send(mc);
                map = null;
                return map;
            }
            ch.send(mc);
            Object r = ch.receive(timeout);
            if (r == null) {
                if (Thread.currentThread().isInterrupted()) {
                    if (POOL_CHANNEL) {
                        ch.close();
                    }
                    throw new RPCException(new InterruptedException());
                }
                throw new RPCException("RPC return message is null");
            }
            if (!(r instanceof ReturnValue)) {
                throw new RPCException("RPC return message is not ReturnValue");
            }
            ReturnValue returnValue = (ReturnValue)r;
            return returnValue;
        }
        catch (NetworkTimeoutException e) {
            if (POOL_CHANNEL && ch != null) {
                ch.close();
            }
            throw new RPCException(new NetworkTimeoutException(method + " call timed out"));
        }
        catch (IOException e) {
            logger.warn("sendInvoke {} \"{}\" got an exception: {}, remote={}", new Object[]{oneway ? "oneway" : "sync", mc.method, e, ch.getRemote()});
            if (POOL_CHANNEL && ch != null) {
                ch.close();
            }
            throw new RPCException(e);
        }
        finally {
            if (POOL_CHANNEL) {
                if (ch != null) {
                    if (isNewChannel) {
                        ch.close();
                    } else {
                        if (cpe != null) {
                            cpe.endUsing();
                        }
                        if (ch.isClosed()) {
                            this.channelPool.remove(remotePeer);
                        }
                    }
                }
            } else {
                ch.close();
            }
        }
    }

    protected Object invokeInReceive(boolean isOneway, RPCIf obj, MethodCall mc) throws NoSuchMethodException, InvocationTargetException {
        if (RPCHook.hook != null) {
            RPCHook.RValue rv = RPCHook.hook.calleeHook(mc.method, mc.args);
            mc.method = rv.method;
            mc.args = rv.args;
        }
        return MethodUtil.strictInvoke(obj, RPCIf.class, this.peer.getPeerId().equals(mc.srcPeerId), mc.method, mc.args);
    }

    protected void receiveOneway(MethodCall mc) {
        try {
            RPCIf obj = this.getRPCObject(mc.target);
            if (obj == null) {
                throw new NoSuchRemoteObjectException("no such object of ID: " + mc.target + " in " + this.trans.getPeerId());
            }
            this.invokeInReceive(true, obj, mc);
        }
        catch (InvocationTargetException e) {
            logger.info("oneway RPC callee got Exception: \"{}\"", (Object)mc.method);
            logger.info("", e.getCause());
        }
        catch (Throwable e) {
            logger.warn("", e);
        }
    }

    protected ReturnValue<?> receiveSync(MethodCall mc) {
        ReturnValue<Object> ret;
        try {
            RPCIf obj = this.getRPCObject(mc.target);
            ret = obj == null ? new ReturnValue(new RPCException(new NoSuchRemoteObjectException("target object of ID not found: " + mc.target))) : new ReturnValue<Object>(this.invokeInReceive(false, obj, mc));
        }
        catch (InvocationTargetException e) {
            ret = new ReturnValue(e.getCause());
        }
        catch (Throwable e) {
            logger.info("", e);
            ret = new ReturnValue(e);
        }
        return ret;
    }

    private void _onReceive(ReceivedMessage rmsg) {
        MethodCall mc = (MethodCall)rmsg.getMessage();
        if (mc == null) {
            logger.info("null message received");
            return;
        }
        this.srcPeerId.set(mc.srcPeerId);
        this.receiveOneway(mc);
    }

    private void _onReceive(Channel<E> ch) {
        if (ch.isCreatorSide()) {
            return;
        }
        Object obj = ch.receive();
        if (obj == null) {
            logger.info("null message received");
            return;
        }
        if (!(obj instanceof MethodCall)) {
            logger.info("Maybe the reply is received after the caller-channel is closed");
            return;
        }
        MethodCall mc = (MethodCall)obj;
        this.srcPeerId.set(mc.srcPeerId);
        if (mc.oneway) {
            this.receiveOneway(mc);
        } else {
            try {
                ReturnValue<?> ret = this.receiveSync(mc);
                if (ch.isClosed()) {
                    logger.info("channel already closed on the return of \"{}\" method", (Object)mc.method);
                } else {
                    ch.send(ret);
                }
            }
            catch (ClosedChannelException e) {
                logger.warn("", (Throwable)e);
                logger.info("closed channel exception occured to reply to \"{}\", args={}", (Object)mc.method, (Object)mc.args);
            }
            catch (IOException e) {
                logger.warn("", (Throwable)e);
                logger.info("caller could not receive the return of \"{}\" method", (Object)mc.method);
            }
        }
    }

    public static class MethodCall
    implements Serializable {
        private static final long serialVersionUID = 1L;
        protected final ObjectId target;
        protected boolean oneway;
        protected String method;
        protected Object[] args;
        protected PeerId srcPeerId;

        protected MethodCall(ObjectId target, PeerId srcPeerId, boolean oneway, String method, Object ... args) {
            this.target = target;
            this.srcPeerId = srcPeerId;
            this.oneway = oneway;
            this.method = method;
            this.args = args;
        }
    }

    class ChannelPoolEntry {
        public long timestamp;
        public Channel<?> channel;
        public AtomicInteger useCount;

        public ChannelPoolEntry(Channel<?> channel) {
            this.channel = channel;
            this.useCount = new AtomicInteger(0);
            this.timestamp = System.currentTimeMillis();
        }

        public void startUsing() {
            this.useCount.incrementAndGet();
        }

        public void endUsing() {
            this.useCount.decrementAndGet();
            this.timestamp = System.currentTimeMillis();
        }

        public boolean isInUse() {
            return this.useCount.get() >= 1;
        }

        public int useCount() {
            return this.useCount.get();
        }

        public long getTimestamp() {
            return this.timestamp;
        }

        public Endpoint getRemote() {
            return this.channel.getRemote();
        }
    }
}

