/*
 * Decompiled with CFR 0.152.
 */
package org.piax.gtrans.netty.idtrans;

import io.netty.bootstrap.AbstractBootstrap;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.piax.common.ComparableKey;
import org.piax.common.ObjectId;
import org.piax.common.Option;
import org.piax.common.PeerId;
import org.piax.common.TransportId;
import org.piax.gtrans.Channel;
import org.piax.gtrans.ChannelListener;
import org.piax.gtrans.ChannelTransport;
import org.piax.gtrans.IdConflictException;
import org.piax.gtrans.Peer;
import org.piax.gtrans.ProtocolUnsupportedException;
import org.piax.gtrans.ReceivedMessage;
import org.piax.gtrans.TransOptions;
import org.piax.gtrans.TransportListener;
import org.piax.gtrans.impl.ChannelTransportImpl;
import org.piax.gtrans.netty.ControlMessage;
import org.piax.gtrans.netty.NettyLocator;
import org.piax.gtrans.netty.NettyMessage;
import org.piax.gtrans.netty.bootstrap.NettyBootstrap;
import org.piax.gtrans.netty.bootstrap.SslBootstrap;
import org.piax.gtrans.netty.bootstrap.TcpBootstrap;
import org.piax.gtrans.netty.bootstrap.UdtBootstrap;
import org.piax.gtrans.netty.idtrans.IdChannel;
import org.piax.gtrans.netty.idtrans.LocatorChannel;
import org.piax.gtrans.netty.idtrans.LocatorManager;
import org.piax.gtrans.netty.idtrans.PrimaryKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IdChannelTransport
extends ChannelTransportImpl<PrimaryKey>
implements ChannelTransport<PrimaryKey> {
    LocatorManager mgr;
    protected static final Logger logger = LoggerFactory.getLogger((String)IdChannelTransport.class.getName());
    protected final ConcurrentHashMap<String, IdChannel> ichannels = new ConcurrentHashMap();
    protected EventLoopGroup bossGroup;
    protected EventLoopGroup serverGroup;
    protected EventLoopGroup clientGroup;
    boolean supportsDuplex = true;
    protected PrimaryKey ep = null;
    protected final PeerId peerId;
    private ConcurrentHashMap<NettyLocator, LocatorChannelEntry> raws;
    protected final Random rand = new Random(System.currentTimeMillis());
    protected boolean isRunning = false;
    protected final ChannelGroup schannels = new DefaultChannelGroup((EventExecutor)GlobalEventExecutor.INSTANCE);
    protected final ChannelGroup cchannels = new DefaultChannelGroup((EventExecutor)GlobalEventExecutor.INSTANCE);
    final ChannelFuture serverFuture;
    private NettyBootstrap<PrimaryKey> bs;
    private AtomicInteger seq;
    public static Option.IntegerOption RAW_POOL_SIZE = new Option.IntegerOption(30, "-pool-size");
    public AttributeKey<String> rawChannelKey = AttributeKey.valueOf((String)"rawKey");
    public AttributeKey<LocatorChannelEntry> LCE_KEY = AttributeKey.valueOf((String)"locatorChannelEntry");
    protected static final int FORWARD_HOPS_LIMIT = 5;
    public int forwardCount = 0;
    protected static final int ID_CHANNEL_REMOVE_CLOSED_THRESHOLD = 2000;

    protected void inboundActive(ChannelHandlerContext ctx) {
        logger.debug("inbound active: " + ctx.channel().remoteAddress());
        this.schannels.add((Object)ctx.channel());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void inboundReceive(ChannelHandlerContext ctx, Object msg) {
        block41: {
            block40: {
                if (!(msg instanceof ControlMessage)) break block40;
                ControlMessage cmsg = (ControlMessage)msg;
                PrimaryKey curSrc = this.mgr.updateAndGet((PrimaryKey)cmsg.getSource());
                logger.debug("received attempt: " + cmsg.getArg() + " from " + ctx);
                switch (cmsg.type) {
                    case ATTEMPT: {
                        ConcurrentHashMap<NettyLocator, LocatorChannelEntry> concurrentHashMap = this.raws;
                        synchronized (concurrentHashMap) {
                            LocatorChannelEntry ent = this.raws.get(curSrc.getLocator());
                            if (ent != null && this.ep.equals(curSrc)) {
                                logger.debug("loop back:" + this.ep);
                                LocatorChannelEntry locatorChannelEntry = ent;
                                synchronized (locatorChannelEntry) {
                                    ent.channel.touch();
                                    ent.channel.setChannel(ctx.channel());
                                    ent.channel.setStat(LocatorChannel.Stat.RUN);
                                    ent.channel.setPrimaryKey(this.ep);
                                    ent.future.complete(ent.channel);
                                    ctx.writeAndFlush(new ControlMessage<PrimaryKey>(ControlMessage.ControlType.ACK, this.ep, this.ep, null));
                                }
                            }
                            if (ent != null && ent.channel.attempt != null) {
                                logger.debug("NOT a loop back:" + this.ep);
                                LocatorChannelEntry locatorChannelEntry = ent;
                                synchronized (locatorChannelEntry) {
                                    ent.channel.touch();
                                    if (ent.channel.attempt > (Integer)cmsg.getArg()) {
                                        logger.debug("attempt won on " + ent.channel);
                                        ctx.writeAndFlush(new ControlMessage<Object>(ControlMessage.ControlType.NACK, this.ep, null, null));
                                    } else {
                                        logger.debug("attempt lose on " + ent.channel);
                                        ctx.writeAndFlush(new ControlMessage<Object>(ControlMessage.ControlType.ACK, this.ep, null, null));
                                        if (ent.channel.getStat() == LocatorChannel.Stat.WAIT || ent.channel.getStat() == LocatorChannel.Stat.DENIED) {
                                            logger.debug("set as RUN by accepting channel on loser: {}", (Object)this.ep);
                                            if (ent.channel.getChannel() != null) {
                                                ent.channel.getChannel().close();
                                            }
                                            ent.channel.setChannel(ctx.channel());
                                            ent.channel.touch();
                                            ent.channel.setStat(LocatorChannel.Stat.RUN);
                                            ent.channel.setPrimaryKey(curSrc);
                                            ent.future.complete(ent.channel);
                                        }
                                    }
                                }
                            }
                            LocatorChannelEntry locatorChannelEntry = ent = new LocatorChannelEntry(new LocatorChannel(curSrc.getLocator(), this), new CompletableFuture<LocatorChannel>());
                            synchronized (locatorChannelEntry) {
                                ctx.channel().attr(this.LCE_KEY).set((Object)ent);
                                ent.channel.setStat(LocatorChannel.Stat.RUN);
                                ent.channel.setChannel(ctx.channel());
                                ent.channel.setPrimaryKey(curSrc);
                                ent.future.complete(ent.channel);
                                logger.debug("set run stat for raw from source=" + curSrc);
                            }
                            ctx.writeAndFlush(new ControlMessage<Object>(ControlMessage.ControlType.ACK, this.ep, null, null));
                            this.raws.put(curSrc.getLocator(), ent);
                            break;
                        }
                    }
                    case ACK: {
                        logger.debug("attempt ACK received from client");
                        break;
                    }
                    case NACK: {
                        logger.debug("attempt NACK received from client");
                        break;
                    }
                    case CLOSE: {
                        logger.debug("close id channel: {}/{}", (Object)((Integer)cmsg.getArg()), (Object)curSrc);
                        IdChannel c = this.ichannels.get(IdChannel.getKeyString((Integer)cmsg.getArg(), curSrc));
                        if (c != null) {
                            this.closeIdChannel(this.ichannels.get(IdChannel.getKeyString((Integer)cmsg.getArg(), curSrc)));
                            break;
                        }
                        break block41;
                    }
                    default: {
                        logger.error("Unimplemented control message was received.");
                        break;
                    }
                }
                break block41;
            }
            if (msg instanceof NettyMessage) {
                NettyMessage nmsg = (NettyMessage)msg;
                logger.debug("inbound received msg{}: on {}", (Object)nmsg, (Object)this.ep);
                PrimaryKey curSrc = this.mgr.updateAndGet((PrimaryKey)nmsg.getSource());
                if (nmsg.isChannelSend()) {
                    IdChannel ch = null;
                    ConcurrentHashMap<String, IdChannel> concurrentHashMap = this.ichannels;
                    synchronized (concurrentHashMap) {
                        ch = this.ichannels.get(IdChannel.getKeyString(nmsg.channelNo(), (PrimaryKey)nmsg.getChannelInitiator()));
                        if (ch == null) {
                            ConcurrentHashMap<NettyLocator, LocatorChannelEntry> concurrentHashMap2 = this.raws;
                            synchronized (concurrentHashMap2) {
                                LocatorChannelEntry ent = this.raws.get(curSrc.getLocator());
                                logger.debug("locator channel for {} is {} ", (Object)curSrc.getLocator(), (Object)ent);
                                if (ent == null || ent.channel.getStat() != LocatorChannel.Stat.RUN) {
                                    logger.warn("receive in illegal state {} from {} (channel not running): throwing it away.", ent == null ? "null" : ent.channel.getStat(), nmsg.getSource());
                                } else {
                                    this.expireClosedIdChannels();
                                    ch = new IdChannel(nmsg.channelNo(), (PrimaryKey)nmsg.getChannelInitiator(), (PrimaryKey)nmsg.getChannelInitiator(), nmsg.getObjectId(), nmsg.getObjectId(), false, ent.channel, this);
                                    this.ichannels.put(ch.getKeyString(), ch);
                                }
                            }
                        } else {
                            if (ch.isClosed()) {
                                return;
                            }
                            logger.debug("response for call from inbound on {} received.", (Object)ch);
                        }
                    }
                    if (ch != null) {
                        ch.raw.touch();
                        this.messageReceived(ch, nmsg);
                    }
                } else {
                    logger.debug("received oneway msg={}", msg);
                    this.messageReceived(null, nmsg);
                }
            }
        }
    }

    void messageReceived(IdChannel c, NettyMessage<PrimaryKey> nmsg) {
        if (!nmsg.isChannelSend()) {
            TransportListener<PrimaryKey> listener = this.getListener(nmsg.getObjectId());
            if (listener != null) {
                ReceivedMessage rmsg = new ReceivedMessage(nmsg.getObjectId(), nmsg.getSource(), nmsg.getMsg());
                logger.debug("trans received {} from {}", rmsg.getMessage(), (Object)nmsg.getSource());
                listener.onReceive(this, rmsg);
            }
        } else {
            ChannelListener<PrimaryKey> clistener = this.getChannelListener(nmsg.getObjectId());
            c.putReceiveQueue(nmsg.getMsg());
            if (clistener != null) {
                clistener.onReceive(c);
            }
        }
    }

    protected void inboundInactive(ChannelHandlerContext ctx) {
        logger.debug("inbound inactive: " + ctx.channel().remoteAddress());
        LocatorChannelEntry lce = (LocatorChannelEntry)ctx.channel().attr(this.LCE_KEY).get();
        if (lce != null) {
            this.raws.remove(lce.channel.getRemote());
        }
        ctx.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void outboundActive(LocatorChannelEntry ent, ChannelHandlerContext ctx) {
        logger.debug("outbound active: {} on {}", (Object)ctx.channel().remoteAddress(), (Object)this.ep);
        ctx.channel().attr(this.LCE_KEY).set((Object)ent);
        this.cchannels.add((Object)ctx.channel());
        int attemptRand = this.rand.nextInt();
        InetSocketAddress sa = (InetSocketAddress)ctx.channel().remoteAddress();
        NettyLocator dst = new NettyLocator(sa.getHostName(), sa.getPort());
        ControlMessage<Object> attempt = new ControlMessage<Object>(ControlMessage.ControlType.ATTEMPT, this.ep, null, attemptRand);
        ConcurrentHashMap<NettyLocator, LocatorChannelEntry> concurrentHashMap = this.raws;
        synchronized (concurrentHashMap) {
            LocatorChannelEntry locatorChannelEntry = ent;
            synchronized (locatorChannelEntry) {
                ent.channel.setAttempt(attemptRand);
                ent.channel.setChannel(ctx.channel());
                if (ent.channel.getStat() == LocatorChannel.Stat.INIT) {
                    ent.channel.setStat(LocatorChannel.Stat.WAIT);
                }
            }
            this.raws.put(dst, ent);
        }
        ctx.writeAndFlush(attempt);
        logger.debug("sent attempt to " + dst + " : " + ctx);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void outboundReceive(LocatorChannelEntry ent, ChannelHandlerContext ctx, Object msg) {
        block32: {
            block31: {
                if (!(msg instanceof ControlMessage)) break block31;
                ControlMessage cmsg = (ControlMessage)msg;
                PrimaryKey curSrc = this.mgr.updateAndGet((PrimaryKey)cmsg.getSource());
                logger.debug("outbound attempt response=" + (Object)((Object)cmsg.type) + ",from=" + curSrc + ",on=" + this.ep);
                switch (IdChannelTransport.$SWITCH_TABLE$org$piax$gtrans$netty$ControlMessage$ControlType()[cmsg.type.ordinal()]) {
                    case 1: {
                        logger.debug("attempt received from server");
                        break;
                    }
                    case 2: {
                        LocatorChannelEntry locatorChannelEntry = ent;
                        synchronized (locatorChannelEntry) {
                            ent.channel.setStat(LocatorChannel.Stat.RUN);
                            ent.channel.setPrimaryKey(curSrc);
                            ent.future.complete(ent.channel);
                            break;
                        }
                    }
                    case 3: {
                        ctx.close();
                        LocatorChannelEntry locatorChannelEntry = ent;
                        synchronized (locatorChannelEntry) {
                            switch (ent.channel.getStat()) {
                                case RUN: {
                                    break;
                                }
                                case WAIT: {
                                    logger.debug("received NAC while WAITING, set DENIED.");
                                    ent.channel.setStat(LocatorChannel.Stat.DENIED);
                                    ent.channel.setChannel(null);
                                    break;
                                }
                                default: {
                                    logger.debug("illegal raw state {}" + ent.channel);
                                }
                            }
                            break;
                        }
                    }
                    case 7: {
                        logger.debug("close id channel: {}/{}", (Object)((Integer)cmsg.getArg()), (Object)curSrc);
                        IdChannel c = this.ichannels.get(IdChannel.getKeyString((Integer)cmsg.getArg(), curSrc));
                        if (c == null) break block32;
                        this.closeIdChannel(this.ichannels.get(IdChannel.getKeyString((Integer)cmsg.getArg(), curSrc)));
                    }
                    default: {
                        break;
                    }
                    {
                    }
                }
                break block32;
            }
            if (msg instanceof NettyMessage) {
                NettyMessage nmsg = (NettyMessage)msg;
                PrimaryKey curSrc = this.mgr.updateAndGet((PrimaryKey)nmsg.getSource());
                logger.debug("outbound received msg {} on {}", (Object)nmsg, (Object)this.ep);
                LocatorChannelEntry c = ent;
                synchronized (c) {
                    if (ent.channel.getStat() != LocatorChannel.Stat.RUN) {
                        ent.channel.getStat();
                    }
                }
                if (nmsg.isChannelSend()) {
                    IdChannel ch = null;
                    ConcurrentHashMap<String, IdChannel> concurrentHashMap = this.ichannels;
                    synchronized (concurrentHashMap) {
                        ch = this.ichannels.get(IdChannel.getKeyString(nmsg.channelNo(), (PrimaryKey)nmsg.getChannelInitiator()));
                        logger.debug("got stored ch=" + ch + " for msg: " + nmsg.getMsg());
                        if (ch == null) {
                            this.expireClosedIdChannels();
                            ch = new IdChannel(nmsg.channelNo(), (PrimaryKey)nmsg.getChannelInitiator(), (PrimaryKey)nmsg.getChannelInitiator(), nmsg.getObjectId(), nmsg.getObjectId(), false, ent.channel, this);
                            logger.debug("new channel for {} by {}", (Object)ent, nmsg.getMsg());
                            this.ichannels.put(ch.getKeyString(), ch);
                        }
                    }
                    if (ch != null) {
                        this.messageReceived(ch, nmsg);
                    }
                } else {
                    logger.debug("received oneway msg={}", msg);
                    this.messageReceived(null, nmsg);
                }
            }
        }
    }

    protected void outboundInactive(ChannelHandlerContext ctx) {
        logger.debug("outbound inactive: {} on {}", (Object)ctx.channel().remoteAddress(), (Object)this.ep);
        LocatorChannelEntry ent = (LocatorChannelEntry)ctx.channel().attr(this.LCE_KEY).get();
        this.raws.remove(ent.channel.getRemote());
        ctx.close();
    }

    void closeIdChannel(IdChannel ch) {
        ch.isClosed = true;
    }

    void expireClosedIdChannels() {
        Iterator<Map.Entry<String, IdChannel>> it = this.ichannels.entrySet().iterator();
        while (it.hasNext()) {
            if (it.next().getValue().elapsedTimeAfterClose() <= 2000L) continue;
            it.remove();
        }
    }

    public IdChannelTransport(Peer peer, TransportId transId, PeerId peerId, NettyLocator peerLocator) throws IdConflictException, IOException {
        this(peer, transId, peerId, new PrimaryKey((ComparableKey<?>)peerId, peerLocator));
    }

    public IdChannelTransport(Peer peer, TransportId transId, PeerId peerId, PrimaryKey key) throws IdConflictException, IOException {
        super(peer, transId, null, true);
        this.peerId = peerId;
        if (key.getRawKey() == null) {
            key.setRawKey((ComparableKey<?>)peerId);
        }
        this.ep = key;
        NettyLocator peerLocator = key.getLocator();
        this.mgr = new LocatorManager();
        this.raws = new ConcurrentHashMap();
        this.seq = new AtomicInteger(0);
        if (peerLocator != null) {
            switch (peerLocator.getType()) {
                case TCP: {
                    this.bs = new TcpBootstrap<PrimaryKey>();
                    break;
                }
                case SSL: {
                    this.bs = new SslBootstrap<PrimaryKey>(peerLocator.getHost(), peerLocator.getPort());
                    break;
                }
                case UDT: {
                    this.bs = new UdtBootstrap<PrimaryKey>();
                    break;
                }
                default: {
                    throw new ProtocolUnsupportedException("not implemented yet.");
                }
            }
            this.bossGroup = this.bs.getParentEventLoopGroup();
            this.serverGroup = this.bs.getChildEventLoopGroup();
            this.clientGroup = this.bs.getClientEventLoopGroup();
            AbstractBootstrap b = this.bs.getServerBootstrap(new InboundHandler(this));
            this.serverFuture = b.bind((SocketAddress)new InetSocketAddress(peerLocator.getHost(), peerLocator.getPort())).syncUninterruptibly();
            logger.debug("bound " + this.ep);
        } else {
            this.serverFuture = null;
        }
        this.isRunning = true;
    }

    public IdChannelTransport(Peer peer, TransportId transId, PeerId peerId) throws IdConflictException, IOException {
        this(peer, transId, peerId, new PrimaryKey((ComparableKey<?>)peerId, null));
    }

    public void waitForFin() {
        this.serverFuture.channel().closeFuture().awaitUninterruptibly();
    }

    @Override
    public boolean isUp() {
        return this.isRunning;
    }

    @Override
    public void fin() {
        logger.debug("running fin.");
        this.isRunning = false;
        this.cchannels.close().awaitUninterruptibly();
        this.schannels.close().awaitUninterruptibly();
        if (this.serverFuture.channel().isOpen()) {
            this.serverFuture.channel().close().awaitUninterruptibly();
        }
        this.bossGroup.shutdownGracefully();
        this.serverGroup.shutdownGracefully();
        this.clientGroup.shutdownGracefully();
        this.mgr.fin();
    }

    private CompletableFuture<LocatorChannel> createLocatorChannel(NettyLocator locator, TransOptions opts) {
        LocatorChannelEntry ent = new LocatorChannelEntry(new LocatorChannel(locator, this), new CompletableFuture<LocatorChannel>());
        logger.debug("initiating locator channel to {}", (Object)locator);
        this.raws.put(locator, ent);
        if (!this.isRunning) {
            ent.future.completeExceptionally(new IOException("generating a channel on stopped transport."));
        } else {
            Bootstrap b = this.bs.getBootstrap(locator, new OutboundHandler(ent, this));
            b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (Object)((int)opts.getTimeout()));
            ChannelFuture f = this.bs.connect(b, locator.getHost(), locator.getPort());
            if (f != null) {
                f.addListener(future -> {
                    if (future.isCancelled()) {
                        logger.trace("connection timed out to {}: {} sec", (Object)locator, (Object)opts.getTimeout());
                        locatorChannelEntry.future.completeExceptionally(new IOException("connection timed out to " + locator + ": " + opts.getTimeout() + " sec"));
                    } else if (!future.isSuccess()) {
                        locatorChannelEntry.future.completeExceptionally(new IOException(f.cause()));
                    }
                    locatorChannelEntry.channel.setNettyChannel(((ChannelFuture)future).channel());
                });
            }
        }
        return ent.future;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompletableFuture<LocatorChannel> getRawCreate(PrimaryKey key, NettyLocator dst, TransOptions opts) {
        ConcurrentHashMap<NettyLocator, LocatorChannelEntry> concurrentHashMap = this.raws;
        synchronized (concurrentHashMap) {
            ArrayList obsoletes = new ArrayList();
            this.raws.values().stream().sorted((x, y) -> (int)(x.channel.lastUse - y.channel.lastUse)).limit((Integer)RAW_POOL_SIZE.value() > this.raws.size() ? 0 : this.raws.size() - (Integer)RAW_POOL_SIZE.value()).forEach(ent -> obsoletes.add(ent));
            obsoletes.stream().forEach(ent -> {
                try {
                    NettyLocator nl = ent.channel.getRemote();
                    ent.channel.closeAsync(true);
                    this.raws.remove(nl);
                }
                catch (Exception e) {
                    this.raws.remove(ent.channel.getRemote());
                }
                logger.debug("channel expired for: " + ent + " on " + this.ep);
            });
            LocatorChannelEntry ent2 = this.raws.get(dst);
            if (ent2 == null) {
                return this.createLocatorChannel(dst, opts);
            }
            return ent2.future;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompletableFuture<IdChannel> getChannelCreate(int channelNo, PrimaryKey dst, ObjectId sender, ObjectId receiver, TransOptions opts) {
        ConcurrentHashMap<String, IdChannel> concurrentHashMap = this.ichannels;
        synchronized (concurrentHashMap) {
            NettyLocator direct;
            IdChannel ch;
            IdChannel idChannel = ch = dst.rawKey == null ? null : this.ichannels.get(IdChannel.getKeyString(channelNo, dst));
            if (ch == null && (direct = this.mgr.getLocator(dst)) != null) {
                PrimaryKey curDst = this.mgr.updateAndGet(dst);
                CompletableFuture<IdChannel> future = new CompletableFuture<IdChannel>();
                CompletableFuture<LocatorChannel> lfuture = this.getRawCreate(curDst, direct, opts);
                lfuture.whenComplete((ret, e) -> {
                    logger.debug("getRawCreate is completed with:" + ret);
                    if (e != null) {
                        future.completeExceptionally((Throwable)e);
                    } else {
                        this.expireClosedIdChannels();
                        IdChannel newCh = new IdChannel(this.seq.incrementAndGet(), this.ep, ret.getPrimaryKey(), sender, receiver, true, (LocatorChannel)ret, this);
                        if (primaryKey.rawKey == null) {
                            this.mgr.updateKey(direct, ret.getPrimaryKey());
                        }
                        this.ichannels.put(newCh.getKeyString(), newCh);
                        future.complete(newCh);
                    }
                });
                return future;
            }
            return CompletableFuture.completedFuture(ch);
        }
    }

    @Override
    public Channel<PrimaryKey> newChannel(ObjectId sender, ObjectId receiver, PrimaryKey dst, boolean isDuplex, int timeout) throws ProtocolUnsupportedException, IOException {
        IdChannel ret;
        logger.debug("new channel for: " + dst + " on " + this.ep);
        try {
            ret = this.getChannelCreate(this.seq.incrementAndGet(), dst, sender, receiver, new TransOptions((long)timeout)).get();
        }
        catch (Exception e) {
            throw new IOException(e);
        }
        return ret;
    }

    @Override
    public void send(ObjectId sender, ObjectId receiver, PrimaryKey dst, Object msg, TransOptions opts) throws ProtocolUnsupportedException, IOException {
        this.sendAsync(sender, receiver, dst, msg, opts);
    }

    @Override
    public CompletableFuture<Void> sendAsync(ObjectId sender, ObjectId receiver, PrimaryKey dst, Object msg, TransOptions opts) {
        if (opts == null) {
            opts = new TransOptions();
        }
        NettyMessage<Object> nmsg = new NettyMessage<Object>(receiver, this.ep, dst, null, this.getPeerId(), msg, false, 0);
        if (this.ep.equals(dst)) {
            this.messageReceived(null, nmsg);
            return CompletableFuture.completedFuture(null);
        }
        logger.debug("sending async {}", nmsg);
        CompletableFuture<Void> retf = new CompletableFuture<Void>();
        if (!this.isRunning) {
            logger.debug("{} is not running", (Object)this);
            retf.completeExceptionally(new IOException("transport is not running"));
            return retf;
        }
        CompletableFuture<IdChannel> f = this.getChannelCreate(0, dst, sender, receiver, opts);
        f.whenComplete((ret, e) -> {
            if (e == null) {
                try {
                    logger.debug("sending async when completed: {} {}", ret, (Object)nmsg);
                    ret.sendAsync(nmsg).addListener(cf -> retf.complete(null));
                }
                catch (Exception e1) {
                    retf.completeExceptionally(e1);
                }
            } else {
                retf.completeExceptionally((Throwable)e);
            }
        });
        return retf;
    }

    @Override
    public PrimaryKey getEndpoint() {
        return this.ep;
    }

    @ChannelHandler.Sharable
    class InboundHandler
    extends ChannelInboundHandlerAdapter {
        IdChannelTransport trans;

        public InboundHandler(IdChannelTransport trans) {
            this.trans = trans;
        }

        public void channelActive(ChannelHandlerContext ctx) {
            this.trans.inboundActive(ctx);
        }

        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            this.trans.getPeer().execute(() -> this.trans.inboundReceive(ctx, msg));
        }

        public void channelReadComplete(ChannelHandlerContext ctx) {
        }

        public void channelInactive(ChannelHandlerContext ctx) {
            logger.debug("Inactive={}", (Object)ctx);
            this.trans.inboundInactive(ctx);
        }

        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            logger.info("Exception:", cause);
            ctx.close();
        }
    }

    class LocatorChannelEntry {
        public CompletableFuture<LocatorChannel> future;
        public LocatorChannel channel;

        public LocatorChannelEntry(LocatorChannel channel, CompletableFuture<LocatorChannel> future) {
            this.channel = channel;
            this.future = future;
        }
    }

    @ChannelHandler.Sharable
    class OutboundHandler
    extends ChannelInboundHandlerAdapter {
        LocatorChannelEntry ent;
        IdChannelTransport trans;

        public OutboundHandler(LocatorChannelEntry ent, IdChannelTransport trans) {
            this.ent = ent;
            this.trans = trans;
        }

        public void channelActive(ChannelHandlerContext ctx) {
            this.trans.outboundActive(this.ent, ctx);
        }

        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            this.trans.getPeer().execute(() -> this.trans.outboundReceive(this.ent, ctx, msg));
        }

        public void channelReadComplete(ChannelHandlerContext ctx) {
            ctx.flush();
        }

        public void channelInactive(ChannelHandlerContext ctx) {
            logger.debug("Inactive={}", (Object)ctx);
            this.trans.outboundInactive(ctx);
        }

        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            logger.info("exception={}", cause);
            this.trans.outboundInactive(ctx);
        }
    }
}

