package io.scalecube.services.gateway.rsocket;

import io.netty.util.concurrent.DefaultThreadFactory;
import io.rsocket.RSocketFactory;
import io.rsocket.transport.netty.server.CloseableChannel;
import io.rsocket.transport.netty.server.WebsocketServerTransport;
import io.rsocket.util.ByteBufPayload;
import io.scalecube.services.ServiceCall;
import io.scalecube.services.gateway.Gateway;
import io.scalecube.services.gateway.GatewayConfig;
import io.scalecube.services.gateway.GatewayMetrics;
import io.scalecube.services.gateway.GatewayTemplate;
import io.scalecube.services.metrics.Metrics;
import java.net.InetSocketAddress;
import java.util.Optional;
import java.util.concurrent.Executor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

/* loaded from: input_file:io/scalecube/services/gateway/rsocket/RSocketGateway.class */
public class RSocketGateway extends GatewayTemplate {
    private static final Logger LOGGER = LoggerFactory.getLogger(RSocketGateway.class);
    private static final DefaultThreadFactory BOSS_THREAD_FACTORY = new DefaultThreadFactory("rsws-boss", true);
    private CloseableChannel server;

    public Mono<Gateway> start(GatewayConfig gatewayConfig, Executor executor, boolean z, ServiceCall.Call call, Metrics metrics) {
        return Mono.defer(() -> {
            LOGGER.info("Starting gateway with {}", gatewayConfig);
            GatewayMetrics gatewayMetrics = new GatewayMetrics(gatewayConfig.name(), metrics);
            RSocketGatewayAcceptor rSocketGatewayAcceptor = new RSocketGatewayAcceptor(call.create(), gatewayMetrics);
            return RSocketFactory.receive().frameDecoder(frame -> {
                return ByteBufPayload.create(frame.sliceData().retain(), frame.sliceMetadata().retain());
            }).acceptor(rSocketGatewayAcceptor).transport(WebsocketServerTransport.create(prepareHttpServer(prepareLoopResources(z, BOSS_THREAD_FACTORY, gatewayConfig, executor), gatewayConfig.port(), gatewayMetrics))).start().doOnSuccess(closeableChannel -> {
                this.server = closeableChannel;
            }).doOnSuccess(closeableChannel2 -> {
                LOGGER.info("Rsocket Gateway has been started successfully on {}", closeableChannel2.address());
            }).then(Mono.just(this));
        });
    }

    public InetSocketAddress address() {
        return this.server.address();
    }

    public Mono<Void> stop() {
        return shutdownServer(this.server).then(shutdownBossGroup());
    }

    private Mono<Void> shutdownServer(CloseableChannel closeableChannel) {
        return Mono.defer(() -> {
            return (Mono) Optional.ofNullable(closeableChannel).map(closeableChannel2 -> {
                closeableChannel2.dispose();
                return closeableChannel2.onClose().doOnError(th -> {
                    LOGGER.warn("Failed to close server: " + th);
                }).onErrorResume(th2 -> {
                    return Mono.empty();
                });
            }).orElse(Mono.empty());
        });
    }
}
