/*
 * Decompiled with CFR 0.152.
 */
package org.pipecraft.pipes.utils.multi;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.function.Predicate;
import org.pipecraft.infra.io.FileUtils;
import org.pipecraft.infra.io.SizedInputStream;
import org.pipecraft.infra.math.ArithmeticUtils;
import org.pipecraft.infra.math.StaticJobScheduler;
import org.pipecraft.infra.storage.Bucket;
import org.pipecraft.pipes.sync.Pipe;
import org.pipecraft.pipes.sync.inter.CallbackPipe;
import org.pipecraft.pipes.utils.PipeSupplier;
import org.pipecraft.pipes.utils.ShardSpecifier;
import org.pipecraft.pipes.utils.multi.LocalMultiFileReaderConfig;
import org.pipecraft.pipes.utils.multi.StorageMultiFileReaderConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MultiFileReaderUtils {
    private static final Logger logger = LoggerFactory.getLogger(MultiFileReaderUtils.class);

    public static boolean isOwned(String filePath, ShardSpecifier shardSpecifier) {
        byte[] pathAsBin = filePath.getBytes(StandardCharsets.UTF_8);
        return ArithmeticUtils.getShardByStrongHash(pathAsBin, shardSpecifier.getShardCount()) == shardSpecifier.getShardIndex();
    }

    public static <T> Collection<File> getAllLocalFilesToRead(LocalMultiFileReaderConfig<T> config) throws IOException {
        Collection<File> filesToRead;
        if (config.isBalancedSharding()) {
            ShardSpecifier shard = config.getShardSpecifier();
            Collection<File> allFiles = MultiFileReaderUtils.filterLocalFiles(config.getPaths(), config.isRecursivePaths(), config.getFileFilter());
            StaticJobScheduler<File> balancer = new StaticJobScheduler<File>(allFiles, f -> f.length());
            List<Collection<File>> filePartition = balancer.schedule(shard.getShardCount());
            filesToRead = filePartition.get(shard.getShardIndex());
        } else {
            Predicate<File> filePredicate = config.getFileFilter();
            if (config.getShardSpecifier() != null) {
                filePredicate = filePredicate.and(f -> MultiFileReaderUtils.isOwned(f.getAbsolutePath(), config.getShardSpecifier()));
            }
            filesToRead = MultiFileReaderUtils.filterLocalFiles(config.getPaths(), config.isRecursivePaths(), filePredicate);
        }
        long totalVolumeBytes = filesToRead.stream().map(File::length).mapToLong(x -> x).sum();
        logger.debug("Current pipe is about to read " + filesToRead.size() + " files, totalling in " + totalVolumeBytes + " bytes");
        return filesToRead;
    }

    public static <T, B> Collection<B> getAllRemoteFilesToRead(StorageMultiFileReaderConfig<T, B> config) throws IOException {
        Collection<Object> filesToRead;
        if (config.isBalancedSharding()) {
            ShardSpecifier shard = config.getShardSpecifier();
            Collection<B> allFiles = MultiFileReaderUtils.filterRemoteFiles(config.getBucket(), config.getPaths(), config.isRecursivePaths(), config.getFileFilter());
            StaticJobScheduler<Object> balancer = new StaticJobScheduler<Object>(allFiles, f -> config.getBucket().getLength(f));
            List<Collection<Object>> filePartition = balancer.schedule(shard.getShardCount());
            filesToRead = filePartition.get(shard.getShardIndex());
        } else {
            Predicate<Object> filePredicate = config.getFileFilter();
            if (config.getShardSpecifier() != null) {
                Bucket bucket = config.getBucket();
                filePredicate = filePredicate.and(f -> MultiFileReaderUtils.isOwned(bucket.getPath(f), config.getShardSpecifier()));
            }
            filesToRead = MultiFileReaderUtils.filterRemoteFiles(config.getBucket(), config.getPaths(), config.isRecursivePaths(), filePredicate);
        }
        long totalVolumeBytes = filesToRead.stream().map(config.getBucket()::getLength).mapToLong(x -> x).sum();
        logger.debug("Current pipe is about to read " + filesToRead.size() + " remote files, totalling in " + totalVolumeBytes + " bytes");
        return filesToRead;
    }

    public static <T, B> List<PipeSupplier<T>> downloadAndGetReadPipes(Collection<B> files, Bucket<B> bucket, StorageMultiFileReaderConfig<T, B> config) throws IOException, InterruptedException {
        ArrayList<PipeSupplier<T>> pipeSuppliers = new ArrayList<PipeSupplier<T>>(files.size());
        HashMap<String, String> remotePathToLocalName = new HashMap<String, String>();
        for (B file : files) {
            String remotePath = bucket.getPath(file);
            String localPath = remotePath.replaceAll("/", "__");
            remotePathToLocalName.put(remotePath, localPath);
        }
        File downloadFolder = FileUtils.createTempFolder("download", config.getTmpFolder());
        logger.debug("Downloading " + files.size() + " files...");
        bucket.getAllRegularFilesByMetaInterruptibly(files, downloadFolder, remotePathToLocalName::get, config.getThreadNum());
        logger.debug("Done downloading " + files.size() + " files.");
        for (Object remoteFile : files) {
            String remotePath = bucket.getPath(remoteFile);
            String localPath = (String)remotePathToLocalName.get(remotePath);
            File f = new File(downloadFolder, localPath);
            pipeSuppliers.add(() -> {
                Pipe input = config.getPipeSupplier().get(new SizedInputStream(new FileInputStream(f), f.length()), remoteFile);
                return new CallbackPipe(input, f::delete);
            });
        }
        return pipeSuppliers;
    }

    private static Collection<File> filterLocalFiles(Collection<String> folderPaths, boolean isRecursive, Predicate<File> filePathPredicate) throws IOException {
        try {
            ArrayList<File> res = new ArrayList<File>();
            for (String folderPath : folderPaths) {
                File folder = new File(folderPath);
                if (!folder.isDirectory()) {
                    throw new IOException("'" + folder.getAbsolutePath() + "' isn't a folder or can't be read");
                }
                Path folderP = folder.toPath();
                Files.walk(folderP, isRecursive ? Integer.MAX_VALUE : 1, new FileVisitOption[0]).map(Path::toFile).filter(f -> !f.isDirectory()).forEach(f -> {
                    if (filePathPredicate.test((File)f)) {
                        res.add((File)f);
                    }
                });
            }
            return res;
        }
        catch (UncheckedIOException e) {
            throw e.getCause();
        }
    }

    private static <B> Collection<B> filterRemoteFiles(Bucket<B> bucket, Collection<String> folderPaths, boolean isRecursive, Predicate<B> filePathPredicate) throws IOException {
        try {
            ArrayList<B> res = new ArrayList<B>();
            for (String folderPath : folderPaths) {
                Iterator<B> it;
                Iterator<B> iterator = it = isRecursive ? bucket.listFilesRecursive(folderPath) : bucket.listFiles(folderPath);
                while (it.hasNext()) {
                    B blob = it.next();
                    if (!filePathPredicate.test(blob)) continue;
                    res.add(blob);
                }
            }
            return res;
        }
        catch (UncheckedIOException e) {
            throw e.getCause();
        }
    }
}

