/*
 * Decompiled with CFR 0.152.
 */
package gorsat.commands;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.spark.api.python.Py4JServer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.gorpipe.exceptions.GorResourceException;
import org.gorpipe.exceptions.GorSystemException;
import org.gorpipe.spark.GorSparkUtilities;

public class PysparkAnalysis
implements AutoCloseable {
    public static Map<String, PysparkAnalysis> datasetMap = new HashMap();
    ExecutorService es;
    Process pythonProcess;
    Dataset<? extends Row> ds;
    List<String> cmds = new ArrayList();
    CountDownLatch latch = new CountDownLatch(1);
    ByteArrayOutputStream baOutput = new ByteArrayOutputStream();
    ByteArrayOutputStream baError = new ByteArrayOutputStream();

    public Dataset<Row> getDataset() {
        return this.ds;
    }

    public void setDataset(Dataset<Row> ds) {
        this.ds = ds;
    }

    public void markDone() throws InterruptedException {
        this.latch.countDown();
        this.latch = new CountDownLatch(1);
        this.latch.await();
    }

    public void waitFor() throws InterruptedException {
        this.latch.await();
    }

    public Dataset<Row> pyspark(String signature, Dataset<? extends Row> ds, String cmd) throws IOException, InterruptedException {
        datasetMap.put(signature, this);
        this.ds = ds;
        SparkSession spark = GorSparkUtilities.getSparkSession();
        Py4JServer py4JServer = GorSparkUtilities.initPy4jServer();
        String[] cmdsplit = cmd.trim().split(" ");
        String pysparkPython = System.getenv("PYSPARK_PYTHON");
        this.cmds.add(pysparkPython != null ? pysparkPython : "python3");
        this.cmds.add(cmdsplit[0]);
        this.cmds.add(signature);
        for (int i = 1; i < cmdsplit.length; ++i) {
            this.cmds.add(cmdsplit[i]);
        }
        ProcessBuilder pb = new ProcessBuilder(this.cmds);
        Map<String, String> env = pb.environment();
        String port = Integer.toString(py4JServer.getListeningPort());
        String secret = py4JServer.secret();
        env.put("PYSPARK_GATEWAY_PORT", port);
        env.put("PYSPARK_GATEWAY_SECRET", secret);
        env.put("PYSPARK_PIN_THREAD", "true");
        this.pythonProcess = pb.start();
        this.es = Executors.newFixedThreadPool(2);
        this.es.submit(() -> this.pythonProcess.getErrorStream().transferTo(this.baOutput));
        this.es.submit(() -> this.pythonProcess.getInputStream().transferTo(this.baError));
        this.waitFor();
        return this.getDataset();
    }

    public String cmdString() {
        return String.join((CharSequence)" ", this.cmds);
    }

    @Override
    public void close() {
        if (this.pythonProcess != null) {
            try {
                this.latch.countDown();
                int exitCode = this.pythonProcess.waitFor();
                if (exitCode != 0) {
                    throw new GorResourceException("Non zero exit code " + exitCode + "\n" + this.baOutput + "\n" + this.baError, this.cmdString());
                }
            }
            catch (InterruptedException e) {
                throw new GorSystemException((Throwable)e);
            }
        }
        if (this.es != null) {
            this.es.shutdown();
        }
    }
}

