/*
 * Decompiled with CFR 0.152.
 */
package org.gorpipe.spark;

import gorsat.Commands.CommandParseUtilities;
import gorsat.Commands.Processor;
import gorsat.process.PipeOptions;
import gorsat.process.SparkPipeInstance;
import java.io.IOException;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.spark.sql.SparkSession;
import org.gorpipe.gor.model.GorParallelQueryHandler;
import org.gorpipe.gor.monitor.GorMonitor;
import org.gorpipe.gor.session.GorRunner;
import org.gorpipe.spark.GorQueryRDD;
import org.gorpipe.spark.GorSparkSession;

public class GeneralSparkQueryHandler
implements GorParallelQueryHandler {
    GorSparkSession gpSession;
    boolean force = false;
    public static final String queue = "GOR_CLUSTER";
    String requestID;

    public GeneralSparkQueryHandler(GorSparkSession gorPipeSession) {
        if (gorPipeSession != null) {
            this.init(gorPipeSession);
        }
    }

    public GeneralSparkQueryHandler() {
    }

    public void init(GorSparkSession gorPipeSession) {
        this.gpSession = gorPipeSession;
        this.requestID = gorPipeSession.getRequestId();
    }

    private static boolean isSparkQuery(String lastQuery) {
        String lastQueryLower = lastQuery.toLowerCase();
        return lastQueryLower.startsWith("select ") || lastQueryLower.startsWith("spark ") || lastQueryLower.startsWith("gorspark ") || lastQueryLower.startsWith("norspark ") || lastQuery.contains("/*+");
    }

    public static String[] executeSparkBatch(GorSparkSession session, String projectDir, String cacheDir, String[] fingerprints, String[] commandsToExecute, String[] jobIds, String[] cacheFiles, String[] securityContext, Boolean[] allowToFail) {
        SparkSession sparkSession = session.getSparkSession();
        String redisUri = session.getRedisUri();
        String redisKey = session.streamKey();
        TreeSet sparkJobs = new TreeSet();
        TreeSet gorJobs = new TreeSet();
        Path root = Paths.get(projectDir, new String[0]);
        IntStream.range(0, commandsToExecute.length).forEach(i -> {
            Path cachePath = Paths.get(cacheFiles[i], new String[0]);
            if (!Files.exists(root.resolve(cachePath), new LinkOption[0])) {
                String command = commandsToExecute[i];
                String[] split = CommandParseUtilities.quoteSafeSplit((String)command, (char)';');
                if (split.length > 1 || GeneralSparkQueryHandler.isSparkQuery(command)) {
                    sparkJobs.add(i);
                } else {
                    gorJobs.add(i);
                }
            }
        });
        Callable<String[]> sparkRes = () -> {
            List ret = sparkJobs.parallelStream().map(i -> {
                Object cmd = commandsToExecute[i];
                String[] split = CommandParseUtilities.quoteSafeSplit((String)cmd, (char)';');
                String jobId = jobIds[i];
                Object lastCmd = split[split.length - 1];
                int firstSpace = ((String)lastCmd).indexOf(32);
                lastCmd = ((String)lastCmd).substring(0, firstSpace + 1) + "-j " + jobId + ((String)lastCmd).substring(firstSpace);
                cmd = split.length == 1 ? lastCmd : String.join((CharSequence)";", Arrays.copyOfRange(split, 0, split.length - 1)) + ";" + (String)lastCmd;
                String[] args = new String[]{cmd, "-queryhandler", "spark"};
                PipeOptions options = new PipeOptions();
                options.parseOptions(args);
                String cacheFile = cacheFiles[i];
                Path cachePath = Paths.get(cacheFile, new String[0]);
                if (!cachePath.isAbsolute()) {
                    cachePath = root.resolve(cacheFile);
                }
                SparkPipeInstance pi = new SparkPipeInstance(session.getGorContext(), cachePath.toString());
                try {
                    pi.subProcessArguments(options);
                    pi.theInputSource().pushdownWrite(cacheFile);
                    GorRunner runner = (GorRunner)session.getSystemContext().getRunnerFactory().create();
                    try {
                        runner.run(pi.getIterator(), (Processor)pi.getPipeStep());
                    }
                    catch (Exception e) {
                        block17: {
                            try {
                                if (!Files.exists(cachePath, new LinkOption[0])) break block17;
                                try (Stream<Path> fwalk = Files.walk(cachePath, new FileVisitOption[0]).sorted(Comparator.reverseOrder());){
                                    fwalk.forEach(path -> {
                                        try {
                                            Files.delete(path);
                                        }
                                        catch (IOException iOException) {
                                            // empty catch block
                                        }
                                    });
                                }
                            }
                            catch (IOException iOException) {
                                // empty catch block
                            }
                        }
                        Exception exception = e;
                        pi.close();
                        return exception;
                    }
                    String string = cacheFile;
                    return string;
                }
                finally {
                    try {
                        pi.close();
                    }
                    catch (Throwable throwable) {
                        Throwable throwable2;
                        throwable2.addSuppressed(throwable);
                    }
                }
            }).collect(Collectors.toList());
            Optional<Exception> oe = ret.stream().filter(o -> o instanceof Exception).map(o -> (Exception)o).findFirst();
            if (oe.isPresent()) {
                throw oe.get();
            }
            return (String[])ret.stream().map(s -> (String)s).toArray(String[]::new);
        };
        Callable<String[]> otherRes = () -> {
            String[] newCommands = new String[gorJobs.size()];
            String[] newFingerprints = new String[gorJobs.size()];
            String[] newCacheFiles = new String[gorJobs.size()];
            String[] newJobIds = new String[gorJobs.size()];
            String[] newSecCtx = new String[gorJobs.size()];
            Boolean[] newAllow = new Boolean[gorJobs.size()];
            int k = 0;
            Iterator iterator = gorJobs.iterator();
            while (iterator.hasNext()) {
                int i = (Integer)iterator.next();
                newCommands[k] = commandsToExecute[i];
                newFingerprints[k] = fingerprints[i];
                newJobIds[k] = jobIds[i];
                newCacheFiles[k] = cacheFiles[i];
                newSecCtx[k] = securityContext[i];
                newAllow[k] = allowToFail[i];
                ++k;
            }
            GorQueryRDD queryRDD = new GorQueryRDD(sparkSession, newCommands, newFingerprints, newCacheFiles, projectDir, cacheDir, session.getProjectContext().getGorConfigFile(), session.getProjectContext().getGorAliasFile(), newJobIds, newSecCtx, newAllow, redisUri, redisKey);
            return queryRDD.collect();
        };
        try {
            String cmds = String.join((CharSequence)" ", commandsToExecute);
            sparkSession.sparkContext().setJobDescription(cmds);
            if (sparkJobs.size() == 0 && gorJobs.size() > 0) {
                otherRes.call();
            } else if (gorJobs.size() == 0 && sparkJobs.size() > 0) {
                sparkRes.call();
            } else if (sparkJobs.size() > 0) {
                ExecutorService executor = Executors.newFixedThreadPool(2);
                List<Callable> callables = Arrays.asList(sparkRes, otherRes);
                executor.invokeAll(callables).forEach(future -> {
                    try {
                        future.get();
                    }
                    catch (Exception e) {
                        throw new IllegalStateException(e);
                    }
                });
            }
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        return (String[])IntStream.range(0, fingerprints.length).mapToObj(i -> jobIds[i] + "\t" + fingerprints[i] + "\t" + cacheFiles[i]).toArray(String[]::new);
    }

    public String[] executeBatch(String[] fingerprints, String[] commandsToExecute, String[] batchGroupNames, String[] cacheFiles, GorMonitor mon) {
        String projectDir = this.gpSession.getProjectContext().getProjectRoot();
        String cacheDir = this.gpSession.getProjectContext().getCacheDir();
        String secCtx = this.gpSession.getProjectContext().getFileReader().getSecurityContext();
        ArrayList securityContext = new ArrayList();
        ArrayList cacheFileList = new ArrayList();
        ArrayList allowToFail = new ArrayList();
        IntStream.range(0, commandsToExecute.length).forEach(i -> {
            Object cachePath;
            String command = commandsToExecute[i];
            String[] cmdsplit = CommandParseUtilities.quoteSafeSplit((String)command, (char)'|');
            String lastCmd = cmdsplit[cmdsplit.length - 1].trim();
            if (lastCmd.toLowerCase().startsWith("write ")) {
                String[] lastCmdSplit = lastCmd.split(" ");
                cachePath = lastCmdSplit[lastCmdSplit.length - 1];
            } else {
                cachePath = cacheDir + "/" + fingerprints[i] + CommandParseUtilities.getExtensionForQuery((String)command, (boolean)false);
            }
            cacheFileList.add(cachePath);
            securityContext.add(secCtx);
            allowToFail.add(batchGroupNames[i].contains("_af"));
        });
        String[] jobIds = Arrays.copyOf(fingerprints, fingerprints.length);
        String[] res = GeneralSparkQueryHandler.executeSparkBatch(this.gpSession, projectDir, cacheDir, fingerprints, commandsToExecute, jobIds, cacheFileList.toArray(new String[0]), securityContext.toArray(new String[0]), allowToFail.toArray(new Boolean[0]));
        return (String[])Arrays.stream(res).map(s -> s.split("\t")[2]).toArray(String[]::new);
    }

    public void setForce(boolean force) {
        this.force = force;
    }

    public void setQueryTime(Long time) {
        throw new UnsupportedOperationException("setQueryTime not supported");
    }

    public long getWaitTime() {
        return 0L;
    }
}

