/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.bolt.v1.transport.integration;

import java.net.SocketException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.commons.lang3.StringUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.neo4j.bolt.messaging.Neo4jPack;
import org.neo4j.bolt.messaging.RequestMessage;
import org.neo4j.bolt.runtime.BoltConnection;
import org.neo4j.bolt.v1.messaging.Neo4jPackV1;
import org.neo4j.bolt.v1.messaging.request.InitMessage;
import org.neo4j.bolt.v1.messaging.request.PullAllMessage;
import org.neo4j.bolt.v1.messaging.request.RunMessage;
import org.neo4j.bolt.v1.messaging.util.MessageMatchers;
import org.neo4j.bolt.v1.transport.integration.Neo4jWithSocket;
import org.neo4j.bolt.v1.transport.integration.TransportTestUtil;
import org.neo4j.bolt.v1.transport.socket.client.SecureSocketConnection;
import org.neo4j.bolt.v1.transport.socket.client.SocketConnection;
import org.neo4j.bolt.v1.transport.socket.client.TransportConnection;
import org.neo4j.function.Factory;
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.helpers.Exceptions;
import org.neo4j.helpers.HostnamePort;
import org.neo4j.kernel.impl.util.ValueUtils;
import org.neo4j.logging.AssertableLogProvider;
import org.neo4j.logging.LogProvider;
import org.neo4j.test.TestGraphDatabaseFactory;
import org.neo4j.test.matchers.CommonMatchers;
import org.neo4j.test.rule.concurrent.OtherThreadRule;
import org.neo4j.test.rule.fs.EphemeralFileSystemRule;

@RunWith(value=Parameterized.class)
public class BoltThrottleMaxDurationIT {
    private AssertableLogProvider logProvider;
    private EphemeralFileSystemRule fsRule = new EphemeralFileSystemRule();
    private Neo4jWithSocket server = new Neo4jWithSocket(this.getClass(), this.getTestGraphDatabaseFactory(), (Supplier)this.fsRule, this.getSettingsFunction());
    @Rule
    public RuleChain ruleChain = RuleChain.outerRule((TestRule)this.fsRule).around((TestRule)this.server);
    @Rule
    public OtherThreadRule<Void> otherThread = new OtherThreadRule(5L, TimeUnit.MINUTES);
    @Parameterized.Parameter
    public Factory<TransportConnection> cf;
    private HostnamePort address;
    private TransportConnection client;
    private TransportTestUtil util;

    @Parameterized.Parameters
    public static Collection<Factory<TransportConnection>> transports() {
        return Arrays.asList(SocketConnection::new, SecureSocketConnection::new);
    }

    protected TestGraphDatabaseFactory getTestGraphDatabaseFactory() {
        TestGraphDatabaseFactory factory = new TestGraphDatabaseFactory();
        this.logProvider = new AssertableLogProvider();
        factory.setInternalLogProvider((LogProvider)this.logProvider);
        return factory;
    }

    protected Consumer<Map<String, String>> getSettingsFunction() {
        return settings -> {
            settings.put(GraphDatabaseSettings.auth_enabled.name(), "false");
            settings.put(GraphDatabaseSettings.bolt_outbound_buffer_throttle_max_duration.name(), "30s");
        };
    }

    @Before
    public void setup() {
        this.client = (TransportConnection)this.cf.newInstance();
        this.address = this.server.lookupDefaultConnector();
        this.util = new TransportTestUtil((Neo4jPack)new Neo4jPackV1());
    }

    @After
    public void after() throws Exception {
        if (this.client != null) {
            this.client.disconnect();
        }
    }

    @Test
    public void sendingButNotReceivingClientShouldBeKilledWhenWriteThrottleMaxDurationIsReached() throws Exception {
        int numberOfRunDiscardPairs = 10000;
        String largeString = StringUtils.repeat((String)" ", (int)8192);
        this.client.connect(this.address).send(this.util.acceptedVersions(1L, 0L, 0L, 0L)).send(this.util.chunk(new RequestMessage[]{new InitMessage("TestClient/1.1", Collections.emptyMap())}));
        MatcherAssert.assertThat((Object)this.client, (Matcher)TransportTestUtil.eventuallyReceives((byte[])new byte[]{0, 0, 0, 1}));
        MatcherAssert.assertThat((Object)this.client, (Matcher)this.util.eventuallyReceives(new Matcher[]{MessageMatchers.msgSuccess()}));
        Future sender = this.otherThread.execute(state -> {
            for (int i = 0; i < numberOfRunDiscardPairs; ++i) {
                this.client.send(this.util.chunk(new RequestMessage[]{new RunMessage("RETURN $data as data", ValueUtils.asMapValue(Collections.singletonMap("data", largeString))), PullAllMessage.INSTANCE}));
            }
            return null;
        });
        try {
            this.otherThread.get().awaitFuture(sender);
            Assert.fail((String)"should throw ExecutionException instead");
        }
        catch (ExecutionException e) {
            MatcherAssert.assertThat((Object)Exceptions.rootCause((Throwable)e), (Matcher)Matchers.instanceOf(SocketException.class));
        }
        this.logProvider.assertAtLeastOnce(new AssertableLogProvider.LogMatcher[]{AssertableLogProvider.inLog((Matcher)Matchers.containsString((String)BoltConnection.class.getPackage().getName())).error(CoreMatchers.startsWith((String)"Unexpected error detected in bolt session"), Matchers.hasProperty((String)"cause", (Matcher)CommonMatchers.matchesExceptionMessage((Matcher)CoreMatchers.containsString((String)"will be closed because the client did not consume outgoing buffers for "))))});
    }
}

