package io.servicetalk.tcp.netty.internal;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ConnectTimeoutException;
import io.netty.channel.EventLoop;
import io.netty.resolver.AbstractAddressResolver;
import io.netty.resolver.AddressResolver;
import io.netty.resolver.AddressResolverGroup;
import io.netty.resolver.NoopAddressResolver;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.ImmediateEventExecutor;
import io.servicetalk.client.api.RetryableConnectException;
import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.SingleSource;
import io.servicetalk.concurrent.api.ListenableAsyncCloseable;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.SourceAdapters;
import io.servicetalk.concurrent.api.internal.SubscribableSingle;
import io.servicetalk.concurrent.internal.DelayedCancellable;
import io.servicetalk.transport.api.ConnectionObserver;
import io.servicetalk.transport.api.ExecutionContext;
import io.servicetalk.transport.api.FileDescriptorSocketAddress;
import io.servicetalk.transport.api.TransportObserver;
import io.servicetalk.transport.netty.internal.BuilderUtils;
import io.servicetalk.transport.netty.internal.ChannelCloseUtils;
import io.servicetalk.transport.netty.internal.CopyByteBufHandlerChannelInitializer;
import io.servicetalk.transport.netty.internal.EventLoopAwareNettyIoExecutors;
import java.net.ConnectException;
import java.net.SocketAddress;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/servicetalk/tcp/netty/internal/TcpConnector.class */
public final class TcpConnector {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/servicetalk/tcp/netty/internal/TcpConnector$ConnectHandler.class */
    public static final class ConnectHandler<C extends ListenableAsyncCloseable> implements Consumer<Channel> {
        private static final Logger LOGGER = LoggerFactory.getLogger(ConnectHandler.class);
        private static final AtomicIntegerFieldUpdater<ConnectHandler> terminatedUpdater = AtomicIntegerFieldUpdater.newUpdater(ConnectHandler.class, "terminated");
        private final DelayedCancellable futureCancellable = new DelayedCancellable();
        private final DelayedCancellable flatMapCancellable = new DelayedCancellable();
        private final SingleSource.Subscriber<? super C> target;
        private final BiFunction<Channel, ConnectionObserver, Single<? extends C>> connectionFactory;
        private final ConnectionObserver connectionObserver;
        private volatile int terminated;

        ConnectHandler(SingleSource.Subscriber<? super C> subscriber, BiFunction<Channel, ConnectionObserver, Single<? extends C>> biFunction, ConnectionObserver connectionObserver) {
            this.target = subscriber;
            this.connectionFactory = biFunction;
            subscriber.onSubscribe(() -> {
                try {
                    this.futureCancellable.cancel();
                } finally {
                    this.flatMapCancellable.cancel();
                }
            });
            this.connectionObserver = connectionObserver;
        }

        @Override // java.util.function.Consumer
        public void accept(final Channel channel) {
            SourceAdapters.toSource(this.connectionFactory.apply(channel, this.connectionObserver).shareContextOnSubscribe()).subscribe(new SingleSource.Subscriber<C>() { // from class: io.servicetalk.tcp.netty.internal.TcpConnector.ConnectHandler.1
                public void onSubscribe(Cancellable cancellable) {
                    ConnectHandler.this.flatMapCancellable.delayedCancellable(cancellable);
                }

                public void onSuccess(@Nullable C c) {
                    if (ConnectHandler.terminatedUpdater.compareAndSet(ConnectHandler.this, 0, 1)) {
                        ConnectHandler.this.target.onSuccess(c);
                        return;
                    }
                    ConnectHandler.LOGGER.debug("Connection {} created for a channel: {} but connect failed previously. Closing connection.", c, channel);
                    if (c != null) {
                        c.closeAsync().subscribe();
                    }
                }

                public void onError(Throwable th) {
                    if (ConnectHandler.terminatedUpdater.compareAndSet(ConnectHandler.this, 0, 1)) {
                        ConnectHandler.this.target.onError(th);
                    } else {
                        ConnectHandler.LOGGER.debug("Ignored duplicate connect failure for channel: {}.", channel, th);
                    }
                }
            });
        }

        void connectFuture(Future<?> future) {
            this.futureCancellable.delayedCancellable(() -> {
                future.cancel(false);
            });
        }

        void connectFailed(Throwable th) {
            if (terminatedUpdater.compareAndSet(this, 0, 1)) {
                this.target.onError(th);
            }
        }

        void unexpectedFailure(Throwable th) {
            if (terminatedUpdater.compareAndSet(this, 0, 1)) {
                this.target.onError(th);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/servicetalk/tcp/netty/internal/TcpConnector$NoopNettyAddressResolverGroup.class */
    public static final class NoopNettyAddressResolverGroup extends AddressResolverGroup<SocketAddress> {
        static final AddressResolverGroup<SocketAddress> INSTANCE = new NoopNettyAddressResolverGroup();
        private static final AbstractAddressResolver<SocketAddress> NOOP_ADDRESS_RESOLVER = new NoopAddressResolver(ImmediateEventExecutor.INSTANCE);

        private NoopNettyAddressResolverGroup() {
        }

        protected AddressResolver<SocketAddress> newResolver(EventExecutor eventExecutor) {
            return NOOP_ADDRESS_RESOLVER;
        }

        public AddressResolver<SocketAddress> getResolver(EventExecutor eventExecutor) {
            return NOOP_ADDRESS_RESOLVER;
        }
    }

    private TcpConnector() {
    }

    public static <C extends ListenableAsyncCloseable> Single<C> connect(@Nullable final SocketAddress socketAddress, final Object obj, final ReadOnlyTcpClientConfig readOnlyTcpClientConfig, final boolean z, final ExecutionContext<?> executionContext, final BiFunction<Channel, ConnectionObserver, Single<? extends C>> biFunction, final TransportObserver transportObserver) {
        Objects.requireNonNull(obj);
        Objects.requireNonNull(readOnlyTcpClientConfig);
        Objects.requireNonNull(executionContext);
        Objects.requireNonNull(biFunction);
        Objects.requireNonNull(transportObserver);
        return new SubscribableSingle<C>() { // from class: io.servicetalk.tcp.netty.internal.TcpConnector.1
            protected void handleSubscribe(SingleSource.Subscriber<? super C> subscriber) {
                ConnectHandler connectHandler = new ConnectHandler(subscriber, biFunction, transportObserver.onNewConnection(socketAddress, obj));
                try {
                    Future<?> connect0 = TcpConnector.connect0(socketAddress, obj, readOnlyTcpClientConfig, z, executionContext, connectHandler);
                    connectHandler.connectFuture(connect0);
                    Object obj2 = obj;
                    SocketAddress socketAddress2 = socketAddress;
                    connect0.addListener(future -> {
                        Throwable cause = future.cause();
                        if (cause != null) {
                            if (cause instanceof ConnectTimeoutException) {
                                cause = new io.servicetalk.client.api.ConnectTimeoutException(obj2 instanceof FileDescriptorSocketAddress ? "Failed to register: " + obj2 : "Failed to connect: " + obj2 + " (localAddress: " + socketAddress2 + ")", cause);
                            } else if (cause instanceof ConnectException) {
                                cause = new RetryableConnectException((ConnectException) cause);
                            }
                            if (future instanceof ChannelFuture) {
                                ChannelCloseUtils.assignConnectionError(((ChannelFuture) future).channel(), cause);
                            }
                            connectHandler.connectFailed(cause);
                        }
                    });
                } catch (Throwable th) {
                    connectHandler.unexpectedFailure(th);
                }
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Future<?> connect0(@Nullable SocketAddress socketAddress, Object obj, ReadOnlyTcpClientConfig readOnlyTcpClientConfig, boolean z, ExecutionContext<?> executionContext, final Consumer<? super Channel> consumer) {
        ChannelInitializer<Channel> channelInitializer = new ChannelInitializer<Channel>() { // from class: io.servicetalk.tcp.netty.internal.TcpConnector.2
            protected void initChannel(Channel channel) {
                consumer.accept(channel);
            }
        };
        EventLoop next = EventLoopAwareNettyIoExecutors.toEventLoopAwareNettyIoExecutor(executionContext.ioExecutor()).eventLoopGroup().next();
        if (!(obj instanceof FileDescriptorSocketAddress)) {
            return connectWithBootstrap(socketAddress, obj, readOnlyTcpClientConfig, z, next, channelInitializer);
        }
        if (socketAddress != null) {
            return next.newFailedFuture(new IllegalArgumentException("local address cannot be specified when " + FileDescriptorSocketAddress.class.getSimpleName() + " is used"));
        }
        Channel socketChannel = BuilderUtils.socketChannel(next, (FileDescriptorSocketAddress) obj);
        return socketChannel == null ? next.newFailedFuture(new IllegalArgumentException(FileDescriptorSocketAddress.class.getSimpleName() + " not supported")) : initFileDescriptorBasedChannel(readOnlyTcpClientConfig, z, next, socketChannel, channelInitializer);
    }

    private static ChannelFuture connectWithBootstrap(@Nullable SocketAddress socketAddress, Object obj, ReadOnlyTcpClientConfig readOnlyTcpClientConfig, boolean z, EventLoop eventLoop, ChannelHandler channelHandler) {
        SocketAddress nettyAddress = BuilderUtils.toNettyAddress(obj);
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.resolver(NoopNettyAddressResolverGroup.INSTANCE);
        bootstrap.group(eventLoop);
        bootstrap.channel(BuilderUtils.socketChannel(eventLoop, nettyAddress.getClass()));
        bootstrap.handler(channelHandler);
        for (Map.Entry<ChannelOption, Object> entry : readOnlyTcpClientConfig.options().entrySet()) {
            bootstrap.option(entry.getKey(), entry.getValue());
        }
        bootstrap.option(ChannelOption.AUTO_READ, Boolean.valueOf(z));
        bootstrap.option(ChannelOption.ALLOCATOR, CopyByteBufHandlerChannelInitializer.POOLED_ALLOCATOR);
        return bootstrap.connect(nettyAddress, socketAddress);
    }

    private static ChannelFuture initFileDescriptorBasedChannel(ReadOnlyTcpClientConfig readOnlyTcpClientConfig, boolean z, EventLoop eventLoop, Channel channel, ChannelHandler channelHandler) {
        for (Map.Entry<ChannelOption, Object> entry : readOnlyTcpClientConfig.options().entrySet()) {
            channel.config().setOption(entry.getKey(), entry.getValue());
        }
        channel.config().setOption(ChannelOption.AUTO_READ, Boolean.valueOf(z));
        channel.config().setAllocator(CopyByteBufHandlerChannelInitializer.POOLED_ALLOCATOR);
        channel.pipeline().addLast(new ChannelHandler[]{channelHandler});
        return eventLoop.register(channel);
    }
}
