/*
 * Decompiled with CFR 0.152.
 */
package org.piax.gtrans.netty.udp;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.nio.NioDatagramChannel;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.piax.common.ComparableKey;
import org.piax.common.ObjectId;
import org.piax.common.PeerId;
import org.piax.common.TransportId;
import org.piax.gtrans.Channel;
import org.piax.gtrans.ChannelListener;
import org.piax.gtrans.ChannelTransport;
import org.piax.gtrans.IdConflictException;
import org.piax.gtrans.Peer;
import org.piax.gtrans.ProtocolUnsupportedException;
import org.piax.gtrans.ReceivedMessage;
import org.piax.gtrans.TransOptions;
import org.piax.gtrans.TransportListener;
import org.piax.gtrans.impl.ChannelTransportImpl;
import org.piax.gtrans.netty.NettyLocator;
import org.piax.gtrans.netty.NettyMessage;
import org.piax.gtrans.netty.kryo.KryoUtil;
import org.piax.gtrans.netty.udp.Signaling;
import org.piax.gtrans.netty.udp.UdpIdChannel;
import org.piax.gtrans.netty.udp.UdpPrimaryKey;
import org.piax.gtrans.netty.udp.UdpRawChannel;
import org.piax.gtrans.netty.udp.direct.DirectSignaling;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UdpChannelTransport
extends ChannelTransportImpl<UdpPrimaryKey>
implements ChannelTransport<UdpPrimaryKey> {
    final UdpPrimaryKey ep;
    private EventLoopGroup workerGroup;
    io.netty.channel.Channel bindChannel;
    boolean isRunning;
    Signaling signaling;
    AtomicInteger seq;
    protected static final Logger logger = LoggerFactory.getLogger((String)UdpChannelTransport.class.getName());
    ConcurrentHashMap<Comparable<?>, CompletableFuture<UdpIdChannel>> channelFutureMap;
    protected final PeerId peerId;

    public UdpChannelTransport(Peer peer, TransportId transId, PeerId peerId, NettyLocator peerLocator) throws IdConflictException, IOException {
        this(peer, transId, peerId, new UdpPrimaryKey((ComparableKey<?>)peerId, peerLocator));
    }

    @Override
    public void fin() {
        logger.debug("running fin.");
        this.isRunning = false;
        if (this.bindChannel.isOpen()) {
            this.bindChannel.close().awaitUninterruptibly();
        }
        this.workerGroup.shutdownGracefully();
    }

    public UdpPrimaryKey getSpecifiedEndpoint() {
        return this.ep;
    }

    public UdpChannelTransport(Peer peer, TransportId transId, PeerId peerId, UdpPrimaryKey key) throws IdConflictException, IOException {
        super(peer, transId, null, true);
        this.peerId = peerId;
        if (key.getRawKey() == null) {
            key.setRawKey((ComparableKey<?>)peerId);
        }
        this.ep = key;
        int port = key.getLocator().getPort();
        this.workerGroup = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        ((Bootstrap)((Bootstrap)bootstrap.group(this.workerGroup)).channel(NioDatagramChannel.class)).handler((ChannelHandler)new ServerChannelInitializer());
        this.bindChannel = bootstrap.bind((SocketAddress)new InetSocketAddress(port)).syncUninterruptibly().channel();
        this.isRunning = true;
        this.seq = new AtomicInteger(0);
        if (!key.sigType.equals((Object)UdpPrimaryKey.SIGTYPE.DIRECT)) {
            throw new IOException("only direct signaling is implemented for now.");
        }
        this.signaling = new DirectSignaling(this);
        this.channelFutureMap = new ConcurrentHashMap();
    }

    public NettyLocator getPrimaryLocator(Comparable<?> key) {
        return this.signaling.getLocatorManager().getPrimaryLocator(key);
    }

    public UdpChannelTransport(Peer peer, TransportId transId, PeerId peerId) throws IdConflictException, IOException {
        this(peer, transId, peerId, new UdpPrimaryKey((ComparableKey<?>)peerId, null));
    }

    @Override
    public UdpPrimaryKey getEndpoint() {
        return this.ep;
    }

    public ChannelFuture rawSend(NettyLocator addr, Object obj) {
        logger.trace("sending to " + addr);
        return this.rawSend(addr.getSocketAddress(), obj);
    }

    public ChannelFuture rawSend(InetSocketAddress addr, Object obj) {
        logger.trace("raw send to={}", (Object)addr);
        assert (addr != null) : "destination addr is null";
        ByteBuf buf = Unpooled.wrappedBuffer((byte[])KryoUtil.encode(obj, 256, 256));
        logger.trace("writing length={}, to={}", (Object)buf.readableBytes(), (Object)addr);
        return this.bindChannel.writeAndFlush((Object)new DatagramPacket(buf, addr));
    }

    public CompletableFuture<UdpIdChannel> newChannelAsync(ObjectId sender, ObjectId receiver, UdpPrimaryKey dst) {
        CompletableFuture<UdpIdChannel> retf = new CompletableFuture<UdpIdChannel>();
        CompletableFuture<UdpRawChannel> f = this.signaling.doSignaling(this.ep, dst);
        f.whenComplete((raw, ex) -> {
            logger.debug("signaling completed");
            if (ex != null) {
                retf.completeExceptionally((Throwable)ex);
            }
            retf.complete(new UdpIdChannel(this.seq.incrementAndGet(), this.ep, raw.dst, sender, receiver, true, (UdpRawChannel)raw, this));
        });
        return retf;
    }

    @Override
    public Channel<UdpPrimaryKey> newChannel(ObjectId sender, ObjectId receiver, UdpPrimaryKey dst, boolean isDuplex, int timeout) throws ProtocolUnsupportedException, IOException {
        try {
            return this.newChannelAsync(sender, receiver, dst).get(timeout, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException | ExecutionException | TimeoutException e) {
            logger.info("While channel signaling: ", (Throwable)e);
            return null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Void> sendAsync(ObjectId sender, ObjectId receiver, UdpPrimaryKey dst, Object msg, TransOptions opts) {
        CompletableFuture<UdpIdChannel> f;
        if (opts == null) {
            opts = new TransOptions();
        }
        NettyMessage<Object> nmsg = new NettyMessage<Object>(receiver, this.ep, dst, null, this.getPeerId(), msg, false, 0);
        logger.debug("dst={}, src={}", (Object)dst, (Object)this.ep);
        if (this.ep.getRawKey() != null && this.ep.getRawKey().equals(dst.getRawKey())) {
            this.messageReceived(null, nmsg);
            return CompletableFuture.completedFuture(null);
        }
        logger.debug("sending async {}", nmsg);
        CompletableFuture<Void> retf = new CompletableFuture<Void>();
        if (!this.isRunning) {
            logger.debug("{} is not running", (Object)this);
            retf.completeExceptionally(new IOException("transport is not running"));
            return retf;
        }
        ConcurrentHashMap<Comparable<?>, CompletableFuture<UdpIdChannel>> concurrentHashMap = this.channelFutureMap;
        synchronized (concurrentHashMap) {
            CompletableFuture<UdpIdChannel> completableFuture = f = dst.getRawKey() == null ? this.channelFutureMap.get(dst.getLocator().getKeyString()) : this.channelFutureMap.get(dst.getRawKey());
            if (f == null) {
                logger.debug("not found for {} on {}.", dst.getRawKey(), (Object)this.ep);
                f = this.newChannelAsync(sender, receiver, dst);
                this.channelFutureMap.put((Comparable<?>)((Object)(dst.getRawKey() == null ? dst.getLocator().getKeyString() : dst.getRawKey())), f);
                logger.debug("put {} on {}.", (Object)dst, (Object)this.ep);
            } else {
                logger.debug("found {} for {}.", f, dst.getRawKey());
            }
        }
        CompletableFuture<UdpIdChannel> ffin = f;
        f.whenComplete((ret, e) -> {
            logger.debug("COMPLETED {}", ret);
            if (e == null) {
                if (dst.getRawKey() == null) {
                    logger.debug("add mapping from {} to {} for {}.", new Object[]{dst.getLocator().getKeyString(), ret.dst.getRawKey(), ret});
                    this.channelFutureMap.put((Comparable<?>)ret.dst.getRawKey(), ffin);
                }
                try {
                    logger.debug("sending async when completed: {} {}", ret, (Object)nmsg);
                    ret.sendAsync(nmsg).addListener(cf -> retf.complete(null));
                }
                catch (Exception e1) {
                    retf.completeExceptionally(e1);
                }
            } else {
                retf.completeExceptionally((Throwable)e);
            }
        });
        return retf;
    }

    void messageReceived(UdpIdChannel c, NettyMessage<UdpPrimaryKey> nmsg) {
        if (!nmsg.isChannelSend()) {
            TransportListener<UdpPrimaryKey> listener = this.getListener(nmsg.getObjectId());
            if (listener != null) {
                ReceivedMessage rmsg = new ReceivedMessage(nmsg.getObjectId(), nmsg.getSource(), nmsg.getMsg());
                logger.debug("trans received {} from {}", rmsg.getMessage(), (Object)nmsg.getSource());
                listener.onReceive(this, rmsg);
            }
        } else {
            logger.debug("channel trans received {} from {}", nmsg.getMsg(), (Object)nmsg.getSource());
            ChannelListener<UdpPrimaryKey> clistener = this.getChannelListener(nmsg.getObjectId());
            c.putReceiveQueue(nmsg.getMsg());
            if (clistener != null) {
                clistener.onReceive(c);
            }
        }
    }

    @Override
    public void send(ObjectId sender, ObjectId receiver, UdpPrimaryKey dst, Object msg, TransOptions opts) throws ProtocolUnsupportedException, IOException {
        this.sendAsync(sender, receiver, dst, msg, opts);
    }

    class InboundHandler
    extends SimpleChannelInboundHandler<DatagramPacket> {
        InboundHandler() {
        }

        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.flush();
        }

        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            System.err.println(cause.getMessage());
            ctx.close();
        }

        protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
            byte[] bytes;
            ByteBuf buf = (ByteBuf)msg.content();
            int length = buf.readableBytes();
            logger.trace("received length={}", (Object)buf.readableBytes());
            if (buf.hasArray()) {
                bytes = buf.array();
                int offset = buf.arrayOffset();
            } else {
                bytes = new byte[length];
                buf.getBytes(buf.readerIndex(), bytes);
                boolean offset = false;
            }
            Object obj = KryoUtil.decode(bytes);
            if (obj instanceof Signaling.Request) {
                Signaling.Request req = (Signaling.Request)obj;
                req.setSender((InetSocketAddress)msg.sender());
                logger.trace("received request {} from={} on {}", new Object[]{req, req.sender, UdpChannelTransport.this.ep});
                UdpChannelTransport.this.signaling.received(req);
            }
            if (obj instanceof Signaling.Response) {
                Signaling.Response resp = (Signaling.Response)obj;
                resp.setSender((InetSocketAddress)msg.sender());
                logger.trace("received response from={} body={} on {}", new Object[]{resp.sender, resp.body, UdpChannelTransport.this.ep});
                UdpChannelTransport.this.signaling.received(resp);
            } else if (obj instanceof NettyMessage) {
                NettyMessage nmsg = (NettyMessage)obj;
                logger.trace("inbound received msg{}: on {}", (Object)nmsg, (Object)UdpChannelTransport.this.ep);
                UdpChannelTransport.this.messageReceived(null, nmsg);
            }
        }
    }

    class ServerChannelInitializer
    extends ChannelInitializer<DatagramChannel> {
        ServerChannelInitializer() {
        }

        protected void initChannel(DatagramChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast(new ChannelHandler[]{new InboundHandler()});
        }
    }
}

