/*
 * Decompiled with CFR 0.152.
 */
package org.gorpipe.spark.redis;

import com.fasterxml.jackson.databind.ObjectMapper;
import gorsat.Commands.CommandParseUtilities;
import gorsat.process.FreemarkerReportBuilder;
import gorsat.process.GenericRunnerFactory;
import gorsat.process.GorSessionCacheManager;
import gorsat.process.PipeOptions;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.spark.api.java.function.VoidFunction2;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.gorpipe.client.FileCache;
import org.gorpipe.gor.clients.LocalFileCacheClient;
import org.gorpipe.gor.model.DriverBackedFileReader;
import org.gorpipe.gor.model.FileReader;
import org.gorpipe.gor.model.GorParallelQueryHandler;
import org.gorpipe.gor.session.GorContext;
import org.gorpipe.gor.session.GorReportBuilder;
import org.gorpipe.gor.session.GorRunnerFactory;
import org.gorpipe.gor.session.GorSession;
import org.gorpipe.gor.session.GorSessionCache;
import org.gorpipe.gor.session.ProjectContext;
import org.gorpipe.gor.session.SystemContext;
import org.gorpipe.spark.GeneralSparkQueryHandler;
import org.gorpipe.spark.GorQueryRDD;
import org.gorpipe.spark.GorSparkSession;
import org.gorpipe.spark.redis.MonitorThread;
import org.gorpipe.spark.redis.SparkGorQuery;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import py4j.Base64;

public class RedisBatchConsumer
implements VoidFunction2<Dataset<Row>, Long>,
AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(RedisBatchConsumer.class);
    private static final String DEFAULT_CACHE_DIR = "result_cache";
    GorSparkSession gss;
    SystemContext sysctx;
    MonitorThread mont;
    ExecutorService es;

    public RedisBatchConsumer(SparkSession sparkSession, String redisUri) {
        log.info("Starting RedisBatchConsumer on redisUri " + redisUri);
        this.gss = new GorSparkSession("", 0);
        this.gss.setSparkSession(sparkSession);
        this.gss.redisUri_$eq(redisUri);
        SystemContext.Builder systemContextBuilder = new SystemContext.Builder();
        this.sysctx = systemContextBuilder.setReportBuilder((GorReportBuilder)new FreemarkerReportBuilder((GorSession)this.gss)).setRunnerFactory((GorRunnerFactory)new GenericRunnerFactory()).setServer(true).setStartTime(System.currentTimeMillis()).build();
        this.es = Executors.newWorkStealingPool(4);
        log.info("Starting monitorthread");
        this.mont = new MonitorThread(redisUri);
        this.es.submit(this.mont);
        log.info("Monitorthread submitted");
    }

    @Override
    public void close() {
        log.info("Closing RedisBatchConsumer");
        this.mont.stopRunning();
        this.es.shutdown();
    }

    public Future<List<String>> runGorJobs(String projectDirStr, Set<Integer> gorJobs, String[] queries, String[] fingerprints, String[] jobIds, String[] cachefiles, String[] secCtxs) {
        String[] newCommands = new String[gorJobs.size()];
        String[] newFingerprints = new String[gorJobs.size()];
        String[] newCacheFiles = new String[gorJobs.size()];
        String[] newJobIds = new String[gorJobs.size()];
        String[] newSecCtxs = new String[gorJobs.size()];
        int k = 0;
        for (int i : gorJobs) {
            newCommands[k] = queries[i];
            newFingerprints[k] = fingerprints[i];
            newJobIds[k] = jobIds[i].substring(jobIds[i].lastIndexOf(58) + 1);
            newCacheFiles[k] = cachefiles[i];
            newSecCtxs[k] = secCtxs[i];
            ++k;
        }
        String configFile = this.gss.getProjectContext() != null ? this.gss.getProjectContext().getGorConfigFile() : null;
        String aliasFile = this.gss.getProjectContext() != null ? this.gss.getProjectContext().getGorAliasFile() : null;
        GorQueryRDD gorQueryRDD = new GorQueryRDD(this.gss.sparkSession(), newCommands, newFingerprints, newCacheFiles, projectDirStr, DEFAULT_CACHE_DIR, configFile, aliasFile, newJobIds, newSecCtxs, this.gss.redisUri());
        return gorQueryRDD.toJavaRDD().collectAsync();
    }

    public Future<List<String>> runSparkJob(String projectDirStr, String[] creates, String cmd, String jobId, String cacheFile, String securityContext) {
        log.info("Running spark job " + jobId + ": " + (String)cmd);
        String shortJobId = jobId.substring(jobId.lastIndexOf(58) + 1);
        String cacheDir = DEFAULT_CACHE_DIR;
        String configFile = System.getProperty("gor.project.config.path", "config/gor_config.txt");
        String aliasFile = System.getProperty("gor.project.alias.path", "config/gor_standard_aliases.txt");
        Path projectPath = Paths.get(projectDirStr, new String[0]);
        configFile = projectPath.resolve(configFile).toAbsolutePath().normalize().toString();
        aliasFile = projectPath.resolve(aliasFile).toAbsolutePath().normalize().toString();
        GeneralSparkQueryHandler queryHandler = new GeneralSparkQueryHandler(this.gss, this.gss.redisUri());
        ProjectContext.Builder projectContextBuilder = new ProjectContext.Builder();
        ProjectContext prjctx = projectContextBuilder.setRoot(projectDirStr + securityContext).setCacheDir(cacheDir).setFileReader((FileReader)new DriverBackedFileReader(securityContext, projectDirStr, null)).setConfigFile(configFile).setAliasFile(aliasFile).setQueryHandler((GorParallelQueryHandler)queryHandler).setFileCache((FileCache)new LocalFileCacheClient(projectPath.resolve(cacheDir))).build();
        GorSessionCache cache = GorSessionCacheManager.getCache((String)this.gss.getRequestId());
        this.gss.init(prjctx, this.sysctx, cache);
        GorContext context = new GorContext((GorSession)this.gss);
        int firstSpace = ((String)cmd).indexOf(32);
        cmd = ((String)cmd).substring(0, firstSpace + 1) + "-j " + shortJobId + ((String)cmd).substring(firstSpace);
        Object query = creates.length > 0 ? String.join((CharSequence)";", creates) + ";" + (String)cmd : cmd;
        cmd = this.gss.replaceAliases((String)query);
        String[] args = new String[]{cmd, "-queryhandler", "spark"};
        PipeOptions options = new PipeOptions();
        options.parseOptions(args);
        SparkGorQuery sgq = new SparkGorQuery(context, (String)cmd, cacheFile);
        return this.es.submit(sgq);
    }

    public Map<String, Future<List<String>>> runJobBatch(List<String[]> lstr) {
        Optional<String> projectDir = lstr.stream().map(l -> l[2]).findFirst();
        HashMap<String, Future<List<String>>> futList = new HashMap<String, Future<List<String>>>();
        if (projectDir.isPresent()) {
            String projectDirStr = projectDir.get();
            String[] queries = (String[])lstr.stream().map(l -> l[0]).toArray(String[]::new);
            String[] fingerprints = (String[])lstr.stream().map(l -> l[1]).toArray(String[]::new);
            String[] cachefiles = (String[])lstr.stream().map(l -> l[5]).toArray(String[]::new);
            String[] jobIds = (String[])lstr.stream().map(l -> l[4]).toArray(String[]::new);
            String[] securityCtxs = (String[])lstr.stream().map(l -> l[6]).toArray(String[]::new);
            this.mont.setValue(jobIds, "status", "RUNNING");
            TreeSet<Integer> gorJobs = new TreeSet<Integer>();
            for (int i2 = 0; i2 < queries.length; ++i2) {
                String cmd = queries[i2];
                String jobId = jobIds[i2];
                String cachefile = cachefiles[i2];
                String securityContext = securityCtxs[i2];
                String[] cmdSplit = CommandParseUtilities.quoteSafeSplit((String)cmd, (char)';');
                String lastCommand = cmdSplit[cmdSplit.length - 1].trim();
                String lastCommandUpper = lastCommand.toUpperCase();
                if (lastCommandUpper.startsWith("SELECT ") || lastCommandUpper.startsWith("SPARK ") || lastCommandUpper.startsWith("GORSPARK ") || lastCommandUpper.startsWith("NORSPARK ")) {
                    Future<List<String>> fut = this.runSparkJob(projectDirStr, Arrays.copyOfRange(cmdSplit, 0, cmdSplit.length - 1), lastCommand, jobId, cachefile, securityContext);
                    futList.put(jobId, fut);
                    continue;
                }
                gorJobs.add(i2);
            }
            if (gorJobs.size() > 0) {
                Future<List<String>> fut = this.runGorJobs(projectDirStr, gorJobs, queries, fingerprints, jobIds, cachefiles, securityCtxs);
                String jobIdStr = gorJobs.stream().map(i -> jobIds[i]).collect(Collectors.joining(","));
                futList.put(jobIdStr, fut);
            }
        }
        return futList;
    }

    public void call(Dataset<Row> v1, Long v2) {
        List rr = v1.collectAsList();
        log.info("Received batch of " + rr.size());
        List<String[]> lstr = rr.stream().filter(r -> r.getString(2).equals("payload")).map(r -> {
            String jobid = r.getString(1);
            String value = r.getString(3);
            String securityContext = r.getString(4);
            ObjectMapper om = new ObjectMapper();
            try {
                String tmpcacheFile;
                String mvalue = value.substring(1, value.length() - 1);
                Map map = (Map)om.readValue(mvalue, Map.class);
                String gorquerybase = (String)map.get("query");
                String fingerprint = (String)map.get("fingerprint");
                String projectRoot = (String)map.get("projectRoot");
                String requestId = (String)map.get("request-id");
                String gorquery = new String(Base64.decode((String)gorquerybase));
                Object cachefile = "result_cache/" + fingerprint + CommandParseUtilities.getExtensionForQuery((String)gorquery, (boolean)false);
                if (map.containsKey("outfile") && (tmpcacheFile = (String)map.get("outfile")) != null) {
                    cachefile = tmpcacheFile;
                }
                return new String[]{gorquery, fingerprint, projectRoot, requestId, jobid, cachefile, securityContext};
            }
            catch (IOException e) {
                log.error("Error when parsing redis json", (Throwable)e);
                return new String[0];
            }
        }).collect(Collectors.toList());
        Map<String, Future<List<String>>> futMap = this.runJobBatch(lstr);
        futMap.forEach((key, value) -> this.mont.addJob((String)key, (Future<List<String>>)value));
    }

    public static void main(String[] args) {
        String redisUrl = args[0];
        String requestId = args[1];
        String projectDir = args[2];
        String queries = args[3];
        String fingerprints = args[4];
        String cachefiles = args[5];
        String jobids = args[6];
        SparkSession.Builder sb = new SparkSession.Builder();
        try (SparkSession sparkSession = sb.getOrCreate();
             RedisBatchConsumer redisBatchConsumer = new RedisBatchConsumer(sparkSession, redisUrl);){
            String[] querySplit = queries.split(";;");
            String[] fingerprintSplit = fingerprints.split(";");
            String[] cachefileSplit = cachefiles.split(";");
            String[] jobidSplit = jobids.split(";");
            List<String[]> lstr = IntStream.range(0, fingerprintSplit.length).mapToObj(i -> new String[]{querySplit[i], fingerprintSplit[i], projectDir, requestId, jobidSplit[i], cachefileSplit[i]}).collect(Collectors.toList());
            Map<String, Future<List<String>>> futMap = redisBatchConsumer.runJobBatch(lstr);
            log.info("Number of batches " + futMap.size());
            for (Future<List<String>> f : futMap.values()) {
                f.get();
            }
            log.info("Finised running all batches");
        }
        catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }
}

