/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.client;

import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.AsyncTable;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.junit.runners.Parameterized;

public abstract class AbstractTestAsyncTableRegionReplicasRead {
    protected static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
    protected static TableName TABLE_NAME = TableName.valueOf((String)"async");
    protected static byte[] FAMILY = Bytes.toBytes((String)"cf");
    protected static byte[] QUALIFIER = Bytes.toBytes((String)"cq");
    protected static byte[] ROW = Bytes.toBytes((String)"row");
    protected static byte[] VALUE = Bytes.toBytes((String)"value");
    protected static int REPLICA_COUNT = 3;
    protected static AsyncConnection ASYNC_CONN;
    @Rule
    public TestName testName = new TestName();
    @Parameterized.Parameter
    public Supplier<AsyncTable<?>> getTable;
    protected static volatile boolean FAIL_PRIMARY_GET;
    protected static ConcurrentMap<Integer, AtomicInteger> REPLICA_ID_TO_COUNT;

    private static AsyncTable<?> getRawTable() {
        return ASYNC_CONN.getTable(TABLE_NAME);
    }

    private static AsyncTable<?> getTable() {
        return ASYNC_CONN.getTable(TABLE_NAME, (ExecutorService)ForkJoinPool.commonPool());
    }

    @Parameterized.Parameters
    public static List<Object[]> params() {
        return Arrays.asList({AbstractTestAsyncTableRegionReplicasRead::getRawTable}, {AbstractTestAsyncTableRegionReplicasRead::getTable});
    }

    private static boolean allReplicasHaveRow(byte[] row) throws IOException {
        for (JVMClusterUtil.RegionServerThread t : TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
            for (HRegion region : t.getRegionServer().getRegions(TABLE_NAME)) {
                if (!region.get(new Get(row), false).isEmpty()) continue;
                return false;
            }
        }
        return true;
    }

    protected static void startClusterAndCreateTable() throws Exception {
        TEST_UTIL.getConfiguration().setLong("hbase.rpc.read.timeout", TimeUnit.MINUTES.toMillis(10L));
        TEST_UTIL.getConfiguration().setLong("hbase.rpc.timeout", TimeUnit.MINUTES.toMillis(10L));
        TEST_UTIL.getConfiguration().setLong("hbase.client.scanner.timeout.period", TimeUnit.MINUTES.toMillis(10L));
        TEST_UTIL.getConfiguration().setLong("hbase.client.primaryCallTimeout.get", TimeUnit.SECONDS.toMicros(1L));
        TEST_UTIL.getConfiguration().setLong("hbase.client.replicaCallTimeout.scan", TimeUnit.SECONDS.toMicros(1L));
        TEST_UTIL.getConfiguration().setLong("hbase.client.pause", 10L);
        TEST_UTIL.getConfiguration().setInt("hbase.client.retries.number", Integer.MAX_VALUE);
        TEST_UTIL.startMiniCluster(3);
        TEST_UTIL.getAdmin().createTable(TableDescriptorBuilder.newBuilder((TableName)TABLE_NAME).setColumnFamily(ColumnFamilyDescriptorBuilder.of((byte[])FAMILY)).setRegionReplication(REPLICA_COUNT).setCoprocessor(FailPrimaryGetCP.class.getName()).build());
        TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME);
        ASYNC_CONN = (AsyncConnection)ConnectionFactory.createAsyncConnection((Configuration)TEST_UTIL.getConfiguration()).get();
    }

    protected static void waitUntilAllReplicasHaveRow(byte[] row) throws IOException {
        TEST_UTIL.getAdmin().disableTable(TABLE_NAME);
        TEST_UTIL.getAdmin().enableTable(TABLE_NAME);
        TEST_UTIL.waitFor(30000L, () -> AbstractTestAsyncTableRegionReplicasRead.allReplicasHaveRow(row));
    }

    @AfterClass
    public static void tearDownAfterClass() throws Exception {
        IOUtils.closeQuietly((Closeable)ASYNC_CONN);
        TEST_UTIL.shutdownMiniCluster();
    }

    protected static int getSecondaryGetCount() {
        return REPLICA_ID_TO_COUNT.entrySet().stream().filter(e -> (Integer)e.getKey() != 0).mapToInt(e -> ((AtomicInteger)e.getValue()).get()).sum();
    }

    protected static int getPrimaryGetCount() {
        AtomicInteger primaryGetCount = (AtomicInteger)REPLICA_ID_TO_COUNT.get(0);
        return primaryGetCount != null ? primaryGetCount.get() : 0;
    }

    protected abstract void readAndCheck(AsyncTable<?> var1, int var2) throws Exception;

    @Test
    public void testNoReplicaRead() throws Exception {
        FAIL_PRIMARY_GET = false;
        REPLICA_ID_TO_COUNT.clear();
        AsyncTable<?> table = this.getTable.get();
        this.readAndCheck(table, -1);
        Thread.sleep(5000L);
        Assert.assertEquals((long)0L, (long)AbstractTestAsyncTableRegionReplicasRead.getSecondaryGetCount());
    }

    @Test
    public void testReplicaRead() throws Exception {
        FAIL_PRIMARY_GET = true;
        REPLICA_ID_TO_COUNT.clear();
        AsyncTable<?> table = this.getTable.get();
        this.readAndCheck(table, -1);
        Thread.sleep(5000L);
        int count = AbstractTestAsyncTableRegionReplicasRead.getPrimaryGetCount();
        Thread.sleep(10000L);
        Assert.assertEquals((long)count, (long)AbstractTestAsyncTableRegionReplicasRead.getPrimaryGetCount());
    }

    @Test
    public void testReadSpecificReplica() throws Exception {
        FAIL_PRIMARY_GET = false;
        REPLICA_ID_TO_COUNT.clear();
        AsyncTable<?> table = this.getTable.get();
        for (int replicaId = 0; replicaId < REPLICA_COUNT; ++replicaId) {
            this.readAndCheck(table, replicaId);
            Assert.assertEquals((long)1L, (long)((AtomicInteger)REPLICA_ID_TO_COUNT.get(replicaId)).get());
        }
    }

    static {
        FAIL_PRIMARY_GET = false;
        REPLICA_ID_TO_COUNT = new ConcurrentHashMap<Integer, AtomicInteger>();
    }

    public static final class FailPrimaryGetCP
    implements RegionObserver,
    RegionCoprocessor {
        public Optional<RegionObserver> getRegionObserver() {
            return Optional.of(this);
        }

        private void recordAndTryFail(ObserverContext<RegionCoprocessorEnvironment> c) throws IOException {
            RegionInfo region = ((RegionCoprocessorEnvironment)c.getEnvironment()).getRegionInfo();
            if (!region.getTable().equals((Object)TABLE_NAME)) {
                return;
            }
            REPLICA_ID_TO_COUNT.computeIfAbsent(region.getReplicaId(), k -> new AtomicInteger()).incrementAndGet();
            if (region.getReplicaId() == 0 && FAIL_PRIMARY_GET) {
                throw new IOException("Inject error");
            }
        }

        public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get, List<Cell> result) throws IOException {
            this.recordAndTryFail(c);
        }

        public void preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan) throws IOException {
            this.recordAndTryFail(c);
        }
    }
}

