package io.scalecube.services.gateway.ws;

import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.gateway.GatewaySession;
import io.scalecube.services.gateway.GatewaySessionHandler;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.jctools.maps.NonBlockingHashMapLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.http.websocket.WebsocketInbound;
import reactor.netty.http.websocket.WebsocketOutbound;
import reactor.util.context.Context;

/* loaded from: input_file:io/scalecube/services/gateway/ws/WebsocketGatewaySession.class */
public final class WebsocketGatewaySession implements GatewaySession {
    private static final Logger LOGGER = LoggerFactory.getLogger(WebsocketGatewaySession.class);
    private static final Predicate<Object> SEND_PREDICATE = obj -> {
        return true;
    };
    private final Map<Long, Disposable> subscriptions = new NonBlockingHashMapLong(1024);
    private final GatewaySessionHandler gatewayHandler;
    private final WebsocketInbound inbound;
    private final WebsocketOutbound outbound;
    private final WebsocketServiceMessageCodec codec;
    private final long sessionId;
    private final Map<String, String> headers;

    public WebsocketGatewaySession(long j, WebsocketServiceMessageCodec websocketServiceMessageCodec, Map<String, String> map, WebsocketInbound websocketInbound, WebsocketOutbound websocketOutbound, GatewaySessionHandler gatewaySessionHandler) {
        this.sessionId = j;
        this.codec = websocketServiceMessageCodec;
        this.headers = Collections.unmodifiableMap(new HashMap(map));
        this.inbound = websocketInbound.withConnection(connection -> {
            connection.onDispose(this::clearSubscriptions);
        });
        this.outbound = websocketOutbound;
        this.gatewayHandler = gatewaySessionHandler;
    }

    @Override // io.scalecube.services.gateway.GatewaySession
    public long sessionId() {
        return this.sessionId;
    }

    @Override // io.scalecube.services.gateway.GatewaySession
    public Map<String, String> headers() {
        return this.headers;
    }

    public Flux<ByteBuf> receive() {
        return this.inbound.receive().retain();
    }

    public Mono<Void> send(ServiceMessage serviceMessage) {
        return Mono.deferContextual(contextView -> {
            TextWebSocketFrame textWebSocketFrame = new TextWebSocketFrame(this.codec.encode(serviceMessage));
            this.gatewayHandler.onResponse(this, textWebSocketFrame.content(), serviceMessage, (Context) contextView);
            return this.outbound.sendObject(textWebSocketFrame).then().doOnError(th -> {
                this.gatewayHandler.onError(this, th, (Context) contextView);
            });
        });
    }

    public Mono<Void> send(Flux<ServiceMessage> flux) {
        return Mono.deferContextual(contextView -> {
            return this.outbound.sendObject(flux.map(serviceMessage -> {
                TextWebSocketFrame textWebSocketFrame = new TextWebSocketFrame(this.codec.encode(serviceMessage));
                this.gatewayHandler.onResponse(this, textWebSocketFrame.content(), serviceMessage, (Context) contextView);
                return textWebSocketFrame;
            }), SEND_PREDICATE).then().doOnError(th -> {
                this.gatewayHandler.onError(this, th, (Context) contextView);
            });
        });
    }

    public Mono<Void> close() {
        return this.outbound.sendClose().then();
    }

    public Mono<Void> close(String str) {
        return this.outbound.sendClose(1000, str).then();
    }

    public Mono<Void> onClose(Disposable disposable) {
        return Mono.create(monoSink -> {
            this.inbound.withConnection(connection -> {
                Mono onTerminate = connection.onDispose(disposable).onTerminate();
                monoSink.getClass();
                Consumer consumer = (v1) -> {
                    r1.success(v1);
                };
                monoSink.getClass();
                Consumer consumer2 = monoSink::error;
                monoSink.getClass();
                onTerminate.subscribe(consumer, consumer2, monoSink::success);
            });
        });
    }

    public boolean dispose(Long l) {
        boolean z = false;
        if (l != null) {
            Disposable remove = this.subscriptions.remove(l);
            z = remove != null;
            if (z) {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("Dispose subscription by sid={}, session={}", l, Long.valueOf(this.sessionId));
                }
                remove.dispose();
            }
        }
        return z;
    }

    public boolean containsSid(Long l) {
        return l != null && this.subscriptions.containsKey(l);
    }

    public void register(Long l, Disposable disposable) {
        boolean z = false;
        if (!disposable.isDisposed()) {
            z = this.subscriptions.putIfAbsent(l, disposable) == null;
        }
        if (z && LOGGER.isDebugEnabled()) {
            LOGGER.debug("Registered subscription with sid={}, session={}", l, Long.valueOf(this.sessionId));
        }
    }

    private void clearSubscriptions() {
        if (this.subscriptions.size() > 1) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Clear all {} subscriptions on session={}", Integer.valueOf(this.subscriptions.size()), Long.valueOf(this.sessionId));
            }
        } else if (this.subscriptions.size() == 1 && LOGGER.isDebugEnabled()) {
            LOGGER.debug("Clear 1 subscription on session={}", Long.valueOf(this.sessionId));
        }
        this.subscriptions.forEach((l, disposable) -> {
            disposable.dispose();
        });
        this.subscriptions.clear();
    }

    public String toString() {
        return "WebsocketGatewaySession[" + this.sessionId + ']';
    }
}
