/*
 * Decompiled with CFR 0.152.
 */
package gorsat;

import gorsat.BatchedReadSourceConfig;
import gorsat.Commands.Analysis;
import gorsat.RowBuffer;
import java.time.Duration;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Spliterator;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.gorpipe.exceptions.GorException;
import org.gorpipe.exceptions.GorSystemException;
import org.gorpipe.gor.model.GenomicIterator;
import org.gorpipe.gor.model.Row;
import org.gorpipe.model.gor.RowObj;
import org.gorpipe.model.gor.iterators.RowSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BatchedPipeStepIteratorAdaptor
extends RowSource
implements Spliterator<Row> {
    private static final Logger log = LoggerFactory.getLogger(BatchedPipeStepIteratorAdaptor.class);
    private final Iterator<? extends Row> sourceIterator;
    private final Analysis pipeStep;
    private RowBuffer rowBuffer = null;
    private final Row endRow = RowObj.StoR((CharSequence)"chrN\t-1");
    private final Duration timeTriggerBufferFlush;
    private final Duration batchOfferTimeout;
    private final Duration timeout;
    private final Duration logInterval;
    private ReaderThread readerThread;
    private boolean throwOnExit = true;
    private double avgSeekTimeMilliSecond = 0.0;
    private double avgBasesPerMilliSecond = 0.0;
    private double avgRowsPerMilliSecond = 0.0;
    private double avgBatchSize = 0.0;
    private int numberOfRowsRead = 0;
    private long totalTimeNs = 0L;
    private int avgCount = 0;
    private int bavgCount = 0;
    private boolean nosplit = false;
    private String currentChrom;
    private static final Map<String, String> nextChromMap = new HashMap<String, String>();
    private final BatchedReadSourceConfig brsConfig;
    private final boolean autoclose;
    int seekCount = 0;

    public void setCurrentChrom(String chrom) {
        this.currentChrom = chrom;
    }

    public double getAvgRowsPerMilliSecond() {
        return this.avgRowsPerMilliSecond;
    }

    public double getAvgBasesPerMilliSecond() {
        return this.avgBasesPerMilliSecond;
    }

    public double getAvgBatchSize() {
        return this.avgBatchSize;
    }

    public int getCurrentBatchSize() {
        return this.rowBuffer.size();
    }

    public int getCurrentBatchLoc() {
        return this.rowBuffer.getIndex();
    }

    public Row getCurrentBatchRow(int i) {
        return this.rowBuffer.get(i);
    }

    public void updateTimeMeasurement(long deltaTimeNs, RowBuffer current) {
        ++this.avgCount;
        this.numberOfRowsRead += current.size();
        this.totalTimeNs += deltaTimeNs;
        this.avgBatchSize = ((double)(this.avgCount - 1) * this.avgBatchSize + (double)current.size()) / (double)this.avgCount;
        this.avgRowsPerMilliSecond = (double)this.numberOfRowsRead / ((double)this.totalTimeNs / 1000000.0);
        Row firstRow = current.get(0);
        Row lastRow = current.get(current.size() - 1);
        if (firstRow.chr.equals(lastRow.chr)) {
            ++this.bavgCount;
            this.avgBasesPerMilliSecond = ((double)(this.bavgCount - 1) * this.avgBasesPerMilliSecond + (double)(lastRow.pos - firstRow.pos) / ((double)deltaTimeNs / 1000000.0)) / (double)this.bavgCount;
        }
    }

    public Stream<Row> getStream() {
        return this.getStream(false);
    }

    public Stream<Row> getStream(boolean parallel) {
        Stream<Row> ret = StreamSupport.stream(this, parallel);
        ret.onClose(this::close);
        return ret;
    }

    @Override
    public Comparator<Row> getComparator() {
        return null;
    }

    @Override
    public boolean tryAdvance(Consumer<? super Row> action) {
        if (this.hasNext()) {
            action.accept((Row)this.next());
            return true;
        }
        return false;
    }

    public BatchedPipeStepIteratorAdaptor clone() throws CloneNotSupportedException {
        GenomicIterator cl = ((GenomicIterator)this.sourceIterator).clone();
        return new BatchedPipeStepIteratorAdaptor(this.sourceIterator, this.pipeStep.clone(), this.getHeader(), this.brsConfig);
    }

    @Override
    public Spliterator<Row> trySplit() {
        if (!this.nosplit && this.sourceIterator != null && nextChromMap.containsKey(this.currentChrom)) {
            BatchedPipeStepIteratorAdaptor splitbpia;
            String nextChrom = nextChromMap.get(this.currentChrom);
            try {
                splitbpia = this.clone();
            }
            catch (CloneNotSupportedException e) {
                throw new GorSystemException("Trying to clone non clonable object", (Throwable)e);
            }
            splitbpia.setPosition(this.currentChrom, 0);
            this.currentChrom = nextChrom;
            return splitbpia;
        }
        if (!this.nosplit) {
            this.setPosition(this.currentChrom, 0);
            this.nosplit = true;
        }
        return null;
    }

    @Override
    public long estimateSize() {
        long est = Long.MAX_VALUE;
        return est;
    }

    @Override
    public int characteristics() {
        return 1300;
    }

    public String toString() {
        return this.pipeStep == null ? this.sourceIterator.toString() : this.sourceIterator + " | " + this.pipeStep;
    }

    public BatchedPipeStepIteratorAdaptor(Iterator<? extends Row> sourceIterator, Analysis pipeStep, String theHeader, BatchedReadSourceConfig brsConfig) {
        this(sourceIterator, pipeStep, false, theHeader, brsConfig);
    }

    public BatchedPipeStepIteratorAdaptor(Iterator<? extends Row> sourceIterator, Analysis pipeStep, boolean autoclose, String theHeader, BatchedReadSourceConfig brsConfig) {
        this.sourceIterator = sourceIterator;
        this.pipeStep = pipeStep;
        this.brsConfig = brsConfig;
        this.autoclose = autoclose;
        this.setHeader(theHeader);
        this.timeTriggerBufferFlush = brsConfig.getBufferFlushTimout();
        this.batchOfferTimeout = brsConfig.getBatchOfferTimeout();
        this.timeout = Duration.ofSeconds(Long.parseLong(System.getProperty("gor.timeout.rowsource", "1800000")));
        this.logInterval = brsConfig.getLogInterval();
    }

    public boolean hasNext() {
        try {
            boolean ret;
            if (this.rowBuffer == null) {
                this.readerThread = new ReaderThread();
                this.readerThread.setUncaughtExceptionHandler((tt, e) -> {});
                this.readerThread.start();
                this.rowBuffer = this.readerThread.pollBatch();
            } else if (!this.rowBuffer.available()) {
                this.rowBuffer = this.readerThread.pollBatch();
            }
            Throwable exception = this.getEx();
            if (exception != null) {
                this.throwOnExit = false;
                if (exception instanceof GorException) {
                    throw (GorException)exception;
                }
                throw new GorSystemException(exception);
            }
            boolean bl = ret = this.rowBuffer != null && this.rowBuffer.hasNext();
            if (!ret && this.autoclose) {
                this.close();
            }
            return ret;
        }
        catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            return false;
        }
    }

    public Row next() {
        return this.rowBuffer.next();
    }

    public void remove() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void forEachRemaining(Consumer<? super Row> action) {
        SpliteratorAdaptor spliteratorAdaptor = new SpliteratorAdaptor(action);
        SpliteratorAdaptor bufferedPipeStep = this.pipeStep != null ? this.pipeStep.$bar((Analysis)spliteratorAdaptor) : spliteratorAdaptor;
        Exception ex = null;
        bufferedPipeStep.securedSetup(null);
        try {
            while (this.sourceIterator.hasNext() && !bufferedPipeStep.wantsNoMore()) {
                Row r = this.sourceIterator.next();
                bufferedPipeStep.process(r);
            }
        }
        catch (Exception e) {
            ex = e;
        }
        finally {
            bufferedPipeStep.securedFinish(ex);
        }
        this.close();
    }

    public void setPosition(String seekChr, int seekPos) {
        long t = System.nanoTime();
        try {
            if (this.readerThread != null) {
                this.readerThread.stopProcessing("Stop processing seeking to " + seekChr + " " + seekPos);
                this.readerThread.poll();
                this.readerThread.join();
                if (this.sourceIterator instanceof GenomicIterator) {
                    ((GenomicIterator)this.sourceIterator).seek(seekChr, seekPos);
                } else {
                    while (this.sourceIterator.hasNext()) {
                        Row next = this.sourceIterator.next();
                        if (next.chr.compareTo(seekChr) < 0 || next.chr.compareTo(seekChr) == 0 && next.pos < seekPos) continue;
                    }
                }
                this.readerThread = new ReaderThread(this.readerThread.bufferedPipeStep, this.readerThread.bufferAdaptor);
                this.readerThread.setUncaughtExceptionHandler((tt, e) -> {});
                this.rowBuffer = this.readerThread.rowBuffer1;
                this.readerThread.start();
            }
        }
        catch (InterruptedException e2) {
            throw new GorSystemException("rowQueue take interrupted on setPosition", (Throwable)e2);
        }
        this.avgSeekTimeMilliSecond = ((double)this.seekCount * this.avgSeekTimeMilliSecond + (double)(System.nanoTime() - t) / 1000000.0) / (double)(this.seekCount + 1);
        ++this.seekCount;
    }

    private void closeSourceIterator() {
        if (this.sourceIterator instanceof RowSource) {
            ((RowSource)this.sourceIterator).close();
        }
    }

    public void close() {
        Throwable exception;
        if (this.readerThread != null && this.readerThread.didStart) {
            this.readerThread.stopProcessing("Stop processing closing source");
            try {
                this.readerThread.join(2000L);
            }
            catch (InterruptedException ie) {
                log.warn("Reader thread join interrupted");
                Thread.currentThread().interrupt();
            }
        } else {
            try {
                this.pipeStep.securedFinish(null);
            }
            catch (Exception e) {
                this.setEx(e);
            }
            finally {
                this.closeSourceIterator();
            }
        }
        if (this.throwOnExit && (exception = this.getEx()) != null) {
            if (exception instanceof GorException) {
                throw (GorException)exception;
            }
            throw new GorSystemException("Got exception in bufferedPipeStep process thread", exception);
        }
    }

    public boolean isBuffered() {
        return true;
    }

    static {
        String[] chroms = new String[]{"chr1", "chr10", "chr11", "chr12", "chr13", "chr14", "chr15", "chr16", "chr17", "chr18", "chr19", "chr20", "chr21", "chr22", "chrM", "chrX", "chrY"};
        IntStream.range(0, chroms.length - 1).forEach(i -> nextChromMap.put(chroms[i], chroms[i + 1]));
    }

    private class ReaderThread
    extends Thread {
        private final SynchronousQueue<RowBuffer> rowQueue = new SynchronousQueue();
        private final RowBuffer rowBuffer1 = new RowBuffer();
        private final RowBuffer rowBuffer2 = new RowBuffer(this.rowBuffer1);
        private Analysis bufferedPipeStep;
        private BufferAdaptor bufferAdaptor;
        private boolean stopProcessing = false;
        private boolean didStart = false;
        private long numberOfPollsBeforeLog;
        private long numberOfPollsBeforeTimeout;

        public ReaderThread(Analysis pipeStep, BufferAdaptor bufferAdaptor) {
            this.bufferAdaptor = bufferAdaptor;
            bufferAdaptor.setReaderThread(this);
            this.bufferedPipeStep = pipeStep;
            this.bufferedPipeStep.wantsNoMore_$eq(false);
            this.init();
        }

        public ReaderThread() {
            this.init();
            this.initPipeStep();
        }

        private void init() {
            this.setName(Thread.currentThread().getName() + "::ReaderThread");
            this.rowBuffer1.setNextRowBuffer(this.rowBuffer2);
            this.numberOfPollsBeforeLog = BatchedPipeStepIteratorAdaptor.this.logInterval.toMillis() / BatchedPipeStepIteratorAdaptor.this.batchOfferTimeout.toMillis();
            this.numberOfPollsBeforeTimeout = BatchedPipeStepIteratorAdaptor.this.timeout.toMillis() / BatchedPipeStepIteratorAdaptor.this.batchOfferTimeout.toMillis();
        }

        private void initPipeStep() {
            this.bufferAdaptor = BatchedPipeStepIteratorAdaptor.this.timeTriggerBufferFlush.getNano() < 0 ? new BufferAdaptor(this) : new TimeoutBufferAdaptor(this);
            this.bufferedPipeStep = BatchedPipeStepIteratorAdaptor.this.pipeStep != null ? BatchedPipeStepIteratorAdaptor.this.pipeStep.$bar((Analysis)this.bufferAdaptor) : this.bufferAdaptor;
            this.bufferedPipeStep.securedSetup(null);
        }

        public void stopProcessing(String message) {
            log.debug(message);
            this.stopProcessing = true;
            if (this.bufferedPipeStep != null) {
                this.bufferedPipeStep.wantsNoMore_$eq(true);
            }
        }

        public void finish() {
            try {
                this.bufferedPipeStep.securedFinish(BatchedPipeStepIteratorAdaptor.this.getEx());
            }
            catch (Throwable e) {
                BatchedPipeStepIteratorAdaptor.this.setEx(e);
                this.stopProcessing("Stop processing error in finish " + e.getMessage());
            }
        }

        @Override
        public void run() {
            this.didStart = true;
            Row r = null;
            try {
                while (BatchedPipeStepIteratorAdaptor.this.sourceIterator.hasNext() && !this.bufferedPipeStep.wantsNoMore()) {
                    r = BatchedPipeStepIteratorAdaptor.this.sourceIterator.next();
                    this.bufferedPipeStep.process(r);
                }
            }
            catch (Throwable e) {
                BatchedPipeStepIteratorAdaptor.this.setEx(e);
                this.stopProcessing("Stop processinng cause error " + e.getMessage() + " last row " + r);
            }
            finally {
                this.finish();
                BatchedPipeStepIteratorAdaptor.this.closeSourceIterator();
            }
        }

        public boolean offer(RowBuffer rowBuffer) {
            return this.rowQueue.offer(rowBuffer);
        }

        public RowBuffer poll() {
            return this.rowQueue.poll();
        }

        public void offerBatch(RowBuffer current) throws InterruptedException {
            int count = 0;
            while (this.didStart && !this.stopProcessing && !this.rowQueue.offer(current, BatchedPipeStepIteratorAdaptor.this.batchOfferTimeout.toMillis(), TimeUnit.MILLISECONDS)) {
                if ((long)count > this.numberOfPollsBeforeTimeout) {
                    throw new GorSystemException("BatchedIteratorAdaptor polling for too long " + BatchedPipeStepIteratorAdaptor.this.timeout.getSeconds(), null);
                }
                if ((long)count++ % this.numberOfPollsBeforeLog != 0L) continue;
                log.debug("Offering batch for {}, batch size {}, query {}", new Object[]{this.numberOfPollsBeforeLog * (long)count, current.size(), BatchedPipeStepIteratorAdaptor.this});
            }
        }

        public RowBuffer pollBatch() throws InterruptedException {
            RowBuffer ret = this.rowQueue.poll(BatchedPipeStepIteratorAdaptor.this.batchOfferTimeout.toMillis(), TimeUnit.MILLISECONDS);
            int count = 0;
            while (!this.stopProcessing && ret == null) {
                ret = this.rowQueue.poll(BatchedPipeStepIteratorAdaptor.this.batchOfferTimeout.toMillis(), TimeUnit.MILLISECONDS);
                if ((long)count > this.numberOfPollsBeforeTimeout) {
                    throw new GorSystemException("BatchedIteratorAdaptor polling for too long " + BatchedPipeStepIteratorAdaptor.this.timeout.getSeconds(), null);
                }
                if ((long)count++ % this.numberOfPollsBeforeLog != 0L) continue;
                log.debug("Polling batch for {} time {}, query {}", new Object[]{this.numberOfPollsBeforeLog * (long)count, Thread.currentThread().getId(), BatchedPipeStepIteratorAdaptor.this});
            }
            return ret;
        }
    }

    private class BufferAdaptor
    extends Analysis {
        RowBuffer current;
        ReaderThread readerThread;
        long t;

        public BufferAdaptor(ReaderThread readerThread) {
            this.setReaderThread(readerThread);
            this.current = readerThread.rowBuffer1;
            this.t = System.nanoTime();
        }

        public void setReaderThread(ReaderThread rt) {
            this.readerThread = rt;
        }

        public void process(Row r) {
            try {
                if (!this.wantsNoMore()) {
                    this.current.add(r);
                    if (this.current.isFull()) {
                        if (this.readerThread.offer(this.current)) {
                            this.current = this.current.nextRowBuffer();
                            this.current.reduce(this.current.size() / 2);
                        } else if (!this.current.enlarge(this.current.size() * 8)) {
                            this.readerThread.offerBatch(this.current);
                            this.current = this.current.nextRowBuffer();
                        }
                    }
                } else {
                    this.readerThread.stopProcessing("Stop processing adaptor wantsNoMore");
                }
            }
            catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
            }
        }

        public void finish() {
            try {
                if (!this.isInErrorState()) {
                    if (this.current.isFull()) {
                        this.readerThread.offerBatch(this.current);
                        this.current = this.current.nextRowBuffer();
                    }
                    this.current.add(BatchedPipeStepIteratorAdaptor.this.endRow);
                    this.readerThread.offerBatch(this.current);
                }
            }
            catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
            }
        }
    }

    private class SpliteratorAdaptor
    extends Analysis
    implements Consumer<Row> {
        Consumer<? super Row> cns;

        SpliteratorAdaptor(Consumer<? super Row> cns) {
            this.cns = cns;
        }

        public void process(Row r) {
            if (BatchedPipeStepIteratorAdaptor.this.nosplit && !BatchedPipeStepIteratorAdaptor.this.currentChrom.equals(r.chr)) {
                this.reportWantsNoMore();
            } else {
                this.cns.accept((Row)r);
            }
        }

        @Override
        public void accept(Row row) {
            this.process(row);
        }

        public void finish() {
            super.finish();
        }
    }

    private class TimeoutBufferAdaptor
    extends BufferAdaptor {
        long timeTriggerBufferFlushNs;

        public TimeoutBufferAdaptor(ReaderThread readerThread) {
            super(readerThread);
            this.timeTriggerBufferFlushNs = BatchedPipeStepIteratorAdaptor.this.timeTriggerBufferFlush.getNano();
        }

        @Override
        public void process(Row r) {
            try {
                if (!this.wantsNoMore()) {
                    this.current.add(r);
                    if (this.current.isFull()) {
                        long nt = System.nanoTime();
                        if (nt - this.t > this.timeTriggerBufferFlushNs) {
                            if (this.readerThread.offer(this.current)) {
                                BatchedPipeStepIteratorAdaptor.this.updateTimeMeasurement(nt - this.t, this.current);
                                this.current = this.current.nextRowBuffer();
                                this.current.reduce(this.current.size() / 2);
                                this.t = System.nanoTime();
                            } else if (!this.current.enlarge(this.current.size() * 2)) {
                                BatchedPipeStepIteratorAdaptor.this.updateTimeMeasurement(nt - this.t, this.current);
                                this.readerThread.offerBatch(this.current);
                                this.current = this.current.nextRowBuffer();
                                this.t = System.nanoTime();
                            }
                        } else if (!this.current.enlarge(this.current.size() * 8)) {
                            BatchedPipeStepIteratorAdaptor.this.updateTimeMeasurement(nt - this.t, this.current);
                            this.readerThread.offerBatch(this.current);
                            this.current = this.current.nextRowBuffer();
                            this.t = System.nanoTime();
                        }
                    }
                } else {
                    this.readerThread.stopProcessing("Stop processing adaptor wantsNoMore");
                }
            }
            catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

