/*
 * Decompiled with CFR 0.152.
 */
package org.pipecraft.pipes.terminal;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.pipecraft.infra.math.ArithmeticUtils;
import org.pipecraft.pipes.async.AsyncPipe;
import org.pipecraft.pipes.async.AsyncPipeListener;
import org.pipecraft.pipes.exceptions.InternalPipeException;
import org.pipecraft.pipes.exceptions.PipeException;
import org.pipecraft.pipes.terminal.TerminalPipe;

public class AsyncEnqueuingSharderPipe<T>
extends TerminalPipe {
    private final AsyncPipe<T> input;
    private final Function<? super T, Integer> selector;
    private final List<? extends BlockingQueue<T>> queues;
    private final AtomicInteger[] updatingCounts;
    private volatile int[] finalCounts;
    private final CountDownLatch terminationLatch = new CountDownLatch(1);
    private volatile PipeException error;
    private volatile boolean closed;
    private final T successMarker;
    private final T errorMarker;

    public AsyncEnqueuingSharderPipe(AsyncPipe<T> input, List<? extends BlockingQueue<T>> queues, Function<? super T, Integer> selectorFunction, T successMarker, T errorMarker) {
        this.input = input;
        this.queues = new ArrayList<BlockingQueue<T>>(queues);
        this.updatingCounts = new AtomicInteger[queues.size()];
        for (int i = 0; i < this.updatingCounts.length; ++i) {
            this.updatingCounts[i] = new AtomicInteger();
        }
        this.selector = selectorFunction;
        this.successMarker = successMarker;
        this.errorMarker = errorMarker;
    }

    public AsyncEnqueuingSharderPipe(AsyncPipe<T> input, List<? extends BlockingQueue<T>> queues, T successMarker, T errorMarker) {
        this(input, queues, v -> ArithmeticUtils.getShardByHash(v, queues.size()), successMarker, errorMarker);
    }

    @Override
    public void close() throws IOException {
        this.input.close();
        this.closed = true;
        this.terminationLatch.countDown();
    }

    @Override
    public void start() throws PipeException, InterruptedException {
        this.input.setListener(new AsyncPipeListener<T>(){

            @Override
            public void next(T item) throws PipeException, InterruptedException {
                int index = AsyncEnqueuingSharderPipe.this.selector.apply(item);
                AsyncEnqueuingSharderPipe.this.updatingCounts[index].incrementAndGet();
                AsyncEnqueuingSharderPipe.this.queues.get(index).put(item);
            }

            @Override
            public void done() throws InterruptedException {
                AsyncEnqueuingSharderPipe.this.terminationLatch.countDown();
            }

            @Override
            public void error(PipeException e) throws InterruptedException {
                AsyncEnqueuingSharderPipe.this.error = e;
                AsyncEnqueuingSharderPipe.this.terminationLatch.countDown();
            }
        });
        this.input.start();
        this.terminationLatch.await();
        if (!this.closed) {
            T marker = this.error == null ? this.successMarker : this.errorMarker;
            for (BlockingQueue<T> q : this.queues) {
                q.put(marker);
            }
        }
        if (this.error != null) {
            if (this.error instanceof InternalPipeException) {
                throw ((InternalPipeException)this.error).getRuntimeException();
            }
            throw this.error;
        }
        int[] counts = new int[this.queues.size()];
        for (int i = 0; i < counts.length; ++i) {
            counts[i] = this.updatingCounts[i].get();
        }
        this.finalCounts = counts;
    }

    public Future<Void> asyncStart() {
        ExecutorService ex = Executors.newSingleThreadExecutor();
        Future<Void> f = ex.submit(() -> {
            this.start();
            return null;
        });
        ex.shutdown();
        return f;
    }

    public int[] getShardSizes() {
        return this.finalCounts;
    }

    @Override
    public float getProgress() {
        return this.input.getProgress();
    }
}

