/*
 * Decompiled with CFR 0.152.
 */
package gorsat;

import gorsat.BatchedPipeStepIteratorAdaptor;
import gorsat.BatchedReadSourceConfig;
import gorsat.BufferAdaptor;
import gorsat.Commands.Analysis;
import gorsat.RowBuffer;
import gorsat.TimeoutBufferAdaptor;
import java.time.Duration;
import java.util.Iterator;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import org.gorpipe.exceptions.GorSystemException;
import org.gorpipe.gor.model.GenomicIterator;
import org.gorpipe.gor.model.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReaderThread
extends Thread {
    private static final Logger log = LoggerFactory.getLogger(ReaderThread.class);
    private final BatchedPipeStepIteratorAdaptor batchedPipeStepIteratorAdaptor;
    private final Duration batchOfferTimeout;
    private final Duration timeout;
    private final Duration logInterval;
    private final Duration timeTriggerBufferFlush;
    private final SynchronousQueue<RowBuffer> rowQueue = new SynchronousQueue();
    final RowBuffer rowBuffer1 = new RowBuffer();
    final RowBuffer rowBuffer2 = new RowBuffer(this.rowBuffer1);
    Analysis bufferedPipeStep;
    BufferAdaptor bufferAdaptor;
    private boolean stopProcessing = false;
    boolean didStart = false;
    private long numberOfPollsBeforeLog;
    private long numberOfPollsBeforeTimeout;

    public ReaderThread(BatchedPipeStepIteratorAdaptor batchedPipeStepIteratorAdaptor, Analysis pipeStep, BufferAdaptor bufferAdaptor, BatchedReadSourceConfig brsConfig) {
        this.batchedPipeStepIteratorAdaptor = batchedPipeStepIteratorAdaptor;
        this.bufferAdaptor = bufferAdaptor;
        bufferAdaptor.setReaderThread(this);
        this.bufferedPipeStep = pipeStep;
        this.bufferedPipeStep.wantsNoMore_$eq(false);
        this.batchOfferTimeout = brsConfig.getBatchOfferTimeout();
        this.timeout = Duration.ofSeconds(Long.parseLong(System.getProperty("gor.timeout.rowsource", "1800000")));
        this.logInterval = brsConfig.getLogInterval();
        this.timeTriggerBufferFlush = brsConfig.getBufferFlushTimout();
        this.init();
    }

    public ReaderThread(BatchedReadSourceConfig brsConfig, BatchedPipeStepIteratorAdaptor batchedPipeStepIteratorAdaptor, Analysis pipeStep) {
        this.batchedPipeStepIteratorAdaptor = batchedPipeStepIteratorAdaptor;
        this.batchOfferTimeout = brsConfig.getBatchOfferTimeout();
        this.timeout = Duration.ofSeconds(Long.parseLong(System.getProperty("gor.timeout.rowsource", "1800000")));
        this.logInterval = brsConfig.getLogInterval();
        this.timeTriggerBufferFlush = brsConfig.getBufferFlushTimout();
        this.init();
        this.initPipeStep(pipeStep);
    }

    public double getAvgRowsPerMilliSecond() {
        return this.bufferAdaptor.avgRowsPerMilliSecond;
    }

    public double getAvgBasesPerMilliSecond() {
        return this.bufferAdaptor.avgBasesPerMilliSecond;
    }

    public double getAvgBatchSize() {
        return this.bufferAdaptor.avgBatchSize;
    }

    public double getAvgSeekTimeMilliSecond() {
        return this.bufferAdaptor.avgSeekTimeMilliSecond;
    }

    public void setAvgSeekTimeMilliSecond(double avgSeekTimeMilliSecond) {
        this.bufferAdaptor.avgSeekTimeMilliSecond = avgSeekTimeMilliSecond;
    }

    private void init() {
        this.setName(Thread.currentThread().getName() + "::ReaderThread");
        this.rowBuffer1.setNextRowBuffer(this.rowBuffer2);
        this.numberOfPollsBeforeLog = this.logInterval.toMillis() / this.batchOfferTimeout.toMillis();
        this.numberOfPollsBeforeTimeout = this.timeout.toMillis() / this.batchOfferTimeout.toMillis();
    }

    private void initPipeStep(Analysis pipeStep) {
        long timeTriggerBufferFlushNs = this.timeTriggerBufferFlush.getNano();
        this.bufferAdaptor = timeTriggerBufferFlushNs < 0L ? new BufferAdaptor(this) : new TimeoutBufferAdaptor(this, timeTriggerBufferFlushNs);
        this.bufferedPipeStep = pipeStep != null ? pipeStep.$bar((Analysis)this.bufferAdaptor) : this.bufferAdaptor;
        this.bufferedPipeStep.securedSetup(null);
    }

    public void stopProcessing(String message) {
        log.debug(message);
        this.stopProcessing = true;
        if (this.bufferedPipeStep != null) {
            this.bufferedPipeStep.wantsNoMore_$eq(true);
        }
        if (this.bufferAdaptor != null) {
            this.bufferAdaptor.reportWantsNoMore();
        }
    }

    public void finish() {
        try {
            this.bufferedPipeStep.securedFinish(this.batchedPipeStepIteratorAdaptor.getEx());
        }
        catch (Throwable e) {
            this.batchedPipeStepIteratorAdaptor.setEx(e);
            this.stopProcessing("Stop processing error in finish " + e.getMessage());
        }
    }

    private void closeSourceIterator() {
        Iterator<? extends Row> sourceIterator = this.batchedPipeStepIteratorAdaptor.sourceIterator;
        if (sourceIterator instanceof GenomicIterator) {
            ((GenomicIterator)sourceIterator).close();
        }
    }

    @Override
    public void run() {
        this.didStart = true;
        Row r = null;
        try {
            Iterator<? extends Row> sourceIterator = this.batchedPipeStepIteratorAdaptor.sourceIterator;
            while (sourceIterator.hasNext() && !this.bufferedPipeStep.wantsNoMore()) {
                r = sourceIterator.next();
                this.bufferedPipeStep.process(r);
            }
        }
        catch (Throwable e) {
            this.batchedPipeStepIteratorAdaptor.setEx(e);
            this.stopProcessing("Stop processinng cause error " + e.getMessage() + " last row " + r);
        }
        finally {
            this.finish();
            this.closeSourceIterator();
        }
    }

    public boolean offer(RowBuffer rowBuffer) {
        return this.rowQueue.offer(rowBuffer);
    }

    public RowBuffer poll() {
        return this.rowQueue.poll();
    }

    public void offerBatch(RowBuffer current) throws InterruptedException {
        int count = 0;
        while (this.didStart && !this.stopProcessing && !this.rowQueue.offer(current, this.batchOfferTimeout.toMillis(), TimeUnit.MILLISECONDS)) {
            if ((long)count > this.numberOfPollsBeforeTimeout) {
                throw new GorSystemException("BatchedIteratorAdaptor polling for too long " + this.timeout.getSeconds(), null);
            }
            if ((long)count++ % this.numberOfPollsBeforeLog != 0L) continue;
            log.debug("Offering batch for {}, batch size {}, query {}", new Object[]{this.numberOfPollsBeforeLog * (long)count, current.size(), this});
        }
    }

    public RowBuffer pollBatch() throws InterruptedException {
        RowBuffer ret = this.rowQueue.poll(this.batchOfferTimeout.toMillis(), TimeUnit.MILLISECONDS);
        int count = 0;
        while (!this.stopProcessing && ret == null) {
            ret = this.rowQueue.poll(this.batchOfferTimeout.toMillis(), TimeUnit.MILLISECONDS);
            if ((long)count > this.numberOfPollsBeforeTimeout) {
                throw new GorSystemException("BatchedIteratorAdaptor polling for too long " + this.timeout.getSeconds(), null);
            }
            if ((long)count++ % this.numberOfPollsBeforeLog != 0L) continue;
            log.debug("Polling batch for {} time {}, query {}", new Object[]{this.numberOfPollsBeforeLog * (long)count, Thread.currentThread().getId(), this});
        }
        return ret;
    }
}

