package io.scalecube.services.gateway.ws;

import io.scalecube.net.Address;
import io.scalecube.services.gateway.Gateway;
import io.scalecube.services.gateway.GatewayLoopResources;
import io.scalecube.services.gateway.GatewayOptions;
import io.scalecube.services.gateway.GatewayTemplate;
import io.scalecube.services.gateway.ReferenceCountUtil;
import java.net.InetSocketAddress;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.resources.LoopResources;

/* loaded from: input_file:io/scalecube/services/gateway/ws/WebsocketGateway.class */
public class WebsocketGateway extends GatewayTemplate {
    private BiFunction<WebsocketSession, GatewayMessage, GatewayMessage> onMessage;
    private Consumer<WebsocketSession> onOpen;
    private Consumer<WebsocketSession> onClose;
    private DisposableServer server;
    private LoopResources loopResources;

    public WebsocketGateway(GatewayOptions gatewayOptions) {
        super(gatewayOptions);
    }

    public WebsocketGateway(GatewayOptions gatewayOptions, BiFunction<WebsocketSession, GatewayMessage, GatewayMessage> biFunction, Consumer<WebsocketSession> consumer, Consumer<WebsocketSession> consumer2) {
        super(gatewayOptions);
        this.onMessage = biFunction;
        this.onOpen = consumer;
        this.onClose = consumer2;
    }

    public Mono<Gateway> start() {
        return Mono.defer(() -> {
            WebsocketGatewayAcceptor websocketGatewayAcceptor = new WebsocketGatewayAcceptor(this.options.call().requestReleaser(ReferenceCountUtil::safestRelease), this.gatewayMetrics, this.onMessage, this.onOpen, this.onClose);
            if (this.options.workerPool() != null) {
                this.loopResources = new GatewayLoopResources(this.options.workerPool());
            } else {
                this.loopResources = LoopResources.create("websocket-gateway");
            }
            return prepareHttpServer(this.loopResources, this.options.port(), this.gatewayMetrics).handle(websocketGatewayAcceptor).bind().doOnSuccess(disposableServer -> {
                this.server = disposableServer;
            }).thenReturn(this);
        });
    }

    public Address address() {
        InetSocketAddress address = this.server.address();
        return Address.create(address.getHostString(), address.getPort());
    }

    public Mono<Void> stop() {
        return Flux.concatDelayError(new Publisher[]{shutdownServer(this.server), shutdownLoopResources(this.loopResources)}).then();
    }
}
