/*
 * Decompiled with CFR 0.152.
 */
package org.aoju.bus.socket;

import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.aoju.bus.core.exception.InternalException;
import org.aoju.bus.socket.NetMonitor;
import org.aoju.bus.socket.ServerConfig;
import org.aoju.bus.socket.SocketStatus;
import org.aoju.bus.socket.UdpAioSession;
import org.aoju.bus.socket.UdpChannel;
import org.aoju.bus.socket.buffers.BufferPool;
import org.aoju.bus.socket.buffers.VirtualBuffer;

public final class WorkerRegister
implements Runnable {
    private static final int MAX_READ_TIMES = 16;
    private static final Runnable SELECTOR_CHANNEL = () -> {};
    private static final Runnable SHUTDOWN_CHANNEL = () -> {};
    private final Selector selector;
    private final BufferPool bufferPool;
    private final BlockingQueue<Runnable> requestQueue = new ArrayBlockingQueue<Runnable>(256);
    private final ConcurrentLinkedQueue<Consumer<Selector>> registers = new ConcurrentLinkedQueue();
    private final ExecutorService executorService;
    private VirtualBuffer standbyBuffer;

    public WorkerRegister(BufferPool bufferPool, int threadNum) throws IOException {
        this.bufferPool = bufferPool;
        this.selector = Selector.open();
        try {
            this.requestQueue.put(SELECTOR_CHANNEL);
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        this.executorService = new ThreadPoolExecutor(threadNum, threadNum, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory(){
            int i = 0;

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "bus-socket:udp-" + WorkerRegister.this.hashCode() + "-" + ++this.i);
            }
        });
        for (int i = 0; i < threadNum; ++i) {
            this.executorService.execute(this);
        }
    }

    void addRegister(Consumer<Selector> register) {
        this.registers.offer(register);
        this.selector.wakeup();
    }

    @Override
    public void run() {
        try {
            while (true) {
                Runnable runnable;
                if ((runnable = this.requestQueue.take()) == SHUTDOWN_CHANNEL) {
                    this.requestQueue.put(SHUTDOWN_CHANNEL);
                    this.selector.wakeup();
                    break;
                }
                if (runnable == SELECTOR_CHANNEL) {
                    try {
                        this.doSelector();
                    }
                    finally {
                        this.requestQueue.put(SELECTOR_CHANNEL);
                    }
                    continue;
                }
                runnable.run();
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void doSelector() throws IOException {
        Consumer<Selector> register;
        while ((register = this.registers.poll()) != null) {
            register.accept(this.selector);
        }
        Set<SelectionKey> keySet = this.selector.selectedKeys();
        if (keySet.isEmpty()) {
            this.selector.select();
        }
        Iterator<SelectionKey> keyIterator = keySet.iterator();
        while (keyIterator.hasNext()) {
            SelectionKey key = keyIterator.next();
            UdpChannel udpChannel = (UdpChannel)key.attachment();
            if (!key.isValid()) {
                keyIterator.remove();
                udpChannel.close();
                continue;
            }
            if (key.isWritable()) {
                udpChannel.doWrite();
            }
            if (key.isReadable() && !this.doRead(udpChannel)) break;
            keyIterator.remove();
        }
    }

    private boolean doRead(UdpChannel channel) throws IOException {
        int count = 16;
        ServerConfig config = channel.config;
        while (count-- > 0) {
            if (this.standbyBuffer == null) {
                this.standbyBuffer = channel.getBufferPage().allocate(config.getReadBufferSize());
            }
            ByteBuffer buffer = this.standbyBuffer.buffer();
            SocketAddress remote = channel.getChannel().receive(buffer);
            if (remote == null) {
                buffer.clear();
                return true;
            }
            VirtualBuffer readyBuffer = this.standbyBuffer;
            this.standbyBuffer = channel.getBufferPage().allocate(config.getReadBufferSize());
            buffer.flip();
            Runnable runnable = () -> {
                UdpAioSession session = new UdpAioSession(channel, remote, this.bufferPool.allocateBufferPage());
                try {
                    NetMonitor netMonitor = config.getMonitor();
                    if (netMonitor != null) {
                        netMonitor.beforeRead(session);
                        netMonitor.afterRead(session, buffer.remaining());
                    }
                    do {
                        Object request;
                        if ((request = config.getProtocol().decode(buffer, session)) == null) {
                            config.getProcessor().stateEvent(session, SocketStatus.DECODE_EXCEPTION, new InternalException("decode result is null, buffer size: " + buffer.remaining()));
                            break;
                        }
                        config.getProcessor().process(session, request);
                    } while (buffer.hasRemaining());
                }
                catch (Throwable e) {
                    e.printStackTrace();
                    config.getProcessor().stateEvent(session, SocketStatus.DECODE_EXCEPTION, e);
                }
                finally {
                    session.writeBuffer().flush();
                    readyBuffer.clean();
                }
            };
            if (this.requestQueue.offer(runnable)) continue;
            return false;
        }
        return true;
    }

    void shutdown() {
        try {
            this.requestQueue.put(SHUTDOWN_CHANNEL);
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        this.selector.wakeup();
        this.executorService.shutdown();
        try {
            this.selector.close();
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}

