/*
 * Decompiled with CFR 0.152.
 */
package ch.squaredesk.nova.comm.websockets.server;

import ch.squaredesk.nova.comm.websockets.CloseReason;
import ch.squaredesk.nova.comm.websockets.StreamCreatingEndpointWrapper;
import ch.squaredesk.nova.tuples.Pair;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.subjects.PublishSubject;
import io.reactivex.subjects.Subject;
import org.glassfish.grizzly.websockets.ClosingFrame;
import org.glassfish.grizzly.websockets.DataFrame;
import org.glassfish.grizzly.websockets.WebSocket;
import org.glassfish.grizzly.websockets.WebSocketApplication;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamCreatingWebSocketApplication
extends WebSocketApplication
implements StreamCreatingEndpointWrapper<WebSocket> {
    private static final Logger logger = LoggerFactory.getLogger(StreamCreatingWebSocketApplication.class);
    private final Subject<Pair<WebSocket, String>> messages = PublishSubject.create().toSerialized();
    private final Subject<WebSocket> connectedSockets = PublishSubject.create();
    private final Subject<Pair<WebSocket, CloseReason>> closedSockets = PublishSubject.create();
    private final Subject<Pair<WebSocket, Throwable>> errors = PublishSubject.create();

    StreamCreatingWebSocketApplication() {
    }

    public void onClose(WebSocket socket, DataFrame frame) {
        CloseReason closeReason;
        ClosingFrame closingFrame = (ClosingFrame)frame;
        try {
            closeReason = CloseReason.forCloseCode(closingFrame.getCode());
        }
        catch (Exception e) {
            logger.error("Unexpected close code " + closingFrame.getCode() + " in closing dataFrame " + frame);
            closeReason = CloseReason.UNEXPECTED_CONDITION;
        }
        this.closedSockets.onNext((Object)new Pair((Object)socket, (Object)closeReason));
    }

    public void onConnect(WebSocket socket) {
        this.connectedSockets.onNext((Object)socket);
    }

    protected boolean onError(WebSocket socket, Throwable t) {
        this.errors.onNext((Object)new Pair((Object)socket, (Object)t));
        return true;
    }

    public void onMessage(WebSocket socket, String text) {
        this.messages.onNext((Object)new Pair((Object)socket, (Object)text));
    }

    @Override
    public Flowable<Pair<WebSocket, String>> messages() {
        return this.messages.toFlowable(BackpressureStrategy.BUFFER);
    }

    @Override
    public Flowable<WebSocket> connectingSockets() {
        return this.connectedSockets.toFlowable(BackpressureStrategy.BUFFER);
    }

    @Override
    public Flowable<Pair<WebSocket, CloseReason>> closingSockets() {
        return this.closedSockets.toFlowable(BackpressureStrategy.BUFFER);
    }

    @Override
    public Flowable<Pair<WebSocket, Throwable>> errors() {
        return this.errors.toFlowable(BackpressureStrategy.BUFFER);
    }

    void close() {
        this.messages.onComplete();
        this.connectedSockets.onComplete();
        this.closedSockets.onComplete();
        this.errors.onComplete();
    }
}

