/*
 * Decompiled with CFR 0.152.
 */
package org.pipecraft.pipes.sync.inter;

import org.pipecraft.infra.concurrent.FailableConsumer;
import org.pipecraft.infra.concurrent.FailableRunnable;
import org.pipecraft.pipes.exceptions.PipeException;
import org.pipecraft.pipes.sync.Pipe;
import org.pipecraft.pipes.sync.inter.DelegatePipe;

public class CallbackPipe<T>
extends DelegatePipe<T> {
    private final FailableConsumer<? super T, ? extends PipeException> itemCallback;
    private final FailableRunnable<? extends PipeException> terminationCallback;
    private boolean terminationCallbackInvoked;

    public CallbackPipe(Pipe<T> input, FailableConsumer<? super T, ? extends PipeException> itemCallback, FailableRunnable<? extends PipeException> terminationCallback) {
        super(input);
        this.itemCallback = itemCallback;
        this.terminationCallback = terminationCallback;
    }

    public CallbackPipe(Pipe<T> input, FailableConsumer<? super T, ? extends PipeException> itemCallback) {
        this(input, itemCallback, () -> {});
    }

    public CallbackPipe(Pipe<T> input, FailableRunnable<? extends PipeException> terminationCallback) {
        this(input, r -> {}, terminationCallback);
    }

    @Override
    public T next() throws PipeException, InterruptedException {
        Object next = this.getOriginPipe().next();
        if (next != null) {
            this.itemCallback.accept(next);
        } else if (!this.terminationCallbackInvoked) {
            this.runTermination();
        }
        return next;
    }

    @Override
    public T peek() throws PipeException {
        Object next = this.getOriginPipe().peek();
        if (next == null && !this.terminationCallbackInvoked) {
            this.runTermination();
        }
        return next;
    }

    private void runTermination() throws PipeException {
        this.terminationCallbackInvoked = true;
        this.terminationCallback.run();
    }
}

