/*
 * Decompiled with CFR 0.152.
 */
package ch.squaredesk.nova.comm.websockets.server;

import ch.squaredesk.nova.comm.retrieving.MessageUnmarshaller;
import ch.squaredesk.nova.comm.sending.MessageMarshaller;
import ch.squaredesk.nova.comm.websockets.CloseReason;
import ch.squaredesk.nova.comm.websockets.EndpointStreamSource;
import ch.squaredesk.nova.comm.websockets.EndpointStreamSourceFactory;
import ch.squaredesk.nova.comm.websockets.MetricsCollector;
import ch.squaredesk.nova.comm.websockets.WebSocket;
import ch.squaredesk.nova.comm.websockets.server.ServerEndpoint;
import ch.squaredesk.nova.comm.websockets.server.StreamCreatingWebSocketApplication;
import io.reactivex.Scheduler;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.Function;
import org.glassfish.grizzly.websockets.WebSocketEngine;

public class ServerEndpointFactory {
    private static final Scheduler lifecycleEventScheduler = Schedulers.io();
    private final ConcurrentHashMap<org.glassfish.grizzly.websockets.WebSocket, WebSocket<?>> webSockets = new ConcurrentHashMap();

    private <MessageType> WebSocket<MessageType> instantiateNewWebSocket(org.glassfish.grizzly.websockets.WebSocket webSocket, MessageMarshaller<MessageType, String> messageMarshaller) {
        return new WebSocket<Object>(message -> {
            String messageAsString = this.marshal(message, messageMarshaller);
            webSocket.send(messageAsString);
        }, () -> ((org.glassfish.grizzly.websockets.WebSocket)webSocket).close());
    }

    private <MessageType> WebSocket<MessageType> createWebSocket(org.glassfish.grizzly.websockets.WebSocket webSocket, MessageMarshaller<MessageType, String> messageMarshaller) {
        WebSocket retVal = this.webSockets.computeIfAbsent(webSocket, key -> this.instantiateNewWebSocket((org.glassfish.grizzly.websockets.WebSocket)key, messageMarshaller));
        return retVal;
    }

    private <MessageType> String marshal(MessageType message, MessageMarshaller<MessageType, String> messageMarshaller) {
        try {
            return (String)messageMarshaller.marshal(message);
        }
        catch (Exception e) {
            throw new RuntimeException("Unable to marshal message " + message, e);
        }
    }

    private <MessageType> MessageType unmarshal(String destination, String message, MessageUnmarshaller<String, MessageType> messageUnmarshaller, MetricsCollector metricsCollector) {
        try {
            return (MessageType)messageUnmarshaller.unmarshal((Object)message);
        }
        catch (Exception e) {
            if (metricsCollector != null) {
                metricsCollector.unparsableMessageReceived(destination);
            }
            throw new RuntimeException("Unable to unmarshal incoming message " + message + " on destination " + destination, e);
        }
    }

    public <MessageType> ServerEndpoint<MessageType> createFor(String destination, MessageMarshaller<MessageType, String> messageMarshaller, MessageUnmarshaller<String, MessageType> messageUnmarshaller, MetricsCollector metricsCollector) {
        String destinationForSubscription = destination.startsWith("/") ? destination : "/" + destination;
        String destinationForMetrics = destination.startsWith("/") ? destination.substring(1) : destination;
        StreamCreatingWebSocketApplication<Object> app = new StreamCreatingWebSocketApplication<Object>(text -> this.unmarshal(destinationForMetrics, (String)text, messageUnmarshaller, metricsCollector));
        WebSocketEngine.getEngine().register("", destinationForSubscription, app);
        Function<org.glassfish.grizzly.websockets.WebSocket, WebSocket> webSocketCreator = socket -> this.createWebSocket((org.glassfish.grizzly.websockets.WebSocket)socket, messageMarshaller);
        EndpointStreamSource endpointStreamSource = EndpointStreamSourceFactory.createStreamSourceFor(destinationForMetrics, webSocketCreator, app, metricsCollector);
        Disposable subscriptionConnections = app.connectingSockets().subscribeOn(lifecycleEventScheduler).subscribe(webSocketCreator::apply);
        Disposable subscriptionDisconnections = app.closingSockets().subscribeOn(lifecycleEventScheduler).subscribe(pair -> this.webSockets.remove(pair._1));
        Consumer<Object> broadcastAction = message -> {
            String messageAsString = this.marshal(message, messageMarshaller);
            Set allSockets = this.webSockets.keySet();
            allSockets.stream().filter(socket -> {
                try {
                    socket.broadcast((Iterable)allSockets, messageAsString);
                    return true;
                }
                catch (Exception exception) {
                    return false;
                }
            }).findAny();
        };
        Consumer<CloseReason> closeAction = closeReason -> {
            subscriptionConnections.dispose();
            subscriptionDisconnections.dispose();
            Set allSockets = this.webSockets.keySet();
            allSockets.forEach(s -> s.close(closeReason.code, closeReason.text));
            this.webSockets.clear();
            app.close();
        };
        return new ServerEndpoint<Object>(endpointStreamSource, broadcastAction, closeAction);
    }
}

