/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.bolt.runtime;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.neo4j.bolt.AbstractBoltTransportsTest;
import org.neo4j.bolt.messaging.RequestMessage;
import org.neo4j.bolt.packstream.Neo4jPack;
import org.neo4j.bolt.runtime.DefaultBoltConnection;
import org.neo4j.bolt.testing.MessageConditions;
import org.neo4j.bolt.testing.TransportTestUtil;
import org.neo4j.bolt.testing.client.TransportConnection;
import org.neo4j.bolt.transport.Neo4jWithSocket;
import org.neo4j.bolt.transport.Neo4jWithSocketExtension;
import org.neo4j.bolt.v4.messaging.BoltV4Messages;
import org.neo4j.configuration.connectors.BoltConnector;
import org.neo4j.configuration.helpers.SocketAddress;
import org.neo4j.graphdb.config.Setting;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.logging.AssertableLogProvider;
import org.neo4j.logging.LogAssertions;
import org.neo4j.logging.LogProvider;
import org.neo4j.test.TestDatabaseManagementServiceBuilder;
import org.neo4j.test.extension.Inject;
import org.neo4j.test.extension.testdirectory.EphemeralTestDirectoryExtension;

@EphemeralTestDirectoryExtension
@Neo4jWithSocketExtension
class BoltSchedulerBusyIT
extends AbstractBoltTransportsTest {
    private final AssertableLogProvider internalLogProvider = new AssertableLogProvider();
    private final AssertableLogProvider userLogProvider = new AssertableLogProvider();
    @Inject
    private Neo4jWithSocket server;
    private TransportConnection connection1;
    private TransportConnection connection2;
    private TransportConnection connection3;
    private TransportConnection connection4;

    BoltSchedulerBusyIT() {
    }

    @BeforeEach
    public void setup(TestInfo testInfo) throws IOException {
        this.server.setGraphDatabaseFactory(this.getTestGraphDatabaseFactory());
        this.server.setConfigure(this.getSettingsFunction());
        this.server.init(testInfo);
        this.address = this.server.lookupDefaultConnector();
    }

    private TestDatabaseManagementServiceBuilder getTestGraphDatabaseFactory() {
        TestDatabaseManagementServiceBuilder factory = new TestDatabaseManagementServiceBuilder();
        factory.setInternalLogProvider((LogProvider)this.internalLogProvider);
        factory.setUserLogProvider((LogProvider)this.userLogProvider);
        return factory;
    }

    @Override
    protected Consumer<Map<Setting<?>, Object>> getSettingsFunction() {
        return settings -> {
            super.getSettingsFunction().accept((Map<Setting<?>, Object>)settings);
            settings.put(BoltConnector.listen_address, new SocketAddress("localhost", 0));
            settings.put(BoltConnector.thread_pool_min_size, 0);
            settings.put(BoltConnector.thread_pool_max_size, 2);
        };
    }

    @AfterEach
    public void cleanup() {
        BoltSchedulerBusyIT.close(this.connection1);
        BoltSchedulerBusyIT.close(this.connection2);
        BoltSchedulerBusyIT.close(this.connection3);
        BoltSchedulerBusyIT.close(this.connection4);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @ParameterizedTest(name="{displayName} {2}")
    @MethodSource(value={"argumentsProvider"})
    public void shouldReportFailureWhenAllThreadsInThreadPoolAreBusy(Class<? extends TransportConnection> connectionClass, Neo4jPack neo4jPack, String name) throws Throwable {
        this.initParameters(connectionClass, neo4jPack, name);
        this.connection1 = this.enterStreaming();
        this.connection2 = this.enterStreaming();
        try {
            this.connection3 = this.connectAndPerformBoltHandshake(this.newConnection());
            this.connection3.send(this.util.defaultAuth());
            LogAssertions.assertThat((Object)this.connection3).satisfies(this.util.eventuallyReceives(new Consumer[]{MessageConditions.msgFailure((Status)Status.Request.NoThreadsAvailable, (String)"There are no available threads to serve this request at the moment")}));
            LogAssertions.assertThat((AssertableLogProvider)this.userLogProvider).containsMessages(new String[]{"since there are no available threads to serve it at the moment. You can retry at a later time"});
            LogAssertions.assertThat((AssertableLogProvider)this.internalLogProvider).forClass(DefaultBoltConnection.class).forLevel(AssertableLogProvider.Level.ERROR).assertExceptionForLogMessage("since there are no available threads to serve it at the moment. You can retry at a later time").isInstanceOf(RejectedExecutionException.class);
        }
        finally {
            this.exitStreaming(this.connection1);
            this.exitStreaming(this.connection2);
        }
    }

    @ParameterizedTest(name="{displayName} {2}")
    @MethodSource(value={"argumentsProvider"})
    public void shouldStopConnectionsWhenRelatedJobIsRejectedOnShutdown(Class<? extends TransportConnection> connectionClass, Neo4jPack neo4jPack, String name) throws Throwable {
        this.initParameters(connectionClass, neo4jPack, name);
        this.connection1 = this.enterStreaming();
        this.exitStreaming(this.connection1);
        this.connection2 = this.enterStreaming();
        this.exitStreaming(this.connection2);
        this.connection3 = this.enterStreaming();
        this.connection4 = this.enterStreaming();
        this.internalLogProvider.clear();
        this.server.shutdownDatabase();
        LogAssertions.assertThat((AssertableLogProvider)this.userLogProvider).doesNotContainMessage("since there are no available threads to serve it at the moment. You can retry at a later time");
        LogAssertions.assertThat((AssertableLogProvider)this.internalLogProvider).doesNotContainMessage("since there are no available threads to serve it at the moment. You can retry at a later time");
    }

    private TransportConnection enterStreaming() throws Throwable {
        TransportConnection connection = null;
        Throwable error = null;
        for (int i = 1; i <= 7; ++i) {
            try {
                connection = this.newConnection();
                this.enterStreaming(connection, i);
                error = null;
                return connection;
            }
            catch (Throwable t) {
                if (error == null) {
                    error = t;
                } else {
                    error.addSuppressed(t);
                }
                BoltSchedulerBusyIT.close(connection);
                TimeUnit.SECONDS.sleep(i);
                continue;
            }
        }
        if (error != null) {
            throw error;
        }
        throw new IllegalStateException("Unable to enter the streaming state");
    }

    private void enterStreaming(TransportConnection connection, int sleepSeconds) throws Exception {
        this.connectAndPerformBoltHandshake(connection);
        connection.send(this.util.defaultAuth());
        LogAssertions.assertThat((Object)connection).satisfies(this.util.eventuallyReceives(new Consumer[]{MessageConditions.msgSuccess()}));
        TimeUnit.SECONDS.sleep(sleepSeconds);
        connection.send(this.util.chunk(new RequestMessage[]{BoltV4Messages.run((String)"UNWIND RANGE (1, 100) AS x RETURN x")}));
        LogAssertions.assertThat((Object)connection).satisfies(this.util.eventuallyReceives(new Consumer[]{MessageConditions.msgSuccess()}));
    }

    private TransportConnection connectAndPerformBoltHandshake(TransportConnection connection) throws Exception {
        BoltSchedulerBusyIT boltSchedulerBusyIT = this;
        connection.connect(this.address).send(boltSchedulerBusyIT.util.defaultAcceptedVersions());
        LogAssertions.assertThat((Object)connection).satisfies(TransportTestUtil.eventuallyReceivesSelectedProtocolVersion());
        return connection;
    }

    private void exitStreaming(TransportConnection connection) throws Exception {
        connection.send(this.util.chunk(new RequestMessage[]{BoltV4Messages.discardAll()}));
        LogAssertions.assertThat((Object)connection).satisfies(this.util.eventuallyReceives(new Consumer[]{MessageConditions.msgSuccess()}));
    }

    private static void close(TransportConnection connection) {
        if (connection != null) {
            try {
                connection.disconnect();
            }
            catch (IOException iOException) {
                // empty catch block
            }
        }
    }
}

