/*
 * Decompiled with CFR 0.152.
 */
package org.pipecraft.pipes.sync.inter;

import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Supplier;
import org.pipecraft.pipes.async.AsyncPipe;
import org.pipecraft.pipes.async.AsyncPipeListener;
import org.pipecraft.pipes.exceptions.PipeException;
import org.pipecraft.pipes.sync.Pipe;

public class AsyncToSyncPipe<T>
implements Pipe<T> {
    private final AsyncPipe<T> inputPipe;
    private final BlockingQueue<T> queue;
    private final T successfulEndMarker;
    private final T errorEndMarker;
    private volatile PipeException error;
    private boolean done;

    public AsyncToSyncPipe(AsyncPipe<T> inputPipe, BlockingQueue<T> queue, Supplier<T> markerFactory) {
        this.inputPipe = inputPipe;
        this.queue = queue;
        this.successfulEndMarker = markerFactory.get();
        this.errorEndMarker = markerFactory.get();
    }

    public AsyncToSyncPipe(AsyncPipe<T> inputPipe, int queueCapacity, Supplier<T> markerFactory) {
        this(inputPipe, new LinkedBlockingQueue(queueCapacity), markerFactory);
    }

    @Override
    public void start() throws PipeException, InterruptedException {
        Listener l = new Listener();
        this.inputPipe.setListener(l);
        this.inputPipe.start();
    }

    @Override
    public float getProgress() {
        return this.inputPipe.getProgress();
    }

    @Override
    public void close() throws IOException {
        this.inputPipe.close();
    }

    @Override
    public T next() throws PipeException, InterruptedException {
        if (this.done) {
            return null;
        }
        T item = this.queue.take();
        if (item == this.successfulEndMarker) {
            item = null;
            this.done = true;
        } else if (item == this.errorEndMarker) {
            this.done = true;
            throw this.error;
        }
        return item;
    }

    @Override
    public T peek() throws PipeException {
        try {
            Object res;
            while ((res = this.queue.peek()) == null) {
                Thread.sleep(10L);
            }
            if (res == this.successfulEndMarker || res == this.errorEndMarker) {
                return null;
            }
            return (T)res;
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return null;
        }
    }

    private class Listener
    implements AsyncPipeListener<T> {
        private Listener() {
        }

        @Override
        public void next(T item) throws PipeException, InterruptedException {
            AsyncToSyncPipe.this.queue.put(item);
        }

        @Override
        public void done() throws InterruptedException {
            AsyncToSyncPipe.this.queue.put(AsyncToSyncPipe.this.successfulEndMarker);
        }

        @Override
        public void error(PipeException e) throws InterruptedException {
            AsyncToSyncPipe.this.error = e;
            AsyncToSyncPipe.this.queue.put(AsyncToSyncPipe.this.errorEndMarker);
        }
    }
}

