/*
 * Decompiled with CFR 0.152.
 */
package org.piax.gtrans.netty.loctrans;

import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.piax.common.Endpoint;
import org.piax.common.ObjectId;
import org.piax.common.TransportId;
import org.piax.gtrans.Channel;
import org.piax.gtrans.NetworkTimeoutException;
import org.piax.gtrans.netty.NettyEndpoint;
import org.piax.gtrans.netty.NettyMessage;
import org.piax.gtrans.netty.loctrans.NettyChannelTransport;
import org.piax.gtrans.netty.loctrans.NettyRawChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NettyChannel<E extends NettyEndpoint>
implements Channel<E> {
    final ObjectId localObjectId;
    final ObjectId remoteObjectId;
    final E channelInitiator;
    final E dst;
    NettyRawChannel<E> raw;
    final boolean isCreator;
    final NettyChannelTransport<E> trans;
    private final BlockingQueue<Object> rcvQueue;
    final int id;
    boolean isClosed;
    private static final Logger logger = LoggerFactory.getLogger((String)NettyChannel.class.getName());
    private static final Object EOF = new Object();

    public NettyChannel(int channelNo, E channelInitiator, E destination, ObjectId localObjectId, ObjectId remoteObjectId, boolean isCreator, NettyRawChannel<E> raw, NettyChannelTransport<E> trans) {
        this.id = channelNo;
        this.channelInitiator = channelInitiator;
        this.dst = destination;
        this.localObjectId = localObjectId;
        this.remoteObjectId = remoteObjectId;
        this.isCreator = isCreator;
        this.raw = raw;
        this.trans = trans;
        this.isClosed = false;
        this.rcvQueue = new LinkedBlockingQueue<Object>();
    }

    @Override
    public void close() {
        if (this.raw.getStat() == NettyRawChannel.Stat.RUN) {
            try {
                this.send(null);
            }
            catch (IOException e) {
                logger.warn("Exception occured while closing channel to {}.", this.dst);
            }
        }
        this.trans.deleteChannel(this);
        this.isClosed = true;
    }

    @Override
    public boolean isClosed() {
        return this.isClosed;
    }

    @Override
    public TransportId getTransportId() {
        return this.raw.getTransportId();
    }

    @Override
    public int getChannelNo() {
        return this.id;
    }

    @Override
    public E getLocal() {
        return (E)this.trans.getEndpoint();
    }

    @Override
    public ObjectId getLocalObjectId() {
        return this.localObjectId;
    }

    @Override
    public E getRemote() {
        return this.dst;
    }

    @Override
    public ObjectId getRemoteObjectId() {
        return this.remoteObjectId;
    }

    @Override
    public boolean isDuplex() {
        return this.raw.isDuplex();
    }

    @Override
    public boolean isCreatorSide() {
        logger.debug("{}={}?", this.channelInitiator, this.trans.ep);
        return this.channelInitiator.equals(this.trans.ep);
    }

    public E getChannelInitiator() {
        return this.channelInitiator;
    }

    @Override
    public void send(Object msg) throws IOException {
        Endpoint src = this.trans.getEndpoint();
        this.trans.channelSendHook(src, (Endpoint)this.dst);
        NettyMessage<Endpoint> nmsg = new NettyMessage<Endpoint>(this.remoteObjectId, src, (Endpoint)this.dst, (Endpoint)this.getChannelInitiator(), this.trans.getPeerId(), msg, true, this.getChannelNo());
        if (!this.raw.isClosed()) {
            logger.debug("ch {}{} send {} from {} to {}", new Object[]{this.getChannelNo(), this.getChannelInitiator(), msg, this.trans.ep, this.getRemote()});
        } else {
            logger.debug("re-creating the raw channel for {}", (Object)this.getRemote());
            this.raw = this.trans.getRawCreateAsClient(this.getRemote(), nmsg);
        }
        if (this.raw != null) {
            this.raw.send(nmsg);
        }
    }

    protected void putReceiveQueue(Object msg) {
        try {
            if (msg == null) {
                this.rcvQueue.put(EOF);
            } else {
                this.rcvQueue.put(msg);
            }
        }
        catch (InterruptedException ignore) {
            ignore.printStackTrace();
        }
    }

    @Override
    public Object receive() {
        Object msg = this.rcvQueue.poll();
        logger.debug("ch {} received {} on {} thread={}", new Object[]{this.getChannelNo(), msg, this.trans.ep, Thread.currentThread()});
        if (msg == EOF) {
            logger.debug("ch {} received EOF on {}", new Object[]{this.getChannelNo(), msg, this.trans.ep});
            this.isClosed = true;
            this.trans.deleteChannel(this);
            return null;
        }
        return msg;
    }

    @Override
    public Object receive(int timeout) throws NetworkTimeoutException {
        Object msg;
        block4: {
            try {
                msg = this.rcvQueue.poll(timeout, TimeUnit.MILLISECONDS);
                logger.debug("ch received(with timeout) {} on {} thread={}", new Object[]{msg, this, Thread.currentThread()});
                if (msg != EOF) break block4;
                logger.debug("ch {} received EOF on {}", new Object[]{this.getChannelNo(), msg, this.trans.ep});
                this.isClosed = true;
                this.trans.deleteChannel(this);
                return null;
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return null;
            }
        }
        if (msg == null) {
            throw new NetworkTimeoutException("ch.receive timed out");
        }
        return msg;
    }

    public String toString() {
        return String.valueOf(this.getRemote().toString()) + ":" + this.getLocalObjectId() + ":" + this.getChannelNo();
    }
}

