/*
 * Decompiled with CFR 0.152.
 */
package org.netcrusher.tcp;

import java.io.EOFException;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import org.netcrusher.core.meter.RateMeter;
import org.netcrusher.core.meter.RateMeterImpl;
import org.netcrusher.core.nio.NioUtils;
import org.netcrusher.core.nio.SelectionKeyControl;
import org.netcrusher.core.reactor.NioReactor;
import org.netcrusher.core.state.BitState;
import org.netcrusher.tcp.TcpQueue;
import org.netcrusher.tcp.TcpQueueBuffers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class TcpChannel {
    private static final Logger LOGGER = LoggerFactory.getLogger(TcpChannel.class);
    private static final long LINGER_PERIOD_NS = TimeUnit.MILLISECONDS.toNanos(500L);
    private final String name;
    private final NioReactor reactor;
    private final Runnable ownerClose;
    private final SocketChannel channel;
    private final SelectionKeyControl selectionKeyControl;
    private final TcpQueue incomingQueue;
    private final TcpQueue outgoingQueue;
    private final Meters meters;
    private final State state;
    private final Queue<Runnable> postOperations;
    private TcpChannel other;

    TcpChannel(String name, NioReactor reactor, Runnable ownerClose, SocketChannel channel, TcpQueue incomingQueue, TcpQueue outgoingQueue) throws IOException {
        this.name = name;
        this.reactor = reactor;
        this.ownerClose = ownerClose;
        this.channel = channel;
        this.postOperations = new LinkedList<Runnable>();
        this.incomingQueue = incomingQueue;
        this.outgoingQueue = outgoingQueue;
        this.meters = new Meters();
        SelectionKey selectionKey = reactor.getSelector().register(channel, 0, this::callback);
        this.selectionKeyControl = new SelectionKeyControl(selectionKey);
        this.state = new State(State.FROZEN);
    }

    void close() {
        this.reactor.getSelector().execute(() -> {
            if (this.state.not(State.CLOSED)) {
                long incomingBytes;
                if (this.state.is(State.OPEN)) {
                    this.freeze();
                }
                if (this.meters.sentBytes.getTotalCount() > 0L) {
                    NioUtils.close(this.channel);
                } else {
                    NioUtils.closeNoLinger(this.channel);
                }
                this.state.set(State.CLOSED);
                if (LOGGER.isDebugEnabled() && (incomingBytes = this.incomingQueue.calculateReadableBytes()) > 0L) {
                    LOGGER.debug("Channel {} has {} incoming bytes when closing", (Object)this.name, (Object)incomingBytes);
                }
                return true;
            }
            return false;
        });
    }

    private void closeAll() {
        this.close();
        this.ownerClose.run();
    }

    private void closeAllDeferred() {
        this.reactor.getSelector().schedule(this::closeAll, LINGER_PERIOD_NS);
    }

    private void closeEOF() {
        this.state.setReadEof(true);
        this.other.postOperations.add(() -> this.other.shutdownWrite());
        this.other.processPostOperations();
        if (this.other.state.isReadEof() && !this.incomingQueue.hasReadable() && !this.outgoingQueue.hasReadable()) {
            this.closeAllDeferred();
        }
    }

    private void closeLocal() {
        this.close();
        this.other.postOperations.add(() -> this.other.closeAll());
        this.other.processPostOperations();
    }

    private void callback(SelectionKey selectionKey) throws IOException {
        try {
            if (selectionKey.isWritable()) {
                this.handleWritableEvent(false);
            }
        }
        catch (ClosedChannelException e) {
            LOGGER.debug("Channel closed on write {}", (Object)this.name);
            this.closeLocal();
        }
        catch (Exception e) {
            LOGGER.error("Exception on {}", (Object)this.name, (Object)e);
            this.closeAll();
        }
        try {
            if (selectionKey.isReadable()) {
                this.handleReadableEvent();
            }
        }
        catch (EOFException e) {
            LOGGER.debug("EOF on transfer on {}", (Object)this.name);
            this.closeEOF();
        }
        catch (ClosedChannelException e) {
            LOGGER.debug("Channel closed on {}", (Object)this.name);
            this.closeLocal();
        }
        catch (Exception e) {
            LOGGER.error("Exception on {}", (Object)this.name, (Object)e);
            this.closeAll();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleWritableEvent(boolean forced) throws IOException {
        TcpQueue queue = this.incomingQueue;
        while (this.state.isWritable()) {
            long sent;
            TcpQueueBuffers queueBuffers = queue.requestReadableBuffers();
            if (queueBuffers.isEmpty()) {
                if (queueBuffers.getDelayNs() > 0L) {
                    this.throttleSend(queueBuffers.getDelayNs());
                    break;
                }
                this.selectionKeyControl.disableWrites();
                break;
            }
            try {
                sent = this.channel.write(queueBuffers.getArray(), queueBuffers.getOffset(), queueBuffers.getCount());
            }
            finally {
                queue.releaseReadableBuffers();
            }
            if (sent == 0L) break;
            if (LOGGER.isTraceEnabled()) {
                LOGGER.trace("Written {} bytes to {}", (Object)sent, (Object)this.name);
            }
            this.meters.sentBytes.update(sent);
        }
        this.other.suggestDeferredRead();
        this.processPostOperations();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleReadableEvent() throws IOException {
        TcpQueue queue = this.outgoingQueue;
        while (this.state.isReadable()) {
            long read;
            TcpQueueBuffers queueBuffers = queue.requestWritableBuffers();
            if (queueBuffers.isEmpty()) {
                this.selectionKeyControl.disableReads();
                break;
            }
            try {
                read = this.channel.read(queueBuffers.getArray(), queueBuffers.getOffset(), queueBuffers.getCount());
            }
            finally {
                queue.releaseWritableBuffers();
            }
            if (read < 0L) {
                if (this.channel.isOpen()) {
                    this.selectionKeyControl.disableReads();
                    throw new EOFException();
                }
                throw new ClosedChannelException();
            }
            if (read == 0L) break;
            if (LOGGER.isTraceEnabled()) {
                LOGGER.trace("Read {} bytes from {}", (Object)read, (Object)this.name);
            }
            this.meters.readBytes.update(read);
            this.other.suggestImmediateSent();
        }
        this.other.suggestDeferredSent();
    }

    private void processPostOperations() {
        if (!this.incomingQueue.hasReadable() && this.other.state.isReadEof()) {
            while (!this.postOperations.isEmpty()) {
                Runnable operation = this.postOperations.poll();
                operation.run();
            }
        }
    }

    private void shutdownWrite() {
        if (this.channel.isOpen()) {
            try {
                this.channel.shutdownOutput();
            }
            catch (IOException e) {
                LOGGER.error("Fail to shutdown output", e);
            }
        }
    }

    private void suggestDeferredRead() {
        if (this.outgoingQueue.hasWritable() && this.state.isReadable()) {
            this.selectionKeyControl.enableReads();
        }
    }

    private void suggestDeferredSent() {
        if (this.incomingQueue.hasReadable() && this.state.isWritable()) {
            this.selectionKeyControl.enableWrites();
        }
    }

    private void suggestImmediateSent() throws IOException {
        if (this.incomingQueue.hasReadable() && this.state.isWritable()) {
            this.handleWritableEvent(true);
        }
    }

    void freeze() {
        if (this.state.is(State.OPEN)) {
            if (this.selectionKeyControl.isValid()) {
                this.selectionKeyControl.setNone();
            }
            this.state.set(State.FROZEN);
        } else {
            LOGGER.warn("Freezing while not open");
        }
    }

    void unfreeze() {
        if (this.state.is(State.FROZEN)) {
            if (this.incomingQueue.hasReadable()) {
                this.selectionKeyControl.setAll();
            } else {
                this.selectionKeyControl.setReadsOnly();
            }
            this.state.set(State.OPEN);
        } else {
            LOGGER.warn("Unfreezing while not frozen");
        }
    }

    boolean isFrozen() {
        return this.state.isAnyOf(State.CLOSED | State.FROZEN);
    }

    private void throttleSend(long delayNs) {
        if (this.state.is(State.OPEN) && !this.state.isSendThrottled()) {
            if (LOGGER.isTraceEnabled()) {
                LOGGER.trace("Channel {} is throttled on {}ns", (Object)this.name, (Object)delayNs);
            }
            this.state.setSendThrottled(true);
            if (this.selectionKeyControl.isValid()) {
                this.selectionKeyControl.disableWrites();
            }
            this.reactor.getSelector().schedule(this::unthrottleSend, delayNs);
        }
    }

    private void unthrottleSend() {
        if (this.state.is(State.OPEN) && this.state.isSendThrottled()) {
            if (LOGGER.isTraceEnabled()) {
                LOGGER.trace("Channel {} is unthrottled", (Object)this.name);
            }
            this.state.setSendThrottled(false);
            if (this.selectionKeyControl.isValid()) {
                this.selectionKeyControl.enableWrites();
            }
        }
    }

    void setOther(TcpChannel other) {
        this.other = other;
    }

    RateMeter getReadBytesMeter() {
        return this.meters.readBytes;
    }

    RateMeter getSentBytesMeter() {
        return this.meters.sentBytes;
    }

    private static final class Meters {
        private final RateMeterImpl readBytes = new RateMeterImpl();
        private final RateMeterImpl sentBytes = new RateMeterImpl();

        private Meters() {
        }
    }

    private static final class State
    extends BitState {
        private static final int OPEN = State.bit(0);
        private static final int FROZEN = State.bit(1);
        private static final int CLOSED = State.bit(2);
        private boolean readEof = false;
        private boolean sendThrottled = false;

        private State(int state) {
            super(state);
        }

        private void setReadEof(boolean readEof) {
            this.readEof = readEof;
        }

        private boolean isReadEof() {
            return this.is(CLOSED) || this.readEof;
        }

        private boolean isSendThrottled() {
            return this.sendThrottled;
        }

        private void setSendThrottled(boolean sendThrottled) {
            this.sendThrottled = sendThrottled;
        }

        private boolean isWritable() {
            return this.is(OPEN) && !this.sendThrottled;
        }

        private boolean isReadable() {
            return this.is(OPEN) && !this.readEof;
        }
    }
}

