/*
 * Decompiled with CFR 0.152.
 */
package org.pipecraft.pipes.async.inter;

import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;
import org.pipecraft.pipes.async.AsyncPipe;
import org.pipecraft.pipes.async.AsyncPipeListener;
import org.pipecraft.pipes.exceptions.PipeException;
import org.pipecraft.pipes.utils.PipeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncUnionPipe<T>
extends AsyncPipe<T> {
    private static final Logger logger = LoggerFactory.getLogger(AsyncUnionPipe.class);
    private final Collection<AsyncPipe<T>> inputPipes;
    private final AtomicInteger doneSuccessfulCounter = new AtomicInteger();

    public AsyncUnionPipe(Collection<AsyncPipe<T>> inputPipes) {
        this.inputPipes = inputPipes;
    }

    @Override
    public void start() throws PipeException, InterruptedException {
        for (AsyncPipe<T> inputPipe : this.inputPipes) {
            inputPipe.setListener(new Listener(inputPipe));
            inputPipe.start();
        }
    }

    @Override
    public float getProgress() {
        return PipeUtils.getAverageProgress(this.inputPipes);
    }

    @Override
    public void close() throws IOException {
        super.close();
        PipeUtils.close(this.inputPipes);
    }

    private class Listener
    implements AsyncPipeListener<T> {
        private final AsyncPipe<T> inputPipe;

        public Listener(AsyncPipe<T> inputPipe) {
            this.inputPipe = inputPipe;
        }

        @Override
        public void next(T item) throws PipeException, InterruptedException {
            AsyncUnionPipe.this.notifyNext(item);
        }

        @Override
        public void done() throws InterruptedException {
            if (AsyncUnionPipe.this.doneSuccessfulCounter.incrementAndGet() == AsyncUnionPipe.this.inputPipes.size()) {
                AsyncUnionPipe.this.notifyDone();
            }
        }

        @Override
        public void error(PipeException e) throws InterruptedException {
            for (AsyncPipe pipe : AsyncUnionPipe.this.inputPipes) {
                if (pipe == this.inputPipe) continue;
                try {
                    pipe.close();
                }
                catch (IOException e2) {
                    logger.error("Unable to close pipe. Abnormal termination with possible notifications during/after the error notification of the async pipe.", (Throwable)e);
                }
            }
            AsyncUnionPipe.this.notifyError(e);
        }
    }
}

