/*
 * Decompiled with CFR 0.152.
 */
package org.gorpipe.spark.redis;

import com.fasterxml.jackson.databind.ObjectMapper;
import gorsat.Commands.CommandParseUtilities;
import gorsat.Commands.Processor;
import gorsat.process.GenericGorRunner;
import gorsat.process.PipeInstance;
import gorsat.process.PipeOptions;
import java.io.IOException;
import java.io.Serializable;
import java.time.Duration;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.spark.api.java.JavaFutureAction;
import org.apache.spark.api.java.function.VoidFunction2;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.gorpipe.exceptions.ExceptionUtilities;
import org.gorpipe.gor.session.GorContext;
import org.gorpipe.gor.session.GorSession;
import org.gorpipe.gor.session.ProjectContext;
import org.gorpipe.gor.session.SystemContext;
import org.gorpipe.model.gor.iterators.RowSource;
import org.gorpipe.spark.GorQueryRDD;
import org.gorpipe.spark.GorSparkSession;
import org.gorpipe.spark.GorSparkUtilities;
import org.gorpipe.spark.platform.JedisURIHelper;
import org.gorpipe.spark.platform.SharedRedisPools;
import py4j.Base64;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

public class GorSparkRedisRunner
implements Callable<String> {
    public static GorSparkRedisRunner instance;
    private SparkSession sparkSession;
    private String redisUri;
    private JedisPool jedisPool;
    private Map<String, Future<List<String>>> futureActionSet;

    public GorSparkRedisRunner(GorSparkSession sparkSession) {
        this.init(sparkSession.getSparkSession());
    }

    public GorSparkRedisRunner() {
        this.init(GorSparkUtilities.getSparkSession());
    }

    public void init(SparkSession sparkSession) {
        instance = this;
        this.sparkSession = sparkSession;
        this.redisUri = GorSparkUtilities.getSparkGorRedisUri();
        this.futureActionSet = new ConcurrentHashMap<String, Future<List<String>>>();
        try {
            this.jedisPool = SharedRedisPools.getJedisPool(JedisURIHelper.create(this.redisUri));
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public String call() throws Exception {
        ExecutorService es = Executors.newWorkStealingPool(4);
        MonitorThread mont = new MonitorThread();
        es.submit(mont);
        GorSparkSession gss = new GorSparkSession("");
        gss.setSparkSession(this.sparkSession);
        gss.redisUri_$eq(this.redisUri);
        SystemContext.Builder systemContextBuilder = new SystemContext.Builder();
        SystemContext sysctx = systemContextBuilder.setServer(true).setStartTime(System.currentTimeMillis()).build();
        StructField[] fields = new StructField[]{StructField.apply((String)"_id", (DataType)DataTypes.StringType, (boolean)true, (Metadata)Metadata.empty()), StructField.apply((String)"job", (DataType)DataTypes.StringType, (boolean)true, (Metadata)Metadata.empty()), StructField.apply((String)"field", (DataType)DataTypes.StringType, (boolean)true, (Metadata)Metadata.empty()), StructField.apply((String)"value", (DataType)DataTypes.StringType, (boolean)true, (Metadata)Metadata.empty())};
        StructType schema = new StructType(fields);
        StreamingQuery query = this.sparkSession.readStream().format("redis").option("stream.keys", "resque").schema(schema).load().writeStream().outputMode("update").foreachBatch((VoidFunction2 & Serializable)(v1, v2) -> {
            List rr = v1.collectAsList();
            List lstr = rr.stream().filter(r -> r.getString(2).equals("payload")).map(r -> {
                String jobid = r.getString(1);
                String value = r.getString(3);
                ObjectMapper om = new ObjectMapper();
                try {
                    String tmpcacheFile;
                    String mvalue = value.substring(1, value.length() - 1);
                    Map map = (Map)om.readValue(mvalue, Map.class);
                    String gorquerybase = (String)map.get("query");
                    String fingerprint = (String)map.get("fingerprint");
                    String projectRoot = (String)map.get("projectRoot");
                    String requestId = (String)map.get("request-id");
                    String gorquery = new String(Base64.decode((String)gorquerybase));
                    String cachefile = "result_cache/" + fingerprint + CommandParseUtilities.getExtensionForQuery((String)gorquery, (boolean)false);
                    if (map.containsKey("cachefile") && (tmpcacheFile = (String)map.get("cachefile")) != null) {
                        cachefile = tmpcacheFile;
                    }
                    return new String[]{gorquery, fingerprint, projectRoot, requestId, jobid, cachefile};
                }
                catch (IOException e) {
                    e.printStackTrace();
                    return new String[0];
                }
            }).collect(Collectors.toList());
            Optional<String> projectDir = lstr.stream().map(l -> l[2]).findFirst();
            if (projectDir.isPresent()) {
                String projectDirStr = projectDir.get();
                String[] queries = (String[])lstr.stream().map(l -> l[0]).toArray(String[]::new);
                String[] fingerprints = (String[])lstr.stream().map(l -> l[1]).toArray(String[]::new);
                String[] cachefiles = (String[])lstr.stream().map(l -> l[5]).toArray(String[]::new);
                String[] jobIds = (String[])lstr.stream().map(l -> l[4]).toArray(String[]::new);
                String jobIdStr = lstr.stream().map(l -> l[4]).collect(Collectors.joining(","));
                this.setValue(jobIds, "status", "RUNNING");
                TreeSet<Integer> gorJobs = new TreeSet<Integer>();
                for (int i = 0; i < queries.length; ++i) {
                    String cmd = queries[i];
                    String commandUpper = cmd.toUpperCase();
                    if (commandUpper.startsWith("SELECT ") || commandUpper.startsWith("SPARK ") || commandUpper.startsWith("GORSPARK ") || commandUpper.startsWith("NORSPARK ")) {
                        String jobId = jobIds[i];
                        String shortJobId = jobId.substring(jobId.lastIndexOf(58) + 1);
                        int firstSpace = cmd.indexOf(32);
                        cmd = cmd.substring(0, firstSpace + 1) + "-j " + shortJobId + cmd.substring(firstSpace);
                        String[] args = new String[]{cmd, "-queryhandler", "spark"};
                        PipeOptions options = new PipeOptions();
                        options.parseOptions(args);
                        ProjectContext.Builder projectContextBuilder = new ProjectContext.Builder();
                        ProjectContext prjctx = projectContextBuilder.setRoot(projectDirStr).setCacheDir("result_cache").setConfigFile(null).build();
                        gss.init(prjctx, sysctx, null);
                        GorContext context = new GorContext((GorSession)gss);
                        PipeInstance pi = new PipeInstance(context);
                        String cacheFile = cachefiles[i];
                        pi.init(cmd, false, "");
                        pi.theInputSource().pushdownWrite(cacheFile);
                        SparkGorQuery sgq = new SparkGorQuery(pi.getIterator(), (Processor)pi.getPipeStep(), cacheFile);
                        Future<List<String>> fut = es.submit(sgq);
                        this.futureActionSet.put(jobId, fut);
                        continue;
                    }
                    gorJobs.add(i);
                }
                if (gorJobs.size() > 0) {
                    String[] newCommands = new String[gorJobs.size()];
                    String[] newFingerprints = new String[gorJobs.size()];
                    String[] newCacheFiles = new String[gorJobs.size()];
                    String[] newJobIds = new String[gorJobs.size()];
                    int k = 0;
                    Iterator firstSpace = gorJobs.iterator();
                    while (firstSpace.hasNext()) {
                        int i = (Integer)firstSpace.next();
                        newCommands[k] = queries[i];
                        newFingerprints[k] = fingerprints[i];
                        newJobIds[k] = jobIds[i].substring(jobIds[i].lastIndexOf(58) + 1);
                        newCacheFiles[k] = cachefiles[i];
                        ++k;
                    }
                    String configFile = gss.getProjectContext() != null ? gss.getProjectContext().getGorConfigFile() : null;
                    String aliasFile = gss.getProjectContext() != null ? gss.getProjectContext().getGorAliasFile() : null;
                    GorQueryRDD gorQueryRDD = new GorQueryRDD(this.sparkSession, newCommands, newFingerprints, newCacheFiles, projectDirStr, "result_cache", configFile, aliasFile, newJobIds, this.redisUri);
                    JavaFutureAction fut = gorQueryRDD.toJavaRDD().collectAsync();
                    this.futureActionSet.put(jobIdStr, (Future<List<String>>)fut);
                }
            }
        }).start();
        query.awaitTermination();
        mont.stopRunning();
        es.shutdown();
        return "";
    }

    public void setValue(String[] jobIds, String field, String value) {
        try (Jedis jedis = this.jedisPool.getResource();){
            for (String jobId : jobIds) {
                jedis.hset(jobId, field, value);
                jedis.expire(jobId, (int)this.getJobExpiration().getSeconds());
            }
        }
    }

    public void setValues(String[] jobIds, String field, String[] values) {
        try (Jedis jedis = this.jedisPool.getResource();){
            for (int i = 0; i < jobIds.length; ++i) {
                String jobId = jobIds[i];
                String value = values[i];
                jedis.hset(jobId, field, value);
                jedis.expire(jobId, (int)this.getJobExpiration().getSeconds());
            }
        }
    }

    public Duration getJobExpiration() {
        return Duration.ofMinutes(20L);
    }

    public static void main(String[] args) {
        GorSparkRedisRunner grr = new GorSparkRedisRunner();
        try {
            grr.call();
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    class SparkGorQuery
    implements Callable<List<String>> {
        GenericGorRunner genericGorRunner = new GenericGorRunner();
        RowSource iterator;
        Processor processor;
        String cachefile;

        public SparkGorQuery(RowSource iterator, Processor processor, String cachefile) {
            this.iterator = iterator;
            this.processor = processor;
            this.cachefile = cachefile;
        }

        @Override
        public List<String> call() throws Exception {
            this.genericGorRunner.run(this.iterator, this.processor);
            return Collections.singletonList("a\tb\t" + this.cachefile);
        }
    }

    class MonitorThread
    implements Runnable {
        boolean running = true;

        MonitorThread() {
        }

        public void stopRunning() {
            this.running = false;
        }

        @Override
        public void run() {
            try {
                String reskey = null;
                while (this.running) {
                    for (String key : GorSparkRedisRunner.this.futureActionSet.keySet()) {
                        Future fut = (Future)GorSparkRedisRunner.this.futureActionSet.get(key);
                        String[] jobIds = key.split(",");
                        try {
                            List res = (List)fut.get(500L, TimeUnit.MILLISECONDS);
                            reskey = key;
                            String[] cacheFiles = (String[])res.stream().map(s -> s.split("\t")).map(s -> s[2]).toArray(String[]::new);
                            GorSparkRedisRunner.this.setValues(jobIds, "result", cacheFiles);
                            GorSparkRedisRunner.this.setValue(jobIds, "status", "DONE");
                            break;
                        }
                        catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        catch (ExecutionException e) {
                            reskey = key;
                            GorSparkRedisRunner.this.setValue(jobIds, "error", ExceptionUtilities.gorExceptionToJson((Throwable)e.getCause()));
                            GorSparkRedisRunner.this.setValue(jobIds, "status", "FAILED");
                            break;
                        }
                        catch (TimeoutException timeoutException) {
                        }
                    }
                    if (reskey != null) {
                        GorSparkRedisRunner.this.futureActionSet.remove(reskey);
                        reskey = null;
                    }
                    if (!GorSparkRedisRunner.this.futureActionSet.isEmpty()) continue;
                    Thread.sleep(500L);
                }
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

