/*
 * Decompiled with CFR 0.152.
 */
package org.pipecraft.pipes.terminal;

import java.io.Closeable;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.pipecraft.infra.io.FileUtils;
import org.pipecraft.infra.io.FileWriteOptions;
import org.pipecraft.pipes.async.AsyncPipe;
import org.pipecraft.pipes.async.AsyncPipeListener;
import org.pipecraft.pipes.exceptions.IOPipeException;
import org.pipecraft.pipes.exceptions.InternalPipeException;
import org.pipecraft.pipes.exceptions.PipeException;
import org.pipecraft.pipes.serialization.EncoderFactory;
import org.pipecraft.pipes.serialization.ItemEncoder;
import org.pipecraft.pipes.terminal.TerminalPipe;

public class AsyncSharderPipe<T>
extends TerminalPipe {
    private final AsyncPipe<T> input;
    private final EncoderFactory<? super T> encoderFactory;
    private final Function<? super T, String> selector;
    private final File folder;
    private final FileWriteOptions writeOptions;
    private final Map<String, ShardWriter> shardWriters = new ConcurrentHashMap<String, ShardWriter>();
    private volatile Map<String, Integer> counts;
    private final CountDownLatch terminationLatch = new CountDownLatch(1);
    private volatile PipeException error;

    public AsyncSharderPipe(AsyncPipe<T> input, EncoderFactory<? super T> encoderFactory, Function<? super T, String> shardSelectorFunction, File folder, FileWriteOptions writeOptions) {
        this.input = input;
        this.encoderFactory = encoderFactory;
        this.selector = shardSelectorFunction;
        this.folder = folder;
        this.writeOptions = writeOptions;
    }

    public AsyncSharderPipe(AsyncPipe<T> input, EncoderFactory<? super T> encoderFactory, Function<? super T, String> shardSelectorFunction, File folder) {
        this(input, encoderFactory, shardSelectorFunction, folder, new FileWriteOptions());
    }

    @Override
    public void close() throws IOException {
        this.input.close();
        this.terminationLatch.countDown();
    }

    @Override
    public void start() throws PipeException, InterruptedException {
        this.input.setListener(new AsyncPipeListener<T>(){

            @Override
            public void next(T item) throws PipeException, InterruptedException {
                String shardId = AsyncSharderPipe.this.selector.apply(item);
                try {
                    AsyncSharderPipe.this.shardWriters.computeIfAbsent(shardId, k -> {
                        try {
                            return new ShardWriter((String)k);
                        }
                        catch (IOException e) {
                            throw new UncheckedIOException(e);
                        }
                    }).write(item);
                }
                catch (IOException | UncheckedIOException e) {
                    throw new IOPipeException(e);
                }
            }

            @Override
            public void done() throws InterruptedException {
                AsyncSharderPipe.this.terminationLatch.countDown();
            }

            @Override
            public void error(PipeException e) throws InterruptedException {
                AsyncSharderPipe.this.error = e;
                AsyncSharderPipe.this.terminationLatch.countDown();
            }
        });
        this.input.start();
        this.terminationLatch.await();
        try {
            FileUtils.close(this.shardWriters.values());
        }
        catch (IOException e) {
            throw new IOPipeException(e);
        }
        if (this.error != null) {
            if (this.error instanceof InternalPipeException) {
                throw ((InternalPipeException)this.error).getRuntimeException();
            }
            throw this.error;
        }
        this.counts = Collections.unmodifiableMap(this.shardWriters.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, v -> ((ShardWriter)v.getValue()).getItemsWrittenCount())));
    }

    public Map<String, Integer> getShardSizes() {
        return this.counts;
    }

    @Override
    public float getProgress() {
        return this.input.getProgress();
    }

    private class ShardWriter
    implements Closeable {
        private final ItemEncoder<? super T> encoder;
        private int itemsWrittenCount;

        public ShardWriter(String shardId) throws IOException {
            FileOutputStream os = new FileOutputStream(new File(AsyncSharderPipe.this.folder, shardId), AsyncSharderPipe.this.writeOptions.isAppend());
            this.encoder = AsyncSharderPipe.this.encoderFactory.newEncoder(os, AsyncSharderPipe.this.writeOptions);
        }

        public synchronized void write(T item) throws IOException {
            this.encoder.encode(item);
            ++this.itemsWrittenCount;
        }

        @Override
        public synchronized void close() throws IOException {
            this.encoder.close();
        }

        public synchronized int getItemsWrittenCount() {
            return this.itemsWrittenCount;
        }
    }
}

