/*
 * Decompiled with CFR 0.152.
 */
package org.pipecraft.pipes.sync.source;

import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import org.pipecraft.pipes.exceptions.PipeException;
import org.pipecraft.pipes.exceptions.QueuePipeException;
import org.pipecraft.pipes.sync.Pipe;

public class QueueReaderPipe<T>
implements Pipe<T> {
    private final BlockingQueue<T> queue;
    private final T successMarker;
    private final T errorMarker;
    private boolean complete;
    private T next;
    private PipeException exception;

    public QueueReaderPipe(BlockingQueue<T> queue, T successMarker, T errorMarker) {
        this.queue = queue;
        this.successMarker = successMarker;
        this.errorMarker = errorMarker;
    }

    @Override
    public void start() throws PipeException, InterruptedException {
        this.prepareNext();
    }

    @Override
    public float getProgress() {
        if (this.complete) {
            return 1.0f;
        }
        return 0.0f;
    }

    @Override
    public void close() throws IOException {
    }

    @Override
    public T next() throws PipeException, InterruptedException {
        if (this.exception != null) {
            throw this.exception;
        }
        T toReturn = this.next;
        this.prepareNext();
        return toReturn;
    }

    @Override
    public T peek() throws PipeException {
        if (this.exception != null) {
            throw this.exception;
        }
        return this.next;
    }

    public void prepareNext() throws InterruptedException {
        if (!this.complete) {
            T item = this.queue.take();
            if (item == this.successMarker) {
                this.complete = true;
                this.next = null;
            } else if (item == this.errorMarker) {
                this.complete = true;
                this.exception = new QueuePipeException("Error signaled by queue producer");
            } else {
                this.next = item;
            }
        }
    }
}

