/*
 * Decompiled with CFR 0.152.
 */
package org.gorpipe.spark.redis;

import com.fasterxml.jackson.databind.ObjectMapper;
import gorsat.Commands.CommandParseUtilities;
import gorsat.process.PipeOptions;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.apache.spark.api.java.JavaFutureAction;
import org.apache.spark.api.java.function.VoidFunction2;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.gorpipe.gor.session.GorContext;
import org.gorpipe.gor.session.GorSession;
import org.gorpipe.gor.session.ProjectContext;
import org.gorpipe.gor.session.SystemContext;
import org.gorpipe.spark.GorQueryRDD;
import org.gorpipe.spark.GorSparkSession;
import org.gorpipe.spark.redis.MonitorThread;
import org.gorpipe.spark.redis.SparkGorQuery;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import py4j.Base64;

public class RedisBatchConsumer
implements VoidFunction2<Dataset<Row>, Long>,
AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(RedisBatchConsumer.class);
    GorSparkSession gss;
    SystemContext sysctx;
    MonitorThread mont;
    ExecutorService es;

    public RedisBatchConsumer(SparkSession sparkSession, String redisUri) {
        log.info("Starting RedisBatchConsumer on redisUri " + redisUri);
        this.gss = new GorSparkSession("");
        this.gss.setSparkSession(sparkSession);
        this.gss.redisUri_$eq(redisUri);
        SystemContext.Builder systemContextBuilder = new SystemContext.Builder();
        this.sysctx = systemContextBuilder.setServer(true).setStartTime(System.currentTimeMillis()).build();
        this.es = Executors.newWorkStealingPool(4);
        log.info("Starting monitorthread");
        this.mont = new MonitorThread(redisUri);
        this.es.submit(this.mont);
        log.info("Monitorthread submitted");
    }

    @Override
    public void close() {
        log.info("Closing RedisBatchConsumer");
        this.mont.stopRunning();
        this.es.shutdown();
    }

    public void runGorJobs(String projectDirStr, Set<Integer> gorJobs, String[] queries, String[] fingerprints, String[] jobIds, String[] cachefiles) {
        String jobIdStr = String.join((CharSequence)",", jobIds);
        String[] newCommands = new String[gorJobs.size()];
        String[] newFingerprints = new String[gorJobs.size()];
        String[] newCacheFiles = new String[gorJobs.size()];
        String[] newJobIds = new String[gorJobs.size()];
        int k = 0;
        for (int i : gorJobs) {
            newCommands[k] = queries[i];
            newFingerprints[k] = fingerprints[i];
            newJobIds[k] = jobIds[i].substring(jobIds[i].lastIndexOf(58) + 1);
            newCacheFiles[k] = cachefiles[i];
            ++k;
        }
        String configFile = this.gss.getProjectContext() != null ? this.gss.getProjectContext().getGorConfigFile() : null;
        String aliasFile = this.gss.getProjectContext() != null ? this.gss.getProjectContext().getGorAliasFile() : null;
        GorQueryRDD gorQueryRDD = new GorQueryRDD(this.gss.sparkSession(), newCommands, newFingerprints, newCacheFiles, projectDirStr, "result_cache", configFile, aliasFile, newJobIds, this.gss.redisUri());
        JavaFutureAction fut = gorQueryRDD.toJavaRDD().collectAsync();
        this.mont.addJob(jobIdStr, (Future<List<String>>)fut);
    }

    public void runSparkJob(String projectDirStr, String cmd, String jobId, String cacheFile) {
        log.info("Running spark job " + jobId + ": " + cmd);
        String shortJobId = jobId.substring(jobId.lastIndexOf(58) + 1);
        int firstSpace = cmd.indexOf(32);
        cmd = cmd.substring(0, firstSpace + 1) + "-j " + shortJobId + cmd.substring(firstSpace);
        String[] args = new String[]{cmd, "-queryhandler", "spark"};
        PipeOptions options = new PipeOptions();
        options.parseOptions(args);
        ProjectContext.Builder projectContextBuilder = new ProjectContext.Builder();
        ProjectContext prjctx = projectContextBuilder.setRoot(projectDirStr).setCacheDir("result_cache").setConfigFile(null).build();
        this.gss.init(prjctx, this.sysctx, null);
        GorContext context = new GorContext((GorSession)this.gss);
        SparkGorQuery sgq = new SparkGorQuery(context, cmd, cacheFile);
        Future<List<String>> fut = this.es.submit(sgq);
        this.mont.addJob(jobId, fut);
    }

    public void call(Dataset<Row> v1, Long v2) {
        List rr = v1.collectAsList();
        log.info("Received batch of " + rr.size());
        List lstr = rr.stream().filter(r -> r.getString(2).equals("payload")).map(r -> {
            String jobid = r.getString(1);
            String value = r.getString(3);
            ObjectMapper om = new ObjectMapper();
            try {
                String tmpcacheFile;
                String mvalue = value.substring(1, value.length() - 1);
                Map map = (Map)om.readValue(mvalue, Map.class);
                String gorquerybase = (String)map.get("query");
                String fingerprint = (String)map.get("fingerprint");
                String projectRoot = (String)map.get("projectRoot");
                String requestId = (String)map.get("request-id");
                String gorquery = new String(Base64.decode((String)gorquerybase));
                String cachefile = "result_cache/" + fingerprint + CommandParseUtilities.getExtensionForQuery((String)gorquery, (boolean)false);
                if (map.containsKey("cachefile") && (tmpcacheFile = (String)map.get("cachefile")) != null) {
                    cachefile = tmpcacheFile;
                }
                return new String[]{gorquery, fingerprint, projectRoot, requestId, jobid, cachefile};
            }
            catch (IOException e) {
                log.error("Error when parsing redis json", (Throwable)e);
                return new String[0];
            }
        }).collect(Collectors.toList());
        Optional<String> projectDir = lstr.stream().map(l -> l[2]).findFirst();
        if (projectDir.isPresent()) {
            String projectDirStr = projectDir.get();
            String[] queries = (String[])lstr.stream().map(l -> l[0]).toArray(String[]::new);
            String[] fingerprints = (String[])lstr.stream().map(l -> l[1]).toArray(String[]::new);
            String[] cachefiles = (String[])lstr.stream().map(l -> l[5]).toArray(String[]::new);
            String[] jobIds = (String[])lstr.stream().map(l -> l[4]).toArray(String[]::new);
            this.mont.setValue(jobIds, "status", "RUNNING");
            TreeSet<Integer> gorJobs = new TreeSet<Integer>();
            for (int i = 0; i < queries.length; ++i) {
                String cmd = queries[i];
                String commandUpper = cmd.toUpperCase();
                if (commandUpper.startsWith("SELECT ") || commandUpper.startsWith("SPARK ") || commandUpper.startsWith("GORSPARK ") || commandUpper.startsWith("NORSPARK ")) {
                    this.runSparkJob(projectDirStr, cmd, jobIds[i], cachefiles[i]);
                    continue;
                }
                gorJobs.add(i);
            }
            if (gorJobs.size() > 0) {
                this.runGorJobs(projectDirStr, gorJobs, queries, fingerprints, jobIds, cachefiles);
            }
        }
    }
}

