/*
 * Decompiled with CFR 0.152.
 */
package org.pipecraft.pipes.sync.inter;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import org.pipecraft.infra.io.Compression;
import org.pipecraft.infra.io.FileReadOptions;
import org.pipecraft.infra.io.FileUtils;
import org.pipecraft.infra.io.FileWriteOptions;
import org.pipecraft.pipes.exceptions.IOPipeException;
import org.pipecraft.pipes.exceptions.PipeException;
import org.pipecraft.pipes.serialization.CodecFactory;
import org.pipecraft.pipes.serialization.DecoderFactory;
import org.pipecraft.pipes.serialization.EncoderFactory;
import org.pipecraft.pipes.sync.Pipe;
import org.pipecraft.pipes.sync.inter.SortedMergePipe;
import org.pipecraft.pipes.sync.source.BinInputReaderPipe;
import org.pipecraft.pipes.sync.source.CollectionReaderPipe;
import org.pipecraft.pipes.terminal.BinFileWriterPipe;

public class SortPipe<T>
implements Pipe<T> {
    private final Pipe<T> input;
    private final File tmpFolder;
    private final Comparator<T> comparator;
    private final int maxItemsInMemory;
    private final Compression tempFilesCompression;
    private final List<T> currentChunk;
    private final List<File> sortedFiles;
    private final EncoderFactory<? super T> encoderFactory;
    private final DecoderFactory<T> decoderFactory;
    private volatile Pipe<T> finalOutputPipe;

    public SortPipe(Pipe<T> input, int maxItemsInMemory, File tmpFolder, EncoderFactory<? super T> encoderFactory, DecoderFactory<T> decoderFactory, Comparator<T> comparator, Compression tempFilesCompression) {
        this.input = input;
        this.currentChunk = new ArrayList<T>(maxItemsInMemory == Integer.MAX_VALUE ? 1024 : maxItemsInMemory);
        this.tmpFolder = tmpFolder;
        this.encoderFactory = encoderFactory;
        this.decoderFactory = decoderFactory;
        this.comparator = comparator;
        this.sortedFiles = new ArrayList<File>();
        this.maxItemsInMemory = maxItemsInMemory;
        this.tempFilesCompression = tempFilesCompression;
    }

    public SortPipe(Pipe<T> input, int maxItemsInMemory, File tmpFolder, EncoderFactory<? super T> encoderFactory, DecoderFactory<T> decoderFactory, Comparator<T> comparator) {
        this(input, maxItemsInMemory, tmpFolder, encoderFactory, decoderFactory, comparator, Compression.ZSTD);
    }

    public SortPipe(Pipe<T> input, int maxItemsInMemory, File tmpFolder, CodecFactory<T> codecFactory, Comparator<T> comparator) {
        this(input, maxItemsInMemory, tmpFolder, codecFactory, codecFactory, comparator, Compression.ZSTD);
    }

    public SortPipe(Pipe<T> input, int maxItemsInMemory, CodecFactory<T> codecFactory, Comparator<T> comparator) {
        this(input, maxItemsInMemory, FileUtils.getSystemDefaultTmpFolder(), codecFactory, codecFactory, comparator, Compression.ZSTD);
    }

    public SortPipe(Pipe<T> input, Comparator<T> comparator) {
        this(input, Integer.MAX_VALUE, null, null, null, comparator, Compression.ZSTD);
    }

    @Override
    public void close() throws IOException {
        FileUtils.close(this.input, this.finalOutputPipe);
        for (File f : this.sortedFiles) {
            f.delete();
        }
    }

    @Override
    public T next() throws PipeException, InterruptedException {
        return this.finalOutputPipe.next();
    }

    @Override
    public T peek() throws PipeException {
        return this.finalOutputPipe.peek();
    }

    @Override
    public void start() throws PipeException, InterruptedException {
        this.input.start();
        while (true) {
            if (this.input.peek() != null && this.currentChunk.size() < this.maxItemsInMemory) {
                this.currentChunk.add(this.input.next());
                continue;
            }
            this.currentChunk.sort(this.comparator);
            if (this.input.peek() == null && this.sortedFiles.isEmpty()) {
                this.finalOutputPipe = new CollectionReaderPipe<T>(this.currentChunk);
                this.finalOutputPipe.start();
                return;
            }
            try {
                CollectionReaderPipe<T> sortedPipe = new CollectionReaderPipe<T>(this.currentChunk);
                File sortedFile = FileUtils.createTempFile("sort-chunk" + this.sortedFiles.size(), this.tempFilesCompression.getFileExtension(), this.tmpFolder);
                this.sortedFiles.add(sortedFile);
                try (BinFileWriterPipe<T> w = new BinFileWriterPipe<T>(sortedPipe, sortedFile, new FileWriteOptions().setCompression(this.tempFilesCompression), this.encoderFactory);){
                    w.start();
                }
                if (this.input.peek() == null) {
                    ArrayList<BinInputReaderPipe<T>> sortedPipes = new ArrayList<BinInputReaderPipe<T>>();
                    for (File f : this.sortedFiles) {
                        BinInputReaderPipe<T> p = new BinInputReaderPipe<T>(f, new FileReadOptions().setCompression(this.tempFilesCompression), this.decoderFactory);
                        sortedPipes.add(p);
                    }
                    this.finalOutputPipe = new SortedMergePipe<T>(sortedPipes, this.comparator);
                    this.finalOutputPipe.start();
                    return;
                }
                this.currentChunk.clear();
            }
            catch (IOException e) {
                throw new IOPipeException(e);
            }
        }
    }

    @Override
    public float getProgress() {
        return this.finalOutputPipe.getProgress();
    }
}

