/*
 * Decompiled with CFR 0.152.
 */
package gorsat;

import gorsat.BatchedReadSourceConfig;
import gorsat.RowBuffer;
import java.time.Duration;
import java.util.Iterator;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import org.gorpipe.exceptions.GorException;
import org.gorpipe.exceptions.GorSystemException;
import org.gorpipe.gor.model.GenomicIterator;
import org.gorpipe.gor.model.GenomicIteratorBase;
import org.gorpipe.gor.model.Row;
import org.gorpipe.gor.monitor.GorMonitor;
import org.gorpipe.model.gor.RowObj;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BatchedReadSource
extends GenomicIteratorBase {
    private static final Logger log = LoggerFactory.getLogger(BatchedReadSource.class);
    private final Row endRow = RowObj.StoR((CharSequence)"chrN\t-1");
    private final Iterator<? extends Row> sourceIterator;
    private PollingThread readerThread;
    private final Duration timeTriggerBufferFlush;
    private final Duration batchOfferTimeout;
    private final Duration timeout;
    private final Duration logInterval;
    private double avgSeekTimeMilliSecond = 0.0;
    private double avgBasesPerMilliSecond = 0.0;
    private double avgRowsPerMilliSecond = 0.0;
    private double avgBatchSize = 0.0;
    private int numberOfRowsRead = 0;
    private long totalTimeNs = 0L;
    private int avgCount;
    private int bavgCount = 0;
    private final GorMonitor gorMonitor;
    private Throwable ex = null;
    private RowBuffer rowBuffer = null;
    int seekCount;

    public void setEx(Throwable throwable) {
        if (this.ex == null || throwable == null) {
            this.ex = throwable;
        }
    }

    public Throwable getEx() {
        return this.ex;
    }

    public void updateTimeMeasurement(long deltaTimeNs, RowBuffer current) {
        ++this.avgCount;
        this.numberOfRowsRead += current.size();
        this.totalTimeNs += deltaTimeNs;
        this.avgBatchSize = ((double)(this.avgCount - 1) * this.avgBatchSize + (double)current.size()) / (double)this.avgCount;
        if ((double)this.totalTimeNs != 0.0) {
            this.avgRowsPerMilliSecond = (double)this.numberOfRowsRead / ((double)this.totalTimeNs / 1000000.0);
        }
        Row firstRow = current.get(0);
        Row lastRow = current.get(current.size() - 1);
        if (firstRow.chr.equals(lastRow.chr)) {
            ++this.bavgCount;
            if ((double)deltaTimeNs != 0.0) {
                this.avgBasesPerMilliSecond = ((double)(this.bavgCount - 1) * this.avgBasesPerMilliSecond + (double)(lastRow.pos - firstRow.pos) / ((double)deltaTimeNs / 1000000.0)) / (double)this.bavgCount;
            }
        }
    }

    public BatchedReadSource(GenomicIterator sourceIterator, BatchedReadSourceConfig brsConfig) {
        this((Iterator<? extends Row>)sourceIterator, brsConfig, null, null);
    }

    public BatchedReadSource(Iterator<? extends Row> sourceIterator, BatchedReadSourceConfig brsConfig, String header, GorMonitor gorMonitor) {
        this.sourceIterator = sourceIterator;
        this.gorMonitor = gorMonitor;
        this.setHeader(header);
        this.timeTriggerBufferFlush = brsConfig.getBufferFlushTimout();
        this.batchOfferTimeout = brsConfig.getBatchOfferTimeout();
        this.timeout = Duration.ofSeconds(Long.parseLong(System.getProperty("gor.timeout.rowsource", "1800000")));
        this.logInterval = brsConfig.getLogInterval();
        this.setHeader(header);
    }

    public double getAvgRowsPerMilliSecond() {
        return this.avgRowsPerMilliSecond;
    }

    public double getAvgBasesPerMilliSecond() {
        return this.avgBasesPerMilliSecond;
    }

    public double getAvgSeekTimeMilliSecond() {
        return this.avgSeekTimeMilliSecond;
    }

    public double getAvgBatchSize() {
        return this.avgBatchSize;
    }

    public int getCurrentBatchSize() {
        return this.rowBuffer.size();
    }

    public int getCurrentBatchLoc() {
        return this.rowBuffer.getIndex();
    }

    public Row getCurrentBatchRow(int i) {
        return this.rowBuffer.get(i);
    }

    public boolean hasNext() {
        try {
            if (this.rowBuffer == null) {
                this.readerThread = new ReaderThread();
                this.rowBuffer = this.readerThread.rowBuffer1;
                this.readerThread.start();
                this.rowBuffer = this.readerThread.pollBatch();
            } else if (!this.rowBuffer.available()) {
                this.rowBuffer = this.readerThread.pollBatch();
            }
            Throwable exception = this.getEx();
            if (exception != null) {
                if (exception instanceof GorException) {
                    throw (GorException)exception;
                }
                throw new GorSystemException(exception);
            }
            return this.rowBuffer != null && this.rowBuffer.hasNext();
        }
        catch (InterruptedException e) {
            throw new GorSystemException("rowQueue take interrupted", (Throwable)e);
        }
    }

    public Row next() {
        return this.rowBuffer.next();
    }

    public boolean seek(String seekChr, int seekPos) {
        long t = System.nanoTime();
        try {
            if (this.sourceIterator instanceof GenomicIterator) {
                ((GenomicIterator)this.sourceIterator).seek(seekChr, seekPos);
            }
            if (this.readerThread != null) {
                this.readerThread.stopProcessing();
                this.readerThread.poll();
                this.readerThread.join();
                this.readerThread = null;
            }
        }
        catch (InterruptedException e) {
            throw new GorSystemException("rowQueue take interrupted on seek", (Throwable)e);
        }
        this.avgSeekTimeMilliSecond = ((double)this.seekCount * this.avgSeekTimeMilliSecond + (double)(System.nanoTime() - t) / 1000000.0) / (double)(this.seekCount + 1);
        ++this.seekCount;
        return true;
    }

    public boolean isCancelled() {
        return this.gorMonitor.isCancelled();
    }

    private void closeSourceIterator() {
        if (this.sourceIterator instanceof GenomicIterator) {
            ((GenomicIterator)this.sourceIterator).close();
        }
    }

    public void close() {
        if (this.readerThread != null && this.readerThread.didStart) {
            this.readerThread.stopProcessing();
        } else {
            this.closeSourceIterator();
        }
        Throwable exception = this.getEx();
        if (exception != null) {
            if (exception instanceof GorException) {
                throw (GorException)exception;
            }
            throw new GorSystemException(exception);
        }
    }

    public boolean isBuffered() {
        return true;
    }

    private class ReaderThread
    extends PollingThread {
        ReaderThread() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            super.run();
            try {
                RowBuffer current = this.rowBuffer1;
                if (BatchedReadSource.this.timeTriggerBufferFlush.getNano() < 0) {
                    while (!this.stopProcessingThread && BatchedReadSource.this.sourceIterator.hasNext()) {
                        Row next = BatchedReadSource.this.sourceIterator.next();
                        current.add(next);
                        if (!current.isFull()) continue;
                        if (this.rowQueue.offer(current)) {
                            current = current.nextRowBuffer();
                            current.reduce(current.getCapacity() / 2);
                            continue;
                        }
                        if (current.enlarge(current.getCapacity() * 8)) continue;
                        this.offerBatch(current, BatchedReadSource.this.batchOfferTimeout);
                        current = current.nextRowBuffer();
                    }
                } else {
                    long t = System.nanoTime();
                    long timeTriggerBufferFlushNs = BatchedReadSource.this.timeTriggerBufferFlush.getNano();
                    while (!this.stopProcessingThread && BatchedReadSource.this.sourceIterator.hasNext()) {
                        Row next = BatchedReadSource.this.sourceIterator.next();
                        current.add(next);
                        if (!current.isFull()) continue;
                        long nt = System.nanoTime();
                        if (nt - t > timeTriggerBufferFlushNs) {
                            if (this.rowQueue.offer(current)) {
                                BatchedReadSource.this.updateTimeMeasurement(nt - t, current);
                                current = current.nextRowBuffer();
                                current.reduce(current.getCapacity() / 2);
                                t = System.nanoTime();
                                continue;
                            }
                            if (current.enlarge(current.getCapacity() * 2)) continue;
                            BatchedReadSource.this.updateTimeMeasurement(nt - t, current);
                            this.offerBatch(current, BatchedReadSource.this.batchOfferTimeout);
                            current = current.nextRowBuffer();
                            t = System.nanoTime();
                            continue;
                        }
                        if (current.enlarge(current.getCapacity() * 8)) continue;
                        BatchedReadSource.this.updateTimeMeasurement(nt - t, current);
                        this.offerBatch(current, BatchedReadSource.this.batchOfferTimeout);
                        current = current.nextRowBuffer();
                        t = System.nanoTime();
                    }
                }
                if (current.isFull()) {
                    this.offerBatch(current, BatchedReadSource.this.batchOfferTimeout);
                    current = current.nextRowBuffer();
                }
                current.add(BatchedReadSource.this.endRow);
                this.offerBatch(current, BatchedReadSource.this.batchOfferTimeout);
            }
            catch (InterruptedException e) {
                log.error("rowQueue put interrupted", (Throwable)e);
                BatchedReadSource.this.setEx(e);
            }
            catch (Throwable e) {
                BatchedReadSource.this.setEx(e);
                this.stopProcessing();
            }
            finally {
                BatchedReadSource.this.closeSourceIterator();
            }
        }
    }

    private class PollingThread
    extends Thread {
        SynchronousQueue<RowBuffer> rowQueue = new SynchronousQueue();
        RowBuffer rowBuffer1 = new RowBuffer();
        RowBuffer rowBuffer2 = new RowBuffer(this.rowBuffer1);
        boolean stopProcessingThread = false;
        boolean didStart = false;
        long numberOfPollsBeforeLog;
        long numberOfPollsBeforeTimeout;
        int lastCount = 0;
        int pollCount = 0;

        PollingThread() {
            this.rowBuffer1.setNextRowBuffer(this.rowBuffer2);
            this.numberOfPollsBeforeLog = BatchedReadSource.this.logInterval.toMillis() / BatchedReadSource.this.batchOfferTimeout.toMillis();
            this.numberOfPollsBeforeTimeout = BatchedReadSource.this.timeout.toMillis() / BatchedReadSource.this.batchOfferTimeout.toMillis();
        }

        void stopProcessing() {
            this.stopProcessingThread = true;
        }

        RowBuffer poll() {
            return this.rowQueue.poll();
        }

        @Override
        public void run() {
            this.didStart = true;
        }

        void offerBatch(RowBuffer current, Duration batchOfferTimeout) throws InterruptedException {
            int count = 0;
            while (!this.stopProcessingThread && !this.rowQueue.offer(current, batchOfferTimeout.toMillis(), TimeUnit.MILLISECONDS)) {
                if ((long)count++ % this.numberOfPollsBeforeLog != 0L) continue;
                log.debug("Offering batch for for" + batchOfferTimeout.getSeconds() * (long)count + ", batchsize " + current.size() + " threadid: " + Thread.currentThread().getId());
            }
        }

        RowBuffer pollBatch() throws InterruptedException {
            RowBuffer rowBuffer = this.rowQueue.poll(BatchedReadSource.this.batchOfferTimeout.toMillis(), TimeUnit.MILLISECONDS);
            int count = 0;
            while (!this.stopProcessingThread && rowBuffer == null) {
                if ((long)count > this.numberOfPollsBeforeTimeout) {
                    throw new RuntimeException("BatchedReadSource polling for too long " + BatchedReadSource.this.timeout.getSeconds());
                }
                if ((long)count++ % this.numberOfPollsBeforeLog == 0L) {
                    log.debug("BatchedReadSource polling for" + BatchedReadSource.this.batchOfferTimeout.getSeconds() * (long)count + ", threadid: " + Thread.currentThread().getId());
                }
                rowBuffer = this.rowQueue.poll(BatchedReadSource.this.batchOfferTimeout.toMillis(), TimeUnit.MILLISECONDS);
            }
            this.pollCount += count + 1;
            if (BatchedReadSource.this.gorMonitor != null && rowBuffer != null && this.pollCount - this.lastCount > 200) {
                if (BatchedReadSource.this.isCancelled()) {
                    this.stopProcessing();
                }
                this.lastCount = this.pollCount;
            }
            return rowBuffer;
        }
    }
}

