/*
 * Decompiled with CFR 0.152.
 */
package one.jpro.platform.auth.core.http.impl;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import one.jpro.platform.auth.core.http.HttpOptions;
import one.jpro.platform.auth.core.http.impl.ByteTokenizer;
import one.jpro.platform.auth.core.http.impl.Cancellable;
import one.jpro.platform.auth.core.http.impl.Handler;
import one.jpro.platform.auth.core.http.impl.Header;
import one.jpro.platform.auth.core.http.impl.Request;
import one.jpro.platform.auth.core.http.impl.RequestParser;
import one.jpro.platform.auth.core.http.impl.Response;
import one.jpro.platform.auth.core.http.impl.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ConnectionEventLoop {
    private static final Logger logger = LoggerFactory.getLogger(ConnectionEventLoop.class);
    private final HttpOptions options;
    private final Handler handler;
    private final AtomicLong connectionCounter;
    private final AtomicBoolean stop;
    private final Scheduler scheduler;
    private final Queue<Runnable> taskQueue;
    private final ByteBuffer buffer;
    private final Selector selector;
    private final Thread thread;

    ConnectionEventLoop(HttpOptions options, Handler handler, AtomicLong connectionCounter, AtomicBoolean stop) throws IOException {
        this.options = options;
        this.handler = handler;
        this.connectionCounter = connectionCounter;
        this.stop = stop;
        this.scheduler = new Scheduler();
        this.taskQueue = new ConcurrentLinkedQueue<Runnable>();
        this.buffer = ByteBuffer.allocateDirect(options.getReadBufferSize());
        this.selector = Selector.open();
        this.thread = new Thread(this::run, "connection-event-loop");
        this.thread.setDaemon(true);
    }

    int numConnections() {
        return this.selector.keys().size();
    }

    void start() {
        this.thread.start();
    }

    void join() throws InterruptedException {
        this.thread.join();
    }

    private void run() {
        try {
            this.doStart();
        }
        catch (IOException e) {
            this.stop.set(true);
        }
    }

    private void doStart() throws IOException {
        while (!this.stop.get()) {
            Runnable task;
            this.selector.select(this.options.getResolution().toMillis());
            Set<SelectionKey> selectedKeys = this.selector.selectedKeys();
            Iterator<SelectionKey> it = selectedKeys.iterator();
            while (it.hasNext()) {
                SelectionKey selKey = it.next();
                if (selKey.isReadable()) {
                    ((Connection)selKey.attachment()).onReadable();
                } else if (selKey.isWritable()) {
                    ((Connection)selKey.attachment()).onWritable();
                }
                it.remove();
            }
            this.scheduler.expired().forEach(Runnable::run);
            while ((task = this.taskQueue.poll()) != null) {
                task.run();
            }
        }
    }

    void register(SocketChannel socketChannel) {
        this.taskQueue.add(() -> {
            try {
                this.doRegister(socketChannel);
            }
            catch (IOException ex) {
                logger.error("Error on registering a new socket channel", (Throwable)ex);
                try {
                    socketChannel.close();
                }
                catch (IOException iOException) {
                    // empty catch block
                }
            }
        });
        this.selector.wakeup();
    }

    private void doRegister(SocketChannel socketChannel) throws IOException {
        socketChannel.configureBlocking(false);
        SelectionKey selectionKey = socketChannel.register(this.selector, 1);
        Connection connection = new Connection(socketChannel, selectionKey);
        selectionKey.attach(connection);
    }

    private class Connection {
        static final String HTTP_1_0 = "HTTP/1.0";
        static final String HTTP_1_1 = "HTTP/1.1";
        static final String HEADER_CONNECTION = "Connection";
        static final String HEADER_CONTENT_LENGTH = "Content-Length";
        static final String KEEP_ALIVE = "Keep-Alive";
        final SocketChannel socketChannel;
        final SelectionKey selectionKey;
        final ByteTokenizer byteTokenizer;
        final String id;
        RequestParser requestParser;
        ByteBuffer writeBuffer;
        Cancellable requestTimeoutTask;
        boolean httpOneDotZero;
        boolean keepAlive;

        private Connection(SocketChannel socketChannel, SelectionKey selectionKey) {
            this.socketChannel = socketChannel;
            this.selectionKey = selectionKey;
            this.byteTokenizer = new ByteTokenizer();
            this.id = Long.toString(ConnectionEventLoop.this.connectionCounter.getAndIncrement());
            this.requestParser = new RequestParser(this.byteTokenizer);
            this.requestTimeoutTask = ConnectionEventLoop.this.scheduler.schedule(this::onRequestTimeout, ConnectionEventLoop.this.options.getRequestTimeout());
        }

        private void onRequestTimeout() {
            logger.trace("Request timeout in connection with id: {}", (Object)this.id);
            this.failSafeClose();
        }

        private void onReadable() {
            try {
                this.doOnReadable();
            }
            catch (IOException | RuntimeException ex) {
                logger.error("Read error in connection with id: {}", (Object)this.id);
                this.failSafeClose();
            }
        }

        private void doOnReadable() throws IOException {
            ConnectionEventLoop.this.buffer.clear();
            int numBytes = this.socketChannel.read(ConnectionEventLoop.this.buffer);
            if (numBytes < 0) {
                logger.trace("Close read in connection with id: {}", (Object)this.id);
                this.failSafeClose();
                return;
            }
            ConnectionEventLoop.this.buffer.flip();
            this.byteTokenizer.add(ConnectionEventLoop.this.buffer);
            logger.trace("Read bytes in connection with id: {}, read_bytes: {}, request_bytes: {}", new Object[]{this.id, numBytes, this.byteTokenizer.remaining()});
            if (this.requestParser.parse()) {
                logger.trace("Read request with connection id: {} and request_bytes: {}", (Object)this.id, (Object)this.byteTokenizer.remaining());
                this.onParseRequest();
            } else if (this.byteTokenizer.size() > ConnectionEventLoop.this.options.getMaxRequestSize()) {
                logger.trace("Exceed request max_size in connection with id: {} and request_size: {}", (Object)this.id, (Object)this.byteTokenizer.size());
                this.failSafeClose();
            }
        }

        private void onParseRequest() {
            if (this.selectionKey.interestOps() != 0) {
                this.selectionKey.interestOps(0);
            }
            if (this.requestTimeoutTask != null) {
                this.requestTimeoutTask.cancel();
                this.requestTimeoutTask = null;
            }
            Request request = this.requestParser.request();
            this.httpOneDotZero = request.version().equalsIgnoreCase(HTTP_1_0);
            this.keepAlive = request.hasHeader(HEADER_CONNECTION, KEEP_ALIVE);
            this.byteTokenizer.compact();
            this.requestParser = new RequestParser(this.byteTokenizer);
            ConnectionEventLoop.this.handler.handle(request, this::onResponse);
        }

        private void onResponse(Response response) {
            ConnectionEventLoop.this.taskQueue.add(() -> {
                try {
                    this.prepareToWriteResponse(response);
                }
                catch (IOException ex) {
                    logger.trace("Response error in connection with id: {}", (Object)this.id);
                    this.failSafeClose();
                }
            });
            if (Thread.currentThread() != ConnectionEventLoop.this.thread) {
                ConnectionEventLoop.this.selector.wakeup();
            }
        }

        private void prepareToWriteResponse(Response response) throws IOException {
            String version = this.httpOneDotZero ? HTTP_1_0 : HTTP_1_1;
            ArrayList<Header> headers = new ArrayList<Header>();
            if (this.httpOneDotZero && this.keepAlive) {
                headers.add(new Header(HEADER_CONNECTION, KEEP_ALIVE));
            }
            if (!response.hasHeader(HEADER_CONTENT_LENGTH)) {
                headers.add(new Header(HEADER_CONTENT_LENGTH, Integer.toString(response.body().length)));
            }
            this.writeBuffer = ByteBuffer.wrap(response.serialize(version, headers));
            logger.trace("Response ready in connection with id: {} and num_bytes: {}", (Object)this.id, (Object)this.writeBuffer.remaining());
            this.doOnWritable();
        }

        private void onWritable() {
            try {
                this.doOnWritable();
            }
            catch (IOException | RuntimeException ex) {
                logger.trace("Write error in connection with id: {}", (Object)this.id);
                this.failSafeClose();
            }
        }

        private int doWrite() throws IOException {
            ConnectionEventLoop.this.buffer.clear();
            int amount = Math.min(ConnectionEventLoop.this.buffer.remaining(), this.writeBuffer.remaining());
            ConnectionEventLoop.this.buffer.put(this.writeBuffer.array(), this.writeBuffer.position(), amount);
            ConnectionEventLoop.this.buffer.flip();
            int written = this.socketChannel.write(ConnectionEventLoop.this.buffer);
            this.writeBuffer.position(this.writeBuffer.position() + written);
            return written;
        }

        private void doOnWritable() throws IOException {
            int numBytes = this.doWrite();
            if (!this.writeBuffer.hasRemaining()) {
                this.writeBuffer = null;
                logger.trace("Write response with connection id: {} and num_bytes: {}", (Object)this.id, (Object)numBytes);
                if (this.httpOneDotZero && !this.keepAlive) {
                    logger.trace("Close after response with connection id: {}", (Object)this.id);
                    this.failSafeClose();
                } else if (this.requestParser.parse()) {
                    logger.trace("Pipeline request with connection id: {} and request_bytes: {}", (Object)this.id, (Object)this.byteTokenizer.remaining());
                    this.onParseRequest();
                } else {
                    this.requestTimeoutTask = ConnectionEventLoop.this.scheduler.schedule(this::onRequestTimeout, ConnectionEventLoop.this.options.getRequestTimeout());
                    this.selectionKey.interestOps(1);
                }
            } else {
                if ((this.selectionKey.interestOps() & 4) == 0) {
                    this.selectionKey.interestOps(4);
                }
                logger.trace("Write in connection with id: {} and num_bytes: {}", (Object)this.id, (Object)numBytes);
            }
        }

        private void failSafeClose() {
            try {
                if (this.requestTimeoutTask != null) {
                    this.requestTimeoutTask.cancel();
                }
                this.selectionKey.cancel();
                this.socketChannel.close();
            }
            catch (IOException iOException) {
                // empty catch block
            }
        }
    }
}

