/*
 * Decompiled with CFR 0.152.
 */
package org.janusgraph.diskstorage.keycolumnvalue.scan;

import com.google.common.base.Preconditions;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.apache.commons.lang.StringUtils;
import org.janusgraph.core.schema.JanusGraphManagement;
import org.janusgraph.diskstorage.BackendException;
import org.janusgraph.diskstorage.configuration.Configuration;
import org.janusgraph.diskstorage.configuration.MergedConfiguration;
import org.janusgraph.diskstorage.keycolumnvalue.KeyColumnValueStore;
import org.janusgraph.diskstorage.keycolumnvalue.KeyColumnValueStoreManager;
import org.janusgraph.diskstorage.keycolumnvalue.StoreTransaction;
import org.janusgraph.diskstorage.keycolumnvalue.scan.ScanJob;
import org.janusgraph.diskstorage.keycolumnvalue.scan.ScanMetrics;
import org.janusgraph.diskstorage.keycolumnvalue.scan.StandardScannerExecutor;
import org.janusgraph.diskstorage.util.StandardBaseTransactionConfig;
import org.janusgraph.diskstorage.util.time.TimestampProvider;

public class StandardScanner {
    private final KeyColumnValueStoreManager manager;
    private final Set<KeyColumnValueStore> openStores;
    private final ConcurrentMap<Object, StandardScannerExecutor> runningJobs;
    private final AtomicLong jobCounter;

    public StandardScanner(KeyColumnValueStoreManager manager) {
        Preconditions.checkNotNull(manager);
        Preconditions.checkArgument(manager.getFeatures().hasScan(), "Provided data store does not support scans: %s", new Object[]{manager});
        this.manager = manager;
        this.openStores = new HashSet<KeyColumnValueStore>(4);
        this.runningJobs = new ConcurrentHashMap<Object, StandardScannerExecutor>();
        this.jobCounter = new AtomicLong(0L);
    }

    public Builder build() {
        return new Builder();
    }

    public void close() throws BackendException {
        for (StandardScannerExecutor exe : this.runningJobs.values()) {
            if (exe.isCancelled() || exe.isDone()) continue;
            exe.cancel(true);
        }
        for (KeyColumnValueStore kcvs : this.openStores) {
            kcvs.close();
        }
    }

    private void addJob(Object jobId, StandardScannerExecutor executor) {
        for (Map.Entry jobs : this.runningJobs.entrySet()) {
            StandardScannerExecutor exe = (StandardScannerExecutor)jobs.getValue();
            if (!exe.isDone() && !exe.isCancelled()) continue;
            this.runningJobs.remove(jobs.getKey(), exe);
        }
        Preconditions.checkArgument(this.runningJobs.putIfAbsent(jobId, executor) == null, "Another job with the same id is already running: %s", new Object[]{jobId});
    }

    public JanusGraphManagement.IndexJobFuture getRunningJob(Object jobId) {
        return (JanusGraphManagement.IndexJobFuture)this.runningJobs.get(jobId);
    }

    public class Builder {
        private static final int DEFAULT_WORKBLOCK_SIZE = 10000;
        private ScanJob job = null;
        private int numProcessingThreads = 1;
        private int workBlockSize = 10000;
        private TimestampProvider times = null;
        private Configuration graphConfiguration = Configuration.EMPTY;
        private Configuration jobConfiguration = Configuration.EMPTY;
        private String dbName = null;
        private Consumer<ScanMetrics> finishJob;
        private Object jobId;

        private Builder() {
            this.jobId = StandardScanner.this.jobCounter.incrementAndGet();
            this.finishJob = m -> {};
        }

        public Builder setNumProcessingThreads(int numThreads) {
            Preconditions.checkArgument(numThreads > 0, "Need to specify a positive number of processing threads: %s", new Object[]{numThreads});
            this.numProcessingThreads = numThreads;
            return this;
        }

        public Builder setWorkBlockSize(int size) {
            Preconditions.checkArgument(size > 0, "Need to specify a positive work block size: %s", new Object[]{size});
            this.workBlockSize = size;
            return this;
        }

        public Builder setTimestampProvider(TimestampProvider times) {
            this.times = Preconditions.checkNotNull(times);
            return this;
        }

        public Builder setStoreName(String name) {
            Preconditions.checkArgument(StringUtils.isNotBlank(name), "Invalid name: %s", new Object[]{name});
            this.dbName = name;
            return this;
        }

        public Object getJobId() {
            return this.jobId;
        }

        public Builder setJobId(Object id) {
            this.jobId = Preconditions.checkNotNull(id, "Need to provide a valid id: %s", new Object[]{id});
            return this;
        }

        public Builder setJob(ScanJob job) {
            this.job = Preconditions.checkNotNull(job);
            return this;
        }

        public Builder setGraphConfiguration(Configuration config) {
            this.graphConfiguration = Preconditions.checkNotNull(config);
            return this;
        }

        public Builder setJobConfiguration(Configuration config) {
            this.jobConfiguration = Preconditions.checkNotNull(config);
            return this;
        }

        public Configuration getJobConfiguration() {
            return this.jobConfiguration;
        }

        public Builder setFinishJob(Consumer<ScanMetrics> finishJob) {
            this.finishJob = Preconditions.checkNotNull(finishJob);
            return this;
        }

        public JanusGraphManagement.IndexJobFuture execute() throws BackendException {
            Preconditions.checkNotNull(this.job, "Need to specify a job to execute");
            Preconditions.checkArgument(StringUtils.isNotBlank(this.dbName), "Need to specify a database to execute against");
            Preconditions.checkNotNull(this.times, "Need to configure the timestamp provider for this job");
            StandardBaseTransactionConfig.Builder txBuilder = new StandardBaseTransactionConfig.Builder();
            txBuilder.timestampProvider(this.times);
            Configuration scanConfig = StandardScanner.this.manager.getFeatures().getScanTxConfig();
            if (Configuration.EMPTY != this.graphConfiguration) {
                Configuration configuration = scanConfig = null == scanConfig ? this.graphConfiguration : new MergedConfiguration(this.graphConfiguration, scanConfig);
            }
            if (null != scanConfig) {
                txBuilder.customOptions(scanConfig);
            }
            StoreTransaction storeTx = StandardScanner.this.manager.beginTransaction(txBuilder.build());
            KeyColumnValueStore kcvs = StandardScanner.this.manager.openDatabase(this.dbName);
            StandardScanner.this.openStores.add(kcvs);
            try {
                StandardScannerExecutor executor = new StandardScannerExecutor(this.job, this.finishJob, kcvs, storeTx, StandardScanner.this.manager.getFeatures(), this.numProcessingThreads, this.workBlockSize, this.jobConfiguration, this.graphConfiguration);
                StandardScanner.this.addJob(this.jobId, executor);
                new Thread(executor).start();
                return executor;
            }
            catch (Throwable e) {
                storeTx.rollback();
                throw e;
            }
        }
    }
}

