package io.scalecube.gateway.websocket;

import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.util.ReferenceCountUtil;
import io.scalecube.gateway.websocket.message.GatewayMessage;
import io.scalecube.gateway.websocket.message.GatewayMessageCodec;
import io.scalecube.gateway.websocket.message.Signal;
import io.scalecube.services.ServiceCall;
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.exceptions.BadRequestException;
import io.scalecube.services.exceptions.ExceptionProcessor;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:io/scalecube/gateway/websocket/WebsocketAcceptor.class */
public final class WebsocketAcceptor {
    private static final Logger LOGGER = LoggerFactory.getLogger(WebsocketAcceptor.class);
    private final ServiceCall serviceCall;
    private final GatewayMessageCodec gatewayMessageCodec = new GatewayMessageCodec();

    public WebsocketAcceptor(ServiceCall serviceCall) {
        this.serviceCall = serviceCall;
    }

    public Mono<Void> onConnect(WebsocketSession websocketSession) {
        LOGGER.info("Session connected: " + websocketSession);
        Mono<Void> send = websocketSession.send(websocketSession.receive().flatMap(webSocketFrame -> {
            return Flux.create(fluxSink -> {
                try {
                    GatewayMessage message = toMessage(webSocketFrame);
                    Long streamId = message.streamId();
                    if (streamId == null) {
                        LOGGER.error("Invalid gateway request: {}, sid is missing for session: {}", message, websocketSession);
                        throw new BadRequestException("sid is missing");
                    }
                    if (message.hasSignal(Signal.CANCEL)) {
                        if (!websocketSession.dispose(streamId)) {
                            LOGGER.error("CANCEL failed for gateway request: {}, sid={} is not contained in session: {}", message, streamId, websocketSession);
                            throw new BadRequestException("sid=" + streamId + " is not contained in session");
                        }
                        fluxSink.next(GatewayMessage.builder().streamId(streamId).signal(Signal.CANCEL).build());
                        fluxSink.complete();
                        return;
                    }
                    if (websocketSession.containsSid(streamId)) {
                        LOGGER.error("Failed gateway request: {}, sid={} is already registered on session: {}", message, websocketSession);
                        throw new BadRequestException("sid=" + streamId + " is already registered on session");
                    }
                    if (message.qualifier() == null) {
                        LOGGER.error("Failed gateway request: {}, q is missing for session: {}", message, websocketSession);
                        throw new BadRequestException("q is missing");
                    }
                    AtomicBoolean atomicBoolean = new AtomicBoolean(false);
                    Flux<ServiceMessage> requestMany = this.serviceCall.requestMany(GatewayMessage.toServiceMessage(message));
                    if (message.inactivity() != null) {
                        requestMany = requestMany.timeout(Duration.ofMillis(message.inactivity().intValue()));
                    }
                    Flux doFinally = requestMany.map(serviceMessage -> {
                        GatewayMessage.Builder streamId2 = GatewayMessage.from(serviceMessage).streamId(streamId);
                        if (ExceptionProcessor.isError(serviceMessage)) {
                            atomicBoolean.set(true);
                            streamId2.signal(Signal.ERROR);
                        }
                        return streamId2.build();
                    }).concatWith(Flux.defer(() -> {
                        return atomicBoolean.get() ? Mono.empty() : Mono.just(GatewayMessage.builder().streamId(streamId).signal(Signal.COMPLETE).build());
                    })).onErrorResume(th -> {
                        return Mono.just(toErrorMessage(th, streamId));
                    }).doFinally(signalType -> {
                        websocketSession.dispose(streamId);
                    });
                    fluxSink.getClass();
                    Consumer consumer = (v1) -> {
                        r1.next(v1);
                    };
                    fluxSink.getClass();
                    Consumer<? super Throwable> consumer2 = fluxSink::error;
                    fluxSink.getClass();
                    websocketSession.register(streamId, doFinally.subscribe(consumer, consumer2, fluxSink::complete));
                } catch (Throwable th2) {
                    ReferenceCountUtil.safeRelease(webSocketFrame);
                    fluxSink.next(toErrorMessage(th2, null));
                    fluxSink.complete();
                }
            });
        }).flatMap(this::toByteBuf).doOnError(th -> {
            LOGGER.error("Unhandled exception occured: {}, session: {} will be closed", th, websocketSession, th);
        }));
        websocketSession.onClose(() -> {
            LOGGER.info("Session disconnected: " + websocketSession);
        });
        return send.then();
    }

    public Mono<Void> onDisconnect(WebsocketSession websocketSession) {
        LOGGER.info("Session disconnected: " + websocketSession);
        return Mono.empty();
    }

    private Mono<ByteBuf> toByteBuf(GatewayMessage gatewayMessage) {
        try {
            return Mono.just(this.gatewayMessageCodec.encode(gatewayMessage));
        } catch (Throwable th) {
            ReferenceCountUtil.safeRelease(gatewayMessage.data());
            return Mono.empty();
        }
    }

    private GatewayMessage toMessage(WebSocketFrame webSocketFrame) {
        try {
            return this.gatewayMessageCodec.decode(webSocketFrame.content());
        } catch (Throwable th) {
            throw new BadRequestException(th.getMessage());
        }
    }

    private GatewayMessage toErrorMessage(Throwable th, Long l) {
        return GatewayMessage.from(ExceptionProcessor.toMessage(th)).streamId(l).signal(Signal.ERROR).build();
    }
}
