/*
 * Decompiled with CFR 0.152.
 */
package org.yetiz.utils.hbase;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
import org.slf4j.LoggerFactory;
import org.yetiz.utils.hbase.HBaseAdmin;
import org.yetiz.utils.hbase.HBaseTable;
import org.yetiz.utils.hbase.TableName;
import org.yetiz.utils.hbase.exception.DataSourceException;
import org.yetiz.utils.hbase.exception.UnHandledException;
import org.yetiz.utils.hbase.exception.YHBaseException;

public class HBaseClient {
    public static final Charset DEFAULT_CHARSET = Charset.forName("utf-8");
    public static final int DEFAULT_INVOKER_COUNT = 1;
    private static final int MAX_FAST_BATCH_SIZE = 5000;
    private static final int MAX_ASYNC_BATCH_SIZE = 5000;
    protected final ConcurrentHashMap<TableName, LinkedBlockingQueue<Row>> fastCollection = new ConcurrentHashMap();
    protected final ConcurrentHashMap<TableName, LinkedBlockingQueue<HBaseTable.Async.AsyncPackage>> asyncPackages = new ConcurrentHashMap();
    private final int invokerCount;
    private boolean closed = false;
    private Connection connection;
    private Configuration configuration = HBaseConfiguration.create();

    private HBaseClient(int invokerCount) {
        this.invokerCount = invokerCount;
    }

    public static final byte[] bytes(String string) {
        return string.getBytes(DEFAULT_CHARSET);
    }

    private void init() {
        this.connection = this.newConnection();
        for (int i = 0; i < this.invokerCount; ++i) {
            new MiniConsumer(this).start();
        }
    }

    private Connection newConnection() {
        try {
            Connection connection = ConnectionFactory.createConnection((Configuration)this.configuration);
            return connection;
        }
        catch (Exception e) {
            throw new DataSourceException(e);
        }
    }

    public HBaseAdmin admin() {
        try {
            return new HBaseAdmin(this.connection.getAdmin());
        }
        catch (Exception e) {
            throw new DataSourceException(e);
        }
    }

    public void close() {
        this.closed = true;
    }

    public boolean isClosed() {
        return this.closed;
    }

    public HBaseTable table(TableName tableName) {
        try {
            return new HBaseTable(tableName, this.connection().getTable(tableName.get()), this.asyncPackages, this.fastCollection);
        }
        catch (Throwable throwable) {
            throw this.convertedException(throwable);
        }
    }

    protected Connection connection() {
        return this.connection;
    }

    private YHBaseException convertedException(Throwable throwable) {
        if (throwable instanceof YHBaseException) {
            return (YHBaseException)throwable;
        }
        return new UnHandledException(throwable);
    }

    public long rowCount(TableName tableName, byte[] family) {
        AggregationClient aggregationClient = new AggregationClient(this.configuration());
        Scan scan = new Scan();
        scan.addFamily(family);
        long count = 0L;
        try {
            count = aggregationClient.rowCount(tableName.get(), null, scan);
        }
        catch (Throwable throwable) {
            // empty catch block
        }
        return count;
    }

    public Configuration configuration() {
        return this.configuration;
    }

    public static class Builder {
        private HBaseClient hBaseClient;

        public static final Builder create() {
            return Builder.create(1);
        }

        public static final Builder create(int invokerCount) {
            Builder builder = new Builder();
            builder.hBaseClient = new HBaseClient(invokerCount);
            return builder;
        }

        public final Builder configuration(Configuration configuration) {
            this.hBaseClient.configuration = configuration;
            return this;
        }

        public final Builder set(Parameter key, String value) {
            this.hBaseClient.configuration.set(key.name(), value);
            return this;
        }

        public final Builder setLong(Parameter key, Long value) {
            this.hBaseClient.configuration.setLong(key.name(), value.longValue());
            return this;
        }

        public final HBaseClient build() {
            this.hBaseClient.init();
            return this.hBaseClient;
        }
    }

    public static class Parameter {
        public static final Parameter ZK_QUORUM = new Parameter("hbase.zookeeper.quorum");
        public static final Parameter ZK_PROPERTY_CLIENT_PORT = new Parameter("hbase.zookeeper.property.clientPort");
        public static final Parameter CLIENT_SCANNER_CACHING = new Parameter("hbase.client.scanner.caching");
        public static final Parameter RPC_TIMEOUT = new Parameter("hbase.rpc.timeout");
        public static final Parameter ZK_SESSION_TIMEOUT = new Parameter("zookeeper.session.timeout");
        private String name;

        private Parameter(String name) {
            this.name = name;
        }

        public String name() {
            return this.name;
        }
    }

    public static class MiniConsumer
    extends Thread {
        private HBaseClient client;

        public MiniConsumer(HBaseClient client) {
            this.client = client;
        }

        @Override
        public void run() {
            while (!this.client.isClosed()) {
                try {
                    this.drainFast();
                }
                catch (Throwable throwable) {
                    LoggerFactory.getLogger(MiniConsumer.class).error("drainFast exception occur.", throwable);
                }
                try {
                    this.drainAsync();
                    Thread.sleep(100L);
                }
                catch (InterruptedException throwable) {
                }
                catch (Throwable throwable) {
                    LoggerFactory.getLogger(MiniConsumer.class).error("drainAsync exception occur.", throwable);
                }
            }
        }

        private void drainFast() {
            this.client.fastCollection.entrySet().parallelStream().forEach(pair -> {
                ArrayList rows = new ArrayList();
                ((LinkedBlockingQueue)pair.getValue()).drainTo(rows, 5000);
                if (rows.isEmpty()) {
                    return;
                }
                Object[] results = new Object[rows.size()];
                Object connection = null;
                try {
                    this.client.table((TableName)pair.getKey()).batch(rows, results);
                }
                catch (Throwable throwable) {
                    throw this.client.convertedException(throwable);
                }
                finally {
                    try {
                        connection.close();
                    }
                    catch (Throwable throwable) {}
                }
            });
        }

        private void drainAsync() {
            this.client.asyncPackages.entrySet().parallelStream().forEach(pair -> {
                ArrayList packages = new ArrayList();
                ArrayList rows = new ArrayList();
                ((LinkedBlockingQueue)pair.getValue()).drainTo(packages, 5000);
                if (packages.isEmpty()) {
                    return;
                }
                packages.stream().forEach(asyncPackage -> rows.add(asyncPackage.action));
                Object[] results = new Object[packages.size()];
                HBaseTable.Async.AsyncPackage[] packageArray = new HBaseTable.Async.AsyncPackage[packages.size()];
                packages.toArray(packageArray);
                Connection connection = null;
                try {
                    connection = this.client.newConnection();
                    this.client.table((TableName)pair.getKey()).batch(rows, results);
                }
                catch (Throwable throwable) {
                    throw this.client.convertedException(throwable);
                }
                finally {
                    try {
                        connection.close();
                    }
                    catch (Throwable throwable) {}
                }
                HashMap<HBaseTable.Async.AsyncPackage, Object> invokes = new HashMap<HBaseTable.Async.AsyncPackage, Object>();
                for (int i = 0; i < results.length; ++i) {
                    invokes.put(packageArray[i], results[i]);
                }
                invokes.entrySet().parallelStream().forEach(entry -> {
                    HBaseTable.Task task = ((HBaseTable.Async.AsyncPackage)entry.getKey()).callback;
                    if (task == null) {
                        return;
                    }
                    if (task instanceof HBaseTable.ResultTask) {
                        ((HBaseTable.ResultTask)task).callback((Result)entry.getValue());
                    }
                    if (task instanceof HBaseTable.CallbackTask) {
                        ((HBaseTable.CallbackTask)task).callback();
                    }
                });
            });
        }
    }
}

