/*
 * Decompiled with CFR 0.152.
 */
package org.icij.extract.queue;

import java.nio.file.Path;
import java.time.Duration;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.icij.concurrent.ExecutorProxy;
import org.icij.concurrent.SealableLatch;
import org.icij.extract.queue.DocumentQueue;
import org.icij.task.Options;
import org.icij.task.annotation.Option;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Option(name="queuePoll", description="Time to wait when polling the queue e.g. \"5s\" or \"1m\". Defaults to 0.", parameter="duration")
public class DocumentQueueDrainer
extends ExecutorProxy {
    private static final Duration DEFAULT_TIMEOUT = Duration.ZERO;
    private final DocumentQueue queue;
    private final Consumer<Path> consumer;
    private SealableLatch latch = null;
    private Duration pollTimeout = DEFAULT_TIMEOUT;
    private static final Logger logger = LoggerFactory.getLogger(DocumentQueueDrainer.class);

    public DocumentQueueDrainer(DocumentQueue queue, Consumer<Path> consumer) {
        super(Executors.newSingleThreadExecutor());
        this.queue = queue;
        this.consumer = consumer;
    }

    public DocumentQueueDrainer configure(Options<String> options) {
        options.get("queuePoll").parse().asDuration().ifPresent(this::setPollTimeout);
        return this;
    }

    public void setPollTimeout(Duration pollTimeout) {
        this.pollTimeout = pollTimeout;
    }

    public void clearPollTimeout() {
        this.pollTimeout = null;
    }

    public Duration getPollTimeout() {
        return this.pollTimeout;
    }

    public void setLatch(SealableLatch latch) {
        this.latch = latch;
    }

    public SealableLatch getLatch() {
        return this.latch;
    }

    public void clearLatch() {
        this.latch = null;
    }

    public Future<Long> drain() {
        return this.executor.submit(new DrainingTask());
    }

    public Future<Long> drain(Path poison) {
        return this.executor.submit(new DrainingTask(poison));
    }

    private class DrainingTask
    implements Callable<Long> {
        private final Path poison;

        DrainingTask() {
            this.poison = null;
        }

        DrainingTask(Path poison) {
            this.poison = poison;
        }

        private Path poll() throws InterruptedException {
            Path path;
            Duration pollTimeout = DocumentQueueDrainer.this.getPollTimeout();
            SealableLatch latch = DocumentQueueDrainer.this.getLatch();
            if (null != latch) {
                path = (Path)DocumentQueueDrainer.this.queue.poll();
                while (null == path && !latch.isSealed()) {
                    latch.await();
                    path = (Path)DocumentQueueDrainer.this.queue.poll();
                }
            } else if (null == pollTimeout) {
                logger.info("Polling the queue, waiting indefinitely.");
                path = (Path)DocumentQueueDrainer.this.queue.take();
            } else if (pollTimeout.getSeconds() > 0L) {
                logger.debug(String.format("Polling the queue, waiting up to \"%s\".", pollTimeout));
                path = (Path)DocumentQueueDrainer.this.queue.poll(pollTimeout.getSeconds(), TimeUnit.SECONDS);
            } else {
                logger.debug("Polling the queue without waiting.");
                path = (Path)DocumentQueueDrainer.this.queue.poll();
            }
            return path;
        }

        private long drain() throws InterruptedException {
            long consumed = 0L;
            Path path = this.poll();
            while (!(null == path || null != this.poison && path.equals(this.poison))) {
                DocumentQueueDrainer.this.consumer.accept(path);
                ++consumed;
                path = this.poll();
            }
            return consumed;
        }

        @Override
        public Long call() throws Exception {
            logger.info("Draining to consumer until stopped or interrupted.");
            long consumed = this.drain();
            logger.info("Continuous draining stopped.");
            return consumed;
        }
    }
}

