package io.scalecube.services.gateway.ws;

import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.scalecube.net.Address;
import io.scalecube.services.gateway.Gateway;
import io.scalecube.services.gateway.GatewayOptions;
import io.scalecube.services.gateway.GatewaySessionHandler;
import io.scalecube.services.gateway.GatewayTemplate;
import io.scalecube.services.gateway.ReferenceCountUtil;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.StringJoiner;
import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.DisposableServer;
import reactor.netty.resources.LoopResources;

/* loaded from: input_file:io/scalecube/services/gateway/ws/WebsocketGateway.class */
public class WebsocketGateway extends GatewayTemplate {
    private static final Logger LOGGER = LoggerFactory.getLogger(WebsocketGateway.class);
    private final GatewaySessionHandler<GatewayMessage> gatewayHandler;
    private final Duration keepAliveInterval;
    private DisposableServer server;
    private LoopResources loopResources;

    public WebsocketGateway(GatewayOptions gatewayOptions) {
        this(gatewayOptions, Duration.ZERO, GatewaySessionHandler.DEFAULT_WS_INSTANCE);
    }

    public WebsocketGateway(GatewayOptions gatewayOptions, Duration duration) {
        this(gatewayOptions, duration, GatewaySessionHandler.DEFAULT_WS_INSTANCE);
    }

    public WebsocketGateway(GatewayOptions gatewayOptions, GatewaySessionHandler<GatewayMessage> gatewaySessionHandler) {
        this(gatewayOptions, Duration.ZERO, gatewaySessionHandler);
    }

    public WebsocketGateway(GatewayOptions gatewayOptions, Duration duration, GatewaySessionHandler<GatewayMessage> gatewaySessionHandler) {
        super(gatewayOptions);
        this.keepAliveInterval = duration;
        this.gatewayHandler = gatewaySessionHandler;
    }

    public Mono<Gateway> start() {
        return Mono.defer(() -> {
            WebsocketGatewayAcceptor websocketGatewayAcceptor = new WebsocketGatewayAcceptor(this.options.call().requestReleaser(ReferenceCountUtil::safestRelease), this.gatewayMetrics, this.gatewayHandler);
            this.loopResources = LoopResources.create("websocket-gateway");
            return prepareHttpServer(this.loopResources, this.options.port(), this.gatewayMetrics).tcpConfiguration(tcpServer -> {
                return tcpServer.doOnConnection(this::setupKeepAlive);
            }).handle(websocketGatewayAcceptor).bind().doOnSuccess(disposableServer -> {
                this.server = disposableServer;
            }).thenReturn(this);
        });
    }

    public Address address() {
        InetSocketAddress address = this.server.address();
        return Address.create(address.getHostString(), address.getPort());
    }

    public Mono<Void> stop() {
        return Flux.concatDelayError(new Publisher[]{shutdownServer(this.server), shutdownLoopResources(this.loopResources)}).then();
    }

    public String toString() {
        return new StringJoiner(", ", WebsocketGateway.class.getSimpleName() + "[", "]").add("server=" + this.server).add("loopResources=" + this.loopResources).add("options=" + this.options).toString();
    }

    private void setupKeepAlive(Connection connection) {
        if (this.keepAliveInterval != Duration.ZERO) {
            connection.onReadIdle(this.keepAliveInterval.toMillis(), () -> {
                onReadIdle(connection);
            }).onWriteIdle(this.keepAliveInterval.toMillis(), () -> {
                onWriteIdle(connection);
            });
        }
    }

    private void onWriteIdle(Connection connection) {
        LOGGER.debug("Sending keepalive on writeIdle");
        connection.outbound().sendObject(new PingWebSocketFrame()).then().subscribe((Consumer) null, th -> {
            LOGGER.warn("Can't send keepalive on writeIdle: " + th);
        });
    }

    private void onReadIdle(Connection connection) {
        LOGGER.debug("Sending keepalive on readIdle");
        connection.outbound().sendObject(new PingWebSocketFrame()).then().subscribe((Consumer) null, th -> {
            LOGGER.warn("Can't send keepalive on readIdle: " + th);
        });
    }
}
