/*
 * Decompiled with CFR 0.152.
 */
package org.pipecraft.pipes.sync.inter;

import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.pipecraft.pipes.exceptions.PipeException;
import org.pipecraft.pipes.sync.Pipe;
import org.pipecraft.pipes.sync.inter.CompoundPipe;
import org.pipecraft.pipes.sync.source.QueueReaderPipe;
import org.pipecraft.pipes.terminal.QueueWriterPipe;
import org.pipecraft.pipes.utils.PipeUtils;
import org.pipecraft.pipes.utils.QueueItem;

public class QueuePipe<T>
extends CompoundPipe<T> {
    private final Pipe<T> inputPipe;
    private final BlockingQueue<QueueItem<T>> queue;

    public QueuePipe(Pipe<T> inputPipe, BlockingQueue<QueueItem<T>> queue) {
        this.inputPipe = inputPipe;
        this.queue = queue;
    }

    public QueuePipe(Pipe<T> inputPipe, int queueCapacity) {
        this(inputPipe, new LinkedBlockingQueue<QueueItem<T>>(queueCapacity));
    }

    @Override
    protected Pipe<T> createPipeline() throws PipeException, InterruptedException {
        final QueueWriterPipe<T> qWriter = new QueueWriterPipe<T>(this.inputPipe, this.queue);
        QueueReaderPipe qReader = new QueueReaderPipe<T>(this.queue){
            private Thread producerThread;

            @Override
            public void close() throws IOException {
                super.close();
                this.producerThread.interrupt();
            }

            @Override
            public void start() throws PipeException, InterruptedException {
                this.producerThread = new Thread(() -> {
                    try {
                        qWriter.start();
                        QueuePipe.this.queue.put(QueueItem.end());
                    }
                    catch (InterruptedException | PipeException exception) {
                        PipeUtils.close(qWriter);
                    }
                    catch (Throwable throwable) {
                        PipeUtils.close(qWriter);
                        throw throwable;
                    }
                    PipeUtils.close(qWriter);
                });
                this.producerThread.start();
                super.start();
            }
        };
        return qReader;
    }
}

