/*
 * Decompiled with CFR 0.152.
 */
package org.nustaq.kontraktor.remoting.tcp;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.nustaq.kontraktor.Actor;
import org.nustaq.kontraktor.ActorProxy;
import org.nustaq.kontraktor.Promise;
import org.nustaq.kontraktor.remoting.RemoteRefRegistry;
import org.nustaq.kontraktor.remoting.tcp.TCPSocket;
import org.nustaq.kontraktor.util.Log;

public class TCPActorServer {
    protected List<ActorServerClientConnection> connections = new ArrayList<ActorServerClientConnection>();
    Consumer<Actor> closeListener;
    Actor facadeActor;
    int port;
    ServerSocket welcomeSocket;
    protected volatile boolean terminated = false;

    public static TCPActorServer Publish(Actor act, int port) throws Exception {
        return TCPActorServer.Publish(act, port, null);
    }

    public static TCPActorServer Publish(Actor act, int port, Consumer<Actor> closeListener) throws Exception {
        TCPActorServer server = new TCPActorServer((ActorProxy)((Object)act), port);
        Promise success = new Promise();
        new Thread(() -> {
            try {
                tCPActorServer.closeListener = closeListener;
                server.start();
                success.receive("started", (Object)null);
            }
            catch (IOException e) {
                success.receive((Object)null, (Object)e);
            }
        }, "acceptor " + port).start();
        CountDownLatch latch = new CountDownLatch(1);
        AtomicReference<Object> res = new AtomicReference<Object>(null);
        success.then((r, e) -> {
            latch.countDown();
            res.set(e);
        });
        try {
            latch.await(10000L, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e2) {
            e2.printStackTrace();
        }
        if (res.get() instanceof Exception) {
            throw (Exception)res.get();
        }
        return server;
    }

    public TCPActorServer(ActorProxy proxy, int port) throws IOException {
        this.port = port;
        this.facadeActor = (Actor)((Object)proxy);
    }

    public boolean isTerminated() {
        return this.terminated;
    }

    public void setTerminated(boolean terminated) {
        this.terminated = terminated;
        this.connections.forEach(con -> con.setTerminated(true));
    }

    public void start() throws IOException {
        try {
            this.welcomeSocket = new ServerSocket(this.port);
            Log.Info(this, this.facadeActor.getActor().getClass().getName() + " running on " + this.welcomeSocket.getLocalPort());
            while (!this.terminated) {
                Socket connectionSocket = this.welcomeSocket.accept();
                ActorServerClientConnection clientConnection = new ActorServerClientConnection(connectionSocket, this.facadeActor);
                this.connections.add(clientConnection);
                clientConnection.start();
            }
        }
        finally {
            this.setTerminated(true);
        }
    }

    public class ActorServerClientConnection
    extends RemoteRefRegistry {
        TCPSocket channel;
        Actor facade;

        public ActorServerClientConnection(Socket s, Actor facade) throws IOException {
            this.channel = new TCPSocket(s, this.conf);
            this.facade = facade;
            this.disconnectHandler = TCPActorServer.this.closeListener;
        }

        public void start() {
            this.publishActor(this.facade);
            new Thread(() -> {
                try {
                    this.currentObjectSocket.set(this.channel);
                    this.receiveLoop(this.channel);
                }
                catch (Exception ex) {
                    Log.Warn(this, ex, "");
                }
                this.setTerminated(true);
                TCPActorServer.this.connections.remove(this);
            }, "receiver").start();
            new Thread(() -> {
                try {
                    this.currentObjectSocket.set(this.channel);
                    this.sendLoop(this.channel);
                }
                catch (Exception ex) {
                    Log.Warn(this, ex, "");
                }
                this.setTerminated(true);
                TCPActorServer.this.connections.remove(this);
            }, "sender").start();
        }

        @Override
        public void close() {
            super.close();
            try {
                this.channel.close();
            }
            catch (IOException e) {
                Log.Warn(this, e, "");
            }
        }

        @Override
        public Actor getFacadeProxy() {
            return this.facade;
        }
    }
}

