/*
 * Decompiled with CFR 0.152.
 */
package org.piax.gtrans.base;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.piax.common.ObjectId;
import org.piax.common.PeerLocator;
import org.piax.common.TransportId;
import org.piax.gtrans.Channel;
import org.piax.gtrans.GTransConfigValues;
import org.piax.gtrans.IdConflictException;
import org.piax.gtrans.Peer;
import org.piax.gtrans.ProtocolUnsupportedException;
import org.piax.gtrans.impl.ChannelTransportImpl;
import org.piax.gtrans.impl.ExceededSizeException;
import org.piax.gtrans.impl.InvalidMessageException;
import org.piax.gtrans.impl.NestedMessage;
import org.piax.gtrans.impl.NotEnoughMessageException;
import org.piax.gtrans.impl.OneToOneMappingTransport;
import org.piax.gtrans.raw.RawTransport;
import org.piax.util.BinaryJsonabilityException;
import org.piax.util.ByteBufferUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BaseChannelTransportImpl<E extends PeerLocator>
extends OneToOneMappingTransport<E> {
    private static Logger logger = LoggerFactory.getLogger(BaseChannelTransportImpl.class);
    private final Map<Channel<E>, ByteBuffer> remainedMsgs = new ConcurrentHashMap<Channel<E>, ByteBuffer>();

    public BaseChannelTransportImpl(Peer peer, TransportId transId, E locator) throws IdConflictException, IOException {
        super(transId, ((PeerLocator)locator).newRawTransport(peer.getPeerId()));
    }

    public BaseChannelTransportImpl(Peer peer, TransportId transId, RawTransport<?> driver) throws IdConflictException, IOException {
        super(transId, driver);
    }

    @Override
    public synchronized void fin() {
        super.fin();
        this.lowerTrans.fin();
    }

    @Override
    public int getMTU() {
        return GTransConfigValues.MAX_MSG_SIZE;
    }

    @Override
    public RawTransport<E> getLowerTransport() {
        RawTransport lower = (RawTransport)this.lowerTrans;
        return lower;
    }

    @Override
    protected void removeCh(Channel<E> lowerCh) {
        super.removeCh(lowerCh);
        this.remainedMsgs.remove(lowerCh);
    }

    @Override
    protected void lowerSend(ObjectId sender, ObjectId receiver, E dst, NestedMessage nmsg) throws ProtocolUnsupportedException, IOException {
        Object bb;
        Channel<E> ch = null;
        if (GTransConfigValues.ALLOW_REF_SEND_IN_BASE_TRANSPORT && ((RawTransport)this.getLowerTransport()).canSendNormalObject()) {
            bb = nmsg;
        } else {
            try {
                bb = nmsg.serialize();
            }
            catch (BinaryJsonabilityException e) {
                logger.error("", (Throwable)e);
                return;
            }
            catch (ExceededSizeException e) {
                throw new IOException(e);
            }
        }
        ch = ((ChannelTransportImpl)this.getLowerTransport()).newChannel(null, null, dst, GTransConfigValues.newChannelTimeout);
        ch.send(bb);
    }

    @Override
    protected void lowerChSend(Channel<E> ch, NestedMessage nmsg) throws IOException {
        Object bb;
        if (GTransConfigValues.ALLOW_REF_SEND_IN_BASE_TRANSPORT && ((RawTransport)this.getLowerTransport()).canSendNormalObject()) {
            bb = nmsg;
        } else {
            try {
                bb = nmsg.serialize();
            }
            catch (BinaryJsonabilityException e) {
                logger.error("", (Throwable)e);
                return;
            }
            catch (ExceededSizeException e) {
                throw new IOException(e);
            }
        }
        ch.send(bb);
    }

    private List<NestedMessage> defragReceiveMsg(Channel<E> lowerCh, ByteBuffer newData) {
        ArrayList<NestedMessage> ret = new ArrayList<NestedMessage>();
        if (newData.remaining() == 0) {
            logger.info("newData contains no data");
            return ret;
        }
        try {
            newData.mark();
            ByteBuffer bb = this.remainedMsgs.get(lowerCh);
            if (bb == null) {
                bb = newData;
            } else {
                ByteBufferUtil.flop(bb);
                bb = ByteBufferUtil.put(bb, newData);
                ByteBufferUtil.flip(bb);
            }
            while (true) {
                int llen;
                try {
                    llen = NestedMessage.checkAndGetMessageLen(bb);
                }
                catch (NotEnoughMessageException e) {
                    logger.info("", (Throwable)e);
                    this.remainedMsgs.put(lowerCh, bb);
                    return ret;
                }
                catch (InvalidMessageException e) {
                    logger.error("", (Throwable)e);
                    this.remainedMsgs.remove(lowerCh);
                    return ret;
                }
                int plen = bb.remaining();
                if (llen > plen) {
                    this.remainedMsgs.put(lowerCh, bb);
                    return ret;
                }
                if (llen == plen) {
                    this.remainedMsgs.remove(lowerCh);
                    NestedMessage nmsg = NestedMessage.deserialize(bb);
                    ret.add(nmsg);
                    return ret;
                }
                logger.debug("llen < plen case: {} bytes remained", (Object)(plen - llen));
                byte[] b = Arrays.copyOfRange(bb.array(), bb.position() + llen, bb.limit());
                ByteBuffer newbb = ByteBuffer.wrap(b);
                newbb.mark();
                this.remainedMsgs.put(lowerCh, newbb);
                bb.limit(bb.position() + llen);
                NestedMessage nmsg = NestedMessage.deserialize(bb);
                ret.add(nmsg);
                bb = newbb;
            }
        }
        catch (BinaryJsonabilityException e) {
            logger.error("", (Throwable)e);
            return ret;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onReceive(Channel<E> lowerCh) {
        this.peer.concatPeerId2ThreadName();
        if (GTransConfigValues.ALLOW_REF_SEND_IN_BASE_TRANSPORT && ((RawTransport)this.getLowerTransport()).canSendNormalObject()) {
            NestedMessage nmsg = null;
            OneToOneMappingTransport.OneToOneChannel<E> ch = null;
            Channel<E> channel = lowerCh;
            synchronized (channel) {
                nmsg = (NestedMessage)lowerCh.receive();
                if (nmsg == null) {
                    if (this.isActive) {
                        logger.error("null message received");
                    }
                    return;
                }
                ch = this._putReceiveQueue(lowerCh, nmsg);
            }
            OneToOneMappingTransport.OneToOneChannel<E> fch = ch;
            if (fch != null) {
                NestedMessage fnmsg = nmsg;
                this.peer.execute(() -> {
                    this.peer.concatPeerId2ThreadName();
                    this._onReceive(fch, fnmsg);
                });
            }
        } else {
            ArrayList<ReceiveChannelAndMessage> dnmsgs = new ArrayList<ReceiveChannelAndMessage>();
            Channel<E> channel = lowerCh;
            synchronized (channel) {
                if (lowerCh.isClosed()) {
                    return;
                }
                ByteBuffer bbuf = (ByteBuffer)lowerCh.receive();
                if (bbuf == null) {
                    if (this.isActive) {
                        logger.error("null message received");
                    }
                    return;
                }
                List<NestedMessage> nmsgs = this.defragReceiveMsg(lowerCh, bbuf);
                for (NestedMessage nmsg : nmsgs) {
                    if (nmsg.srcPeerId == null) {
                        this.peer.execute(() -> {
                            this.peer.concatPeerId2ThreadName();
                            lowerCh.close();
                            this._onReceive(this.getLowerTransport(), nmsg);
                        });
                        continue;
                    }
                    OneToOneMappingTransport.OneToOneChannel<E> ch = this._putReceiveQueue(lowerCh, nmsg);
                    if (ch == null) continue;
                    dnmsgs.add(new ReceiveChannelAndMessage(ch, nmsg));
                }
                for (ReceiveChannelAndMessage dnmsg : dnmsgs) {
                    this.peer.execute(() -> {
                        this.peer.concatPeerId2ThreadName();
                        this._onReceive(dnmsg.ch, dnmsg.nmsg);
                    });
                }
            }
        }
    }

    class ReceiveChannelAndMessage {
        public OneToOneMappingTransport.OneToOneChannel<E> ch;
        public NestedMessage nmsg;

        public ReceiveChannelAndMessage(OneToOneMappingTransport.OneToOneChannel<E> ch, NestedMessage nmsg) {
            this.ch = ch;
            this.nmsg = nmsg;
        }
    }
}

