/*
 * Decompiled with CFR 0.152.
 */
package org.gorpipe.spark;

import io.projectglow.GlowBase;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.spark.SparkConf;
import org.apache.spark.api.python.Py4JServer;
import org.apache.spark.ml.linalg.SQLDataTypes;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.gorpipe.spark.SparkGorRow;
import org.gorpipe.spark.udfs.CharToDoubleArray;
import org.gorpipe.spark.udfs.CommaToDoubleArray;
import org.gorpipe.spark.udfs.CommaToDoubleMatrix;
import org.gorpipe.spark.udfs.CommaToIntArray;
import org.gorpipe.util.standalone.GorStandalone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GorSparkUtilities {
    private static final Logger log = LoggerFactory.getLogger(GorSparkUtilities.class);
    private static SparkSession spark;
    private static Py4JServer py4jServer;
    private static Optional<Process> jupyterProcess;
    private static Optional<String> jupyterPath;
    private static ExecutorService es;

    private GorSparkUtilities() {
    }

    public static Py4JServer getPyServer() {
        return py4jServer;
    }

    public static int getPyServerPort() {
        return py4jServer != null ? py4jServer.getListeningPort() : 0;
    }

    public static String getPyServerSecret() {
        return py4jServer != null ? py4jServer.secret() : "";
    }

    public static Optional<String> getJupyterPath() {
        return jupyterPath;
    }

    public static void closePySpark() {
        if (py4jServer != null) {
            py4jServer.shutdown();
        }
        jupyterProcess.ifPresent(Process::destroy);
        if (es != null) {
            es.shutdown();
        }
    }

    public static void initPySpark(Optional<String> standaloneRoot) {
        String pyspark = System.getenv("PYSPARK_PIN_THREAD");
        if (py4jServer == null && pyspark != null && pyspark.length() > 0) {
            py4jServer = new Py4JServer(spark.sparkContext().conf());
            py4jServer.start();
            GorSparkUtilities.getSparkSession();
            ProcessBuilder pb = new ProcessBuilder("/usr/local/bin/jupyter", "notebook", "--NotebookApp.allow_origin='https://colab.research.google.com'", "--port=8888", "--NotebookApp.port_retries=0");
            standaloneRoot.ifPresent(sroot -> pb.directory(Paths.get(sroot, new String[0]).toFile()));
            Map<String, String> env = pb.environment();
            env.put("PYSPARK_GATEWAY_PORT", Integer.toString(GorSparkUtilities.getPyServerPort()));
            env.put("PYSPARK_GATEWAY_SECRET", GorSparkUtilities.getPyServerSecret());
            env.put("PYSPARK_PIN_THREAD", "true");
            try {
                Process p = pb.start();
                jupyterProcess = Optional.of(p);
                es = Executors.newFixedThreadPool(2);
                Future<String> resin = es.submit(() -> {
                    try (InputStream is = p.getInputStream();){
                        InputStreamReader isr = new InputStreamReader(is);
                        BufferedReader br = new BufferedReader(isr);
                        jupyterPath = br.lines().map(String::trim).filter(s -> s.startsWith("http://localhost:8888/?token=")).findFirst();
                    }
                    return null;
                });
                Future<String> future = es.submit(() -> {
                    try (InputStream is = p.getErrorStream();){
                        InputStreamReader isr = new InputStreamReader(is);
                        BufferedReader br = new BufferedReader(isr);
                        jupyterPath = br.lines().peek(System.err::println).map(String::trim).filter(s -> s.startsWith("http://localhost:8888/?token=")).findFirst();
                    }
                    return null;
                });
            }
            catch (IOException ie) {
                log.info(ie.getMessage());
                jupyterProcess = Optional.empty();
            }
        }
    }

    private static String constructRedisUri(String sparkRedisHost) {
        String sparkRedisPort = System.getProperty("spark.redis.port");
        String sparkRedisDb = System.getProperty("spark.redis.db");
        String ret = sparkRedisHost + ":" + (sparkRedisPort != null && sparkRedisPort.length() > 0 ? sparkRedisPort : "6379");
        return sparkRedisDb != null && sparkRedisDb.length() > 0 ? ret + "/" + sparkRedisDb : ret;
    }

    public static String getSparkGorRedisUri() {
        String sparkRedisHost = System.getProperty("spark.redis.host");
        return sparkRedisHost != null && sparkRedisHost.length() > 0 ? GorSparkUtilities.constructRedisUri(sparkRedisHost) : "";
    }

    private static SparkSession newSparkSession() {
        SparkConf sparkConf = new SparkConf();
        sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true");
        sparkConf.set("spark.kubernetes.appKillPodDeletionGracePeriod", "20");
        SparkSession.Builder ssb = SparkSession.builder();
        if (!sparkConf.contains("spark.master")) {
            ssb = ssb.master("local[*]");
        }
        SparkSession spark = ssb.config(sparkConf).getOrCreate();
        spark.udf().register("chartodoublearray", (UDF1)new CharToDoubleArray(), (DataType)DataTypes.createArrayType((DataType)DataTypes.DoubleType));
        spark.udf().register("todoublearray", (UDF1)new CommaToDoubleArray(), (DataType)DataTypes.createArrayType((DataType)DataTypes.DoubleType));
        spark.udf().register("todoublematrix", (UDF1)new CommaToDoubleMatrix(), SQLDataTypes.MatrixType());
        spark.udf().register("tointarray", (UDF1)new CommaToIntArray(), (DataType)DataTypes.createArrayType((DataType)DataTypes.IntegerType));
        GlowBase gb = new GlowBase();
        gb.register(spark);
        return spark;
    }

    public static SparkSession getSparkSession() {
        if (spark == null) {
            if (!SparkSession.getDefaultSession().isEmpty()) {
                log.info("SparkSession from default");
                spark = (SparkSession)SparkSession.getDefaultSession().get();
            } else {
                log.info("Starting a new SparkSession");
                spark = GorSparkUtilities.newSparkSession();
            }
            Optional<String> standaloneRoot = GorStandalone.isStandalone() ? Optional.of(GorStandalone.getStandaloneRoot()) : Optional.empty();
            GorSparkUtilities.initPySpark(standaloneRoot);
        }
        return spark;
    }

    public static List<Row> stream2SparkRowList(Stream<org.gorpipe.gor.model.Row> str, StructType schema) {
        return str.map(p -> new SparkGorRow((org.gorpipe.gor.model.Row)p, schema)).collect(Collectors.toList());
    }

    static {
        jupyterPath = Optional.empty();
    }
}

