/*
 * Decompiled with CFR 0.152.
 */
package org.nustaq.kontraktor.remoting.websockets;

import io.undertow.Handlers;
import io.undertow.Undertow;
import io.undertow.server.HttpHandler;
import io.undertow.server.handlers.PathHandler;
import io.undertow.websockets.core.AbstractReceiveListener;
import io.undertow.websockets.core.BufferedBinaryMessage;
import io.undertow.websockets.core.BufferedTextMessage;
import io.undertow.websockets.core.CloseMessage;
import io.undertow.websockets.core.WebSocketCallback;
import io.undertow.websockets.core.WebSocketChannel;
import io.undertow.websockets.core.WebSockets;
import io.undertow.websockets.spi.WebSocketHttpExchange;
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
import java.util.function.Function;
import org.nustaq.kontraktor.Actor;
import org.nustaq.kontraktor.IPromise;
import org.nustaq.kontraktor.Promise;
import org.nustaq.kontraktor.remoting.base.ActorServerConnector;
import org.nustaq.kontraktor.remoting.base.ObjectSink;
import org.nustaq.kontraktor.remoting.base.ObjectSocket;
import org.nustaq.kontraktor.remoting.http.Http4K;
import org.nustaq.kontraktor.remoting.websockets.WebObjectSocket;
import org.nustaq.kontraktor.util.Pair;
import org.nustaq.serialization.util.FSTUtil;
import org.xnio.Buffers;
import org.xnio.ChannelListener;

public class UndertowWebsocketServerConnector
implements ActorServerConnector {
    String host;
    String path;
    int port;

    public UndertowWebsocketServerConnector(String path, int port, String host) {
        this.path = path;
        this.port = port;
        this.host = host;
    }

    public void connect(Actor facade, Function<ObjectSocket, ObjectSink> factory) throws Exception {
        PathHandler server = (PathHandler)this.getServer(this.port).getFirst();
        server.addExactPath(this.path, (HttpHandler)Handlers.websocket((exchange, channel) -> {
            Runnable runnable = () -> {
                final UTWebObjectSocket objectSocket = new UTWebObjectSocket(exchange, channel);
                final ObjectSink sink = (ObjectSink)factory.apply(objectSocket);
                objectSocket.setSink(sink);
                channel.getReceiveSetter().set((ChannelListener)new AbstractReceiveListener(){

                    protected void onCloseMessage(CloseMessage cm, WebSocketChannel channel) {
                        try {
                            channel.close();
                        }
                        catch (IOException e) {
                            e.printStackTrace();
                        }
                        sink.sinkClosed();
                        try {
                            objectSocket.close();
                        }
                        catch (IOException e) {
                            e.printStackTrace();
                        }
                        channel.getReceiveSetter().set(null);
                    }

                    protected void onError(WebSocketChannel channel, Throwable error) {
                        sink.sinkClosed();
                    }

                    protected void onFullTextMessage(WebSocketChannel channel, BufferedTextMessage message) throws IOException {
                        String data = message.getData();
                        byte[] bytez = data.getBytes("UTF-8");
                        sink.receiveObject(objectSocket.getConf().asObject(bytez), null);
                    }

                    protected void onFullBinaryMessage(WebSocketChannel channel, BufferedBinaryMessage message) throws IOException {
                        ByteBuffer[] data = (ByteBuffer[])message.getData().getResource();
                        byte[] bytez = Buffers.take((ByteBuffer[])data, (int)0, (int)data.length);
                        sink.receiveObject(objectSocket.getConf().asObject(bytez), null);
                    }
                });
            };
            facade.execute(runnable);
            channel.resumeReceives();
        }));
    }

    protected Pair<PathHandler, Undertow> getServer(int port) {
        String hostName = this.host;
        return Http4K.get().getServer(port, hostName);
    }

    public IPromise closeServer() {
        ((Undertow)this.getServer(this.port).getSecond()).stop();
        return new Promise(null);
    }

    static class UTWebObjectSocket
    extends WebObjectSocket {
        protected WebSocketChannel channel;
        protected WebSocketHttpExchange ex;
        protected WeakReference<ObjectSink> sink;

        public UTWebObjectSocket(WebSocketHttpExchange ex, WebSocketChannel channel) {
            this.ex = ex;
            this.channel = channel;
        }

        @Override
        public void sendBinary(byte[] message) {
            WebSockets.sendBinary((ByteBuffer)ByteBuffer.wrap(message), (WebSocketChannel)this.channel, (WebSocketCallback)new WebSocketCallback(){

                public void complete(WebSocketChannel channel, Object context) {
                }

                public void onError(WebSocketChannel channel, Object context, Throwable throwable) {
                    this.setLastError(throwable);
                    try {
                        isClosed = true;
                        this.close();
                    }
                    catch (IOException e) {
                        FSTUtil.rethrow((Throwable)e);
                    }
                }
            });
        }

        public void close() throws IOException {
            this.channel.getReceiveSetter().set(null);
            this.channel.close();
            ObjectSink objectSink = (ObjectSink)this.sink.get();
            if (objectSink != null) {
                objectSink.sinkClosed();
            }
            this.conf = null;
            this.channel = null;
            this.ex = null;
        }

        public void setSink(ObjectSink sink) {
            this.sink = new WeakReference<ObjectSink>(sink);
        }

        public ObjectSink getSink() {
            return (ObjectSink)this.sink.get();
        }
    }
}

