/*
 * Decompiled with CFR 0.152.
 */
package org.janusgraph.diskstorage.keycolumnvalue.scan;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractFuture;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.janusgraph.core.schema.JanusGraphManagement;
import org.janusgraph.diskstorage.BackendException;
import org.janusgraph.diskstorage.Entry;
import org.janusgraph.diskstorage.EntryList;
import org.janusgraph.diskstorage.StaticBuffer;
import org.janusgraph.diskstorage.TemporaryBackendException;
import org.janusgraph.diskstorage.configuration.Configuration;
import org.janusgraph.diskstorage.keycolumnvalue.KCVSUtil;
import org.janusgraph.diskstorage.keycolumnvalue.KeyColumnValueStore;
import org.janusgraph.diskstorage.keycolumnvalue.KeyIterator;
import org.janusgraph.diskstorage.keycolumnvalue.SliceQuery;
import org.janusgraph.diskstorage.keycolumnvalue.StoreFeatures;
import org.janusgraph.diskstorage.keycolumnvalue.StoreTransaction;
import org.janusgraph.diskstorage.keycolumnvalue.scan.ScanJob;
import org.janusgraph.diskstorage.keycolumnvalue.scan.ScanMetrics;
import org.janusgraph.diskstorage.keycolumnvalue.scan.StandardScanMetrics;
import org.janusgraph.diskstorage.util.BufferUtil;
import org.janusgraph.diskstorage.util.RecordIterator;
import org.janusgraph.diskstorage.util.StaticArrayEntry;
import org.janusgraph.diskstorage.util.StaticArrayEntryList;
import org.janusgraph.util.system.Threads;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class StandardScannerExecutor
extends AbstractFuture<ScanMetrics>
implements JanusGraphManagement.IndexJobFuture,
Runnable {
    private static final Logger log = LoggerFactory.getLogger(StandardScannerExecutor.class);
    private static final int QUEUE_SIZE = 1000;
    private static final int TIMEOUT_MS = 180000;
    private static final int TIME_PER_TRY = 10;
    private static final int MAX_KEY_LENGTH = 128;
    private final ScanJob job;
    private final Consumer<ScanMetrics> finishJob;
    private final StoreFeatures storeFeatures;
    private final StoreTransaction storeTx;
    private final KeyColumnValueStore store;
    private final int numProcessors;
    private final int workBlockSize;
    private final Configuration jobConfiguration;
    private final Configuration graphConfiguration;
    private final ScanMetrics metrics;
    private boolean hasCompleted = false;
    private boolean interrupted = false;
    private List<BlockingQueue<SliceResult>> dataQueues;
    private DataPuller[] pullThreads;

    StandardScannerExecutor(ScanJob job, Consumer<ScanMetrics> finishJob, KeyColumnValueStore store, StoreTransaction storeTx, StoreFeatures storeFeatures, int numProcessors, int workBlockSize, Configuration jobConfiguration, Configuration graphConfiguration) throws BackendException {
        this.job = job;
        this.finishJob = finishJob;
        this.store = store;
        this.storeTx = storeTx;
        this.storeFeatures = storeFeatures;
        this.numProcessors = numProcessors;
        this.workBlockSize = workBlockSize;
        this.jobConfiguration = jobConfiguration;
        this.graphConfiguration = graphConfiguration;
        this.metrics = new StandardScanMetrics();
    }

    private DataPuller addDataPuller(SliceQuery sq, StoreTransaction stx) throws BackendException {
        LinkedBlockingQueue queue = new LinkedBlockingQueue(1000);
        this.dataQueues.add(queue);
        DataPuller dp = new DataPuller(sq, queue, KCVSUtil.getKeys(this.store, sq, this.storeFeatures, 128, stx), this.job.getKeyFilter());
        dp.start();
        return dp;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        int numQueries;
        List<SliceQuery> queries;
        try {
            this.job.workerIterationStart(this.jobConfiguration, this.graphConfiguration, this.metrics);
            queries = this.job.getQueries();
            numQueries = queries.size();
            Preconditions.checkArgument(numQueries > 0, "Must at least specify one query for job: %s", new Object[]{this.job});
            if (numQueries > 1) {
                SliceQuery ground = queries.get(0);
                StaticBuffer start = ground.getSliceStart();
                Preconditions.checkArgument(start.equals(BufferUtil.zeroBuffer(1)), "Expected start of first query to be a single 0s: %s", new Object[]{start});
                StaticBuffer end = ground.getSliceEnd();
                Preconditions.checkArgument(end.equals(BufferUtil.oneBuffer(end.length())), "Expected end of first query to be all 1s: %s", new Object[]{end});
            }
            this.dataQueues = new ArrayList<BlockingQueue<SliceResult>>(numQueries);
            this.pullThreads = new DataPuller[numQueries];
            for (int pos = 0; pos < numQueries; ++pos) {
                this.pullThreads[pos] = this.addDataPuller(queries.get(pos), this.storeTx);
            }
        }
        catch (Throwable e) {
            log.error("Exception trying to setup the job:", e);
            this.cleanupSilent();
            this.job.workerIterationEnd(this.metrics);
            this.setException(e);
            return;
        }
        LinkedBlockingQueue<Row> processorQueue = new LinkedBlockingQueue<Row>(1000);
        Thread[] processors = new Processor[this.numProcessors];
        for (int i = 0; i < processors.length; ++i) {
            processors[i] = new Processor(this.job.clone(), processorQueue);
            processors[i].start();
        }
        try {
            SliceResult[] currentResults = new SliceResult[numQueries];
            while (!this.interrupted) {
                for (int i = 0; i < numQueries; ++i) {
                    if (currentResults[i] != null) continue;
                    BlockingQueue<SliceResult> queue = this.dataQueues.get(i);
                    SliceResult qr = queue.poll(10L, TimeUnit.MILLISECONDS);
                    if (qr == null) {
                        if (this.pullThreads[i].isFinished()) continue;
                        for (int retryCount = 0; !this.pullThreads[i].isFinished() && retryCount < 18000 && qr == null; ++retryCount) {
                            qr = queue.poll(10L, TimeUnit.MILLISECONDS);
                        }
                        if (qr == null && !this.pullThreads[i].isFinished()) {
                            throw new TemporaryBackendException("Timed out waiting for next row data - storage error likely");
                        }
                    }
                    currentResults[i] = qr;
                }
                SliceResult conditionQuery = currentResults[0];
                if (conditionQuery == null) break;
                StaticBuffer key = conditionQuery.key;
                HashMap<SliceQuery, EntryList.EmptyList> queryResults = new HashMap<SliceQuery, EntryList.EmptyList>(numQueries);
                for (int i = 0; i < currentResults.length; ++i) {
                    SliceQuery query = queries.get(i);
                    EntryList entries = EntryList.EMPTY_LIST;
                    if (currentResults[i] != null && currentResults[i].key.equals(key)) {
                        assert (query.equals(currentResults[i].query));
                        entries = currentResults[i].entries;
                        currentResults[i] = null;
                    }
                    queryResults.put(query, (EntryList.EmptyList)entries);
                }
                processorQueue.put(new Row(key, queryResults));
            }
            for (int i = 0; i < this.pullThreads.length; ++i) {
                this.pullThreads[i].join(10L);
                if (!this.pullThreads[i].isAlive()) continue;
                log.warn("Data pulling thread [{}] did not terminate. Forcing termination", (Object)i);
                this.pullThreads[i].interrupt();
            }
            for (Thread processor : processors) {
                ((Processor)processor).finish();
            }
            if (!Threads.waitForCompletion(processors, 180000)) {
                log.error("Processor did not terminate in time");
            }
            this.cleanup();
            try {
                this.job.workerIterationEnd(this.metrics);
            }
            catch (IllegalArgumentException e) {
                log.warn("Exception occurred processing worker iteration end. See PR 891.", e);
            }
            if (this.interrupted) {
                this.setException(new InterruptedException("Scanner got interrupted"));
            } else {
                this.finishJob.accept(this.metrics);
                this.set(this.metrics);
            }
        }
        catch (Throwable e) {
            log.error("Exception occurred during job execution: {}", e);
            this.job.workerIterationEnd(this.metrics);
            this.setException(e);
        }
        finally {
            Threads.terminate(processors);
            this.cleanupSilent();
        }
    }

    @Override
    protected void interruptTask() {
        this.interrupted = true;
    }

    private void cleanup() throws BackendException {
        if (!this.hasCompleted) {
            this.hasCompleted = true;
            if (this.pullThreads != null) {
                for (DataPuller pullThread : this.pullThreads) {
                    if (!pullThread.isAlive()) continue;
                    pullThread.interrupt();
                }
            }
            this.storeTx.rollback();
        }
    }

    private void cleanupSilent() {
        try {
            this.cleanup();
        }
        catch (BackendException ex) {
            log.error("Encountered exception when trying to clean up after failure", ex);
        }
    }

    @Override
    public ScanMetrics getIntermediateResult() {
        return this.metrics;
    }

    private static class SliceResult {
        final SliceQuery query;
        final StaticBuffer key;
        final EntryList entries;

        private SliceResult(SliceQuery query, StaticBuffer key, EntryList entries) {
            this.query = query;
            this.key = key;
            this.entries = entries;
        }
    }

    private static class DataPuller
    extends Thread {
        private final BlockingQueue<SliceResult> queue;
        private final KeyIterator keyIterator;
        private final SliceQuery query;
        private final Predicate<StaticBuffer> keyFilter;
        private volatile boolean finished;

        private DataPuller(SliceQuery query, BlockingQueue<SliceResult> queue, KeyIterator keyIterator, Predicate<StaticBuffer> keyFilter) {
            this.query = query;
            this.queue = queue;
            this.keyIterator = keyIterator;
            this.keyFilter = keyFilter;
            this.finished = false;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                while (this.keyIterator.hasNext()) {
                    StaticBuffer key = (StaticBuffer)this.keyIterator.next();
                    RecordIterator<Entry> entries = this.keyIterator.getEntries();
                    if (!this.keyFilter.test(key)) continue;
                    EntryList entryList = StaticArrayEntryList.ofStaticBuffer(entries, StaticArrayEntry.ENTRY_GETTER);
                    this.queue.put(new SliceResult(this.query, key, entryList));
                }
                this.finished = true;
            }
            catch (InterruptedException e) {
                log.error("Data-pulling thread interrupted while waiting on queue or data", e);
            }
            catch (Throwable e) {
                log.error("Could not load data from storage: {}", e);
            }
            finally {
                try {
                    this.keyIterator.close();
                }
                catch (IOException e) {
                    log.warn("Could not close storage iterator ", e);
                }
            }
        }

        public boolean isFinished() {
            return this.finished;
        }
    }

    private class Processor
    extends Thread {
        private ScanJob job;
        private final BlockingQueue<Row> processorQueue;
        private volatile boolean finished;
        private int numProcessed;

        private Processor(ScanJob job, BlockingQueue<Row> processorQueue) {
            this.job = job;
            this.processorQueue = processorQueue;
            this.finished = false;
            this.numProcessed = 0;
        }

        @Override
        public void run() {
            try {
                this.job.workerIterationStart(StandardScannerExecutor.this.jobConfiguration, StandardScannerExecutor.this.graphConfiguration, StandardScannerExecutor.this.metrics);
                while (!this.finished || !this.processorQueue.isEmpty()) {
                    Row row;
                    while ((row = this.processorQueue.poll(100L, TimeUnit.MILLISECONDS)) != null) {
                        if (this.numProcessed >= StandardScannerExecutor.this.workBlockSize) {
                            this.job.workerIterationEnd(StandardScannerExecutor.this.metrics);
                            this.job = this.job.clone();
                            this.job.workerIterationStart(StandardScannerExecutor.this.jobConfiguration, StandardScannerExecutor.this.graphConfiguration, StandardScannerExecutor.this.metrics);
                            this.numProcessed = 0;
                        }
                        try {
                            this.job.process(row.key, row.entries, StandardScannerExecutor.this.metrics);
                            StandardScannerExecutor.this.metrics.increment(ScanMetrics.Metric.SUCCESS);
                        }
                        catch (Throwable ex) {
                            log.error("Exception processing row [" + row.key + "]: ", ex);
                            StandardScannerExecutor.this.metrics.increment(ScanMetrics.Metric.FAILURE);
                        }
                        ++this.numProcessed;
                    }
                }
            }
            catch (InterruptedException e) {
                log.error("Processing thread interrupted while waiting on queue or processing data", e);
            }
            catch (Throwable e) {
                log.error("Unexpected error processing data: {}", e);
            }
            finally {
                this.job.workerIterationEnd(StandardScannerExecutor.this.metrics);
            }
        }

        public void finish() {
            this.finished = true;
        }
    }

    private static class Row {
        final StaticBuffer key;
        final Map<SliceQuery, EntryList> entries;

        private Row(StaticBuffer key, Map<SliceQuery, EntryList> entries) {
            this.key = key;
            this.entries = entries;
        }
    }
}

