/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.bolt.transport;

import java.io.IOException;
import java.net.SocketException;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Stream;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.neo4j.bolt.runtime.DefaultBoltConnection;
import org.neo4j.bolt.testing.MessageConditions;
import org.neo4j.bolt.testing.TransportTestUtil;
import org.neo4j.bolt.testing.client.SecureSocketConnection;
import org.neo4j.bolt.testing.client.SocketConnection;
import org.neo4j.bolt.testing.client.TransportConnection;
import org.neo4j.bolt.transport.Neo4jWithSocket;
import org.neo4j.bolt.transport.Neo4jWithSocketExtension;
import org.neo4j.configuration.GraphDatabaseInternalSettings;
import org.neo4j.configuration.connectors.BoltConnector;
import org.neo4j.configuration.connectors.BoltConnectorInternalSettings;
import org.neo4j.graphdb.config.Setting;
import org.neo4j.internal.helpers.HostnamePort;
import org.neo4j.kernel.impl.util.ValueUtils;
import org.neo4j.logging.AssertableLogProvider;
import org.neo4j.logging.LogAssertions;
import org.neo4j.logging.LogProvider;
import org.neo4j.test.TestDatabaseManagementServiceBuilder;
import org.neo4j.test.extension.Inject;
import org.neo4j.test.extension.OtherThread;
import org.neo4j.test.extension.OtherThreadExtension;
import org.neo4j.test.extension.testdirectory.EphemeralTestDirectoryExtension;

@EphemeralTestDirectoryExtension
@Neo4jWithSocketExtension
@ExtendWith(value={OtherThreadExtension.class})
public class BoltThrottleMaxDurationIT {
    @Inject
    private Neo4jWithSocket server;
    @Inject
    private OtherThread otherThread;
    private AssertableLogProvider logProvider;
    private HostnamePort address;
    private TransportConnection client;
    private TransportTestUtil util;

    public static Stream<Arguments> factoryProvider() {
        return Stream.of(Arguments.of((Object[])new Object[]{SocketConnection.class}), Arguments.of((Object[])new Object[]{SecureSocketConnection.class}));
    }

    @BeforeEach
    public void setup(TestInfo testInfo) throws IOException {
        this.server.setGraphDatabaseFactory(this.getTestGraphDatabaseFactory());
        this.server.setConfigure(BoltThrottleMaxDurationIT.getSettingsFunction());
        this.server.init(testInfo);
        this.otherThread.set(5L, TimeUnit.MINUTES);
        this.address = this.server.lookupDefaultConnector();
        this.util = new TransportTestUtil();
    }

    @AfterEach
    public void cleanup() throws IOException {
        if (this.client != null) {
            this.client.disconnect();
        }
    }

    protected TestDatabaseManagementServiceBuilder getTestGraphDatabaseFactory() {
        TestDatabaseManagementServiceBuilder factory = new TestDatabaseManagementServiceBuilder();
        this.logProvider = new AssertableLogProvider();
        factory.setInternalLogProvider((LogProvider)this.logProvider);
        return factory;
    }

    protected static Consumer<Map<Setting<?>, Object>> getSettingsFunction() {
        return settings -> {
            settings.put(BoltConnectorInternalSettings.unsupported_bolt_unauth_connection_timeout, Duration.ofMinutes(5L));
            settings.put(GraphDatabaseInternalSettings.bolt_outbound_buffer_throttle_max_duration, Duration.ofSeconds(30L));
            settings.put(BoltConnector.encryption_level, BoltConnector.EncryptionLevel.OPTIONAL);
        };
    }

    @ParameterizedTest(name="{displayName} {index}")
    @MethodSource(value={"factoryProvider"})
    public void sendingButNotReceivingClientShouldBeKilledWhenWriteThrottleMaxDurationIsReached(Class<? extends TransportConnection> c) throws Exception {
        this.client = c.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
        int numberOfRunDiscardPairs = 10000;
        String largeString = " ".repeat(8192);
        BoltThrottleMaxDurationIT boltThrottleMaxDurationIT = this;
        this.client.connect(this.address).send(boltThrottleMaxDurationIT.util.defaultAcceptedVersions()).send(this.util.defaultAuth());
        Assertions.assertThat((Object)this.client).satisfies(TransportTestUtil.eventuallyReceivesSelectedProtocolVersion());
        Assertions.assertThat((Object)this.client).satisfies(this.util.eventuallyReceives(new Consumer[]{MessageConditions.msgSuccess()}));
        Future sender = this.otherThread.execute(() -> {
            for (int i = 0; i < numberOfRunDiscardPairs; ++i) {
                this.client.send(this.util.defaultRunAutoCommitTx("RETURN $data as data", ValueUtils.asMapValue(Collections.singletonMap("data", largeString))));
            }
            return null;
        });
        ExecutionException e = (ExecutionException)org.junit.jupiter.api.Assertions.assertThrows(ExecutionException.class, () -> this.otherThread.get().awaitFuture(sender));
        Assertions.assertThat((Throwable)ExceptionUtils.getRootCause((Throwable)e)).isInstanceOf(SocketException.class);
        LogAssertions.assertThat((AssertableLogProvider)this.logProvider).forClass(DefaultBoltConnection.class).forLevel(AssertableLogProvider.Level.ERROR).assertExceptionForLogMessage("Unexpected error detected in bolt session").hasStackTraceContaining("will be closed because the client did not consume outgoing buffers for ");
    }
}

