/*
 * Decompiled with CFR 0.152.
 */
package org.pipecraft.pipes.terminal;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import org.pipecraft.infra.concurrent.FailableFunction;
import org.pipecraft.pipes.async.AsyncPipe;
import org.pipecraft.pipes.async.AsyncPipeListener;
import org.pipecraft.pipes.exceptions.PipeException;
import org.pipecraft.pipes.exceptions.ValidationPipeException;
import org.pipecraft.pipes.terminal.TerminalPipe;

public class AsyncMapWriterPipe<T, K, V>
extends TerminalPipe {
    private final AsyncPipe<T> input;
    private final Map<K, V> outputMap;
    private final CountDownLatch terminationLatch = new CountDownLatch(1);
    private PipeException exception;

    public AsyncMapWriterPipe(AsyncPipe<T> inputPipe, final FailableFunction<T, K, ValidationPipeException> keyExtractor, final FailableFunction<T, V, ValidationPipeException> valueExtractor, final Map<K, V> outputMap) {
        this.input = inputPipe;
        this.outputMap = outputMap;
        this.input.setListener(new AsyncPipeListener<T>(){

            @Override
            public void next(T item) throws PipeException {
                Object key = keyExtractor.apply(item);
                Object value = valueExtractor.apply(item);
                outputMap.put(key, value);
            }

            @Override
            public void done() {
                AsyncMapWriterPipe.this.terminationLatch.countDown();
            }

            @Override
            public void error(PipeException e) {
                AsyncMapWriterPipe.this.exception = e;
                AsyncMapWriterPipe.this.terminationLatch.countDown();
            }
        });
    }

    public AsyncMapWriterPipe(AsyncPipe<T> inputPipe, FailableFunction<T, K, ValidationPipeException> keyExtractor, FailableFunction<T, V, ValidationPipeException> valueExtractor) {
        this(inputPipe, keyExtractor, valueExtractor, new ConcurrentHashMap());
    }

    @Override
    public void close() throws IOException {
        this.input.close();
    }

    @Override
    public void start() throws PipeException, InterruptedException {
        this.input.start();
        this.terminationLatch.await();
        if (this.exception != null) {
            throw this.exception;
        }
    }

    public Map<K, V> getItems() {
        return this.outputMap;
    }
}

