/*
 * Decompiled with CFR 0.152.
 */
package org.pipecraft.pipes.terminal;

import java.io.Closeable;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
import org.pipecraft.infra.concurrent.FailableFunction;
import org.pipecraft.infra.io.FileUtils;
import org.pipecraft.infra.io.FileWriteOptions;
import org.pipecraft.pipes.exceptions.IOPipeException;
import org.pipecraft.pipes.exceptions.PipeException;
import org.pipecraft.pipes.serialization.EncoderFactory;
import org.pipecraft.pipes.serialization.ItemEncoder;
import org.pipecraft.pipes.sync.Pipe;
import org.pipecraft.pipes.terminal.TerminalPipe;

public class SharderByItemPipe<T>
extends TerminalPipe {
    private final Pipe<T> input;
    private final EncoderFactory<? super T> encoderFactory;
    private final FailableFunction<? super T, String, PipeException> selector;
    private final File folder;
    private final FileWriteOptions writeOptions;
    private volatile Map<String, Integer> counts;

    public SharderByItemPipe(Pipe<T> input, EncoderFactory<? super T> encoderFactory, FailableFunction<? super T, String, PipeException> shardSelectorFunction, File folder, FileWriteOptions writeOptions) {
        this.input = input;
        this.encoderFactory = encoderFactory;
        this.selector = shardSelectorFunction;
        this.folder = folder;
        this.writeOptions = writeOptions;
    }

    public SharderByItemPipe(Pipe<T> input, EncoderFactory<? super T> encoderFactory, FailableFunction<? super T, String, PipeException> shardSelectorFunction, File folder) {
        this(input, encoderFactory, shardSelectorFunction, folder, new FileWriteOptions());
    }

    @Override
    public void close() throws IOException {
        this.input.close();
    }

    @Override
    public void start() throws PipeException, InterruptedException {
        this.input.start();
        HashMap<String, ShardWriter> shardWriters = new HashMap<String, ShardWriter>();
        try {
            T item;
            while ((item = this.input.next()) != null) {
                String shardId = this.selector.apply(item);
                ShardWriter w = (ShardWriter)shardWriters.get(shardId);
                if (w == null) {
                    w = new ShardWriter(shardId);
                    shardWriters.put(shardId, w);
                }
                w.write(item);
            }
            this.counts = Collections.unmodifiableMap(shardWriters.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, v -> ((ShardWriter)v.getValue()).getItemsWrittenCount())));
        }
        catch (IOException e) {
            throw new IOPipeException(e);
        }
        finally {
            FileUtils.closeSilently(shardWriters.values());
        }
    }

    public Map<String, Integer> getShardSizes() {
        return this.counts;
    }

    private class ShardWriter
    implements Closeable {
        private final ItemEncoder<? super T> encoder;
        private int itemsWrittenCount;

        public ShardWriter(String shardId) throws IOException {
            File shardFile = new File(SharderByItemPipe.this.folder, shardId);
            FileOutputStream os = new FileOutputStream(shardFile, SharderByItemPipe.this.writeOptions.isAppend());
            if (SharderByItemPipe.this.writeOptions.isTemp()) {
                shardFile.deleteOnExit();
            }
            this.encoder = SharderByItemPipe.this.encoderFactory.newEncoder(os, SharderByItemPipe.this.writeOptions);
        }

        public void write(T item) throws IOException {
            this.encoder.encode(item);
            ++this.itemsWrittenCount;
        }

        @Override
        public void close() throws IOException {
            this.encoder.close();
        }

        public int getItemsWrittenCount() {
            return this.itemsWrittenCount;
        }
    }
}

