package io.questdb.cutlass.http;

import io.questdb.cairo.CairoEngine;
import io.questdb.cairo.EntryUnavailableException;
import io.questdb.cairo.TableWriter;
import io.questdb.cairo.security.AllowAllCairoSecurityContext;
import io.questdb.cutlass.http.processors.TextImportProcessor;
import io.questdb.log.Log;
import io.questdb.log.LogFactory;
import io.questdb.network.NetworkFacade;
import io.questdb.network.NetworkFacadeImpl;
import io.questdb.network.ServerDisconnectException;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.jetbrains.annotations.NotNull;
import org.junit.Assert;
import org.junit.ComparisonFailure;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:io/questdb/cutlass/http/RetryIODispatcherTest.class */
public class RetryIODispatcherTest {
    private static final Log LOG = LogFactory.getLog(RetryIODispatcherTest.class);
    private static final String ValidImportRequest = "POST /upload HTTP/1.1\r\nHost: localhost:9001\r\nUser-Agent: curl/7.64.0\r\nAccept: */*\r\nContent-Length: 437760673\r\nContent-Type: multipart/form-data; boundary=------------------------27d997ca93d2689d\r\nExpect: 100-continue\r\n\r\n--------------------------27d997ca93d2689d\r\nContent-Disposition: form-data; name=\"schema\"; filename=\"schema.json\"\r\nContent-Type: application/octet-stream\r\n\r\n[\r\n  {\r\n    \"name\": \"date\",\r\n    \"type\": \"DATE\",\r\n    \"pattern\": \"d MMMM y.\",\r\n    \"locale\": \"ru-RU\"\r\n  }\r\n]\r\n\r\n--------------------------27d997ca93d2689d\r\nContent-Disposition: form-data; name=\"data\"; filename=\"fhv_tripdata_2017-02.csv\"\r\nContent-Type: application/octet-stream\r\n\r\nDispatching_base_num,Pickup_DateTime,DropOff_datetime,PUlocationID,DOlocationID\r\nB00008,2017-02-01 00:30:00,,,\r\nB00008,2017-02-01 00:40:00,,,\r\nB00009,2017-02-01 00:30:00,,,\r\nB00013,2017-02-01 00:11:00,,,\r\nB00013,2017-02-01 00:41:00,,,\r\nB00013,2017-02-01 00:00:00,,,\r\nB00013,2017-02-01 00:53:00,,,\r\nB00013,2017-02-01 00:44:00,,,\r\nB00013,2017-02-01 00:05:00,,,\r\nB00013,2017-02-01 00:54:00,,,\r\nB00014,2017-02-01 00:45:00,,,\r\nB00014,2017-02-01 00:45:00,,,\r\nB00014,2017-02-01 00:46:00,,,\r\nB00014,2017-02-01 00:54:00,,,\r\nB00014,2017-02-01 00:45:00,,,\r\nB00014,2017-02-01 00:45:00,,,\r\nB00014,2017-02-01 00:45:00,,,\r\nB00014,2017-02-01 00:26:00,,,\r\nB00014,2017-02-01 00:55:00,,,\r\nB00014,2017-02-01 00:47:00,,,\r\nB00014,2017-02-01 00:05:00,,,\r\nB00014,2017-02-01 00:58:00,,,\r\nB00014,2017-02-01 00:33:00,,,\r\nB00014,2017-02-01 00:45:00,,,\r\n\r\n--------------------------27d997ca93d2689d--";
    private final String ValidImportResponse = "HTTP/1.1 200 OK\r\nServer: questDB/1.0\r\nDate: Thu, 1 Jan 1970 00:00:00 GMT\r\nTransfer-Encoding: chunked\r\nContent-Type: text/plain; charset=utf-8\r\n\r\n0666\r\n+-----------------------------------------------------------------------------------------------------------------+\r\n|      Location:  |                          fhv_tripdata_2017-02.csv  |        Pattern  | Locale  |      Errors  |\r\n|   Partition by  |                                              NONE  |                 |         |              |\r\n|      Timestamp  |                                              NONE  |                 |         |              |\r\n+-----------------------------------------------------------------------------------------------------------------+\r\n|   Rows handled  |                                                24  |                 |         |              |\r\n|  Rows imported  |                                                24  |                 |         |              |\r\n+-----------------------------------------------------------------------------------------------------------------+\r\n|              0  |                              Dispatching_base_num  |                   STRING  |           0  |\r\n|              1  |                                   Pickup_DateTime  |                     DATE  |           0  |\r\n|              2  |                                  DropOff_datetime  |                   STRING  |           0  |\r\n|              3  |                                      PUlocationID  |                   STRING  |           0  |\r\n|              4  |                                      DOlocationID  |                   STRING  |           0  |\r\n+-----------------------------------------------------------------------------------------------------------------+\r\n\r\n00\r\n\r\n";

    @Rule
    public TemporaryFolder temp = new TemporaryFolder();

    @Test
    public void testImportProcessedWhenClientDisconnectedLoop() throws Exception {
        for (int i = 0; i < 10; i++) {
            System.out.println("*************************************************************************************");
            System.out.println("**************************         Run " + i + "            ********************************");
            System.out.println("*************************************************************************************");
            assertImportProcessedWhenClientDisconnected();
            this.temp.delete();
            this.temp.create();
        }
    }

    @Test
    public void testInsertWaitsExceedsRerunProcessingQueueSizeLoop() throws Exception {
        for (int i = 0; i < 5; i++) {
            System.out.println("*************************************************************************************");
            System.out.println("**************************         Run " + i + "            ********************************");
            System.out.println("*************************************************************************************");
            assertInsertWaitsExceedsRerunProcessingQueueSize();
            this.temp.delete();
            this.temp.create();
        }
    }

    @Test
    public void testImportWaitsWhenWriterLockedLoop() throws Exception {
        for (int i = 0; i < 5; i++) {
            System.out.println("*************************************************************************************");
            System.out.println("**************************         Run " + i + "            ********************************");
            System.out.println("*************************************************************************************");
            testImportWaitsWhenWriterLocked(new HttpQueryTestBuilder().withTempFolder(this.temp).withWorkerCount(4).withHttpServerConfigBuilder(new HttpServerConfigurationBuilder().withNetwork(getSendDelayNetworkFacade(500)).withMultipartIdleSpinCount(10L)), 500, ValidImportRequest, "HTTP/1.1 200 OK\r\nServer: questDB/1.0\r\nDate: Thu, 1 Jan 1970 00:00:00 GMT\r\nTransfer-Encoding: chunked\r\nContent-Type: text/plain; charset=utf-8\r\n\r\n0666\r\n+-----------------------------------------------------------------------------------------------------------------+\r\n|      Location:  |                          fhv_tripdata_2017-02.csv  |        Pattern  | Locale  |      Errors  |\r\n|   Partition by  |                                              NONE  |                 |         |              |\r\n|      Timestamp  |                                              NONE  |                 |         |              |\r\n+-----------------------------------------------------------------------------------------------------------------+\r\n|   Rows handled  |                                                24  |                 |         |              |\r\n|  Rows imported  |                                                24  |                 |         |              |\r\n+-----------------------------------------------------------------------------------------------------------------+\r\n|              0  |                              Dispatching_base_num  |                   STRING  |           0  |\r\n|              1  |                                   Pickup_DateTime  |                     DATE  |           0  |\r\n|              2  |                                  DropOff_datetime  |                   STRING  |           0  |\r\n|              3  |                                      PUlocationID  |                   STRING  |           0  |\r\n|              4  |                                      DOlocationID  |                   STRING  |           0  |\r\n+-----------------------------------------------------------------------------------------------------------------+\r\n\r\n00\r\n\r\n", true, false);
            this.temp.delete();
            this.temp.create();
        }
    }

    @Test
    public void testInsertWaitsWhenWriterLockedLoop() throws Exception {
        for (int i = 0; i < 10; i++) {
            System.out.println("*************************************************************************************");
            System.out.println("**************************         Run " + i + "            ********************************");
            System.out.println("*************************************************************************************");
            assertInsertWaitsWhenWriterLocked();
            this.temp.delete();
            this.temp.create();
        }
    }

    @Test
    public void testInsertsIsPerformedWhenWriterLockedAndDisconnectedLoop() throws Exception {
        for (int i = 0; i < 10; i++) {
            System.out.println("*************************************************************************************");
            System.out.println("**************************         Run " + i + "            ********************************");
            System.out.println("*************************************************************************************");
            assertInsertsIsPerformedWhenWriterLockedAndDisconnected();
            this.temp.delete();
            this.temp.create();
        }
    }

    @Test
    public void testImportWaitsWhenWriterLockedWithSlowPeerLoop() throws Exception {
        for (int i = 0; i < 10; i++) {
            System.out.println("*************************************************************************************");
            System.out.println("**************************         Run " + i + "            ********************************");
            System.out.println("*************************************************************************************");
            testImportWaitsWhenWriterLocked(new HttpQueryTestBuilder().withTempFolder(this.temp).withWorkerCount(2).withHttpServerConfigBuilder(new HttpServerConfigurationBuilder().withNetwork(getSendDelayNetworkFacade(500))), 0, ValidImportRequest, "HTTP/1.1 200 OK\r\nServer: questDB/1.0\r\nDate: Thu, 1 Jan 1970 00:00:00 GMT\r\nTransfer-Encoding: chunked\r\nContent-Type: text/plain; charset=utf-8\r\n\r\n0666\r\n+-----------------------------------------------------------------------------------------------------------------+\r\n|      Location:  |                          fhv_tripdata_2017-02.csv  |        Pattern  | Locale  |      Errors  |\r\n|   Partition by  |                                              NONE  |                 |         |              |\r\n|      Timestamp  |                                              NONE  |                 |         |              |\r\n+-----------------------------------------------------------------------------------------------------------------+\r\n|   Rows handled  |                                                24  |                 |         |              |\r\n|  Rows imported  |                                                24  |                 |         |              |\r\n+-----------------------------------------------------------------------------------------------------------------+\r\n|              0  |                              Dispatching_base_num  |                   STRING  |           0  |\r\n|              1  |                                   Pickup_DateTime  |                     DATE  |           0  |\r\n|              2  |                                  DropOff_datetime  |                   STRING  |           0  |\r\n|              3  |                                      PUlocationID  |                   STRING  |           0  |\r\n|              4  |                                      DOlocationID  |                   STRING  |           0  |\r\n+-----------------------------------------------------------------------------------------------------------------+\r\n\r\n00\r\n\r\n", true, true);
            this.temp.delete();
            this.temp.create();
        }
    }

    @Test
    public void testFailsWhenInvalidDataImportedLoop() throws Exception {
        for (int i = 0; i < 5; i++) {
            System.out.println("*************************************************************************************");
            System.out.println("**************************         Run " + i + "            ********************************");
            System.out.println("*************************************************************************************");
            testImportWaitsWhenWriterLocked(new HttpQueryTestBuilder().withTempFolder(this.temp).withWorkerCount(2).withHttpServerConfigBuilder(new HttpServerConfigurationBuilder()).withCustomTextImportProcessor((jsonQueryProcessorConfiguration, cairoEngine, messageBus, i2) -> {
                return new TextImportProcessor(cairoEngine) { // from class: io.questdb.cutlass.http.RetryIODispatcherTest.1
                    public void onRequestRetry(HttpConnectionContext httpConnectionContext) throws ServerDisconnectException {
                        throw ServerDisconnectException.INSTANCE;
                    }
                };
            }), 0, ValidImportRequest, "HTTP/1.1 200 OK\r\nServer: questDB/1.0\r\nDate: Thu, 1 Jan 1970 00:00:00 GMT\r\nTransfer-Encoding: chunked\r\nContent-Type: text/plain; charset=utf-8\r\n\r\n0666\r\n+-----------------------------------------------------------------------------------------------------------------+\r\n|      Location:  |                          fhv_tripdata_2017-02.csv  |        Pattern  | Locale  |      Errors  |\r\n|   Partition by  |                                              NONE  |                 |         |              |\r\n|      Timestamp  |                                              NONE  |                 |         |              |\r\n+-----------------------------------------------------------------------------------------------------------------+\r\n|   Rows handled  |                                                24  |                 |         |              |\r\n|  Rows imported  |                                                24  |                 |         |              |\r\n+-----------------------------------------------------------------------------------------------------------------+\r\n|              0  |                              Dispatching_base_num  |                   STRING  |           0  |\r\n|              1  |                                   Pickup_DateTime  |                     DATE  |           0  |\r\n|              2  |                                  DropOff_datetime  |                   STRING  |           0  |\r\n|              3  |                                      PUlocationID  |                   STRING  |           0  |\r\n|              4  |                                      DOlocationID  |                   STRING  |           0  |\r\n+-----------------------------------------------------------------------------------------------------------------+\r\n\r\n00\r\n\r\n", false, true);
            this.temp.delete();
            this.temp.create();
        }
    }

    @Test
    public void testImportsWhenReceiveBufferIsSmallAndSenderSlow() throws Exception {
        for (int i = 0; i < 10; i++) {
            System.out.println("*************************************************************************************");
            System.out.println("**************************         Run " + i + "            ********************************");
            System.out.println("*************************************************************************************");
            testImportWaitsWhenWriterLocked(new HttpQueryTestBuilder().withTempFolder(this.temp).withWorkerCount(2).withHttpServerConfigBuilder(new HttpServerConfigurationBuilder().withReceiveBufferSize(256)), 200, ValidImportRequest, "HTTP/1.1 200 OK\r\nServer: questDB/1.0\r\nDate: Thu, 1 Jan 1970 00:00:00 GMT\r\nTransfer-Encoding: chunked\r\nContent-Type: text/plain; charset=utf-8\r\n\r\n0666\r\n+-----------------------------------------------------------------------------------------------------------------+\r\n|      Location:  |                          fhv_tripdata_2017-02.csv  |        Pattern  | Locale  |      Errors  |\r\n|   Partition by  |                                              NONE  |                 |         |              |\r\n|      Timestamp  |                                              NONE  |                 |         |              |\r\n+-----------------------------------------------------------------------------------------------------------------+\r\n|   Rows handled  |                                                24  |                 |         |              |\r\n|  Rows imported  |                                                24  |                 |         |              |\r\n+-----------------------------------------------------------------------------------------------------------------+\r\n|              0  |                              Dispatching_base_num  |                   STRING  |           0  |\r\n|              1  |                                   Pickup_DateTime  |                     DATE  |           0  |\r\n|              2  |                                  DropOff_datetime  |                   STRING  |           0  |\r\n|              3  |                                      PUlocationID  |                   STRING  |           0  |\r\n|              4  |                                      DOlocationID  |                   STRING  |           0  |\r\n+-----------------------------------------------------------------------------------------------------------------+\r\n\r\n00\r\n\r\n", false, true);
            this.temp.delete();
            this.temp.create();
        }
    }

    @Test
    public void testImportsHeaderIsNotFullyReceivedIntoReceiveBuffer() throws Exception {
        new HttpQueryTestBuilder().withTempFolder(this.temp).withWorkerCount(1).withHttpServerConfigBuilder(new HttpServerConfigurationBuilder().withReceiveBufferSize(50)).run(cairoEngine -> {
            new SendAndReceiveRequestBuilder().withExpectDisconnect(true).execute(ValidImportRequest, "HTTP/1.1 200 OK\r\nServer: questDB/1.0\r\nDate: Thu, 1 Jan 1970 00:00:00 GMT\r\nTransfer-Encoding: chunked\r\nContent-Type: text/plain; charset=utf-8\r\n\r\n58\r\ncannot parse import because of receive buffer is not big enough to parse table structure\r\n00\r\n\r\n");
        });
    }

    @Test
    public void testImportRerunsExceedsRerunProcessingQueueSizeLoop() throws Exception {
        for (int i = 0; i < 10; i++) {
            System.out.println("*************************************************************************************");
            System.out.println("**************************         Run " + i + "            ********************************");
            System.out.println("*************************************************************************************");
            testImportRerunsExceedsRerunProcessingQueueSize(1000);
            this.temp.delete();
            this.temp.create();
        }
    }

    @Test
    public void testImportsCreateAsSelectAndDrop() throws Exception {
        new HttpQueryTestBuilder().withTempFolder(this.temp).withWorkerCount(4).withHttpServerConfigBuilder(new HttpServerConfigurationBuilder()).withTelemetry(false).run(cairoEngine -> {
            for (int i = 0; i < 10; i++) {
                System.out.println("*************************************************************************************");
                System.out.println("**************************         Run " + i + "            ********************************");
                System.out.println("*************************************************************************************");
                new SendAndReceiveRequestBuilder().withNetworkFacade(getSendDelayNetworkFacade(0)).withCompareLength("HTTP/1.1 200 OK\r\nServer: questDB/1.0\r\nDate: Thu, 1 Jan 1970 00:00:00 GMT\r\nTransfer-Encoding: chunked\r\nContent-Type: text/plain; charset=utf-8\r\n\r\n0666\r\n+-----------------------------------------------------------------------------------------------------------------+\r\n|      Location:  |                          fhv_tripdata_2017-02.csv  |        Pattern  | Locale  |      Errors  |\r\n|   Partition by  |                                              NONE  |                 |         |              |\r\n|      Timestamp  |                                              NONE  |                 |         |              |\r\n+-----------------------------------------------------------------------------------------------------------------+\r\n|   Rows handled  |                                                24  |                 |         |              |\r\n|  Rows imported  |                                                24  |                 |         |              |\r\n+-----------------------------------------------------------------------------------------------------------------+\r\n|              0  |                              Dispatching_base_num  |                   STRING  |           0  |\r\n|              1  |                                   Pickup_DateTime  |                     DATE  |           0  |\r\n|              2  |                                  DropOff_datetime  |                   STRING  |           0  |\r\n|              3  |                                      PUlocationID  |                   STRING  |           0  |\r\n|              4  |                                      DOlocationID  |                   STRING  |           0  |\r\n+-----------------------------------------------------------------------------------------------------------------+\r\n\r\n00\r\n\r\n".length()).execute(ValidImportRequest, "HTTP/1.1 200 OK\r\nServer: questDB/1.0\r\nDate: Thu, 1 Jan 1970 00:00:00 GMT\r\nTransfer-Encoding: chunked\r\nContent-Type: text/plain; charset=utf-8\r\n\r\n0666\r\n+-----------------------------------------------------------------------------------------------------------------+\r\n|      Location:  |                          fhv_tripdata_2017-02.csv  |        Pattern  | Locale  |      Errors  |\r\n|   Partition by  |                                              NONE  |                 |         |              |\r\n|      Timestamp  |                                              NONE  |                 |         |              |\r\n+-----------------------------------------------------------------------------------------------------------------+\r\n|   Rows handled  |                                                24  |                 |         |              |\r\n|  Rows imported  |                                                24  |                 |         |              |\r\n+-----------------------------------------------------------------------------------------------------------------+\r\n|              0  |                              Dispatching_base_num  |                   STRING  |           0  |\r\n|              1  |                                   Pickup_DateTime  |                     DATE  |           0  |\r\n|              2  |                                  DropOff_datetime  |                   STRING  |           0  |\r\n|              3  |                                      PUlocationID  |                   STRING  |           0  |\r\n|              4  |                                      DOlocationID  |                   STRING  |           0  |\r\n+-----------------------------------------------------------------------------------------------------------------+\r\n\r\n00\r\n\r\n");
                if (i == 0) {
                    new SendAndReceiveRequestBuilder().executeWithStandardHeaders("GET /query?query=create+table+copy+as+(select+*+from+%22fhv_tripdata_2017-02.csv%22)&count=true HTTP/1.1\r\n", "0c\r\n{\"ddl\":\"OK\"}\r\n00\r\n\r\n");
                } else {
                    new SendAndReceiveRequestBuilder().executeWithStandardHeaders("GET /query?query=insert+into+copy+select+*+from+%22fhv_tripdata_2017-02.csv%22&count=true HTTP/1.1\r\n", "0c\r\n{\"ddl\":\"OK\"}\r\n00\r\n\r\n");
                }
                new SendAndReceiveRequestBuilder().executeWithStandardHeaders("GET /query?query=drop+table+%22fhv_tripdata_2017-02.csv%22&count=true HTTP/1.1\r\n", "0c\r\n{\"ddl\":\"OK\"}\r\n00\r\n\r\n");
            }
        });
    }

    private void assertImportProcessedWhenClientDisconnected() throws Exception {
        new HttpQueryTestBuilder().withTempFolder(this.temp).withWorkerCount(2).withHttpServerConfigBuilder(new HttpServerConfigurationBuilder()).withTelemetry(false).run(cairoEngine -> {
            new SendAndReceiveRequestBuilder().execute(ValidImportRequest, "HTTP/1.1 200 OK\r\nServer: questDB/1.0\r\nDate: Thu, 1 Jan 1970 00:00:00 GMT\r\nTransfer-Encoding: chunked\r\nContent-Type: text/plain; charset=utf-8\r\n\r\n0666\r\n+-----------------------------------------------------------------------------------------------------------------+\r\n|      Location:  |                          fhv_tripdata_2017-02.csv  |        Pattern  | Locale  |      Errors  |\r\n|   Partition by  |                                              NONE  |                 |         |              |\r\n|      Timestamp  |                                              NONE  |                 |         |              |\r\n+-----------------------------------------------------------------------------------------------------------------+\r\n|   Rows handled  |                                                24  |                 |         |              |\r\n|  Rows imported  |                                                24  |                 |         |              |\r\n+-----------------------------------------------------------------------------------------------------------------+\r\n|              0  |                              Dispatching_base_num  |                   STRING  |           0  |\r\n|              1  |                                   Pickup_DateTime  |                     DATE  |           0  |\r\n|              2  |                                  DropOff_datetime  |                   STRING  |           0  |\r\n|              3  |                                      PUlocationID  |                   STRING  |           0  |\r\n|              4  |                                      DOlocationID  |                   STRING  |           0  |\r\n+-----------------------------------------------------------------------------------------------------------------+\r\n\r\n00\r\n\r\n");
            TableWriter lockWriter = lockWriter(cairoEngine, "fhv_tripdata_2017-02.csv");
            CountDownLatch countDownLatch = new CountDownLatch(2);
            long[] jArr = new long[2];
            Arrays.fill(jArr, -1L);
            for (int i = 0; i < 2; i++) {
                int i2 = i;
                new Thread(() -> {
                    for (int i3 = 0; i3 < 1; i3++) {
                        try {
                            try {
                                jArr[(i2 * 1) + i3] = new SendAndReceiveRequestBuilder().connectAndSendRequest(ValidImportRequest);
                            } catch (Exception e) {
                                LOG.error().$("Failed execute insert http request. Server error ").$(e).$();
                            }
                        } finally {
                            countDownLatch.countDown();
                        }
                    }
                    LOG.info().$("Stopped thread ").$(i2).$();
                }).start();
            }
            countDownLatch.await();
            assertNRowsInserted(24);
            for (int i3 = 0; i3 < jArr.length; i3++) {
                Assert.assertNotEquals(jArr[i3], -1L);
                NetworkFacadeImpl.INSTANCE.close(jArr[i3]);
            }
            lockWriter.close();
            for (int i4 = 0; i4 < 20; i4++) {
                try {
                    assertNRowsInserted(72);
                    return;
                } catch (ComparisonFailure e) {
                    if (i4 >= 9) {
                        throw e;
                    }
                    Thread.sleep(50L);
                }
            }
        });
    }

    protected void assertNRowsInserted(int i) throws InterruptedException {
        new SendAndReceiveRequestBuilder().executeWithStandardHeaders("GET /query?query=select+count(*)+from+%22fhv_tripdata_2017-02.csv%22&count=true HTTP/1.1\r\n", "83\r\n{\"query\":\"select count(*) from \\\"fhv_tripdata_2017-02.csv\\\"\",\"columns\":[{\"name\":\"count\",\"type\":\"LONG\"}],\"dataset\":[[" + i + "]],\"count\":1}\r\n00\r\n\r\n");
    }

    private void assertInsertWaitsExceedsRerunProcessingQueueSize() throws Exception {
        new HttpQueryTestBuilder().withTempFolder(this.temp).withWorkerCount(4).withHttpServerConfigBuilder(new HttpServerConfigurationBuilder().withRerunProcessingQueueSize(1)).withTelemetry(false).run(cairoEngine -> {
            new SendAndReceiveRequestBuilder().executeWithStandardHeaders("GET /query?query=%0A%0A%0Acreate+table+balances_x+(%0A%09cust_id+int%2C+%0A%09balance_ccy+symbol%2C+%0A%09balance+double%2C+%0A%09status+byte%2C+%0A%09timestamp+timestamp%0A)&limit=0%2C1000&count=true HTTP/1.1\r\n", "0c\r\n{\"ddl\":\"OK\"}\r\n00\r\n\r\n");
            TableWriter lockWriter = lockWriter(cairoEngine, "balances_x");
            CountDownLatch countDownLatch = new CountDownLatch(4);
            AtomicInteger atomicInteger = new AtomicInteger();
            for (int i = 0; i < 4; i++) {
                new Thread(() -> {
                    for (int i2 = 0; i2 < 10; i2++) {
                        try {
                            try {
                                new SendAndReceiveRequestBuilder().executeWithStandardHeaders("GET /query?query=%0A%0Ainsert+into+balances_x+(cust_id%2C+balance_ccy%2C+balance%2C+timestamp)+values+(1%2C+%27USD%27%2C+1500.00%2C+6000000001)&limit=0%2C1000&count=true HTTP/1.1\r\n", "0c\r\n{\"ddl\":\"OK\"}\r\n00\r\n\r\n");
                            } catch (AssertionError e) {
                                atomicInteger.incrementAndGet();
                            } catch (Exception e2) {
                                LOG.error().$("Failed execute insert http request. Server error ").$(e2);
                            }
                        } finally {
                            countDownLatch.countDown();
                        }
                    }
                }).start();
            }
            Assert.assertFalse(countDownLatch.await(200L, TimeUnit.MILLISECONDS));
            lockWriter.close();
            if (!countDownLatch.await(5000L, TimeUnit.MILLISECONDS)) {
                Assert.fail("Wait to process retries exceeded timeout");
            }
            new SendAndReceiveRequestBuilder().executeWithStandardHeaders("GET /query?query=select+count(*)+from+balances_x&count=true HTTP/1.1\r\n", "71\r\n{\"query\":\"select count(*) from balances_x\",\"columns\":[{\"name\":\"count\",\"type\":\"LONG\"}],\"dataset\":[[" + (40 - atomicInteger.get()) + "]],\"count\":1}\r\n00\r\n\r\n");
        });
    }

    private void assertInsertWaitsWhenWriterLocked() throws Exception {
        new HttpQueryTestBuilder().withTempFolder(this.temp).withWorkerCount(2).withHttpServerConfigBuilder(new HttpServerConfigurationBuilder()).withTelemetry(false).run(cairoEngine -> {
            new SendAndReceiveRequestBuilder().executeWithStandardHeaders("GET /query?query=%0A%0A%0Acreate+table+balances_x+(%0A%09cust_id+int%2C+%0A%09balance_ccy+symbol%2C+%0A%09balance+double%2C+%0A%09status+byte%2C+%0A%09timestamp+timestamp%0A)&limit=0%2C1000&count=true HTTP/1.1\r\n", "0c\r\n{\"ddl\":\"OK\"}\r\n00\r\n\r\n");
            TableWriter lockWriter = lockWriter(cairoEngine, "balances_x");
            CountDownLatch countDownLatch = new CountDownLatch(2);
            for (int i = 0; i < 2; i++) {
                new Thread(() -> {
                    for (int i2 = 0; i2 < 10; i2++) {
                        try {
                            try {
                                new SendAndReceiveRequestBuilder().executeWithStandardHeaders("GET /query?query=%0A%0Ainsert+into+balances_x+(cust_id%2C+balance_ccy%2C+balance%2C+timestamp)+values+(1%2C+%27USD%27%2C+1500.00%2C+6000000001)&limit=0%2C1000&count=true HTTP/1.1\r\n", "0c\r\n{\"ddl\":\"OK\"}\r\n00\r\n\r\n");
                            } catch (Exception e) {
                                LOG.error().$("Failed execute insert http request. Server error ").$(e).$();
                            }
                        } finally {
                            countDownLatch.countDown();
                        }
                    }
                }).start();
            }
            Assert.assertFalse(countDownLatch.await(200L, TimeUnit.MILLISECONDS));
            lockWriter.close();
            countDownLatch.await();
            new SendAndReceiveRequestBuilder().executeWithStandardHeaders("GET /query?query=select+count(*)+from+balances_x&count=true HTTP/1.1\r\n", "71\r\n{\"query\":\"select count(*) from balances_x\",\"columns\":[{\"name\":\"count\",\"type\":\"LONG\"}],\"dataset\":[[20]],\"count\":1}\r\n00\r\n\r\n");
        });
    }

    public void testImportWaitsWhenWriterLocked(HttpQueryTestBuilder httpQueryTestBuilder, int i, String str, String str2, boolean z, boolean z2) throws Exception {
        int workerCount = httpQueryTestBuilder.getWorkerCount();
        httpQueryTestBuilder.run(cairoEngine -> {
            new SendAndReceiveRequestBuilder().execute(ValidImportRequest, "HTTP/1.1 200 OK\r\nServer: questDB/1.0\r\nDate: Thu, 1 Jan 1970 00:00:00 GMT\r\nTransfer-Encoding: chunked\r\nContent-Type: text/plain; charset=utf-8\r\n\r\n0666\r\n+-----------------------------------------------------------------------------------------------------------------+\r\n|      Location:  |                          fhv_tripdata_2017-02.csv  |        Pattern  | Locale  |      Errors  |\r\n|   Partition by  |                                              NONE  |                 |         |              |\r\n|      Timestamp  |                                              NONE  |                 |         |              |\r\n+-----------------------------------------------------------------------------------------------------------------+\r\n|   Rows handled  |                                                24  |                 |         |              |\r\n|  Rows imported  |                                                24  |                 |         |              |\r\n+-----------------------------------------------------------------------------------------------------------------+\r\n|              0  |                              Dispatching_base_num  |                   STRING  |           0  |\r\n|              1  |                                   Pickup_DateTime  |                     DATE  |           0  |\r\n|              2  |                                  DropOff_datetime  |                   STRING  |           0  |\r\n|              3  |                                      PUlocationID  |                   STRING  |           0  |\r\n|              4  |                                      DOlocationID  |                   STRING  |           0  |\r\n+-----------------------------------------------------------------------------------------------------------------+\r\n\r\n00\r\n\r\n");
            TableWriter lockWriter = lockWriter(cairoEngine, "fhv_tripdata_2017-02.csv");
            CountDownLatch countDownLatch = new CountDownLatch(workerCount);
            AtomicInteger atomicInteger = new AtomicInteger();
            for (int i2 = 0; i2 < workerCount; i2++) {
                int i3 = i2;
                new Thread(() -> {
                    for (int i4 = 0; i4 < 1; i4++) {
                        try {
                            try {
                                new SendAndReceiveRequestBuilder().withNetworkFacade(getSendDelayNetworkFacade(i)).withCompareLength(str2.length()).execute(str, str2);
                                atomicInteger.incrementAndGet();
                            } catch (AssertionError e) {
                                if (z2) {
                                    LOG.info().$("Failed execute insert http request. Comparison failed").$();
                                } else {
                                    LOG.error().$("Failed execute insert http request. Comparison failed").$(e).$();
                                }
                            } catch (Exception e2) {
                                LOG.error().$("Failed execute insert http request.").$(e2).$();
                            }
                        } finally {
                            countDownLatch.countDown();
                        }
                    }
                    LOG.info().$("Stopped thread ").$(i3).$();
                }).start();
            }
            boolean await = countDownLatch.await(100L, TimeUnit.MILLISECONDS);
            if (z) {
                Assert.assertFalse(await);
            }
            lockWriter.close();
            if (!countDownLatch.await(50000L, TimeUnit.MILLISECONDS)) {
                Assert.fail("Imports did not finish within reasonable time");
            }
            if (!z2) {
                Assert.assertEquals(workerCount, atomicInteger.get());
            }
            LOG.info().$("Requesting row count").$();
            int i4 = (atomicInteger.get() + 1) * 24;
            new SendAndReceiveRequestBuilder().executeWithStandardHeaders("GET /query?query=select+count(*)+from+%22fhv_tripdata_2017-02.csv%22&count=true HTTP/1.1\r\n", (i4 < 100 ? "83" : "84") + "\r\n{\"query\":\"select count(*) from \\\"fhv_tripdata_2017-02.csv\\\"\",\"columns\":[{\"name\":\"count\",\"type\":\"LONG\"}],\"dataset\":[[" + i4 + "]],\"count\":1}\r\n00\r\n\r\n");
        });
    }

    private void assertInsertsIsPerformedWhenWriterLockedAndDisconnected() throws Exception {
        new HttpQueryTestBuilder().withTempFolder(this.temp).withWorkerCount(4).withHttpServerConfigBuilder(new HttpServerConfigurationBuilder()).withTelemetry(false).run(cairoEngine -> {
            new SendAndReceiveRequestBuilder().executeWithStandardHeaders("GET /query?query=%0A%0A%0Acreate+table+balances_x+(%0A%09cust_id+int%2C+%0A%09balance_ccy+symbol%2C+%0A%09balance+double%2C+%0A%09status+byte%2C+%0A%09timestamp+timestamp%0A)&limit=0%2C1000&count=true HTTP/1.1\r\n", "0c\r\n{\"ddl\":\"OK\"}\r\n00\r\n\r\n");
            TableWriter lockWriter = lockWriter(cairoEngine, "balances_x");
            CountDownLatch countDownLatch = new CountDownLatch(4);
            long[] jArr = new long[4];
            Arrays.fill(jArr, -1L);
            Thread[] threadArr = new Thread[4];
            for (int i = 0; i < 4; i++) {
                int i2 = i;
                threadArr[i] = new Thread(() -> {
                    try {
                        try {
                            Thread.sleep(i2 * 5);
                            jArr[i2] = new SendAndReceiveRequestBuilder().connectAndSendRequest("GET /query?query=%0A%0Ainsert+into+balances_x+(cust_id%2C+balance_ccy%2C+balance%2C+timestamp)+values+(" + i2 + "%2C+%27USD%27%2C+1500.00%2C+6000000001)&limit=0%2C1000&count=true HTTP/1.1\r\n" + SendAndReceiveRequestBuilder.RequestHeaders);
                        } catch (Exception e) {
                            LOG.error().$("Failed execute insert http request. Server error ").$(e);
                        }
                    } finally {
                        countDownLatch.countDown();
                    }
                });
                threadArr[i].start();
            }
            countDownLatch.await();
            new SendAndReceiveRequestBuilder().executeWithStandardHeaders("GET /query?query=SELECT+1 HTTP/1.1\r\n", "54\r\n{\"query\":\"SELECT 1\",\"columns\":[{\"name\":\"1\",\"type\":\"INT\"}],\"dataset\":[[1]],\"count\":1}\r\n00\r\n\r\n");
            for (int i3 = 0; i3 < jArr.length; i3++) {
                Assert.assertNotEquals(jArr[i3], -1L);
                NetworkFacadeImpl.INSTANCE.close(jArr[i3]);
            }
            lockWriter.close();
            for (int i4 = 0; i4 < 80; i4++) {
                try {
                    new SendAndReceiveRequestBuilder().executeWithStandardHeaders("GET /query?query=select+count()+from+balances_x&count=true HTTP/1.1\r\n", "6f\r\n{\"query\":\"select count() from balances_x\",\"columns\":[{\"name\":\"count\",\"type\":\"LONG\"}],\"dataset\":[[4]],\"count\":1}\r\n00\r\n\r\n");
                    return;
                } catch (ComparisonFailure e) {
                    if (i4 >= 80 - 1) {
                        throw e;
                    }
                    Thread.sleep(50L);
                }
            }
        });
    }

    public void testImportRerunsExceedsRerunProcessingQueueSize(int i) throws Exception {
        new HttpQueryTestBuilder().withTempFolder(this.temp).withWorkerCount(2).withHttpServerConfigBuilder(new HttpServerConfigurationBuilder().withNetwork(getSendDelayNetworkFacade(i)).withRerunProcessingQueueSize(1)).run(cairoEngine -> {
            new SendAndReceiveRequestBuilder().execute(ValidImportRequest, "HTTP/1.1 200 OK\r\nServer: questDB/1.0\r\nDate: Thu, 1 Jan 1970 00:00:00 GMT\r\nTransfer-Encoding: chunked\r\nContent-Type: text/plain; charset=utf-8\r\n\r\n0666\r\n+-----------------------------------------------------------------------------------------------------------------+\r\n|      Location:  |                          fhv_tripdata_2017-02.csv  |        Pattern  | Locale  |      Errors  |\r\n|   Partition by  |                                              NONE  |                 |         |              |\r\n|      Timestamp  |                                              NONE  |                 |         |              |\r\n+-----------------------------------------------------------------------------------------------------------------+\r\n|   Rows handled  |                                                24  |                 |         |              |\r\n|  Rows imported  |                                                24  |                 |         |              |\r\n+-----------------------------------------------------------------------------------------------------------------+\r\n|              0  |                              Dispatching_base_num  |                   STRING  |           0  |\r\n|              1  |                                   Pickup_DateTime  |                     DATE  |           0  |\r\n|              2  |                                  DropOff_datetime  |                   STRING  |           0  |\r\n|              3  |                                      PUlocationID  |                   STRING  |           0  |\r\n|              4  |                                      DOlocationID  |                   STRING  |           0  |\r\n+-----------------------------------------------------------------------------------------------------------------+\r\n\r\n00\r\n\r\n");
            TableWriter lockWriter = lockWriter(cairoEngine, "fhv_tripdata_2017-02.csv");
            AtomicInteger atomicInteger = new AtomicInteger();
            CountDownLatch countDownLatch = new CountDownLatch(4);
            for (int i2 = 0; i2 < 4; i2++) {
                int i3 = i2;
                new Thread(() -> {
                    for (int i4 = 0; i4 < 4; i4++) {
                        try {
                            try {
                                new SendAndReceiveRequestBuilder().execute(ValidImportRequest, "HTTP/1.1 200 OK\r\nServer: questDB/1.0\r\nDate: Thu, 1 Jan 1970 00:00:00 GMT\r\nTransfer-Encoding: chunked\r\nContent-Type: text/plain; charset=utf-8\r\n\r\n0666\r\n+-----------------------------------------------------------------------------------------------------------------+\r\n|      Location:  |                          fhv_tripdata_2017-02.csv  |        Pattern  | Locale  |      Errors  |\r\n|   Partition by  |                                              NONE  |                 |         |              |\r\n|      Timestamp  |                                              NONE  |                 |         |              |\r\n+-----------------------------------------------------------------------------------------------------------------+\r\n|   Rows handled  |                                                24  |                 |         |              |\r\n|  Rows imported  |                                                24  |                 |         |              |\r\n+-----------------------------------------------------------------------------------------------------------------+\r\n|              0  |                              Dispatching_base_num  |                   STRING  |           0  |\r\n|              1  |                                   Pickup_DateTime  |                     DATE  |           0  |\r\n|              2  |                                  DropOff_datetime  |                   STRING  |           0  |\r\n|              3  |                                      PUlocationID  |                   STRING  |           0  |\r\n|              4  |                                      DOlocationID  |                   STRING  |           0  |\r\n+-----------------------------------------------------------------------------------------------------------------+\r\n\r\n00\r\n\r\n");
                            } catch (AssertionError e) {
                                LOG.info().$("Server call succeeded but response is different from the expected one").$();
                                atomicInteger.incrementAndGet();
                            } catch (Exception e2) {
                                LOG.error().$("Failed execute insert http request. Server error ").$(e2).$();
                            }
                        } finally {
                            countDownLatch.countDown();
                        }
                    }
                    LOG.info().$("Stopped thread ").$(i3).$();
                }).start();
            }
            Assert.assertFalse(countDownLatch.await(100L, TimeUnit.MILLISECONDS));
            lockWriter.close();
            if (!countDownLatch.await(5000L, TimeUnit.MILLISECONDS)) {
                Assert.fail("Imports did not finish within reasonable time");
            }
            LOG.info().$("Requesting row count").$();
            new SendAndReceiveRequestBuilder().executeWithStandardHeaders("GET /query?query=select+count(*)+from+%22fhv_tripdata_2017-02.csv%22&count=true HTTP/1.1\r\n", "84\r\n{\"query\":\"select count(*) from \\\"fhv_tripdata_2017-02.csv\\\"\",\"columns\":[{\"name\":\"count\",\"type\":\"LONG\"}],\"dataset\":[[" + ((17 - atomicInteger.get()) * 24) + "]],\"count\":1}\r\n00\r\n\r\n");
        });
    }

    @Test
    public void testRenameWaitsWhenWriterLocked() throws Exception {
        new HttpQueryTestBuilder().withTempFolder(this.temp).withWorkerCount(2).withHttpServerConfigBuilder(new HttpServerConfigurationBuilder()).withTelemetry(false).run(cairoEngine -> {
            new SendAndReceiveRequestBuilder().executeWithStandardHeaders("GET /query?query=%0A%0A%0Acreate+table+balances_x+(%0A%09cust_id+int%2C+%0A%09balance_ccy+symbol%2C+%0A%09balance+double%2C+%0A%09status+byte%2C+%0A%09timestamp+timestamp%0A)&limit=0%2C1000&count=true HTTP/1.1\r\n", "0c\r\n{\"ddl\":\"OK\"}\r\n00\r\n\r\n");
            TableWriter lockWriter = lockWriter(cairoEngine, "balances_x");
            CountDownLatch countDownLatch = new CountDownLatch(1);
            new Thread(() -> {
                try {
                    try {
                        new SendAndReceiveRequestBuilder().executeWithStandardHeaders("GET /query?query=rename+table+%27balances_x%27+to+%27balances_y%27&limit=0%2C1000&count=true HTTP/1.1\r\n", "0c\r\n{\"ddl\":\"OK\"}\r\n00\r\n\r\n");
                    } catch (Exception e) {
                        LOG.error().$("Failed execute insert http request. Server error ").$(e).$();
                    }
                } finally {
                    countDownLatch.countDown();
                }
            }).start();
            Assert.assertFalse(countDownLatch.await(200L, TimeUnit.MILLISECONDS));
            lockWriter.close();
            Assert.assertTrue("Table rename did not complete within timeout after writer is released", countDownLatch.await(500L, TimeUnit.MILLISECONDS));
        });
    }

    @NotNull
    private NetworkFacade getSendDelayNetworkFacade(final int i) {
        return new NetworkFacadeImpl() { // from class: io.questdb.cutlass.http.RetryIODispatcherTest.2
            final AtomicInteger totalSent = new AtomicInteger();

            public int send(long j, long j2, int i2) {
                if (i == 0) {
                    return super.send(j, j2, i2);
                }
                int i3 = this.totalSent.get();
                if (i2 <= 0) {
                    return 0;
                }
                if (i3 >= i) {
                    this.totalSent.set(0);
                    return 0;
                }
                int send = super.send(j, j2, Math.min(i2, i - i3));
                this.totalSent.addAndGet(send);
                return send;
            }
        };
    }

    @NotNull
    private TableWriter lockWriter(CairoEngine cairoEngine, String str) throws InterruptedException {
        TableWriter tableWriter = null;
        for (int i = 0; i < 10; i++) {
            try {
                tableWriter = cairoEngine.getWriter(AllowAllCairoSecurityContext.INSTANCE, str);
                break;
            } catch (EntryUnavailableException e) {
                Thread.sleep(10L);
            }
        }
        if (tableWriter == null) {
            Assert.fail("Cannot lock writer in a reasonable time");
        }
        return tableWriter;
    }
}
