/*
 * Decompiled with CFR 0.152.
 */
package org.calrissian.accumulorecipes.commons.iterators;

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.OptionDescriber;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.log4j.Logger;

public class ReadAheadIterator
implements SortedKeyValueIterator<Key, Value>,
OptionDescriber {
    public static final String QUEUE_SIZE = "queue.size";
    public static final String TIMEOUT = "timeout";
    private static final QueueElement noMoreDataElement = new QueueElement();
    private static Logger log = Logger.getLogger(ReadAheadIterator.class);
    private int queueSize = 5;
    private int timeout = 60;
    private SortedKeyValueIterator<Key, Value> source;
    private ArrayBlockingQueue<QueueElement> queue = null;
    private QueueElement currentElement = new QueueElement();
    private ProducerThread thread = null;
    private Thread t = null;

    protected ReadAheadIterator(ReadAheadIterator other, IteratorEnvironment env) {
        this.source = other.source.deepCopy(env);
    }

    public ReadAheadIterator() {
    }

    public SortedKeyValueIterator<Key, Value> deepCopy(IteratorEnvironment env) {
        return new ReadAheadIterator(this, env);
    }

    public Key getTopKey() {
        return this.currentElement.getKey();
    }

    public Value getTopValue() {
        return this.currentElement.getValue();
    }

    public boolean hasTop() {
        if (this.currentElement == noMoreDataElement) {
            return false;
        }
        return this.currentElement != null || this.queue.size() > 0 || this.source.hasTop();
    }

    public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options, IteratorEnvironment env) throws IOException {
        this.validateOptions(options);
        this.source = source;
        this.queue = new ArrayBlockingQueue(this.queueSize);
        this.thread = new ProducerThread(this.source);
        this.t = new Thread((Runnable)this.thread, "ReadAheadIterator-SourceThread");
        this.t.start();
    }

    public void next() throws IOException {
        while (this.t.getState().equals((Object)Thread.State.NEW)) {
            try {
                Thread.sleep(1L);
            }
            catch (InterruptedException e) {}
        }
        if (this.t.getState().equals((Object)Thread.State.TERMINATED) && this.thread.hasError()) {
            throw new IOException("Background thread has died", this.thread.getError());
        }
        try {
            if (this.thread.hasError()) {
                throw new IOException("background thread has error", this.thread.getError());
            }
            QueueElement nextElement = null;
            while (null == nextElement) {
                try {
                    nextElement = this.queue.poll(1L, TimeUnit.SECONDS);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
                if (null != nextElement || !this.thread.hasError()) continue;
                throw new IOException("background thread has error", this.thread.getError());
            }
            this.currentElement = nextElement;
        }
        catch (IOException e) {
            throw new IOException("Error getting element from source iterator", e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
        if (this.t.isAlive()) {
            if (this.thread.hasError()) {
                throw new IOException("background thread has error", this.thread.getError());
            }
            try {
                this.thread.lock();
                this.queue.clear();
                this.currentElement = null;
                this.source.seek(range, columnFamilies, inclusive);
            }
            finally {
                this.thread.unlock();
            }
        } else {
            throw new IOException("source iterator thread has died.");
        }
        this.next();
    }

    public OptionDescriber.IteratorOptions describeOptions() {
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(QUEUE_SIZE, "read ahead queue size");
        options.put(TIMEOUT, "timeout in seconds before background thread thinks that the client has aborted");
        return new OptionDescriber.IteratorOptions(this.getClass().getSimpleName(), "Iterator that puts the source in another thread", options, null);
    }

    public boolean validateOptions(Map<String, String> options) {
        if (options.containsKey(QUEUE_SIZE)) {
            this.queueSize = Integer.parseInt(options.get(QUEUE_SIZE));
        }
        if (options.containsKey(TIMEOUT)) {
            this.timeout = Integer.parseInt(options.get(TIMEOUT));
        }
        return true;
    }

    static /* synthetic */ int access$000(ReadAheadIterator x0) {
        return x0.timeout;
    }

    class ProducerThread
    extends ReentrantLock
    implements Runnable {
        private static final long serialVersionUID = 1L;
        private Exception e = null;
        private SortedKeyValueIterator<Key, Value> sourceIter = null;
        private int waitTime = ReadAheadIterator.access$000(ReadAheadIterator.this);

        public ProducerThread(SortedKeyValueIterator<Key, Value> source) {
            this.sourceIter = source;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            boolean hasMoreData = true;
            while (hasMoreData || ReadAheadIterator.this.queue.size() > 0) {
                block14: {
                    block13: {
                        try {
                            this.lock();
                            hasMoreData = this.sourceIter.hasTop();
                            if (hasMoreData) break block13;
                        }
                        catch (Throwable throwable) {
                            this.unlock();
                            throw throwable;
                        }
                        this.unlock();
                        continue;
                    }
                    QueueElement e = new QueueElement((Key)this.sourceIter.getTopKey(), (Value)this.sourceIter.getTopValue());
                    boolean inserted = false;
                    try {
                        inserted = ReadAheadIterator.this.queue.offer(e, this.waitTime, TimeUnit.SECONDS);
                    }
                    catch (InterruptedException ie) {
                        this.e = ie;
                        this.unlock();
                        break;
                    }
                    if (inserted) break block14;
                    this.e = new TimeoutException("Background thread has exceeded wait time of " + this.waitTime + " seconds, aborting...");
                    this.unlock();
                    break;
                }
                try {
                    this.sourceIter.next();
                }
                catch (Exception e) {
                    this.e = e;
                    log.error((Object)"Error calling next on source iterator", (Throwable)e);
                    this.unlock();
                    break;
                }
                this.unlock();
            }
            if (!this.hasError()) {
                try {
                    ReadAheadIterator.this.queue.put(noMoreDataElement);
                }
                catch (InterruptedException e) {
                    this.e = e;
                    log.error((Object)"Error putting End of Data marker onto queue");
                }
            }
        }

        public boolean hasError() {
            return this.e != null;
        }

        public Exception getError() {
            return this.e;
        }
    }

    static class QueueElement {
        Key key = null;
        Value value = null;

        public QueueElement() {
        }

        public QueueElement(Key key, Value value) {
            this.key = new Key(key);
            this.value = new Value(value.get(), true);
        }

        public Key getKey() {
            return this.key;
        }

        public Value getValue() {
            return this.value;
        }
    }
}

