/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.bolt.runtime;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.neo4j.bolt.AbstractBoltTransportsTest;
import org.neo4j.bolt.messaging.RequestMessage;
import org.neo4j.bolt.runtime.BoltConnection;
import org.neo4j.bolt.testing.MessageMatchers;
import org.neo4j.bolt.testing.client.TransportConnection;
import org.neo4j.bolt.transport.Neo4jWithSocket;
import org.neo4j.bolt.v4.messaging.BoltV4Messages;
import org.neo4j.configuration.connectors.BoltConnector;
import org.neo4j.configuration.helpers.SocketAddress;
import org.neo4j.graphdb.config.Setting;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.logging.AssertableLogProvider;
import org.neo4j.logging.LogProvider;
import org.neo4j.test.TestDatabaseManagementServiceBuilder;
import org.neo4j.test.rule.fs.EphemeralFileSystemRule;

@RunWith(value=Parameterized.class)
public class BoltSchedulerBusyIT
extends AbstractBoltTransportsTest {
    private AssertableLogProvider internalLogProvider = new AssertableLogProvider();
    private AssertableLogProvider userLogProvider = new AssertableLogProvider();
    private EphemeralFileSystemRule fsRule = new EphemeralFileSystemRule();
    private Neo4jWithSocket server = new Neo4jWithSocket(this.getClass(), this.getTestGraphDatabaseFactory(), (Supplier)this.fsRule, this.getSettingsFunction());
    private TransportConnection connection1;
    private TransportConnection connection2;
    private TransportConnection connection3;
    private TransportConnection connection4;
    @Rule
    public RuleChain ruleChain = RuleChain.outerRule((TestRule)this.fsRule).around((TestRule)this.server);

    private TestDatabaseManagementServiceBuilder getTestGraphDatabaseFactory() {
        TestDatabaseManagementServiceBuilder factory = new TestDatabaseManagementServiceBuilder();
        factory.setInternalLogProvider((LogProvider)this.internalLogProvider);
        factory.setUserLogProvider((LogProvider)this.userLogProvider);
        return factory;
    }

    @Override
    protected Consumer<Map<Setting<?>, Object>> getSettingsFunction() {
        return settings -> {
            super.getSettingsFunction().accept((Map<Setting<?>, Object>)settings);
            settings.put(BoltConnector.listen_address, new SocketAddress("localhost", 0));
            settings.put(BoltConnector.thread_pool_min_size, 0);
            settings.put(BoltConnector.thread_pool_max_size, 2);
        };
    }

    @Before
    public void setup() throws Exception {
        this.address = this.server.lookupDefaultConnector();
    }

    @After
    public void cleanup() throws Exception {
        this.close(this.connection1);
        this.close(this.connection2);
        this.close(this.connection3);
        this.close(this.connection4);
    }

    @Test
    public void shouldReportFailureWhenAllThreadsInThreadPoolAreBusy() throws Throwable {
        this.connection1 = this.enterStreaming();
        this.connection2 = this.enterStreaming();
        try {
            this.connection3 = this.connectAndPerformBoltHandshake(this.newConnection());
            this.connection3.send(this.util.defaultAuth());
            MatcherAssert.assertThat((Object)this.connection3, (Matcher)this.util.eventuallyReceives(new Matcher[]{MessageMatchers.msgFailure((Status)Status.Request.NoThreadsAvailable, (String)"There are no available threads to serve this request at the moment")}));
            this.userLogProvider.rawMessageMatcher().assertContains("since there are no available threads to serve it at the moment. You can retry at a later time");
            this.internalLogProvider.assertAtLeastOnce(new AssertableLogProvider.LogMatcher[]{AssertableLogProvider.inLog((Matcher)CoreMatchers.startsWith((String)BoltConnection.class.getPackage().getName())).error(CoreMatchers.containsString((String)"since there are no available threads to serve it at the moment. You can retry at a later time"), CoreMatchers.isA(RejectedExecutionException.class))});
        }
        finally {
            this.exitStreaming(this.connection1);
            this.exitStreaming(this.connection2);
        }
    }

    @Test
    public void shouldStopConnectionsWhenRelatedJobIsRejectedOnShutdown() throws Throwable {
        this.connection1 = this.enterStreaming();
        this.exitStreaming(this.connection1);
        this.connection2 = this.enterStreaming();
        this.exitStreaming(this.connection2);
        this.connection3 = this.enterStreaming();
        this.connection4 = this.enterStreaming();
        this.internalLogProvider.clear();
        this.server.shutdownDatabase();
        this.userLogProvider.rawMessageMatcher().assertNotContains("since there are no available threads to serve it at the moment. You can retry at a later time");
        this.internalLogProvider.rawMessageMatcher().assertNotContains("since there are no available threads to serve it at the moment. You can retry at a later time");
    }

    private TransportConnection enterStreaming() throws Throwable {
        TransportConnection connection = null;
        Throwable error = null;
        for (int i = 1; i <= 7; ++i) {
            try {
                connection = this.newConnection();
                this.enterStreaming(connection, i);
                error = null;
                return connection;
            }
            catch (Throwable t) {
                if (error == null) {
                    error = t;
                } else {
                    error.addSuppressed(t);
                }
                this.close(connection);
                TimeUnit.SECONDS.sleep(i);
                continue;
            }
        }
        if (error != null) {
            throw error;
        }
        throw new IllegalStateException("Unable to enter the streaming state");
    }

    private void enterStreaming(TransportConnection connection, int sleepSeconds) throws Exception {
        this.connectAndPerformBoltHandshake(connection);
        connection.send(this.util.defaultAuth());
        MatcherAssert.assertThat((Object)connection, (Matcher)this.util.eventuallyReceives(new Matcher[]{MessageMatchers.msgSuccess()}));
        TimeUnit.SECONDS.sleep(sleepSeconds);
        connection.send(this.util.chunk(new RequestMessage[]{BoltV4Messages.run((String)"UNWIND RANGE (1, 100) AS x RETURN x")}));
        MatcherAssert.assertThat((Object)connection, (Matcher)this.util.eventuallyReceives(new Matcher[]{MessageMatchers.msgSuccess()}));
    }

    private TransportConnection connectAndPerformBoltHandshake(TransportConnection connection) throws Exception {
        connection.connect(this.address).send(this.util.defaultAcceptedVersions());
        MatcherAssert.assertThat((Object)connection, (Matcher)this.util.eventuallyReceivesSelectedProtocolVersion());
        return connection;
    }

    private void exitStreaming(TransportConnection connection) throws Exception {
        connection.send(this.util.chunk(new RequestMessage[]{BoltV4Messages.discardAll()}));
        MatcherAssert.assertThat((Object)connection, (Matcher)this.util.eventuallyReceives(new Matcher[]{MessageMatchers.msgSuccess()}));
    }

    private void close(TransportConnection connection) {
        if (connection != null) {
            try {
                connection.disconnect();
            }
            catch (IOException iOException) {
                // empty catch block
            }
        }
    }
}

