/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.bolt;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;
import org.neo4j.bolt.BoltServer;
import org.neo4j.bolt.test.annotation.BoltTestExtension;
import org.neo4j.bolt.test.annotation.connection.initializer.Authenticated;
import org.neo4j.bolt.test.annotation.connection.initializer.VersionSelected;
import org.neo4j.bolt.test.annotation.connection.transport.ExcludeTransport;
import org.neo4j.bolt.test.annotation.connection.transport.IncludeTransport;
import org.neo4j.bolt.test.annotation.connection.transport.UseTransport;
import org.neo4j.bolt.test.annotation.setup.FactoryFunction;
import org.neo4j.bolt.test.annotation.setup.SettingsFunction;
import org.neo4j.bolt.test.annotation.test.TransportTest;
import org.neo4j.bolt.test.provider.ConnectionProvider;
import org.neo4j.bolt.test.util.ServerUtil;
import org.neo4j.bolt.testing.assertions.BoltConnectionAssertions;
import org.neo4j.bolt.testing.client.BoltTestConnection;
import org.neo4j.bolt.testing.client.TransportType;
import org.neo4j.bolt.testing.messages.BoltWire;
import org.neo4j.bolt.transport.Neo4jWithSocket;
import org.neo4j.bolt.transport.Neo4jWithSocketExtension;
import org.neo4j.configuration.connectors.BoltConnector;
import org.neo4j.configuration.connectors.BoltConnectorInternalSettings;
import org.neo4j.graphdb.config.Setting;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.logging.AssertableLogProvider;
import org.neo4j.logging.InternalLogProvider;
import org.neo4j.logging.LogAssertions;
import org.neo4j.logging.LogProvider;
import org.neo4j.test.TestDatabaseManagementServiceBuilder;
import org.neo4j.test.extension.Inject;
import org.neo4j.test.extension.testdirectory.EphemeralTestDirectoryExtension;

@EphemeralTestDirectoryExtension
@Neo4jWithSocketExtension
@BoltTestExtension
@DisabledOnOs(value={OS.WINDOWS})
public class UnixDomainSocketSchedulerIT {
    private final AssertableLogProvider internalLogProvider = new AssertableLogProvider();
    private final AssertableLogProvider userLogProvider = new AssertableLogProvider();
    @Inject
    private Neo4jWithSocket server;

    private BoltServer boltServer() {
        GraphDatabaseAPI gdb = (GraphDatabaseAPI)this.server.graphDatabaseService();
        return (BoltServer)gdb.getDependencyResolver().resolveDependency(BoltServer.class);
    }

    @FactoryFunction
    void customizeDatabase(TestDatabaseManagementServiceBuilder factory) {
        factory.setInternalLogProvider((InternalLogProvider)this.internalLogProvider);
        factory.setUserLogProvider((LogProvider)this.userLogProvider);
    }

    @SettingsFunction
    static void customizeSettings(Map<Setting<?>, Object> settings) {
        settings.put(BoltConnector.thread_pool_min_size, 0);
        settings.put(BoltConnector.thread_pool_max_size, 2);
        settings.put(BoltConnectorInternalSettings.enable_unix_socket_user_database_access, true);
        settings.put(BoltConnector.unix_socket_use_dedicated_thread_pool, true);
        settings.put(BoltConnector.unix_socket_dedicated_thread_pool_min_size, 0);
        settings.put(BoltConnector.unix_socket_dedicated_thread_pool_max_size, 1);
    }

    @TransportTest
    @ExcludeTransport(value={TransportType.UNIX})
    void shouldProvideDedicatedPoolForUnixDomainSocket(BoltWire wire, @Authenticated BoltTestConnection standardConnection, @Authenticated @UseTransport(value=TransportType.UNIX) ConnectionProvider unixConnectionProvider) throws Exception {
        UnixDomainSocketSchedulerIT.enterStreaming(wire, standardConnection);
        ServerUtil.awaitPrimaryThreadPoolSaturation(this.boltServer(), 1);
        try (BoltTestConnection unixConnection = unixConnectionProvider.create();){
            unixConnection.send(wire.run("RETURN 1"));
            BoltConnectionAssertions.assertThat((BoltTestConnection)unixConnection).receivesSuccess();
        }
        UnixDomainSocketSchedulerIT.exitStreaming(wire, standardConnection);
        ServerUtil.awaitPrimaryThreadPoolSaturation(this.boltServer(), 0);
    }

    @TransportTest
    @IncludeTransport(value={TransportType.UNIX})
    void shouldNotSubmitToPrimaryPool(BoltWire wire, @Authenticated BoltTestConnection connection) throws IOException, InterruptedException {
        UnixDomainSocketSchedulerIT.enterStreaming(wire, connection);
        ThreadPoolExecutor executor = (ThreadPoolExecutor)this.boltServer().getPrimaryExecutorService();
        ServerUtil.awaitDomainSocketThreadPoolSaturation(this.boltServer(), 1);
        int i = 0;
        do {
            Thread.sleep(100L);
            Assertions.assertThat((int)executor.getActiveCount()).isEqualTo(0);
        } while (i++ < 10);
    }

    @TransportTest
    @IncludeTransport(value={TransportType.UNIX})
    void shouldAdhereToConfiguredThreadLimits(BoltWire wire, @Authenticated BoltTestConnection connection1, @VersionSelected ConnectionProvider connectionProvider) throws IOException {
        UnixDomainSocketSchedulerIT.enterStreaming(wire, connection1);
        Awaitility.await().atMost(2L, TimeUnit.MINUTES).pollInSameThread().pollInterval(100L, TimeUnit.MILLISECONDS).untilAsserted(() -> {
            try (BoltTestConnection connection3 = connectionProvider.create();){
                connection3.send(wire.hello());
                BoltConnectionAssertions.assertThat((BoltTestConnection)connection3).receivesFailureFuzzyV40((Status)Status.Request.NoThreadsAvailable, "There are no available threads to serve this request at the moment");
                BoltConnectionAssertions.assertThat((BoltTestConnection)connection3).isEventuallyTerminated();
            }
        });
        LogAssertions.assertThat((AssertableLogProvider)this.userLogProvider).forLevel(AssertableLogProvider.Level.DEBUG).containsMessages(new String[]{"since there are no available threads to serve it at the moment."});
        LogAssertions.assertThat((AssertableLogProvider)this.internalLogProvider).forLevel(AssertableLogProvider.Level.DEBUG).containsMessages(new String[]{"since there are no available threads to serve it at the moment."});
    }

    private static void enterStreaming(BoltWire wire, BoltTestConnection connection) throws IOException {
        connection.send(wire.run("UNWIND RANGE (1, 100) AS x RETURN x"));
        BoltConnectionAssertions.assertThat((BoltTestConnection)connection).receivesSuccess();
    }

    private static void exitStreaming(BoltWire wire, BoltTestConnection connection) throws IOException {
        connection.send(wire.discard());
        BoltConnectionAssertions.assertThat((BoltTestConnection)connection).receivesSuccess();
    }
}

