/*
 * Decompiled with CFR 0.152.
 */
package org.gorpipe.spark;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.spark.SparkConf;
import org.apache.spark.api.python.Py4JServer;
import org.apache.spark.api.r.RAuthHelper;
import org.apache.spark.api.r.RBackend;
import org.apache.spark.ml.linalg.SQLDataTypes;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.gorpipe.gor.monitor.GorMonitor;
import org.gorpipe.spark.SparkGorMonitor;
import org.gorpipe.spark.SparkGorMonitorFactory;
import org.gorpipe.spark.SparkGorRow;
import org.gorpipe.spark.SparkMonitorFactory;
import org.gorpipe.spark.udfs.CharToDoubleArray;
import org.gorpipe.spark.udfs.CharToDoubleArrayParallel;
import org.gorpipe.spark.udfs.CommaToDoubleArray;
import org.gorpipe.spark.udfs.CommaToDoubleMatrix;
import org.gorpipe.spark.udfs.CommaToIntArray;
import org.gorpipe.util.standalone.GorStandalone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public class GorSparkUtilities {
    private static final Logger log = LoggerFactory.getLogger(GorSparkUtilities.class);
    private static SparkSession spark;
    private static Py4JServer py4jServer;
    private static RBackend rBackend;
    private static Optional<Process> jupyterProcess;
    private static Optional<String> jupyterPath;
    private static Optional<String> rPath;
    private static ExecutorService es;

    private GorSparkUtilities() {
    }

    public static Py4JServer getPyServer() {
        return py4jServer;
    }

    public static RBackend getRBackend() {
        return rBackend;
    }

    public static int getPyServerPort() {
        return py4jServer != null ? py4jServer.getListeningPort() : 0;
    }

    public static String getPyServerSecret() {
        return py4jServer != null ? py4jServer.secret() : "";
    }

    public static Optional<String> getJupyterPath() {
        return jupyterPath;
    }

    public static Optional<String> getRPath() {
        return rPath;
    }

    public static void closePySpark() {
        GorSparkUtilities.shutdownPy4jServer();
        if (rBackend != null) {
            rBackend.close();
        }
        jupyterProcess.ifPresent(Process::destroy);
        if (es != null) {
            es.shutdown();
        }
    }

    public static void shutdownPy4jServer() {
        if (py4jServer != null) {
            py4jServer.shutdown();
        }
    }

    public static Py4JServer initPy4jServer() {
        if (py4jServer == null) {
            py4jServer = new Py4JServer(spark.sparkContext().conf());
            py4jServer.start();
        }
        return py4jServer;
    }

    static synchronized void setJupyterPath(String jp) {
        jupyterPath = Optional.of(jp);
        spark.createDataset(Collections.singletonList(jp), Encoders.STRING()).createOrReplaceTempView("jupyterpath");
    }

    public static void initPySpark(Optional<String> standaloneRoot) {
        String pyspark;
        int rbackendPort = -1;
        String rbackendSecret = null;
        String sparkr = System.getenv("SPARKR_INIT");
        if (sparkr == null) {
            sparkr = System.getProperty("SPARKR_INIT");
        }
        if (rBackend == null && sparkr != null && sparkr.length() > 0) {
            rBackend = new RBackend();
            Tuple2 tuple = rBackend.init();
            rbackendPort = (Integer)tuple._1;
            rbackendSecret = ((RAuthHelper)tuple._2).secret();
            rPath = Optional.of(rbackendPort + ";" + rbackendSecret);
            System.err.println(rPath);
            new Thread(() -> rBackend.run()).start();
        }
        if ((pyspark = System.getenv("PYSPARK_PIN_THREAD")) == null) {
            pyspark = System.getProperty("PYSPARK_PIN_THREAD");
        }
        if (py4jServer == null && pyspark != null && pyspark.length() > 0) {
            String baseurl;
            GorSparkUtilities.initPy4jServer();
            SparkSession spark = GorSparkUtilities.getSparkSession();
            ArrayList<String> plist = new ArrayList<String>(List.of("jupyter-lab", "--ip=0.0.0.0", "--NotebookApp.allow_origin='*'", "--port=8888", "--NotebookApp.port_retries=0"));
            String notebookdir = System.getenv("JUPYTER_NOTEBOOK_DIR");
            if (notebookdir == null) {
                notebookdir = System.getProperty("JUPYTER_NOTEBOOK_DIR");
            }
            if (notebookdir != null && !notebookdir.isEmpty()) {
                plist.add("--notebook-dir=" + notebookdir);
            }
            if ((baseurl = System.getenv("JUPYTER_BASE_URL")) == null) {
                baseurl = System.getProperty("JUPYTER_BASE_URL");
            }
            if (baseurl != null && !baseurl.isEmpty()) {
                plist.add("--NotebookApp.base_url=/" + baseurl);
                plist.add("--LabApp.base_url=/" + baseurl);
            }
            String pyServerPort = Integer.toString(GorSparkUtilities.getPyServerPort());
            String pyServerSecret = GorSparkUtilities.getPyServerSecret();
            System.err.println(pyServerPort + ";" + pyServerSecret);
            ProcessBuilder pb = new ProcessBuilder(plist);
            standaloneRoot.ifPresent(sroot -> pb.directory(Paths.get(sroot, new String[0]).toFile()));
            Map<String, String> env = pb.environment();
            env.put("PYSPARK_GATEWAY_PORT", pyServerPort);
            env.put("PYSPARK_GATEWAY_SECRET", pyServerSecret);
            env.put("PYSPARK_PIN_THREAD", "true");
            if (rbackendPort > 0) {
                env.put("SPARKR_WORKER_PORT", String.valueOf(rbackendPort));
                env.put("SPARKR_WORKER_SECRET", rbackendSecret);
                env.put("EXISTING_SPARKR_BACKEND_PORT", String.valueOf(rbackendPort));
                env.put("SPARKR_BACKEND_AUTH_SECRET", rbackendSecret);
            }
            try {
                Process p = pb.start();
                jupyterProcess = Optional.of(p);
                es = Executors.newFixedThreadPool(2);
                Future<String> resin = es.submit(() -> {
                    try (InputStream is = p.getInputStream();){
                        InputStreamReader isr = new InputStreamReader(is);
                        BufferedReader br = new BufferedReader(isr);
                        br.lines().peek(System.err::println).map(String::trim).filter(s -> s.startsWith("http://") && s.contains("?token=")).forEach(GorSparkUtilities::setJupyterPath);
                    }
                    return null;
                });
                Future<String> future = es.submit(() -> {
                    try (InputStream is = p.getErrorStream();){
                        InputStreamReader isr = new InputStreamReader(is);
                        BufferedReader br = new BufferedReader(isr);
                        br.lines().peek(System.err::println).map(String::trim).filter(s -> s.startsWith("http://") && s.contains("?token=")).forEach(GorSparkUtilities::setJupyterPath);
                    }
                    return null;
                });
            }
            catch (IOException ie) {
                log.info(ie.getMessage());
                jupyterProcess = Optional.empty();
            }
        }
    }

    private static String constructRedisUri(String sparkRedisHost) {
        String sparkRedisPort = System.getProperty("spark.redis.port");
        String sparkRedisDb = System.getProperty("spark.redis.db");
        String ret = sparkRedisHost + ":" + (sparkRedisPort != null && sparkRedisPort.length() > 0 ? sparkRedisPort : "6379");
        return sparkRedisDb != null && sparkRedisDb.length() > 0 ? ret + "/" + sparkRedisDb : ret;
    }

    public static String getSparkGorRedisUri() {
        String sparkRedisHost = System.getProperty("spark.redis.host");
        return sparkRedisHost != null && sparkRedisHost.length() > 0 ? GorSparkUtilities.constructRedisUri(sparkRedisHost) : "";
    }

    public static GorMonitor getSparkGorMonitor(String jobId, String redisUri, String key) {
        if (SparkGorMonitor.localProgressMonitor != null) {
            return SparkGorMonitor.localProgressMonitor;
        }
        List srvList = ServiceLoader.load(SparkMonitorFactory.class).stream().collect(Collectors.toList());
        if (srvList.size() > 0) {
            SparkMonitorFactory sparkMonitorFactory = (SparkMonitorFactory)((ServiceLoader.Provider)srvList.get(0)).get();
            if (srvList.size() > 1 && sparkMonitorFactory instanceof SparkGorMonitorFactory && redisUri != null && redisUri.length() > 0) {
                sparkMonitorFactory = (SparkMonitorFactory)((ServiceLoader.Provider)srvList.get(1)).get();
            }
            return sparkMonitorFactory.createSparkGorMonitor(jobId, redisUri, key);
        }
        return null;
    }

    private static SparkSession newSparkSession(int workers) {
        SparkConf sparkConf = new SparkConf();
        sparkConf.set("spark.sql.execution.arrow.pyspark.enabled", "true");
        sparkConf.set("spark.sql.execution.arrow.pyspark.fallback.enabled", "false");
        sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true");
        sparkConf.set("spark.kubernetes.appKillPodDeletionGracePeriod", "20");
        sparkConf.set("spark.hadoop.fs.s3a.connection.ssl.enabled", "false");
        sparkConf.set("spark.hadoop.fs.s3a.path.style.access", "true");
        sparkConf.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
        sparkConf.set("spark.hadoop.fs.s3a.change.detection.mode", "warn");
        sparkConf.set("spark.hadoop.com.amazonaws.services.s3.enableV4", "true");
        sparkConf.set("spark.hadoop.fs.s3a.committer.name", "partitioned");
        sparkConf.set("spark.hadoop.fs.s3a.committer.staging.conflict-mode", "replace");
        sparkConf.set("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore");
        SparkSession.Builder ssb = SparkSession.builder();
        if (!sparkConf.contains("spark.master")) {
            ssb = workers > 0 ? ssb.master("local[" + workers + "]") : ssb.master("local[*]");
        }
        SparkSession spark = ssb.config(sparkConf).getOrCreate();
        spark.udf().register("chartodoublearray", (UDF1)new CharToDoubleArray(), (DataType)DataTypes.createArrayType((DataType)DataTypes.DoubleType));
        spark.udf().register("chartodoublearrayparallel", (UDF1)new CharToDoubleArrayParallel(), (DataType)DataTypes.createArrayType((DataType)DataTypes.DoubleType));
        spark.udf().register("todoublearray", (UDF1)new CommaToDoubleArray(), (DataType)DataTypes.createArrayType((DataType)DataTypes.DoubleType));
        spark.udf().register("todoublematrix", (UDF1)new CommaToDoubleMatrix(), SQLDataTypes.MatrixType());
        spark.udf().register("tointarray", (UDF1)new CommaToIntArray(), (DataType)DataTypes.createArrayType((DataType)DataTypes.IntegerType));
        return spark;
    }

    public static SparkSession getSparkSession() {
        return GorSparkUtilities.getSparkSession(0);
    }

    public static SparkSession getSparkSession(int workers) {
        if (spark == null) {
            if (!SparkSession.getDefaultSession().isEmpty()) {
                log.info("SparkSession from default");
                spark = (SparkSession)SparkSession.getDefaultSession().get();
            } else {
                log.info("Starting a new SparkSession");
                spark = GorSparkUtilities.newSparkSession(workers);
            }
            Optional<String> standaloneRoot = GorStandalone.isStandalone() ? Optional.of(GorStandalone.getStandaloneRoot()) : Optional.empty();
            GorSparkUtilities.initPySpark(standaloneRoot);
        }
        return spark;
    }

    public static List<Row> stream2SparkRowList(Stream<org.gorpipe.gor.model.Row> str, StructType schema) {
        return str.map(p -> new SparkGorRow((org.gorpipe.gor.model.Row)p, schema)).collect(Collectors.toList());
    }

    static {
        jupyterPath = Optional.empty();
        rPath = Optional.empty();
    }
}

