/*
 * Decompiled with CFR 0.152.
 */
package org.janusgraph.diskstorage.cql;

import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.HostDistance;
import com.datastax.driver.core.JdkSSLOptions;
import com.datastax.driver.core.KeyspaceMetadata;
import com.datastax.driver.core.PoolingOptions;
import com.datastax.driver.core.ProtocolVersion;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.TableMetadata;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.core.exceptions.QueryExecutionException;
import com.datastax.driver.core.exceptions.QueryValidationException;
import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
import com.datastax.driver.core.policies.TokenAwarePolicy;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.schemabuilder.SchemaBuilder;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.vavr.API;
import io.vavr.Tuple;
import io.vavr.collection.Array;
import io.vavr.collection.HashMap;
import io.vavr.collection.Iterator;
import io.vavr.collection.Traversable;
import io.vavr.concurrent.Future;
import io.vavr.control.Option;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import javax.annotation.Resource;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import org.janusgraph.diskstorage.BackendException;
import org.janusgraph.diskstorage.BaseTransactionConfig;
import org.janusgraph.diskstorage.Entry;
import org.janusgraph.diskstorage.PermanentBackendException;
import org.janusgraph.diskstorage.StaticBuffer;
import org.janusgraph.diskstorage.StoreMetaData;
import org.janusgraph.diskstorage.common.DistributedStoreManager;
import org.janusgraph.diskstorage.configuration.Configuration;
import org.janusgraph.diskstorage.configuration.ModifiableConfiguration;
import org.janusgraph.diskstorage.cql.CQLConfigOptions;
import org.janusgraph.diskstorage.cql.CQLKeyColumnValueStore;
import org.janusgraph.diskstorage.cql.CQLTransaction;
import org.janusgraph.diskstorage.keycolumnvalue.KCVMutation;
import org.janusgraph.diskstorage.keycolumnvalue.KeyColumnValueStore;
import org.janusgraph.diskstorage.keycolumnvalue.KeyColumnValueStoreManager;
import org.janusgraph.diskstorage.keycolumnvalue.KeyRange;
import org.janusgraph.diskstorage.keycolumnvalue.StandardStoreFeatures;
import org.janusgraph.diskstorage.keycolumnvalue.StoreFeatures;
import org.janusgraph.diskstorage.keycolumnvalue.StoreTransaction;
import org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration;
import org.janusgraph.util.system.NetworkUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CQLStoreManager
extends DistributedStoreManager
implements KeyColumnValueStoreManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(CQLStoreManager.class);
    static final String CONSISTENCY_LOCAL_QUORUM = "LOCAL_QUORUM";
    static final String CONSISTENCY_QUORUM = "QUORUM";
    private static final int DEFAULT_PORT = 9042;
    private final String keyspace;
    private final int batchSize;
    private final boolean atomicBatch;
    private final boolean allowCompactStorage;
    final ExecutorService executorService;
    @Resource
    private Cluster cluster;
    @Resource
    private Session session;
    private final StoreFeatures storeFeatures;
    private final Map<String, CQLKeyColumnValueStore> openStores;
    private final DistributedStoreManager.Deployment deployment;

    public CQLStoreManager(Configuration configuration) throws BackendException {
        super(configuration, 9042);
        this.keyspace = this.determineKeyspaceName(configuration);
        this.batchSize = configuration.get(CQLConfigOptions.BATCH_STATEMENT_SIZE, new String[0]);
        this.atomicBatch = configuration.get(CQLConfigOptions.ATOMIC_BATCH_MUTATE, new String[0]);
        this.executorService = new ThreadPoolExecutor(10, 100, 1L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), new ThreadFactoryBuilder().setDaemon(true).setNameFormat("CQLStoreManager[%02d]").build());
        this.cluster = this.initializeCluster();
        this.session = this.initializeSession(this.keyspace);
        this.allowCompactStorage = this.initializeCompactStorage();
        ModifiableConfiguration global = GraphDatabaseConfiguration.buildGraphConfiguration().set(CQLConfigOptions.READ_CONSISTENCY, CONSISTENCY_QUORUM, new String[0]).set(CQLConfigOptions.WRITE_CONSISTENCY, CONSISTENCY_QUORUM, new String[0]).set(GraphDatabaseConfiguration.METRICS_PREFIX, "org.janusgraph.sys", new String[0]);
        ModifiableConfiguration local = GraphDatabaseConfiguration.buildGraphConfiguration().set(CQLConfigOptions.READ_CONSISTENCY, CONSISTENCY_LOCAL_QUORUM, new String[0]).set(CQLConfigOptions.WRITE_CONSISTENCY, CONSISTENCY_LOCAL_QUORUM, new String[0]).set(GraphDatabaseConfiguration.METRICS_PREFIX, "org.janusgraph.sys", new String[0]);
        Boolean onlyUseLocalConsistency = configuration.get(CQLConfigOptions.ONLY_USE_LOCAL_CONSISTENCY_FOR_SYSTEM_OPERATIONS, new String[0]);
        Boolean useExternalLocking = configuration.get(CQLConfigOptions.USE_EXTERNAL_LOCKING, new String[0]);
        StandardStoreFeatures.Builder fb = new StandardStoreFeatures.Builder();
        fb.batchMutation(true).distributed(true);
        fb.timestamps(true).cellTTL(true);
        fb.keyConsistent(onlyUseLocalConsistency != false ? local : global, local);
        fb.locking(useExternalLocking);
        fb.optimisticLocking(true);
        fb.multiQuery(false);
        String partitioner = this.cluster.getMetadata().getPartitioner();
        switch (partitioner.substring(partitioner.lastIndexOf(46) + 1)) {
            case "RandomPartitioner": 
            case "Murmur3Partitioner": {
                fb.keyOrdered(false).orderedScan(false).unorderedScan(true);
                this.deployment = DistributedStoreManager.Deployment.REMOTE;
                break;
            }
            case "ByteOrderedPartitioner": {
                fb.keyOrdered(true).orderedScan(true).unorderedScan(false);
                this.deployment = this.hostnames.length == 1 ? (NetworkUtil.isLocalConnection(this.hostnames[0]) ? DistributedStoreManager.Deployment.LOCAL : DistributedStoreManager.Deployment.REMOTE) : DistributedStoreManager.Deployment.REMOTE;
                break;
            }
            default: {
                throw new IllegalArgumentException("Unrecognized partitioner: " + partitioner);
            }
        }
        this.storeFeatures = fb.build();
        this.openStores = new ConcurrentHashMap<String, CQLKeyColumnValueStore>();
    }

    Cluster initializeCluster() throws PermanentBackendException {
        List<InetSocketAddress> contactPoints;
        Configuration configuration = this.getStorageConfig();
        try {
            contactPoints = ((Array)((Array)Array.of(this.hostnames).map(hostName -> hostName.split(":"))).map(array -> Tuple.of(array[0], ((String[])array).length == 2 ? Integer.parseInt(array[1]) : this.port))).map(tuple -> new InetSocketAddress((String)tuple._1, (int)((Integer)tuple._2))).toJavaList();
        }
        catch (ArrayIndexOutOfBoundsException | NumberFormatException | SecurityException e) {
            throw new PermanentBackendException("Error initialising cluster contact points", e);
        }
        Cluster.Builder builder = Cluster.builder().addContactPointsWithPorts(contactPoints).withClusterName(configuration.get(CQLConfigOptions.CLUSTER_NAME, new String[0]));
        if (configuration.get(CQLConfigOptions.PROTOCOL_VERSION, new String[0]) != 0) {
            builder.withProtocolVersion(ProtocolVersion.fromInt(configuration.get(CQLConfigOptions.PROTOCOL_VERSION, new String[0])));
        }
        if (configuration.has(GraphDatabaseConfiguration.AUTH_USERNAME, new String[0]) && configuration.has(GraphDatabaseConfiguration.AUTH_PASSWORD, new String[0])) {
            builder.withCredentials(configuration.get(GraphDatabaseConfiguration.AUTH_USERNAME, new String[0]), configuration.get(GraphDatabaseConfiguration.AUTH_PASSWORD, new String[0]));
        }
        if (configuration.has(CQLConfigOptions.LOCAL_DATACENTER, new String[0])) {
            builder.withLoadBalancingPolicy(new TokenAwarePolicy(DCAwareRoundRobinPolicy.builder().withLocalDc(configuration.get(CQLConfigOptions.LOCAL_DATACENTER, new String[0])).build()));
        }
        if (configuration.get(CQLConfigOptions.SSL_ENABLED, new String[0]).booleanValue()) {
            try {
                TrustManager[] trustManagers;
                try (FileInputStream keyStoreStream = new FileInputStream(configuration.get(CQLConfigOptions.SSL_TRUSTSTORE_LOCATION, new String[0]));){
                    KeyStore keystore = KeyStore.getInstance("jks");
                    keystore.load(keyStoreStream, configuration.get(CQLConfigOptions.SSL_TRUSTSTORE_PASSWORD, new String[0]).toCharArray());
                    TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
                    trustManagerFactory.init(keystore);
                    trustManagers = trustManagerFactory.getTrustManagers();
                }
                SSLContext sslContext = SSLContext.getInstance("TLS");
                sslContext.init(null, trustManagers, null);
                JdkSSLOptions sslOptions = JdkSSLOptions.builder().withSSLContext(sslContext).build();
                builder.withSSL(sslOptions);
            }
            catch (IOException | KeyManagementException | KeyStoreException | NoSuchAlgorithmException | CertificateException e) {
                throw new PermanentBackendException("Error initialising SSL connection properties", e);
            }
        }
        PoolingOptions poolingOptions = new PoolingOptions();
        poolingOptions.setMaxRequestsPerConnection(HostDistance.LOCAL, configuration.get(CQLConfigOptions.LOCAL_MAX_REQUESTS_PER_CONNECTION, new String[0])).setMaxRequestsPerConnection(HostDistance.REMOTE, configuration.get(CQLConfigOptions.REMOTE_MAX_REQUESTS_PER_CONNECTION, new String[0]));
        poolingOptions.setConnectionsPerHost(HostDistance.LOCAL, configuration.get(CQLConfigOptions.LOCAL_CORE_CONNECTIONS_PER_HOST, new String[0]), configuration.get(CQLConfigOptions.LOCAL_MAX_CONNECTIONS_PER_HOST, new String[0])).setConnectionsPerHost(HostDistance.REMOTE, configuration.get(CQLConfigOptions.REMOTE_CORE_CONNECTIONS_PER_HOST, new String[0]), configuration.get(CQLConfigOptions.REMOTE_MAX_CONNECTIONS_PER_HOST, new String[0]));
        return builder.withPoolingOptions(poolingOptions).build();
    }

    Session initializeSession(String keyspaceName) {
        Session s = this.cluster.connect();
        if (this.cluster.getMetadata().getKeyspace(keyspaceName) != null) {
            return s;
        }
        Configuration configuration = this.getStorageConfig();
        Map replication = ((HashMap)API.Match(configuration.get(CQLConfigOptions.REPLICATION_STRATEGY, new String[0])).of(API.Case(API.$("SimpleStrategy"), strategy -> HashMap.of("class", strategy, "replication_factor", configuration.get(CQLConfigOptions.REPLICATION_FACTOR, new String[0]))), API.Case(API.$("NetworkTopologyStrategy"), strategy -> HashMap.of("class", strategy).merge(Array.of((Object[])configuration.get(CQLConfigOptions.REPLICATION_OPTIONS, new String[0])).grouped(2).toMap(array -> Tuple.of(array.get(0), Integer.parseInt((String)array.get(1)))))))).toJavaMap();
        s.execute(SchemaBuilder.createKeyspace(keyspaceName).ifNotExists().with().replication(replication));
        return s;
    }

    boolean initializeCompactStorage() throws PermanentBackendException {
        try {
            ResultSet versionResultSet = this.session.execute(QueryBuilder.select().column("release_version").from("system", "local"));
            String version = versionResultSet.one().getString(0);
            int major = Integer.parseInt(version.substring(0, version.indexOf(".")));
            return major < 3;
        }
        catch (NoHostAvailableException | QueryExecutionException | QueryValidationException | NumberFormatException e) {
            throw new PermanentBackendException("Error determining Cassandra version", e);
        }
    }

    boolean isCompactStorageAllowed() {
        return this.allowCompactStorage;
    }

    ExecutorService getExecutorService() {
        return this.executorService;
    }

    Session getSession() {
        return this.session;
    }

    String getKeyspaceName() {
        return this.keyspace;
    }

    Map<String, String> getCompressionOptions(String name) throws BackendException {
        KeyspaceMetadata keyspaceMetadata = Option.of(this.cluster.getMetadata().getKeyspace(this.keyspace)).getOrElseThrow(() -> new PermanentBackendException(String.format("Unknown keyspace '%s'", this.keyspace)));
        return (Map)Option.of(keyspaceMetadata.getTable(name)).map(tableMetadata -> tableMetadata.getOptions().getCompression()).getOrElseThrow(() -> new PermanentBackendException(String.format("Unknown table '%s'", name)));
    }

    TableMetadata getTableMetadata(String name) throws BackendException {
        KeyspaceMetadata keyspaceMetadata = Option.of(this.cluster.getMetadata().getKeyspace(this.keyspace)).getOrElseThrow(() -> new PermanentBackendException(String.format("Unknown keyspace '%s'", this.keyspace)));
        return Option.of(keyspaceMetadata.getTable(name)).getOrElseThrow(() -> new PermanentBackendException(String.format("Unknown table '%s'", name)));
    }

    @Override
    public void close() throws BackendException {
        try {
            this.session.close();
        }
        finally {
            try {
                this.cluster.close();
            }
            finally {
                this.executorService.shutdownNow();
            }
        }
    }

    @Override
    public String getName() {
        return String.format("%s.%s", this.getClass().getSimpleName(), this.keyspace);
    }

    @Override
    public DistributedStoreManager.Deployment getDeployment() {
        return this.deployment;
    }

    @Override
    public StoreFeatures getFeatures() {
        return this.storeFeatures;
    }

    @Override
    public KeyColumnValueStore openDatabase(String name, StoreMetaData.Container metaData) throws BackendException {
        Supplier<Boolean> initializeTable = () -> Optional.ofNullable(this.cluster.getMetadata().getKeyspace(this.keyspace)).map(k -> k.getTable(name) == null).orElse(true);
        return this.openStores.computeIfAbsent(name, n -> new CQLKeyColumnValueStore(this, (String)n, this.getStorageConfig(), () -> this.openStores.remove(n), this.allowCompactStorage, initializeTable));
    }

    @Override
    public StoreTransaction beginTransaction(BaseTransactionConfig config) throws BackendException {
        return new CQLTransaction(config);
    }

    @Override
    public void clearStorage() throws BackendException {
        if (this.storageConfig.get(GraphDatabaseConfiguration.DROP_ON_CLEAR, new String[0]).booleanValue()) {
            this.session.execute(SchemaBuilder.dropKeyspace(this.keyspace));
        } else if (this.exists()) {
            Future result = Future.sequence(Iterator.ofAll(this.cluster.getMetadata().getKeyspace(this.keyspace).getTables()).map(table -> Future.fromJavaFuture(this.session.executeAsync(QueryBuilder.truncate(this.keyspace, table.getName())))));
            result.await();
        } else {
            LOGGER.info("Keyspace {} does not exist in the cluster", (Object)this.keyspace);
        }
    }

    @Override
    public boolean exists() throws BackendException {
        return this.cluster.getMetadata().getKeyspace(this.keyspace) != null;
    }

    @Override
    public List<KeyRange> getLocalKeyPartition() throws BackendException {
        throw new UnsupportedOperationException();
    }

    @Override
    public void mutateMany(Map<String, Map<StaticBuffer, KCVMutation>> mutations, StoreTransaction txh) throws BackendException {
        if (this.atomicBatch) {
            this.mutateManyLogged(mutations, txh);
        } else {
            this.mutateManyUnlogged(mutations, txh);
        }
    }

    private void mutateManyLogged(Map<String, Map<StaticBuffer, KCVMutation>> mutations, StoreTransaction txh) throws BackendException {
        DistributedStoreManager.MaskedTimestamp commitTime = new DistributedStoreManager.MaskedTimestamp(txh);
        BatchStatement batchStatement = new BatchStatement(BatchStatement.Type.LOGGED);
        batchStatement.setConsistencyLevel(CQLTransaction.getTransaction(txh).getWriteConsistencyLevel());
        batchStatement.addAll(Iterator.ofAll(mutations.entrySet()).flatMap(tableNameAndMutations -> {
            String tableName = (String)tableNameAndMutations.getKey();
            Map tableMutations = (Map)tableNameAndMutations.getValue();
            CQLKeyColumnValueStore columnValueStore = Option.of(this.openStores.get(tableName)).getOrElseThrow(() -> new IllegalStateException("Store cannot be found: " + tableName));
            return Iterator.ofAll(tableMutations.entrySet()).flatMap(keyAndMutations -> {
                StaticBuffer key = (StaticBuffer)keyAndMutations.getKey();
                KCVMutation keyMutations = (KCVMutation)keyAndMutations.getValue();
                Traversable deletions = Iterator.of(Long.valueOf(commitTime.getDeletionTime(this.times))).flatMap(deleteTime -> Iterator.ofAll(keyMutations.getDeletions()).map(deletion -> columnValueStore.deleteColumn(key, (StaticBuffer)deletion, (long)deleteTime)));
                Traversable additions = Iterator.of(Long.valueOf(commitTime.getAdditionTime(this.times))).flatMap(addTime -> Iterator.ofAll(keyMutations.getAdditions()).map(addition -> columnValueStore.insertColumn(key, (Entry)addition, (long)addTime)));
                return Iterator.concat(deletions, additions);
            });
        }));
        Future<ResultSet> result = Future.fromJavaFuture(this.executorService, this.session.executeAsync(batchStatement));
        result.await();
        if (result.isFailure()) {
            throw CQLKeyColumnValueStore.EXCEPTION_MAPPER.apply(result.getCause().get());
        }
        this.sleepAfterWrite(txh, commitTime);
    }

    private void mutateManyUnlogged(Map<String, Map<StaticBuffer, KCVMutation>> mutations, StoreTransaction txh) throws BackendException {
        DistributedStoreManager.MaskedTimestamp commitTime = new DistributedStoreManager.MaskedTimestamp(txh);
        Future result = Future.sequence(this.executorService, Iterator.ofAll(mutations.entrySet()).flatMap(tableNameAndMutations -> {
            String tableName = (String)tableNameAndMutations.getKey();
            Map tableMutations = (Map)tableNameAndMutations.getValue();
            CQLKeyColumnValueStore columnValueStore = Option.of(this.openStores.get(tableName)).getOrElseThrow(() -> new IllegalStateException("Store cannot be found: " + tableName));
            return Iterator.ofAll(tableMutations.entrySet()).flatMap(keyAndMutations -> {
                StaticBuffer key = (StaticBuffer)keyAndMutations.getKey();
                KCVMutation keyMutations = (KCVMutation)keyAndMutations.getValue();
                Traversable deletions = Iterator.of(Long.valueOf(commitTime.getDeletionTime(this.times))).flatMap(deleteTime -> Iterator.ofAll(keyMutations.getDeletions()).map(deletion -> columnValueStore.deleteColumn(key, (StaticBuffer)deletion, (long)deleteTime)));
                Traversable additions = Iterator.of(Long.valueOf(commitTime.getAdditionTime(this.times))).flatMap(addTime -> Iterator.ofAll(keyMutations.getAdditions()).map(addition -> columnValueStore.insertColumn(key, (Entry)addition, (long)addTime)));
                return Iterator.concat(deletions, additions).grouped(this.batchSize).map(group -> Future.fromJavaFuture(this.executorService, this.session.executeAsync(new BatchStatement(BatchStatement.Type.UNLOGGED).addAll((Iterable<? extends Statement>)group).setConsistencyLevel(CQLTransaction.getTransaction(txh).getWriteConsistencyLevel()))));
            });
        }));
        result.await();
        if (result.isFailure()) {
            throw CQLKeyColumnValueStore.EXCEPTION_MAPPER.apply(result.getCause().get());
        }
        this.sleepAfterWrite(txh, commitTime);
    }

    private String determineKeyspaceName(Configuration config) {
        if (!config.has(CQLConfigOptions.KEYSPACE, new String[0]) && config.has(GraphDatabaseConfiguration.GRAPH_NAME, new String[0])) {
            return config.get(GraphDatabaseConfiguration.GRAPH_NAME, new String[0]);
        }
        return config.get(CQLConfigOptions.KEYSPACE, new String[0]);
    }
}

