/*
 * Decompiled with CFR 0.152.
 */
package org.pipecraft.pipes.terminal;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import org.pipecraft.infra.concurrent.FailableConsumer;
import org.pipecraft.pipes.async.AsyncPipe;
import org.pipecraft.pipes.async.AsyncPipeListener;
import org.pipecraft.pipes.exceptions.InternalPipeException;
import org.pipecraft.pipes.exceptions.PipeException;
import org.pipecraft.pipes.terminal.TerminalPipe;

public class AsyncConsumerPipe<T>
extends TerminalPipe {
    private final AsyncPipe<T> inputPipe;
    private volatile PipeException error;
    private final CountDownLatch endLatch = new CountDownLatch(1);

    public AsyncConsumerPipe(AsyncPipe<T> inputPipe, final FailableConsumer<? super T, PipeException> itemAction, final Runnable terminationAction) {
        this.inputPipe = inputPipe;
        inputPipe.setListener(new AsyncPipeListener<T>(){

            @Override
            public void next(T item) throws PipeException, InterruptedException {
                itemAction.accept(item);
            }

            @Override
            public void done() throws InterruptedException {
                terminationAction.run();
                AsyncConsumerPipe.this.endLatch.countDown();
            }

            @Override
            public void error(PipeException e) throws InterruptedException {
                AsyncConsumerPipe.this.error = e;
                AsyncConsumerPipe.this.endLatch.countDown();
            }
        });
    }

    public AsyncConsumerPipe(AsyncPipe<T> inputPipe, FailableConsumer<? super T, PipeException> itemAction) {
        this(inputPipe, itemAction, () -> {});
    }

    public AsyncConsumerPipe(AsyncPipe<T> inputPipe) {
        this(inputPipe, item -> {}, () -> {});
    }

    @Override
    public void start() throws PipeException, InterruptedException {
        this.inputPipe.start();
        this.endLatch.await();
        if (this.error != null) {
            if (this.error instanceof InternalPipeException) {
                throw ((InternalPipeException)this.error).getRuntimeException();
            }
            throw this.error;
        }
    }

    @Override
    public float getProgress() {
        return this.inputPipe.getProgress();
    }

    @Override
    public void close() throws IOException {
        this.endLatch.countDown();
        this.inputPipe.close();
    }
}

