/*
 * Decompiled with CFR 0.152.
 */
package ch.squaredesk.nova.comm.websockets.server;

import ch.squaredesk.nova.comm.MessageTranscriber;
import ch.squaredesk.nova.comm.websockets.CloseReason;
import ch.squaredesk.nova.comm.websockets.EndpointStreamSource;
import ch.squaredesk.nova.comm.websockets.EndpointStreamSourceFactory;
import ch.squaredesk.nova.comm.websockets.MetricsCollector;
import ch.squaredesk.nova.comm.websockets.SendAction;
import ch.squaredesk.nova.comm.websockets.WebSocket;
import ch.squaredesk.nova.comm.websockets.server.ServerEndpoint;
import ch.squaredesk.nova.comm.websockets.server.StreamCreatingWebSocketApplication;
import io.reactivex.Scheduler;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Function;
import io.reactivex.schedulers.Schedulers;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import org.glassfish.grizzly.websockets.WebSocketApplication;
import org.glassfish.grizzly.websockets.WebSocketEngine;

public class ServerEndpointFactory {
    private static final Scheduler lifecycleEventScheduler = Schedulers.io();
    private final ConcurrentHashMap<org.glassfish.grizzly.websockets.WebSocket, WebSocket> webSockets = new ConcurrentHashMap();
    private final MessageTranscriber<String> messageTranscriber;

    public ServerEndpointFactory(MessageTranscriber<String> messageTranscriber) {
        this.messageTranscriber = messageTranscriber;
    }

    private WebSocket instantiateNewWebSocket(final org.glassfish.grizzly.websockets.WebSocket webSocket) {
        SendAction sendAction = new SendAction(){

            @Override
            public <T> void accept(T message) throws Exception {
                String messageAsString = (String)ServerEndpointFactory.this.messageTranscriber.getOutgoingMessageTranscriber(message).apply(message);
                webSocket.send(messageAsString);
            }
        };
        return new WebSocket(sendAction, () -> ((org.glassfish.grizzly.websockets.WebSocket)webSocket).close());
    }

    private WebSocket createWebSocket(org.glassfish.grizzly.websockets.WebSocket webSocket) {
        return this.webSockets.computeIfAbsent(webSocket, this::instantiateNewWebSocket);
    }

    public ServerEndpoint createFor(String destination, MetricsCollector metricsCollector) {
        String destinationForSubscription = destination.startsWith("/") ? destination : "/" + destination;
        String destinationForMetrics = destination.startsWith("/") ? destination.substring(1) : destination;
        StreamCreatingWebSocketApplication app = new StreamCreatingWebSocketApplication();
        WebSocketEngine.getEngine().register("", destinationForSubscription, (WebSocketApplication)app);
        Function webSocketCreator = this::createWebSocket;
        EndpointStreamSource endpointStreamSource = EndpointStreamSourceFactory.createStreamSourceFor(destinationForMetrics, webSocketCreator, app, metricsCollector);
        Disposable subscriptionConnections = app.connectingSockets().subscribeOn(lifecycleEventScheduler).subscribe(arg_0 -> ((Function)webSocketCreator).apply(arg_0));
        Disposable subscriptionDisconnections = app.closingSockets().subscribeOn(lifecycleEventScheduler).subscribe(pair -> this.webSockets.remove(pair._1));
        SendAction broadcastAction = new SendAction(){

            @Override
            public <T> void accept(T message) throws Exception {
                String messageAsString = (String)ServerEndpointFactory.this.messageTranscriber.getOutgoingMessageTranscriber(message).apply(message);
                Set allSockets = ServerEndpointFactory.this.webSockets.keySet();
                allSockets.stream().filter(socket -> {
                    try {
                        socket.broadcast((Iterable)allSockets, messageAsString);
                        return true;
                    }
                    catch (Exception exception) {
                        return false;
                    }
                }).findAny();
            }
        };
        Consumer<CloseReason> closeAction = closeReason -> {
            subscriptionConnections.dispose();
            subscriptionDisconnections.dispose();
            Set allSockets = this.webSockets.keySet();
            allSockets.forEach(s -> s.close(closeReason.code, closeReason.text));
            this.webSockets.clear();
            app.close();
        };
        return new ServerEndpoint(destination, endpointStreamSource, broadcastAction, closeAction, this.messageTranscriber, metricsCollector);
    }
}

