/*
 * Decompiled with CFR 0.152.
 */
package org.zstacks.znet.nio;

import java.io.IOException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zstacks.znet.Helper;
import org.zstacks.znet.nio.Dispatcher;
import org.zstacks.znet.nio.Session;

public class SelectorThread
extends Thread {
    private static final Logger log = LoggerFactory.getLogger(SelectorThread.class);
    protected volatile Selector selector = null;
    protected final Dispatcher dispatcher;
    private final Queue<Object[]> register = new LinkedBlockingQueue<Object[]>();
    private final Queue<Session> unregister = new LinkedBlockingQueue<Session>();

    public SelectorThread(Dispatcher dispatcher, String name) throws IOException {
        super(name);
        this.dispatcher = dispatcher;
        this.selector = Selector.open();
    }

    public SelectorThread(Dispatcher dispatcher) throws IOException {
        this(dispatcher, "Selector");
    }

    public void registerChannel(SelectableChannel channel, int ops) throws IOException {
        this.registerChannel(channel, ops, null);
    }

    public void registerSession(int ops, Session sess) throws IOException {
        this.registerChannel(sess.getChannel(), ops, sess);
    }

    public void registerChannel(SelectableChannel channel, int ops, Session sess) throws IOException {
        if (Thread.currentThread() == this) {
            SelectionKey key = channel.register(this.selector, ops, sess);
            if (sess != null) {
                sess.setRegisteredKey(key);
                sess.setStatus(Session.SessionStatus.CONNECTED);
                sess.getIoAdaptor().onSessionRegistered(sess);
            }
        } else {
            this.register.offer(new Object[]{channel, ops, sess});
            this.selector.wakeup();
        }
    }

    public void unregisterSession(Session sess) {
        if (this.unregister.contains(sess)) {
            return;
        }
        this.unregister.add(sess);
        this.selector.wakeup();
    }

    @Override
    public void interrupt() {
        super.interrupt();
        try {
            this.selector.close();
        }
        catch (IOException e) {
            log.error(e.getMessage(), (Throwable)e);
        }
    }

    @Override
    public void run() {
        try {
            while (!this.isInterrupted()) {
                this.selector.select();
                this.handleRegister();
                Iterator<SelectionKey> iter = this.selector.selectedKeys().iterator();
                while (iter.hasNext()) {
                    SelectionKey key = iter.next();
                    iter.remove();
                    if (!key.isValid()) continue;
                    Object att = key.attachment();
                    if (att != null && att instanceof Session) {
                        ((Session)att).updateLastOperationTime();
                    }
                    try {
                        if (key.isAcceptable()) {
                            this.handleAcceptEvent(key);
                            continue;
                        }
                        if (key.isConnectable()) {
                            this.handleConnectEvent(key);
                            continue;
                        }
                        if (key.isReadable()) {
                            this.handleReadEvent(key);
                            continue;
                        }
                        if (!key.isWritable()) continue;
                        this.handleWriteEvent(key);
                    }
                    catch (Throwable e) {
                        this.disconnectWithException(key, e);
                    }
                }
                this.handleUnregister();
            }
        }
        catch (Throwable e) {
            if (!this.dispatcher.isStarted()) {
                if (log.isDebugEnabled()) {
                    log.debug(e.getMessage(), e);
                }
            }
            log.error(e.getMessage(), e);
        }
    }

    private void disconnectWithException(SelectionKey key, Throwable e) {
        Session sess = (Session)key.attachment();
        try {
            sess.setStatus(Session.SessionStatus.ON_ERROR);
            sess.getIoAdaptor().onException(e, sess);
        }
        catch (Throwable ex) {
            if (!this.dispatcher.isStarted()) {
                log.debug(e.getMessage(), ex);
            }
            log.error(e.getMessage(), ex);
        }
        try {
            if (sess != null) {
                sess.close();
            } else {
                key.channel().close();
            }
            key.cancel();
        }
        catch (Throwable ex) {
            log.error(e.getMessage(), ex);
        }
    }

    protected void handleRegister() {
        Object[] item = null;
        while ((item = this.register.poll()) != null) {
            try {
                SelectableChannel channel = (SelectableChannel)item[0];
                if (!channel.isOpen()) continue;
                int ops = (Integer)item[1];
                Session sess = (Session)item[2];
                SelectionKey key = channel.register(this.selector, ops, sess);
                if (sess == null) continue;
                sess.setRegisteredKey(key);
                sess.getIoAdaptor().onSessionRegistered(sess);
            }
            catch (Exception e) {
                log.error(e.getMessage(), (Throwable)e);
            }
        }
    }

    protected void handleUnregister() {
        Session sess = null;
        while ((sess = this.unregister.poll()) != null) {
            try {
                sess.close();
            }
            catch (IOException e) {
                log.error(e.getMessage(), (Throwable)e);
            }
        }
    }

    protected void handleAcceptEvent(SelectionKey key) throws IOException {
        ServerSocketChannel server = (ServerSocketChannel)key.channel();
        SocketChannel channel = server.accept();
        channel.configureBlocking(false);
        if (log.isDebugEnabled()) {
            log.debug("ACCEPT: {}=>{}", (Object)Helper.remoteAddress(channel), (Object)Helper.localAddress(channel));
        }
        Session sess = new Session(this.dispatcher, channel, this.dispatcher.serverIoAdaptor());
        sess.setStatus(Session.SessionStatus.CONNECTED);
        sess.getIoAdaptor().onSessionAccepted(sess);
    }

    protected void handleConnectEvent(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel)key.channel();
        Session sess = (Session)key.attachment();
        if (sess == null) {
            throw new IOException("Session not attached yet to SelectionKey");
        }
        if (channel.finishConnect()) {
            sess.finishConnect();
            if (log.isDebugEnabled()) {
                log.debug("CONNECT: {}=>{}", (Object)Helper.localAddress(channel), (Object)Helper.remoteAddress(channel));
            }
        }
        sess.setStatus(Session.SessionStatus.CONNECTED);
        key.interestOps(0);
        sess.getIoAdaptor().onSessionConnected(sess);
    }

    protected void handleReadEvent(SelectionKey key) throws IOException {
        Session sess = (Session)key.attachment();
        if (sess == null) {
            throw new IOException("Session not attached yet to SelectionKey");
        }
        if (log.isDebugEnabled()) {
            // empty if block
        }
        sess.doRead();
    }

    protected void handleWriteEvent(SelectionKey key) throws IOException {
        Session sess = (Session)key.attachment();
        if (sess == null) {
            throw new IOException("Session not attached yet to SelectionKey");
        }
        if (log.isDebugEnabled()) {
            // empty if block
        }
        sess.doWrite();
    }
}

