/*
 * Decompiled with CFR 0.152.
 */
package org.piax.gtrans.netty.loctrans;

import io.netty.bootstrap.AbstractBootstrap;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.piax.common.Endpoint;
import org.piax.common.ObjectId;
import org.piax.common.PeerId;
import org.piax.common.TransportId;
import org.piax.gtrans.Channel;
import org.piax.gtrans.ChannelListener;
import org.piax.gtrans.ChannelTransport;
import org.piax.gtrans.IdConflictException;
import org.piax.gtrans.Peer;
import org.piax.gtrans.ProtocolUnsupportedException;
import org.piax.gtrans.ReceivedMessage;
import org.piax.gtrans.TransOptions;
import org.piax.gtrans.TransportListener;
import org.piax.gtrans.impl.ChannelTransportImpl;
import org.piax.gtrans.netty.ControlMessage;
import org.piax.gtrans.netty.NettyEndpoint;
import org.piax.gtrans.netty.NettyLocator;
import org.piax.gtrans.netty.NettyMessage;
import org.piax.gtrans.netty.bootstrap.NettyBootstrap;
import org.piax.gtrans.netty.bootstrap.SslBootstrap;
import org.piax.gtrans.netty.bootstrap.TcpBootstrap;
import org.piax.gtrans.netty.bootstrap.UdtBootstrap;
import org.piax.gtrans.netty.loctrans.NettyChannel;
import org.piax.gtrans.netty.loctrans.NettyInboundHandler;
import org.piax.gtrans.netty.loctrans.NettyOutboundHandler;
import org.piax.gtrans.netty.loctrans.NettyRawChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class NettyChannelTransport<E extends NettyEndpoint>
extends ChannelTransportImpl<E>
implements ChannelTransport<E> {
    protected static final Logger logger = LoggerFactory.getLogger((String)NettyChannelTransport.class.getName());
    protected EventLoopGroup bossGroup;
    protected EventLoopGroup serverGroup;
    protected EventLoopGroup clientGroup;
    boolean supportsDuplex = true;
    protected E ep = null;
    protected final PeerId peerId;
    protected final ConcurrentHashMap<E, NettyRawChannel<E>> raws = new ConcurrentHashMap();
    protected final ConcurrentHashMap<String, NettyChannel<E>> channels = new ConcurrentHashMap();
    protected final Random rand = new Random(System.currentTimeMillis());
    protected boolean isRunning = false;
    protected final ChannelGroup schannels = new DefaultChannelGroup((EventExecutor)GlobalEventExecutor.INSTANCE);
    protected final ChannelGroup cchannels = new DefaultChannelGroup((EventExecutor)GlobalEventExecutor.INSTANCE);
    protected NettyBootstrap<E> bs;
    protected AtomicInteger seq;
    public static int RAW_POOL_SIZE = 30;
    public AttributeKey<String> rawChannelKey = AttributeKey.valueOf((String)"rawKey");
    protected static final int CHANNEL_ESTABLISH_TIMEOUT = 10000;

    public NettyChannelTransport(Peer peer, TransportId transId, PeerId peerId, NettyLocator peerLocator) throws IdConflictException, IOException {
        super(peer, transId, null, true);
        this.ep = peerLocator;
        this.peerId = peerId;
        this.seq = new AtomicInteger(0);
        if (peerLocator != null) {
            switch (peerLocator.getType()) {
                case TCP: {
                    this.bs = new TcpBootstrap();
                    break;
                }
                case SSL: {
                    this.bs = new SslBootstrap(this.ep.getHost(), this.ep.getPort());
                    break;
                }
                case UDT: {
                    this.bs = new UdtBootstrap();
                    break;
                }
                default: {
                    throw new ProtocolUnsupportedException("not implemented yet.");
                }
            }
            this.bossGroup = this.bs.getParentEventLoopGroup();
            this.serverGroup = this.bs.getChildEventLoopGroup();
            this.clientGroup = this.bs.getClientEventLoopGroup();
            AbstractBootstrap b = this.bs.getServerBootstrap(new NettyInboundHandler(this));
            b.bind((SocketAddress)new InetSocketAddress(this.ep.getPort())).syncUninterruptibly();
            logger.debug("bound " + this.ep);
        }
        this.isRunning = true;
    }

    public NettyChannelTransport(Peer peer, TransportId transId, PeerId peerId) throws IdConflictException, IOException {
        super(peer, transId, null, true);
        this.peerId = peerId;
    }

    @Override
    public void fin() {
        logger.debug("running fin.");
        this.isRunning = false;
        this.cchannels.close().awaitUninterruptibly();
        this.schannels.close().awaitUninterruptibly();
        this.bossGroup.shutdownGracefully();
        this.serverGroup.shutdownGracefully();
        this.clientGroup.shutdownGracefully();
    }

    @Override
    public void send(ObjectId sender, ObjectId receiver, E dst, Object msg, TransOptions opts) throws ProtocolUnsupportedException, IOException {
        E src = this.ep;
        this.channelSendHook(src, dst);
        NettyMessage<Object> nmsg = new NettyMessage<Object>(receiver, src, dst, null, this.getPeerId(), msg, false, 0);
        NettyRawChannel<Object> raw = this.getRawCreateAsClient(dst, nmsg);
        if (raw == null) {
            throw new IOException("Getting new raw channel failed (maybe peer down).");
        }
        logger.debug("oneway send to {} from {} msg={}", new Object[]{dst, this.ep, msg});
        raw.touch();
        raw.send(nmsg);
    }

    void putChannel(NettyChannel<E> ch) {
        logger.debug("" + ch.getChannelNo() + ch.channelInitiator.hashCode() + "->" + ch + " on " + this.ep);
        this.channels.put("" + ch.getChannelNo() + ch.channelInitiator.hashCode(), ch);
    }

    protected NettyChannel<E> getChannel(int channelNo, E channelInitiator) {
        logger.debug("" + channelNo + channelInitiator.hashCode() + " on " + this.ep);
        return this.channels.get("" + channelNo + channelInitiator.hashCode());
    }

    void deleteChannel(NettyChannel<?> ch) {
        this.channels.remove("" + ch.getChannelNo() + ch.channelInitiator.hashCode(), ch);
    }

    void putRaw(E ep, NettyRawChannel<E> ch) {
        if (this.raws.size() == 0) {
            ch.setPriority(1);
        }
        this.raws.put(ep, ch);
    }

    protected NettyRawChannel<E> getRaw(E ep) {
        return this.raws.get(ep);
    }

    void deleteRaw(NettyRawChannel<E> raw) {
        this.raws.remove(raw.getRemote(), raw);
    }

    void deleteRaw(String key) {
        this.raws.remove(key);
    }

    public List<NettyRawChannel<E>> getCreatedRawChannels() {
        return this.raws.values().stream().filter(x -> x.isCreatorSide()).sorted((x, y) -> (int)(y.lastUse - x.lastUse)).sorted((x, y) -> y.priority - x.priority).collect(Collectors.toList());
    }

    public List<NettyEndpoint> getRawChannelLocators() {
        return this.raws.values().stream().sorted((x, y) -> (int)(y.lastUse - x.lastUse)).sorted((x, y) -> y.priority - x.priority).map(x -> x.isClosed() ? null : x.getRemote()).filter(x -> x != null).collect(Collectors.toList());
    }

    protected abstract NettyRawChannel<E> getRawCreateAsClient(E var1, NettyMessage<E> var2) throws IOException;

    protected abstract boolean filterMessage(NettyMessage<E> var1);

    protected abstract NettyRawChannel<E> getResolvedRawChannel(E var1) throws IOException;

    protected abstract NettyLocator directLocator(E var1);

    protected void channelSendHook(E src, E dst) {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Unable to fully structure code
     */
    protected NettyRawChannel<E> getRawCreateAsClient0(E dst) throws IOException {
        if (!this.isRunning) {
            return null;
        }
        var3_2 = this.raws;
        synchronized (var3_2) {
            cached = this.getRaw(dst);
            if (cached != null) {
                while (cached.getStat() == NettyRawChannel.Stat.INIT || cached.getStat() == NettyRawChannel.Stat.WAIT || cached.getStat() == NettyRawChannel.Stat.DENIED) {
                    try {
                        var5_6 = cached;
                        synchronized (var5_6) {
                            cached.wait(10000L);
                        }
                    }
                    catch (InterruptedException var5_7) {
                        // empty catch block
                    }
                }
                if (cached.getStat() == NettyRawChannel.Stat.RUN) {
                    return cached;
                }
            }
            count = 0;
            for (NettyRawChannel<E> r : this.getCreatedRawChannels()) {
                if (NettyChannelTransport.RAW_POOL_SIZE - 1 <= count) {
                    NettyChannelTransport.logger.debug("closing {}, curtime={}", r, (Object)System.currentTimeMillis());
                    r.close();
                }
                ++count;
            }
            raw = new NettyRawChannel<E>(dst, this, true);
            l = this.directLocator(dst);
            if (l != null) {
                b = this.bs.getBootstrap(l, new NettyOutboundHandler(raw, this));
                this.bs.connect(b, l.getHost(), l.getPort());
            } else {
                NettyChannelTransport.logger.debug("destination is not directly connectable.");
            }
            // MONITOREXIT @DISABLED, blocks:[0, 7] lbl42 : MonitorExitStatement: MONITOREXIT : var3_2
            if (true) ** GOTO lbl52
        }
        do {
            try {
                var3_2 = raw;
                synchronized (var3_2) {
                    raw.wait(10000L);
                }
            }
            catch (InterruptedException var3_3) {
                // empty catch block
            }
lbl52:
            // 3 sources

        } while (raw.getStat() == NettyRawChannel.Stat.INIT || raw.getStat() == NettyRawChannel.Stat.WAIT);
        if (raw.getStat() != NettyRawChannel.Stat.RUN) ** GOTO lbl65
        return raw;
lbl-1000:
        // 1 sources

        {
            try {
                var3_2 = raw;
                synchronized (var3_2) {
                    raw.wait(10000L);
                    continue;
                }
            }
            catch (InterruptedException var3_4) {
                // empty catch block
            }
lbl65:
            // 3 sources

            ** while (raw.getStat() == NettyRawChannel.Stat.DENIED)
        }
lbl66:
        // 1 sources

        if (raw.getStat() == NettyRawChannel.Stat.RUN) {
            return raw;
        }
        NettyChannelTransport.logger.error("getRawChannelAsClient: illegal state: " + (Object)raw.getStat());
        raw.close();
        throw new IOException("Channel establish failed.");
    }

    protected abstract E createEndpoint(String var1, int var2);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void outboundActive(NettyRawChannel<E> raw, ChannelHandlerContext ctx) {
        logger.debug("outbound active: " + ctx.channel().remoteAddress());
        ctx.channel().attr(this.rawChannelKey).set((Object)raw.getRemote().getKeyString());
        this.cchannels.add((Object)ctx.channel());
        int attemptRand = this.rand.nextInt();
        InetSocketAddress sa = (InetSocketAddress)ctx.channel().remoteAddress();
        E dst = this.createEndpoint(sa.getHostName(), sa.getPort());
        ControlMessage<Object> attempt = new ControlMessage<Object>(ControlMessage.ControlType.ATTEMPT, this.ep, null, attemptRand);
        ConcurrentHashMap<E, NettyRawChannel<E>> concurrentHashMap = this.raws;
        synchronized (concurrentHashMap) {
            NettyRawChannel<E> nettyRawChannel = raw;
            synchronized (nettyRawChannel) {
                raw.setAttempt(attemptRand);
                raw.setContext(ctx);
                if (raw.getStat() == NettyRawChannel.Stat.INIT) {
                    raw.setStat(NettyRawChannel.Stat.WAIT);
                }
            }
            this.putRaw(dst, raw);
        }
        ctx.writeAndFlush(attempt);
        logger.debug("sent attempt to " + dst + " : " + ctx);
    }

    protected void outboundInactive(ChannelHandlerContext ctx) {
        logger.debug("outbound inactive: " + ctx.channel().remoteAddress());
        String key = (String)ctx.channel().attr(this.rawChannelKey).get();
        this.deleteRaw(key);
        ctx.close();
    }

    protected void inboundActive(ChannelHandlerContext ctx) {
        logger.debug("inbound active: " + ctx.channel().remoteAddress());
        this.schannels.add((Object)ctx.channel());
    }

    protected void inboundInactive(ChannelHandlerContext ctx) {
        logger.debug("inbound inactive: " + ctx.channel().remoteAddress());
        String key = (String)ctx.channel().attr(this.rawChannelKey).get();
        if (key != null) {
            this.deleteRaw(key);
        }
        ctx.close();
    }

    protected void handleControlMessage(ControlMessage<E> cmsg) {
        logger.warn("unhandled control message:" + cmsg);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void inboundReceive(ChannelHandlerContext ctx, Object msg) {
        if (msg instanceof ControlMessage) {
            ControlMessage cmsg = (ControlMessage)msg;
            logger.debug("received attempt: " + cmsg.getArg() + " from " + ctx);
            switch (cmsg.type) {
                case ATTEMPT: {
                    ConcurrentHashMap<E, NettyRawChannel<E>> concurrentHashMap = this.raws;
                    synchronized (concurrentHashMap) {
                        NettyRawChannel raw = this.getRaw(cmsg.getSource());
                        if (raw != null && this.ep.equals(cmsg.getSource())) {
                            NettyRawChannel nettyRawChannel = raw;
                            synchronized (nettyRawChannel) {
                                raw.touch();
                                raw.setStat(NettyRawChannel.Stat.RUN);
                                raw.setContext(ctx);
                                ctx.writeAndFlush(new ControlMessage<E>(ControlMessage.ControlType.ACK, this.ep, this.ep, null));
                            }
                        }
                        if (raw != null && raw.attempt != null) {
                            NettyRawChannel nettyRawChannel = raw;
                            synchronized (nettyRawChannel) {
                                raw.touch();
                                if (raw.attempt > (Integer)cmsg.getArg()) {
                                    logger.debug("attempt won on " + raw);
                                    ctx.writeAndFlush(new ControlMessage<Object>(ControlMessage.ControlType.NACK, this.ep, null, null));
                                } else {
                                    logger.debug("attempt lose on " + raw);
                                    ctx.writeAndFlush(new ControlMessage<Object>(ControlMessage.ControlType.ACK, this.ep, null, null));
                                    raw.setContext(ctx);
                                    raw.setStat(NettyRawChannel.Stat.RUN);
                                    raw.notifyAll();
                                }
                            }
                        }
                        NettyRawChannel nettyRawChannel = raw = new NettyRawChannel(cmsg.getSource(), this);
                        synchronized (nettyRawChannel) {
                            ctx.channel().attr(this.rawChannelKey).set((Object)raw.getRemote().getKeyString());
                            raw.setStat(NettyRawChannel.Stat.RUN);
                            raw.setContext(ctx);
                            logger.debug("set run stat for raw from source=" + cmsg.getSource());
                        }
                        ctx.writeAndFlush(new ControlMessage<Object>(ControlMessage.ControlType.ACK, this.ep, null, null));
                        this.putRaw(cmsg.getSource(), raw);
                        break;
                    }
                }
                case ACK: {
                    logger.debug("illegal attempt ACK received from client");
                    break;
                }
                case NACK: {
                    logger.debug("illegal attempt NACK received from client");
                    break;
                }
                default: {
                    this.handleControlMessage(cmsg);
                    break;
                }
            }
        } else if (msg instanceof NettyMessage) {
            NettyMessage nmsg = (NettyMessage)msg;
            logger.debug("inbound received msg: " + nmsg.getMsg() + " on " + this.ep + " from " + nmsg.getSource() + " to " + nmsg.getDestination());
            if (this.filterMessage(nmsg)) {
                return;
            }
            if (nmsg.isChannelSend()) {
                NettyChannel ch = null;
                ConcurrentHashMap<String, NettyChannel<E>> concurrentHashMap = this.channels;
                synchronized (concurrentHashMap) {
                    ch = this.getChannel(nmsg.channelNo(), nmsg.getChannelInitiator());
                    if (ch == null) {
                        ConcurrentHashMap<E, NettyRawChannel<E>> concurrentHashMap2 = this.raws;
                        synchronized (concurrentHashMap2) {
                            NettyRawChannel raw = this.getRaw(nmsg.getSource());
                            if (raw == null || raw.getStat() != NettyRawChannel.Stat.RUN) {
                                logger.warn("receive in illegal state {} from {} (channel not running): throwing it away.", raw == null ? "null" : raw.getStat(), nmsg.getSource());
                            } else {
                                ch = new NettyChannel(nmsg.channelNo(), nmsg.getChannelInitiator(), nmsg.getChannelInitiator(), nmsg.getObjectId(), nmsg.getObjectId(), false, raw, this);
                                this.putChannel(ch);
                            }
                        }
                    } else {
                        if (ch.isClosed()) {
                            return;
                        }
                        logger.debug("response for call from inbound on {} received.", ch);
                    }
                }
                if (ch != null) {
                    ch.raw.touch();
                    this.messageReceived(ch, nmsg);
                }
            } else {
                logger.debug("received oneway msg={}", msg);
                this.messageReceived(null, nmsg);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void outboundReceive(NettyRawChannel<E> raw, ChannelHandlerContext ctx, Object msg) {
        if (msg instanceof ControlMessage) {
            ControlMessage resp = (ControlMessage)msg;
            logger.debug("outbound attempt response=" + (Object)((Object)resp.type));
            switch (resp.type) {
                case ATTEMPT: {
                    logger.debug("illegal attempt received from server");
                    break;
                }
                case ACK: {
                    ConcurrentHashMap<E, NettyRawChannel<E>> concurrentHashMap = this.raws;
                    synchronized (concurrentHashMap) {
                        NettyRawChannel<E> nettyRawChannel = raw;
                        synchronized (nettyRawChannel) {
                            raw.setStat(NettyRawChannel.Stat.RUN);
                            raw.notifyAll();
                        }
                    }
                }
                case NACK: {
                    ctx.close();
                    ConcurrentHashMap<E, NettyRawChannel<E>> concurrentHashMap = this.raws;
                    synchronized (concurrentHashMap) {
                        NettyRawChannel<E> nettyRawChannel = raw;
                        synchronized (nettyRawChannel) {
                            switch (raw.getStat()) {
                                case RUN: {
                                    break;
                                }
                                case WAIT: {
                                    raw.setStat(NettyRawChannel.Stat.DENIED);
                                    raw.setContext(null);
                                    raw.notifyAll();
                                    break;
                                }
                                default: {
                                    logger.debug("illegal raw state {}" + raw);
                                }
                            }
                        }
                    }
                }
            }
        } else if (msg instanceof NettyMessage) {
            NettyMessage nmsg = (NettyMessage)msg;
            logger.debug("outbound received msg: " + nmsg.getMsg() + " on " + this.ep + " from " + nmsg.getSource() + " to " + nmsg.getDestination());
            if (this.filterMessage(nmsg)) {
                return;
            }
            NettyRawChannel<E> nettyRawChannel = raw;
            synchronized (nettyRawChannel) {
                if (raw.getStat() != NettyRawChannel.Stat.RUN && raw.getStat() != NettyRawChannel.Stat.DEFUNCT) {
                    raw.setStat(NettyRawChannel.Stat.RUN);
                }
            }
            if (nmsg.isChannelSend()) {
                NettyChannel ch;
                ConcurrentHashMap<String, NettyChannel<E>> concurrentHashMap = this.channels;
                synchronized (concurrentHashMap) {
                    ch = this.getChannel(nmsg.channelNo(), nmsg.getChannelInitiator());
                    logger.debug("got stored ch=" + ch + " for msg: " + nmsg.getMsg());
                    if (ch == null) {
                        ch = new NettyChannel(nmsg.channelNo(), nmsg.getChannelInitiator(), nmsg.getChannelInitiator(), nmsg.getObjectId(), nmsg.getObjectId(), false, raw, this);
                        this.putChannel(ch);
                    }
                }
                if (ch != null) {
                    this.messageReceived(ch, nmsg);
                }
            } else {
                logger.debug("received oneway msg={}", msg);
                this.messageReceived(null, nmsg);
            }
        }
    }

    void messageReceived(NettyChannel<E> c, NettyMessage<E> nmsg) {
        if (!nmsg.isChannelSend()) {
            TransportListener listener = this.getListener(nmsg.getObjectId());
            if (listener != null) {
                ReceivedMessage rmsg = new ReceivedMessage(nmsg.getObjectId(), (Endpoint)nmsg.getSource(), nmsg.getMsg());
                logger.debug("trans received {} on {}", rmsg.getMessage(), nmsg.getSource());
                listener.onReceive(this, rmsg);
            }
        } else {
            ChannelListener<E> clistener = this.getChannelListener(nmsg.getObjectId());
            c.putReceiveQueue(nmsg.getMsg());
            if (clistener != null) {
                clistener.onReceive(c);
            }
        }
    }

    @Override
    public E getEndpoint() {
        return this.ep;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Channel<E> newChannel(ObjectId sender, ObjectId receiver, E dst, boolean isDuplex, int timeout) throws ProtocolUnsupportedException, IOException {
        NettyChannel<E> ch;
        logger.debug("new channel for: " + dst + " on " + this.ep);
        NettyRawChannel<E> raw = this.getRawCreateAsClient(dst, null);
        if (raw == null) {
            throw new IOException("Getting new raw channel failed (maybe peer down).");
        }
        ConcurrentHashMap<String, NettyChannel<E>> concurrentHashMap = this.channels;
        synchronized (concurrentHashMap) {
            ch = new NettyChannel<E>(this.seq.incrementAndGet(), this.ep, dst, sender, receiver, true, raw, this);
            this.putChannel(ch);
        }
        return ch;
    }
}

