/*
 * Decompiled with CFR 0.152.
 */
package org.piax.gtrans.impl;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.piax.common.Endpoint;
import org.piax.common.ObjectId;
import org.piax.common.PeerId;
import org.piax.common.TransportId;
import org.piax.gtrans.Channel;
import org.piax.gtrans.ChannelListener;
import org.piax.gtrans.ChannelTransport;
import org.piax.gtrans.GTransConfigValues;
import org.piax.gtrans.IdConflictException;
import org.piax.gtrans.NetworkTimeoutException;
import org.piax.gtrans.Peer;
import org.piax.gtrans.ProtocolUnsupportedException;
import org.piax.gtrans.ReceivedMessage;
import org.piax.gtrans.Transport;
import org.piax.gtrans.TransportListener;
import org.piax.gtrans.impl.AcceptedChannelMgr;
import org.piax.gtrans.impl.ChannelImpl;
import org.piax.gtrans.impl.ChannelTransportImpl;
import org.piax.gtrans.impl.NestedMessage;
import org.piax.util.UniqNumberGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class DatagramBasedTransport<U extends Endpoint, L extends Endpoint>
extends ChannelTransportImpl<U>
implements TransportListener<L>,
ChannelListener<L> {
    private static final Logger logger = LoggerFactory.getLogger(DatagramBasedTransport.class);
    public static final byte CH_NEW_CMD = 1;
    public static final byte CH_NEW_ACK_CMD = 2;
    public static final byte CH_NEW_NACK_CMD = 3;
    public static final byte CH_CLOSE_CMD = 4;
    private final UniqNumberGenerator<DatagramChannel<U>> clientChMgr = new UniqNumberGenerator(1, 1, GTransConfigValues.MAX_CHANNELS);
    private final AcceptedChannelMgr<U> acceptChMgr = new AcceptedChannelMgr();

    protected DatagramBasedTransport(Peer peer, TransportId transId, Transport<?> lowerTrans, boolean supportsDuplex) throws IdConflictException {
        super(peer, transId, lowerTrans, supportsDuplex);
    }

    @Override
    public Channel<U> newChannel(ObjectId sender, ObjectId receiver, U dst, boolean isDuplex, int timeout) throws ProtocolUnsupportedException, IOException {
        DatagramChannel<U> ch = new DatagramChannel<U>(this, sender, receiver, dst);
        int chNo = this.clientChMgr.newNo(ch);
        ch.setChannelNo(chNo);
        this.sendCmd(dst, (byte)1, ch);
        try {
            if (timeout == 0) {
                timeout = GTransConfigValues.newChannelTimeout;
            }
            if (!((DatagramChannel)ch).latch.await(timeout, TimeUnit.MILLISECONDS)) {
                ((DatagramChannel)ch).latch.countDown();
                throw new NetworkTimeoutException("newChannel timed out: " + dst);
            }
        }
        catch (InterruptedException e) {
            ((DatagramChannel)ch).latch.countDown();
            throw new NetworkTimeoutException("newChannel interrupted: " + dst);
        }
        logger.trace("EXIT:");
        return ch;
    }

    protected DatagramChannel<U> getClientCh(int channelNo) {
        return this.clientChMgr.getObject(channelNo);
    }

    protected DatagramChannel<U> getAcceptCh(PeerId creator, int channelNo) {
        return (DatagramChannel)this.acceptChMgr.getChannel(creator, channelNo);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected DatagramChannel<U> newAcceptChIfAbsent(PeerId creator, ObjectId localObjId, ObjectId remoteObjId, int channelNo, U dst) {
        AcceptedChannelMgr<U> acceptedChannelMgr = this.acceptChMgr;
        synchronized (acceptedChannelMgr) {
            DatagramChannel<U> ch = this.getAcceptCh(creator, channelNo);
            if (ch == null) {
                ch = new DatagramChannel<U>(creator, this, localObjId, remoteObjId, dst);
                ch.setChannelNo(channelNo);
                this.acceptChMgr.putChannel(creator, channelNo, ch);
            }
            logger.trace("EXIT:");
            return ch;
        }
    }

    private void unbindClientCh(ChannelImpl<U> ch) {
        this.clientChMgr.discardNo(ch.getChannelNo());
        logger.trace("EXIT:");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void unbindAcceptCh(ChannelImpl<U> ch) {
        AcceptedChannelMgr<U> acceptedChannelMgr = this.acceptChMgr;
        synchronized (acceptedChannelMgr) {
            this.acceptChMgr.removeChannel(ch);
        }
        logger.trace("EXIT:");
    }

    protected void closeCh(boolean isDirect, ChannelImpl<U> ch) {
        if (isDirect) {
            try {
                this.sendCmd(ch.getRemote(), (byte)4, ch);
            }
            catch (IOException iOException) {
                // empty catch block
            }
        }
        if (ch.isCreatorSide()) {
            this.unbindClientCh(ch);
        } else {
            this.unbindAcceptCh(ch);
        }
    }

    protected abstract void lowerSend(U var1, NestedMessage var2) throws ProtocolUnsupportedException, IOException;

    private void sendCmd(U dst, byte cmd, ChannelImpl<U> ch) throws IOException {
        int chNo = ch.isCreatorSide() ? ch.getChannelNo() : -ch.getChannelNo();
        NestedMessage nmsg = new NestedMessage(ch.localObjectId, ch.remoteObjectId, this.getPeerId(), (Endpoint)ch.getLocal(), chNo, cmd, null);
        this.lowerSend(dst, nmsg);
    }

    protected void _send(ObjectId sender, ObjectId receiver, int channelNo, U dst, Object msg) throws ProtocolUnsupportedException, IOException {
        logger.trace("ENTRY:");
        NestedMessage nmsg = new NestedMessage(sender, receiver, this.getPeerId(), this.getEndpoint(), channelNo, null, msg);
        this.lowerSend(dst, nmsg);
    }

    @Override
    public void send(ObjectId sender, ObjectId receiver, U dst, Object msg) throws ProtocolUnsupportedException, IOException {
        this._send(sender, receiver, 0, dst, msg);
    }

    protected abstract NestedMessage _preReceive(ReceivedMessage var1);

    protected abstract boolean useReceiverThread(int var1);

    private void dispatchCmd(NestedMessage nmsg) {
        ChannelListener<Endpoint> listener = this.getChannelListener(nmsg.receiver);
        if (listener == null) {
            logger.warn("message purged as no listener: {}", (Object)nmsg.receiver);
        }
        byte cmd = (Byte)nmsg.option;
        switch (cmd) {
            case 1: {
                DatagramChannel<Endpoint> ch = this.newAcceptChIfAbsent(nmsg.srcPeerId, nmsg.receiver, nmsg.sender, nmsg.channelNo, nmsg.src);
                if (listener != null) {
                    listener.onAccepting(ch);
                }
                try {
                    this.sendCmd(nmsg.src, (byte)2, ch);
                }
                catch (IOException e) {
                    logger.error("", (Throwable)e);
                }
                break;
            }
            case 2: {
                if (nmsg.channelNo < 0) {
                    DatagramChannel<U> ch = this.getClientCh(-nmsg.channelNo);
                    if (ch == null) {
                        logger.error("CH_NEW_ACK_CMD accepted with invalid channelNo:{}", (Object)(-nmsg.channelNo));
                        break;
                    }
                    ((DatagramChannel)ch).latch.countDown();
                    break;
                }
                logger.error("invalid channelNo: {}", (Object)nmsg.channelNo);
                break;
            }
            case 4: {
                DatagramChannel<U> ch = nmsg.channelNo < 0 ? this.getClientCh(-nmsg.channelNo) : this.getAcceptCh(nmsg.srcPeerId, nmsg.channelNo);
                if (ch == null) break;
                if (listener != null) {
                    listener.onClosed(ch);
                }
                this.closeCh(false, ch);
                break;
            }
            default: {
                logger.error("invalid control command in message header");
            }
        }
    }

    private void dispatchTrans(NestedMessage nmsg) {
        ReceivedMessage rcvMsg = new ReceivedMessage(nmsg.sender, nmsg.src, nmsg.getInner());
        TransportListener listener = this.getListener(nmsg.receiver);
        if (listener == null) {
            logger.warn("message purged as transport has no listener: {}", (Object)nmsg.receiver);
        }
        if (listener != null) {
            listener.onReceive(this, rcvMsg);
        }
    }

    private void dispatchCh(ChannelImpl<U> ch, NestedMessage nmsg) {
        ChannelListener<U> listener = this.getChannelListener(nmsg.receiver);
        ch.putReceiveQueue(nmsg.getInner());
        if (listener != null) {
            listener.onReceive(ch);
        }
    }

    protected void raiseUpperListener(NestedMessage nmsg) {
        if (nmsg.channelNo == 0) {
            this.dispatchTrans(nmsg);
            return;
        }
        if (nmsg.channelNo < 0) {
            DatagramChannel<U> ch = this.getClientCh(-nmsg.channelNo);
            if (ch == null) {
                logger.warn("unknown channelNo: {}", (Object)(-nmsg.channelNo));
                logger.debug("clientChMgr {}", this.clientChMgr);
                return;
            }
            this.dispatchCh(ch, nmsg);
        } else {
            DatagramChannel<Endpoint> ch = this.newAcceptChIfAbsent(nmsg.srcPeerId, nmsg.receiver, nmsg.sender, nmsg.channelNo, nmsg.src);
            this.dispatchCh(ch, nmsg);
        }
    }

    protected void _onReceive(ReceivedMessage rmsg) {
        logger.trace("ENTRY:");
        NestedMessage nmsg = this._preReceive(rmsg);
        if (nmsg == null) {
            return;
        }
        if (nmsg.getInner() == null && nmsg.option instanceof Byte) {
            this.dispatchCmd(nmsg);
        } else {
            this.raiseUpperListener(nmsg);
        }
    }

    @Override
    public void onReceive(Transport<L> lowerTrans, final ReceivedMessage rmsg) {
        this.peer.concatPeerId2ThreadName();
        if (!this.isActive) {
            logger.info("received msg purged {}", rmsg.getMessage());
            return;
        }
        if (!this.useReceiverThread(1)) {
            this._onReceive(rmsg);
        } else {
            this.peer.execute(new Runnable(){

                @Override
                public void run() {
                    DatagramBasedTransport.this.peer.concatPeerId2ThreadName();
                    DatagramBasedTransport.this._onReceive(rmsg);
                }
            });
        }
    }

    @Override
    public boolean onAccepting(Channel<L> channel) {
        logger.error("unexpected onAccepting");
        return false;
    }

    @Override
    public void onClosed(Channel<L> channel) {
        logger.error("unexpected onClosed");
    }

    @Override
    public void onFailure(Channel<L> channel, Exception cause) {
        logger.error("unexpected onFailure");
    }

    @Override
    public void onReceive(Channel<L> lowerCh) {
        logger.error("unexpected onReceive");
    }

    protected static class DatagramChannel<E extends Endpoint>
    extends ChannelImpl<E> {
        private final CountDownLatch latch;

        DatagramChannel(ChannelTransport<E> mother, ObjectId localObjId, ObjectId remoteObjId, E remote) {
            super(mother, localObjId, remoteObjId, remote, true);
            this.latch = new CountDownLatch(1);
        }

        DatagramChannel(PeerId creator, ChannelTransport<E> mother, ObjectId localObjId, ObjectId remoteObjId, E remote) {
            super(creator, mother, localObjId, remoteObjId, remote, true);
            this.latch = null;
        }

        @Override
        public void close() {
            super.close();
            ((DatagramBasedTransport)this.mother).closeCh(true, this);
        }

        @Override
        public void send(Object msg) throws IOException {
            int chNo = this.isCreatorSide() ? this.getChannelNo() : -this.getChannelNo();
            ((DatagramBasedTransport)this.mother)._send(this.localObjectId, this.remoteObjectId, chNo, this.remote, msg);
        }
    }
}

