/*
 * Decompiled with CFR 0.152.
 */
package org.epics.ca.impl;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.epics.ca.impl.ContextImpl;
import org.epics.ca.impl.Header;
import org.epics.ca.impl.Messages;
import org.epics.ca.impl.ResponseHandlers;
import org.epics.ca.impl.Transport;
import org.epics.ca.impl.TransportClient;
import org.epics.ca.impl.reactor.ReactorHandler;
import org.epics.ca.util.ResettableLatch;

public class TCPTransport
implements Transport,
ReactorHandler,
Runnable {
    private static final Logger logger = Logger.getLogger(TCPTransport.class.getName());
    private final AtomicBoolean closed = new AtomicBoolean();
    private final ContextImpl context;
    private final ResponseHandlers.ResponseHandler responseHandler;
    private final SocketChannel channel;
    private final InetSocketAddress socketAddress;
    private final int priority;
    private ByteBuffer receiveBuffer;
    private static final int FLOW_CONTROL_BUFFER_FULL_COUNT_LIMIT = 4;
    private final AtomicBoolean flowControlState = new AtomicBoolean();
    private final AtomicReference<Boolean> flowControlChangeRequest = new AtomicReference();
    private final short remoteTransportRevision;
    private final Set<TransportClient> owners = new HashSet<TransportClient>();
    private static final int INITIAL_RX_BUFFER_SIZE = 64000;
    private static final int INITIAL_TX_BUFFER_SIZE = 1024;
    private final Header header = new Header();
    private final Lock sendBufferLock = new ReentrantLock();
    private ByteBuffer sendBuffer;
    private int lastSendBufferPosition = 0;
    private final ScheduledFuture<?> echoTimer;
    private final ResettableLatch sendCompletedLatch = new ResettableLatch(1);
    private int startPosition;

    public TCPTransport(ContextImpl context, TransportClient client, ResponseHandlers.ResponseHandler responseHandler, SocketChannel channel, short remoteTransportRevision, int priority) {
        this.context = context;
        this.responseHandler = responseHandler;
        this.channel = channel;
        this.remoteTransportRevision = remoteTransportRevision;
        this.priority = priority;
        this.socketAddress = (InetSocketAddress)channel.socket().getRemoteSocketAddress();
        this.receiveBuffer = ByteBuffer.allocateDirect(64000);
        this.sendBuffer = ByteBuffer.allocateDirect(1024);
        this.acquire(client);
        long echoPeriod = (long)(context.getConnectionTimeout() * 1000.0f);
        this.echoTimer = echoPeriod >= 0L ? context.getScheduledExecutor().scheduleWithFixedDelay(this, 0L, echoPeriod, TimeUnit.MILLISECONDS) : null;
        context.getTransportRegistry().put(this.socketAddress, this);
    }

    public void close(boolean remotelyClosed) {
        if (this.closed.getAndSet(true)) {
            return;
        }
        if (this.echoTimer != null) {
            this.echoTimer.cancel(false);
        }
        this.context.getTransportRegistry().remove(this.socketAddress, this.priority);
        if (!remotelyClosed) {
            this.flush();
        }
        this.closedNotifyClients();
        logger.log(Level.FINER, "Connection to " + this.socketAddress + " closed.");
        this.context.getReactor().unregisterAndClose(this.channel);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void closedNotifyClients() {
        TransportClient[] clients;
        Set<TransportClient> set = this.owners;
        synchronized (set) {
            int refs = this.owners.size();
            if (refs == 0) {
                return;
            }
            logger.log(Level.FINE, "Transport to " + this.socketAddress + " still has " + refs + " client(s) active and closing...");
            clients = new TransportClient[refs];
            this.owners.toArray(clients);
            this.owners.clear();
        }
        for (int i = 0; i < clients.length; ++i) {
            try {
                clients[i].transportClosed();
                continue;
            }
            catch (Throwable th) {
                logger.log(Level.SEVERE, "Unexpected exception caught while calling TransportClient.transportClosed().", th);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean acquire(TransportClient client) {
        if (this.closed.get()) {
            return false;
        }
        logger.log(Level.FINER, "Acquiring transport to " + this.socketAddress + ".");
        Set<TransportClient> set = this.owners;
        synchronized (set) {
            if (this.closed.get()) {
                return false;
            }
            this.owners.add(client);
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void release(TransportClient client) {
        if (this.closed.get()) {
            return;
        }
        logger.log(Level.FINER, "Releasing transport to " + this.socketAddress + ".");
        Set<TransportClient> set = this.owners;
        synchronized (set) {
            this.owners.remove(client);
            if (this.owners.size() == 0) {
                this.close(false);
            }
        }
    }

    @Override
    public short getMinorRevision() {
        return this.remoteTransportRevision;
    }

    @Override
    public void handleEvent(SelectionKey key) {
        if (key.isValid() && key.isReadable()) {
            this.processRead();
        }
        if (key.isValid() && key.isWritable()) {
            this.processWrite();
        }
    }

    protected void processRead() {
        try {
            this.receiveBuffer.limit(this.receiveBuffer.capacity());
            int bufferFullCount = 0;
            while (!this.closed.get()) {
                logger.log(Level.FINEST, "About to read into buffer starting at pos: " + String.valueOf(this.receiveBuffer.position()));
                int bytesRead = this.channel.read(this.receiveBuffer);
                logger.log(Level.FINEST, "Read #bytes from channel: " + String.valueOf(bytesRead));
                if (bytesRead < 0) {
                    logger.log(Level.FINEST, "End of stream ");
                    this.close(true);
                    return;
                }
                if (bytesRead == 0) {
                    bufferFullCount = 0;
                    logger.log(Level.FINEST, "Disabling flow control...");
                    this.disableFlowControl();
                    break;
                }
                logger.log(Level.FINEST, "Received " + bytesRead + " bytes from " + this.socketAddress + ".");
                if (this.receiveBuffer.hasRemaining()) {
                    bufferFullCount = 0;
                    logger.log(Level.FINEST, "Disabling flow control...");
                    this.disableFlowControl();
                } else if (bufferFullCount >= 4) {
                    logger.log(Level.FINEST, "Enabling flow control...");
                    this.enableFlowControl();
                } else {
                    ++bufferFullCount;
                }
                logger.log(Level.FINEST, "Flipping buffer.");
                this.receiveBuffer.flip();
                logger.log(Level.FINEST, "ReceiveBuffer now has #bytes: " + String.valueOf(this.receiveBuffer.remaining()));
                logger.log(Level.FINEST, "CA: processing new data...");
                this.processReadBuffer();
            }
        }
        catch (IOException ioex) {
            logger.log(Level.FINEST, "CA: socket exception. Closing connection.");
            this.close(true);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void processReadBuffer() {
        int lastMessageStartPosition = 0;
        int lastMessageBytesAvailable = 0;
        logger.log(Level.FINEST, "\n\nProcessing READ buffer from thread: " + Thread.currentThread());
        while (!this.closed.get()) {
            lastMessageStartPosition = this.receiveBuffer.position();
            lastMessageBytesAvailable = this.receiveBuffer.remaining();
            logger.log(Level.FINEST, "Processing NEXT loop iteration...");
            logger.log(Level.FINEST, "- lastMessagePosition = " + String.valueOf(lastMessageStartPosition));
            logger.log(Level.FINEST, "- lastMessageBytesAvailable = " + String.valueOf(lastMessageBytesAvailable));
            if (lastMessageBytesAvailable < 16) {
                logger.log(Level.FINEST, "Not enough bytes for normal header - breaking from loop.");
                break;
            }
            if (!this.header.read(this.receiveBuffer)) {
                logger.log(Level.FINEST, "Not enough bytes for extended header - breaking from loop.");
                break;
            }
            if (this.receiveBuffer.remaining() < this.header.payloadSize) {
                logger.log(Level.FINEST, "Not enough bytes for payload: " + String.valueOf(this.header.payloadSize));
                if (this.header.payloadSize <= this.receiveBuffer.capacity() - 24) break;
                logger.log(Level.FINEST, "Not enough room to read payload: need to resize buffer!");
                int PAGE_SIZE = 4096;
                int newSize = (this.header.payloadSize + 24 & 0xFFFFF000) + 4096;
                int maxBufferSize = this.context.getMaxArrayBytes();
                if (maxBufferSize > 0 && newSize > maxBufferSize) {
                    logger.log(Level.SEVERE, "Received payload size (" + this.header.payloadSize + ") is larger than configured maximum array size (" + maxBufferSize + "), disconnecting from " + this.socketAddress + ".");
                    this.close(true);
                    return;
                }
                ByteBuffer newBuffer = ByteBuffer.allocateDirect(newSize);
                this.receiveBuffer.position(lastMessageStartPosition);
                newBuffer.put(this.receiveBuffer);
                this.receiveBuffer = newBuffer;
                return;
            }
            int endOfMessage = this.receiveBuffer.position() + this.header.payloadSize;
            try {
                logger.log(Level.FINEST, "Processing message starting at position:" + String.valueOf(this.receiveBuffer.position()));
                logger.log(Level.FINEST, "Payload size is: " + String.valueOf(this.header.payloadSize));
                this.responseHandler.handleResponse(this.socketAddress, this, this.header, this.receiveBuffer);
            }
            catch (Throwable th) {
                logger.log(Level.WARNING, th, () -> "Unexpected exception caught while processing CA message over TCP from " + this.socketAddress);
            }
            finally {
                this.receiveBuffer.position(endOfMessage);
            }
        }
        logger.log(Level.FINEST, "Checking for any remaining bytes.");
        int unprocessedBytes = this.receiveBuffer.limit() - lastMessageStartPosition;
        if (unprocessedBytes > 0) {
            logger.log(Level.FINEST, "- moving remaining bytes to start of buffer. Unprocessed bytes = " + String.valueOf(unprocessedBytes));
            if (unprocessedBytes < 1024) {
                logger.log(Level.FINEST, "- using copy algorithm 1");
                for (int i = 0; i < unprocessedBytes; ++i) {
                    this.receiveBuffer.put(i, this.receiveBuffer.get(lastMessageStartPosition++));
                }
                this.receiveBuffer.position(unprocessedBytes);
            } else {
                logger.log(Level.FINEST, "- using copy algorithm 2");
                this.receiveBuffer.position(lastMessageStartPosition);
                ByteBuffer remainingBuffer = this.receiveBuffer.slice();
                this.receiveBuffer.position(0);
                this.receiveBuffer.put(remainingBuffer);
            }
        } else {
            logger.log(Level.FINEST, "No remaining bytes to copy.");
            this.receiveBuffer.position(0);
        }
        this.receiveBuffer.limit(this.receiveBuffer.capacity());
        logger.log(Level.FINEST, "Done with read processing for now. Buffer Position is: " + String.valueOf(this.receiveBuffer.position()));
    }

    protected void processWrite() {
    }

    protected void disableFlowControl() {
        if (this.flowControlState.getAndSet(false)) {
            this.flowControlChangeRequest.set(Boolean.FALSE);
            this.flush();
        }
    }

    protected void enableFlowControl() {
        if (!this.flowControlState.getAndSet(true)) {
            this.flowControlChangeRequest.set(Boolean.TRUE);
            this.flush();
        }
    }

    private void noSyncSend(ByteBuffer buffer) throws IOException {
        try {
            int SEND_BUFFER_LIMIT = 64000;
            int bufferLimit = buffer.limit();
            logger.log(Level.FINEST, "Sending " + bufferLimit + " bytes to " + this.socketAddress + ".");
            int parts = (buffer.limit() - 1) / 64000 + 1;
            block4: for (int part = 1; part <= parts; ++part) {
                if (parts > 1) {
                    buffer.limit(Math.min(part * 64000, bufferLimit));
                    logger.log(Level.FINEST, "[Parted] Sending (part " + part + "/" + parts + ") " + (buffer.limit() - buffer.position()) + " bytes to " + this.socketAddress + ".");
                }
                int tries = 0;
                while (true) {
                    int bytesSent;
                    if ((bytesSent = this.channel.write(buffer)) < 0) {
                        throw new IOException("bytesSent < 0");
                    }
                    if (buffer.position() == buffer.limit()) continue block4;
                    if (this.closed.get()) {
                        throw new IOException("transport closed on the client side");
                    }
                    int WARNING_MESSAGE_TRIES = 10;
                    if (tries >= 10) {
                        logger.log(Level.WARNING, "Failed to send message to " + this.socketAddress + " - buffer full, will retry.");
                    }
                    logger.log(Level.FINEST, "Send buffer full for " + this.socketAddress + ", waiting...");
                    try {
                        Thread.sleep(Math.min(15000, 10 + tries * 100));
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                    }
                    ++tries;
                }
            }
        }
        catch (IOException ioex) {
            this.close(true);
            throw ioex;
        }
    }

    @Override
    public ContextImpl getContext() {
        return this.context;
    }

    @Override
    public ByteBuffer acquireSendBuffer(int requiredSize) {
        if (this.closed.get()) {
            throw new RuntimeException("transport closed");
        }
        this.sendBufferLock.lock();
        this.lastSendBufferPosition = this.sendBuffer.position();
        if (this.sendBuffer.remaining() >= requiredSize) {
            return this.sendBuffer;
        }
        try {
            this.flush(true);
        }
        catch (Throwable th) {
            this.sendBufferLock.unlock();
            throw th;
        }
        if (this.sendBuffer.capacity() < requiredSize) {
            int PAGE_SIZE = 4096;
            int newSize = (requiredSize + 16 & 0xFFFFF000) + 4096;
            int maxBufferSize = this.context.getMaxArrayBytes();
            if (maxBufferSize > 0 && newSize > maxBufferSize) {
                throw new RuntimeException("requiredSize > maxArrayBytes");
            }
            try {
                this.sendBuffer = ByteBuffer.allocate(newSize);
                this.clearSendBuffer();
            }
            catch (Throwable th) {
                this.sendBufferLock.unlock();
                throw th;
            }
        }
        this.lastSendBufferPosition = this.sendBuffer.position();
        return this.sendBuffer;
    }

    private ByteBuffer acquireSendBufferNoBlocking(int requiredSize, long time, TimeUnit timeUnit) {
        if (this.closed.get()) {
            throw new RuntimeException("transport closed");
        }
        try {
            if (!this.sendBufferLock.tryLock(time, timeUnit)) {
                return null;
            }
        }
        catch (InterruptedException e) {
            return null;
        }
        this.lastSendBufferPosition = this.sendBuffer.position();
        if (this.sendBuffer.remaining() >= requiredSize) {
            return this.sendBuffer;
        }
        if (this.sendBuffer.capacity() < requiredSize) {
            throw new RuntimeException("sendBuffer.capacity() < requiredSize");
        }
        return null;
    }

    @Override
    public void releaseSendBuffer(boolean ignore, boolean flush) {
        try {
            if (ignore) {
                this.sendBuffer.position(this.lastSendBufferPosition);
            } else if (flush) {
                this.flush();
            }
        }
        finally {
            this.sendBufferLock.unlock();
        }
    }

    @Override
    public void flush() {
        this.flush(false);
    }

    private final void clearSendBuffer() {
        this.sendBuffer.clear();
        this.sendBuffer.position(16);
        this.startPosition = this.sendBuffer.position();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void flush(boolean wait) {
        this.sendBufferLock.lock();
        try {
            Boolean insertFlowControlMessage = this.flowControlChangeRequest.getAndSet(null);
            if (insertFlowControlMessage != null) {
                long offOn = insertFlowControlMessage != false ? 0x8000000000000L : 0x9000000000000L;
                this.sendBuffer.putLong(0, offOn);
                this.sendBuffer.putLong(8, 0L);
                this.startPosition = 0;
            }
            this.sendBuffer.limit(this.sendBuffer.position());
            this.sendBuffer.position(this.startPosition);
            this.noSyncSend(this.sendBuffer);
            this.clearSendBuffer();
        }
        catch (IOException e1) {
            e1.printStackTrace();
        }
        finally {
            this.sendBufferLock.unlock();
        }
    }

    @Override
    public InetSocketAddress getRemoteAddress() {
        return this.socketAddress;
    }

    @Override
    public int getPriority() {
        return this.priority;
    }

    @Override
    public void run() {
        ByteBuffer buffer = this.acquireSendBufferNoBlocking(16, 1L, TimeUnit.SECONDS);
        if (buffer != null) {
            Messages.generateEchoMessage(this, buffer);
            this.releaseSendBuffer(false, true);
        }
    }
}

