/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.bolt;

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Stream;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Condition;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;
import org.mockito.Mockito;
import org.neo4j.bolt.runtime.scheduling.ExecutorBoltScheduler;
import org.neo4j.bolt.testing.MessageConditions;
import org.neo4j.bolt.testing.StreamConditions;
import org.neo4j.bolt.testing.TransportTestUtil;
import org.neo4j.bolt.testing.client.SocketConnection;
import org.neo4j.bolt.testing.client.TransportConnection;
import org.neo4j.bolt.transport.Neo4jWithSocket;
import org.neo4j.configuration.connectors.BoltConnector;
import org.neo4j.configuration.helpers.SocketAddress;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.config.Setting;
import org.neo4j.internal.helpers.HostnamePort;
import org.neo4j.internal.helpers.collection.Pair;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.api.procedure.GlobalProcedures;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.logging.AssertableLogProvider;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogAssertions;
import org.neo4j.logging.LogProvider;
import org.neo4j.logging.SpiedAssertableLogProvider;
import org.neo4j.procedure.Context;
import org.neo4j.procedure.Mode;
import org.neo4j.procedure.Procedure;
import org.neo4j.test.TestDatabaseManagementServiceBuilder;
import org.neo4j.test.rule.OtherThreadRule;
import org.neo4j.test.rule.SuppressOutput;
import org.neo4j.test.rule.TestDirectory;
import org.neo4j.test.rule.fs.EphemeralFileSystemRule;
import org.neo4j.values.storable.Values;

public class ShutdownSequenceIT {
    private static final Duration THREAD_POOL_SHUTDOWN_WAIT_TIME = Duration.ofSeconds(10L);
    private final AssertableLogProvider internalLogProvider = new SpiedAssertableLogProvider(ExecutorBoltScheduler.class);
    private final AssertableLogProvider userLogProvider = new AssertableLogProvider();
    private final EphemeralFileSystemRule fsRule = new EphemeralFileSystemRule();
    private final Neo4jWithSocket server = new Neo4jWithSocket(this.getTestGraphDatabaseFactory(), () -> TestDirectory.testDirectory(this.getClass(), (FileSystemAbstraction)this.fsRule.get()), ShutdownSequenceIT.getSettingsFunction());
    private final TransportTestUtil util = new TransportTestUtil();
    private HostnamePort address;
    private CountDownLatch txStarted;
    private CountDownLatch boltWorkerThreadPoolShuttingDown;
    @Rule
    public RuleChain ruleChain = RuleChain.outerRule((TestRule)SuppressOutput.suppressAll()).around((TestRule)this.fsRule).around((TestRule)this.server);
    @Rule
    public OtherThreadRule<Void> otherThread = new OtherThreadRule();

    @Before
    public void setup() throws Exception {
        this.address = this.server.lookupDefaultConnector();
        this.txStarted = new CountDownLatch(1);
        this.boltWorkerThreadPoolShuttingDown = new CountDownLatch(1);
        GlobalProcedures procedures = (GlobalProcedures)((GraphDatabaseAPI)this.server.graphDatabaseService()).getDependencyResolver().resolveDependency(GlobalProcedures.class);
        procedures.registerComponent(Pair.class, context -> Pair.of((Object)this.txStarted, (Object)this.boltWorkerThreadPoolShuttingDown), true);
        procedures.registerProcedure(TestProcedures.class);
    }

    @After
    public void tearDown() {
        this.userLogProvider.print(System.out);
        this.internalLogProvider.print(System.out);
    }

    @Test
    public void shutdownShouldResultInFailureMessageForTransactionAwareConnections() throws Exception {
        TransportConnection connection = this.connectAndAuthenticate();
        connection.send(this.util.defaultRunAutoCommitTx("CALL test.stream.nodes()"));
        Assert.assertTrue((boolean)this.txStarted.await(1L, TimeUnit.MINUTES));
        Log schedulerLog = this.internalLogProvider.getLog(ExecutorBoltScheduler.class);
        ((Log)Mockito.doAnswer(invocation -> {
            invocation.callRealMethod();
            this.boltWorkerThreadPoolShuttingDown.countDown();
            return null;
        }).when((Object)schedulerLog)).debug("Shutting down thread pool");
        this.server.getManagementService().shutdown();
        Assertions.assertThat((Object)connection).satisfies(this.util.eventuallyReceives(new Consumer[]{MessageConditions.msgSuccess()}));
        Assertions.assertThat((Object)connection).satisfies(this.util.eventuallyReceives(new Consumer[]{MessageConditions.either((Consumer)MessageConditions.msgFailure((Status)Status.Transaction.Terminated, (String)"The transaction has been terminated"), (Consumer)MessageConditions.msgFailure((Status)Status.General.UnknownError, (String)"The transaction has been terminated"))}));
        Assertions.assertThat((Object)connection).satisfies(TransportTestUtil.eventuallyDisconnects());
        LogAssertions.assertThat((AssertableLogProvider)this.internalLogProvider).forClass(ExecutorBoltScheduler.class).forLevel(AssertableLogProvider.Level.DEBUG).containsMessages(new String[]{"Thread pool shut down"});
    }

    @Test
    public void shutdownShouldCloseIdleConnections() throws Exception {
        TransportConnection connection = this.connectAndAuthenticate();
        this.server.getManagementService().shutdown();
        Assertions.assertThat((Object)connection).satisfies(TransportTestUtil.eventuallyDisconnects());
        LogAssertions.assertThat((AssertableLogProvider)this.internalLogProvider).forClass(ExecutorBoltScheduler.class).forLevel(AssertableLogProvider.Level.DEBUG).containsMessages(new String[]{"Thread pool shut down"});
    }

    @Test
    public void shutdownShouldWaitForNonTransactionAwareConnections() throws Exception {
        TransportConnection connection = this.connectAndAuthenticate();
        connection.send(this.util.defaultRunAutoCommitTx("CALL test.stream.strings()"));
        Assert.assertTrue((boolean)this.txStarted.await(1L, TimeUnit.MINUTES));
        Log schedulerLog = this.internalLogProvider.getLog(ExecutorBoltScheduler.class);
        ((Log)Mockito.doAnswer(invocation -> {
            invocation.callRealMethod();
            this.boltWorkerThreadPoolShuttingDown.countDown();
            return null;
        }).when((Object)schedulerLog)).debug("Shutting down thread pool");
        this.server.getManagementService().shutdown();
        Assertions.assertThat((Object)connection).satisfies(this.util.eventuallyReceives(new Consumer[]{MessageConditions.msgSuccess()}));
        Condition equalRecord = new Condition(record -> record.equals((Object)Values.stringValue((String)"0")), "Equal record", new Object[0]);
        Assertions.assertThat((Object)connection).satisfies(this.util.eventuallyReceives(new Consumer[]{MessageConditions.msgRecord(StreamConditions.eqRecord(equalRecord))}));
        Assertions.assertThat((Object)connection).satisfies(this.util.eventuallyReceives(new Consumer[]{MessageConditions.either((Consumer)MessageConditions.msgFailure((Status)Status.Transaction.Terminated, (String)"The transaction has been terminated."), (Consumer)MessageConditions.msgFailure((Status)Status.General.UnknownError, (String)"The transaction has been terminated"))}));
        Assertions.assertThat((Object)connection).satisfies(TransportTestUtil.eventuallyDisconnects());
        LogAssertions.assertThat((AssertableLogProvider)this.internalLogProvider).forClass(ExecutorBoltScheduler.class).forLevel(AssertableLogProvider.Level.DEBUG).containsMessages(new String[]{"Thread pool shut down"});
    }

    private TransportConnection connectAndAuthenticate() throws Exception {
        SocketConnection connection = new SocketConnection();
        connection.connect(this.address).send(this.util.defaultAcceptedVersions());
        Assertions.assertThat((Object)connection).satisfies(this.util.eventuallyReceivesSelectedProtocolVersion());
        connection.send(this.util.defaultAuth());
        Assertions.assertThat((Object)connection).satisfies(this.util.eventuallyReceives(new Consumer[]{MessageConditions.msgSuccess()}));
        return connection;
    }

    private TestDatabaseManagementServiceBuilder getTestGraphDatabaseFactory() {
        TestDatabaseManagementServiceBuilder factory = new TestDatabaseManagementServiceBuilder();
        factory.setInternalLogProvider((LogProvider)this.internalLogProvider);
        factory.setUserLogProvider((LogProvider)this.userLogProvider);
        return factory;
    }

    private static Consumer<Map<Setting<?>, Object>> getSettingsFunction() {
        return settings -> {
            settings.put(BoltConnector.encryption_level, BoltConnector.EncryptionLevel.OPTIONAL);
            settings.put(BoltConnector.listen_address, new SocketAddress("localhost", 0));
            settings.put(BoltConnector.thread_pool_min_size, 0);
            settings.put(BoltConnector.thread_pool_max_size, 2);
            settings.put(BoltConnector.thread_pool_shutdown_wait_time, THREAD_POOL_SHUTDOWN_WAIT_TIME);
        };
    }

    public static class TestProcedures {
        @Context
        public GraphDatabaseService db;
        @Context
        public Pair<CountDownLatch, CountDownLatch> pair;
        @Context
        public Transaction tx;

        @Procedure(name="test.stream.strings", mode=Mode.READ)
        public Stream<Output> streamStrings() {
            ((CountDownLatch)this.pair.first()).countDown();
            try {
                Assert.assertTrue((boolean)((CountDownLatch)this.pair.other()).await(1L, TimeUnit.MINUTES));
            }
            catch (InterruptedException e) {
                Assert.fail((String)"Interrupted while waiting for bolt worker threads shut down.");
            }
            return Stream.of(new Output(String.valueOf(0)));
        }

        @Procedure(name="test.stream.nodes", mode=Mode.READ)
        public Stream<Output> streamNodes() {
            ((CountDownLatch)this.pair.first()).countDown();
            try {
                Assert.assertTrue((boolean)((CountDownLatch)this.pair.other()).await(1L, TimeUnit.MINUTES));
            }
            catch (InterruptedException e) {
                Assert.fail((String)"Interrupted while waiting for bolt worker threads shut down.");
            }
            this.tx.getNodeById(0L);
            return Stream.of(new Output(String.valueOf(0)));
        }

        public static class Output {
            public String out;

            Output(String value) {
                this.out = value;
            }
        }
    }
}

