/*
 * Decompiled with CFR 0.152.
 */
package org.aoju.bus.socket;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.Set;
import org.aoju.bus.core.io.ByteBuffer;
import org.aoju.bus.core.io.PageBuffer;
import org.aoju.bus.core.io.VirtualBuffer;
import org.aoju.bus.core.lang.exception.InstrumentException;
import org.aoju.bus.socket.NetMonitor;
import org.aoju.bus.socket.Protocol;
import org.aoju.bus.socket.ServerConfig;
import org.aoju.bus.socket.SocketStatus;
import org.aoju.bus.socket.UdpAioSession;
import org.aoju.bus.socket.UdpChannel;
import org.aoju.bus.socket.UdpDispatcher;
import org.aoju.bus.socket.UdpSelectionKey;
import org.aoju.bus.socket.process.MessageProcessor;

public class UdpBootstrap<R> {
    private static final int MAX_EVENT = 512;
    private static final int MAX_READ_TIMES = 16;
    private static int UID;
    private final SelectionKey NEED_TO_POLL = new UdpSelectionKey();
    private final SelectionKey EXECUTE_TASK_OR_SHUTDOWN = new UdpSelectionKey();
    private final PageBuffer pageBuffer = new ByteBuffer(1024, 1, -1, true).allocatePageBuffer();
    private final ServerConfig<R> config = new ServerConfig();
    private volatile Status status = Status.STATUS_INIT;
    private Selector selector;
    private UdpDispatcher<R>[] workerGroup;

    public UdpBootstrap(Protocol<R> protocol, MessageProcessor<R> messageProcessor) {
        this.config.setProtocol(protocol);
        this.config.setProcessor(messageProcessor);
    }

    public UdpChannel<R> open() throws IOException {
        return this.open(0);
    }

    public UdpChannel<R> open(int port) throws IOException {
        return this.open(null, port);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public UdpChannel<R> open(String host, int port) throws IOException {
        if (host != null) {
            this.config.setHost(host);
        }
        this.config.setPort(port);
        if (this.selector == null) {
            UdpBootstrap udpBootstrap = this;
            synchronized (udpBootstrap) {
                if (this.selector == null) {
                    this.selector = Selector.open();
                }
            }
        }
        DatagramChannel channel = DatagramChannel.open();
        channel.configureBlocking(false);
        if (port > 0) {
            InetSocketAddress inetSocketAddress = host == null ? new InetSocketAddress(port) : new InetSocketAddress(host, port);
            channel.socket().bind(inetSocketAddress);
            if (host == null) {
                this.config.setHost(inetSocketAddress.getHostString());
            }
        } else {
            this.config.setHost("");
        }
        if (this.status == Status.STATUS_RUNNING) {
            this.selector.wakeup();
        }
        SelectionKey selectionKey = channel.register(this.selector, 1);
        UdpChannel<R> udpChannel = new UdpChannel<R>(channel, selectionKey, this.config, this.pageBuffer);
        selectionKey.attach(udpChannel);
        if (this.status == Status.STATUS_INIT) {
            this.initThreadServer();
        }
        System.out.println("bus-socket server started on port " + this.config.getPort() + ",threadNum:" + this.config.getThreadNum());
        System.out.println("bus-socket server config is " + this.config);
        return udpChannel;
    }

    private synchronized void initThreadServer() {
        if (this.status != Status.STATUS_INIT) {
            return;
        }
        this.status = Status.STATUS_RUNNING;
        int uid = UID++;
        this.workerGroup = new UdpDispatcher[this.config.getThreadNum()];
        for (int i = 0; i < this.config.getThreadNum(); ++i) {
            this.workerGroup[i] = new UdpDispatcher<R>(this.config.getProcessor());
            new Thread(this.workerGroup[i], "UDP-Worker-" + i).start();
        }
        new Thread(() -> {
            VirtualBuffer readBuffer = this.pageBuffer.allocate(this.config.getReadBufferSize());
            try {
                try {
                    block4: while (true) {
                        Set<SelectionKey> selectionKeys;
                        if ((selectionKeys = this.selector.selectedKeys()).isEmpty()) {
                            this.selector.select();
                        }
                        Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
                        while (true) {
                            if (!keyIterator.hasNext()) continue block4;
                            SelectionKey key = keyIterator.next();
                            keyIterator.remove();
                            UdpChannel udpChannel = (UdpChannel)key.attachment();
                            if (!key.isValid()) {
                                udpChannel.close();
                                continue;
                            }
                            if (key.isReadable()) {
                                this.doRead(readBuffer, udpChannel);
                            }
                            if (!key.isWritable()) continue;
                            udpChannel.flush();
                        }
                        break;
                    }
                }
                catch (IOException e) {
                    e.printStackTrace();
                    readBuffer.clean();
                }
            }
            catch (Throwable throwable) {
                readBuffer.clean();
                throw throwable;
            }
        }, "UDP-Boss-" + uid).start();
    }

    private void doRead(VirtualBuffer readBuffer, UdpChannel<R> channel) throws IOException {
        int count = 16;
        while (count-- > 0) {
            R request;
            java.nio.ByteBuffer buffer = readBuffer.buffer();
            buffer.clear();
            SocketAddress remote = channel.getChannel().receive(buffer);
            if (remote == null) {
                return;
            }
            buffer.flip();
            UdpAioSession aioSession = channel.createAndCacheSession(remote);
            NetMonitor netMonitor = this.config.getMonitor();
            if (netMonitor != null) {
                netMonitor.beforeRead(aioSession);
                netMonitor.afterRead(aioSession, buffer.remaining());
            }
            try {
                request = this.config.getProtocol().decode(buffer, aioSession);
            }
            catch (Exception e) {
                this.config.getProcessor().stateEvent(aioSession, SocketStatus.DECODE_EXCEPTION, e);
                aioSession.close();
                throw e;
            }
            if (request == null) {
                this.config.getProcessor().stateEvent(aioSession, SocketStatus.DECODE_EXCEPTION, new InstrumentException("decode result is null"));
                return;
            }
            int hashCode = remote.hashCode();
            if (hashCode < 0) {
                hashCode = -hashCode;
            }
            UdpDispatcher<R> dispatcher = this.workerGroup[hashCode % this.workerGroup.length];
            dispatcher.dispatch(aioSession, request);
        }
    }

    public void shutdown() {
        this.status = Status.STATUS_STOPPING;
        this.selector.wakeup();
        for (UdpDispatcher<R> dispatcher : this.workerGroup) {
            dispatcher.dispatch(dispatcher.EXECUTE_TASK_OR_SHUTDOWN);
        }
    }

    public final UdpBootstrap<R> setReadBufferSize(int size) {
        this.config.setReadBufferSize(size);
        return this;
    }

    public final UdpBootstrap<R> setThreadNum(int num) {
        this.config.setThreadNum(num);
        return this;
    }

    static enum Status {
        STATUS_INIT,
        STATUS_STARTING,
        STATUS_RUNNING,
        STATUS_STOPPING,
        STATUS_STOPPED;

    }
}

