/*
 * Decompiled with CFR 0.152.
 */
package org.nustaq.kontraktor.remoting.tcp;

import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.function.Consumer;
import java.util.function.Function;
import org.nustaq.kontraktor.Actor;
import org.nustaq.kontraktor.IPromise;
import org.nustaq.kontraktor.Promise;
import org.nustaq.kontraktor.asyncio.AsyncServerSocket;
import org.nustaq.kontraktor.asyncio.ObjectAsyncSocketConnection;
import org.nustaq.kontraktor.remoting.base.ActorServer;
import org.nustaq.kontraktor.remoting.base.ActorServerConnector;
import org.nustaq.kontraktor.remoting.base.ObjectSink;
import org.nustaq.kontraktor.remoting.base.ObjectSocket;
import org.nustaq.kontraktor.remoting.encoding.Coding;

public class NIOServerConnector
extends AsyncServerSocket
implements ActorServerConnector {
    int port;

    public static IPromise<ActorServer> Publish(Actor facade, int port, Coding coding) {
        return NIOServerConnector.Publish(facade, port, coding, null);
    }

    public static Promise<ActorServer> Publish(Actor facade, int port, Coding coding, Consumer<Actor> disconnectHandler) {
        Promise<ActorServer> finished = new Promise<ActorServer>();
        try {
            ActorServer publisher = new ActorServer(new NIOServerConnector(port), facade, coding);
            facade.execute(() -> {
                try {
                    publisher.start(disconnectHandler);
                    finished.resolve(publisher);
                }
                catch (Exception e) {
                    finished.reject(e);
                }
            });
        }
        catch (Exception e) {
            e.printStackTrace();
            return new Promise<Object>(null, e);
        }
        return finished;
    }

    public NIOServerConnector(int port) {
        this.port = port;
    }

    @Override
    public void connect(Actor facade, Function<ObjectSocket, ObjectSink> factory) throws Exception {
        this.connect(this.port, (SelectionKey key, SocketChannel channel) -> {
            MyObjectAsyncSocketConnection sc = new MyObjectAsyncSocketConnection((SelectionKey)key, (SocketChannel)channel);
            ObjectSink sink = (ObjectSink)factory.apply(sc);
            sc.init(sink);
            return sc;
        });
    }

    @Override
    public IPromise closeServer() {
        try {
            super.close();
        }
        catch (IOException e) {
            return new Promise<Object>(null, e);
        }
        return new Promise<Object>(null);
    }

    static class MyObjectAsyncSocketConnection
    extends ObjectAsyncSocketConnection {
        ObjectSink sink;

        public MyObjectAsyncSocketConnection(SelectionKey key, SocketChannel chan) {
            super(key, chan);
        }

        public void init(ObjectSink sink) {
            this.sink = sink;
        }

        @Override
        public void receivedObject(Object o) {
            this.sink.receiveObject(o, null);
        }

        @Override
        public void closed(Throwable ioe) {
            this.isClosed = true;
            this.sink.sinkClosed();
        }

        @Override
        public void close() throws IOException {
            this.chan.close();
            this.sink.sinkClosed();
        }
    }
}

