package io.scalecube.ipc;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.scalecube.ipc.netty.NettyServerTransport;
import io.scalecube.transport.Address;
import java.util.function.Consumer;
import rx.Observable;
import rx.subjects.ReplaySubject;
import rx.subjects.Subject;

/* loaded from: input_file:io/scalecube/ipc/ListeningServerStream.class */
public final class ListeningServerStream implements EventStream {
    private final Config config;
    private final Subject<Address, Address> bindSubject = ReplaySubject.create(1).toSerialized();
    private final Subject<Address, Address> unbindSubject = ReplaySubject.create(1).toSerialized();
    private final ServerStream serverStream = ServerStream.newServerStream();

    /* loaded from: input_file:io/scalecube/ipc/ListeningServerStream$Config.class */
    public static class Config {
        private static final int DEFAULT_PORT = 5801;
        private static final int DEFAULT_PORT_COUNT = 100;
        private static final boolean DEFAULT_PORT_AUTO_INCREMENT = true;
        private String listenAddress;
        private String listenInterface;
        private boolean preferIPv6;
        private int port;
        private int portCount;
        private boolean portAutoIncrement;
        private ServerBootstrap serverBootstrap;
        private static final String DEFAULT_LISTEN_ADDRESS = null;
        private static final String DEFAULT_LISTEN_INTERFACE = null;
        private static final boolean DEFAULT_PREFER_IP6 = false;
        private static final ServerBootstrap DEFAULT_SERVER_BOOTSTRAP = new ServerBootstrap().group(new NioEventLoopGroup(1), new NioEventLoopGroup(DEFAULT_PREFER_IP6)).channel(NioServerSocketChannel.class).childOption(ChannelOption.TCP_NODELAY, true).childOption(ChannelOption.SO_KEEPALIVE, true).childOption(ChannelOption.SO_REUSEADDR, true);

        private Config() {
            this.listenAddress = DEFAULT_LISTEN_ADDRESS;
            this.listenInterface = DEFAULT_LISTEN_INTERFACE;
            this.preferIPv6 = false;
            this.port = DEFAULT_PORT;
            this.portCount = DEFAULT_PORT_COUNT;
            this.portAutoIncrement = true;
            this.serverBootstrap = DEFAULT_SERVER_BOOTSTRAP;
        }

        private Config(Config config, Consumer<Config> consumer) {
            this.listenAddress = DEFAULT_LISTEN_ADDRESS;
            this.listenInterface = DEFAULT_LISTEN_INTERFACE;
            this.preferIPv6 = false;
            this.port = DEFAULT_PORT;
            this.portCount = DEFAULT_PORT_COUNT;
            this.portAutoIncrement = true;
            this.serverBootstrap = DEFAULT_SERVER_BOOTSTRAP;
            this.listenAddress = config.listenAddress;
            this.listenInterface = config.listenInterface;
            this.preferIPv6 = config.preferIPv6;
            this.port = config.port;
            this.portCount = config.portCount;
            this.portAutoIncrement = config.portAutoIncrement;
            this.serverBootstrap = config.serverBootstrap;
            consumer.accept(this);
        }

        public String getListenAddress() {
            return this.listenAddress;
        }

        public Config setListenAddress(String str) {
            return new Config(this, config -> {
                config.listenAddress = str;
            });
        }

        public String getListenInterface() {
            return this.listenInterface;
        }

        public Config setListenInterface(String str) {
            return new Config(this, config -> {
                config.listenInterface = str;
            });
        }

        public boolean isPreferIPv6() {
            return this.preferIPv6;
        }

        public Config setPreferIPv6(boolean z) {
            return new Config(this, config -> {
                config.preferIPv6 = z;
            });
        }

        public int getPort() {
            return this.port;
        }

        public Config setPort(int i) {
            return new Config(this, config -> {
                config.port = i;
            });
        }

        public int getPortCount() {
            return this.portCount;
        }

        public Config setPortCount(int i) {
            return new Config(this, config -> {
                config.portCount = i;
            });
        }

        public boolean isPortAutoIncrement() {
            return this.portAutoIncrement;
        }

        public Config setPortAutoIncrement(boolean z) {
            return new Config(this, config -> {
                config.portAutoIncrement = z;
            });
        }

        public ServerBootstrap getServerBootstrap() {
            return this.serverBootstrap;
        }

        public Config setServerBootstrap(ServerBootstrap serverBootstrap) {
            return new Config(this, config -> {
                config.serverBootstrap = serverBootstrap;
            });
        }
    }

    private ListeningServerStream(Config config) {
        this.config = config;
    }

    public static ListeningServerStream newServerStream() {
        return new ListeningServerStream(new Config());
    }

    public ListeningServerStream withListenAddress(String str) {
        return new ListeningServerStream(this.config.setListenAddress(str));
    }

    public ListeningServerStream withListenInterface(String str) {
        return new ListeningServerStream(this.config.setListenInterface(str));
    }

    public ListeningServerStream withPreferIPv6(boolean z) {
        return new ListeningServerStream(this.config.setPreferIPv6(z));
    }

    public ListeningServerStream withPort(int i) {
        return new ListeningServerStream(this.config.setPort(i));
    }

    public ListeningServerStream withPortCount(int i) {
        return new ListeningServerStream(this.config.setPortCount(i));
    }

    public ListeningServerStream withPortAutoIncrement(boolean z) {
        return new ListeningServerStream(this.config.setPortAutoIncrement(z));
    }

    public ListeningServerStream withServerBootstrap(ServerBootstrap serverBootstrap) {
        return new ListeningServerStream(this.config.setServerBootstrap(serverBootstrap));
    }

    @Override // io.scalecube.ipc.EventStream
    public void subscribe(ChannelContext channelContext) {
        this.serverStream.subscribe(channelContext);
    }

    @Override // io.scalecube.ipc.EventStream
    public Observable<Event> listen() {
        return this.serverStream.listen();
    }

    public void send(ServiceMessage serviceMessage) {
        this.serverStream.send(serviceMessage);
    }

    @Override // io.scalecube.ipc.EventStream
    public void close() {
        this.serverStream.close();
    }

    public Observable<Address> listenBind() {
        return this.bindSubject.onBackpressureLatest().asObservable();
    }

    public Observable<Address> listenUnbind() {
        return this.unbindSubject.onBackpressureLatest().asObservable();
    }

    public ListeningServerStream bind() {
        ListeningServerStream listeningServerStream = new ListeningServerStream(this.config);
        Config config = this.config;
        listeningServerStream.getClass();
        new NettyServerTransport(config, listeningServerStream::subscribe).bind().whenComplete((nettyServerTransport, th) -> {
            onBind(listeningServerStream, nettyServerTransport, th);
        });
        return listeningServerStream;
    }

    private void onBind(ListeningServerStream listeningServerStream, NettyServerTransport nettyServerTransport, Throwable th) {
        if (nettyServerTransport != null) {
            listeningServerStream.listen().subscribe(event -> {
            }, th2 -> {
                unbindTransport(nettyServerTransport);
            }, () -> {
                unbindTransport(nettyServerTransport);
            });
            nettyServerTransport.getAddress().ifPresent(address -> {
                listeningServerStream.bindSubject.onNext(address);
                listeningServerStream.bindSubject.onCompleted();
            });
        }
        if (th != null) {
            listeningServerStream.bindSubject.onError(th);
        }
    }

    private void unbindTransport(NettyServerTransport nettyServerTransport) {
        nettyServerTransport.unbind().whenComplete((nettyServerTransport2, th) -> {
            if (nettyServerTransport2 != null) {
                nettyServerTransport2.getAddress().ifPresent(address -> {
                    this.unbindSubject.onNext(address);
                    this.unbindSubject.onCompleted();
                });
            }
            if (th != null) {
                this.unbindSubject.onError(th);
            }
        });
    }
}
