/*
 * Decompiled with CFR 0.152.
 */
package org.gorpipe.spark;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.google.gson.reflect.TypeToken;
import gorsat.Utilities.StringUtilities;
import gorsat.process.SparkPipeInstance;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.Configuration;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.apis.CustomObjectsApi;
import io.kubernetes.client.openapi.models.V1DeleteOptions;
import io.kubernetes.client.openapi.models.V1Pod;
import io.kubernetes.client.util.Config;
import io.kubernetes.client.util.Watch;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Type;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import okhttp3.Call;
import org.apache.spark.sql.SparkSession;
import org.gorpipe.exceptions.GorSystemException;
import org.gorpipe.gor.driver.DataSource;
import org.gorpipe.gor.model.DriverBackedFileReader;
import org.gorpipe.gor.monitor.GorMonitor;
import org.gorpipe.gor.util.Util;
import org.gorpipe.spark.GorSparkSession;
import org.gorpipe.spark.SparkOperatorSpecs;
import org.gorpipe.spark.redis.RedisBatchConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SparkOperatorRunner {
    private static final Logger log = LoggerFactory.getLogger(SparkOperatorRunner.class);
    public static final String SPARKAPPLICATION_COMPLETED_STATE = "COMPLETED";
    public static final String SPARKAPPLICATION_FAILED_STATE = "FAILED";
    public static final String SPARKAPPLICATION_RUNNING_STATE = "RUNNING";
    private static final String GOR_PROJECT_MOUNT_NAME = "gorproject";
    private static final String BASE_NFS_MOUNT_POINT = "/mnt/csa/";
    ApiClient client = Config.defaultClient();
    CustomObjectsApi apiInstance;
    CoreV1Api core;
    ObjectMapper objectMapper;
    String jobName;
    String namespace;
    boolean hostMount = false;
    SparkSession sparkSession;
    String securityContext;
    private static final boolean debug = false;

    public SparkOperatorRunner(GorSparkSession gorSparkSession) throws IOException {
        Configuration.setDefaultApiClient((ApiClient)this.client);
        this.apiInstance = new CustomObjectsApi();
        this.core = new CoreV1Api(this.client);
        this.objectMapper = new ObjectMapper((JsonFactory)new YAMLFactory());
        this.sparkSession = gorSparkSession.getSparkSession();
        this.securityContext = gorSparkSession.getProjectContext().getFileReader().getSecurityContext();
    }

    Map<String, Object> loadBody(String query, String project, String result_dir, Map<String, Object> parameters) throws IOException {
        Map body = (Map)this.objectMapper.readValue(query, Map.class);
        Map metadata = (Map)body.get("metadata");
        this.jobName = metadata.get("name").toString();
        if (body.containsKey("yaml")) {
            this.objectMapper = new ObjectMapper((JsonFactory)new YAMLFactory());
            String yamlPathString = body.get("yaml").toString();
            Path queryRoot = Paths.get(project, new String[0]);
            Path yamlPath = Paths.get(yamlPathString, new String[0]);
            yamlPath = queryRoot.resolve(yamlPath);
            DriverBackedFileReader driverBackedGorServerFileReader = new DriverBackedFileReader(this.securityContext, project, null);
            DataSource yamlDataSource = driverBackedGorServerFileReader.resolveUrl(yamlPath.toString());
            String yamlContent = driverBackedGorServerFileReader.readFile(yamlDataSource.getSourceReference().getUrl()).collect(Collectors.joining("\n"));
            Map yaml = (Map)this.objectMapper.readValue(yamlContent, Map.class);
            Map md = (Map)yaml.get("metadata");
            String name = (String)md.get("name");
            String string = this.namespace = md.containsKey("namespace") ? md.get("namespace").toString() : "gorkube";
            if (name.equals("${name.val}")) {
                md.put("name", this.jobName);
            }
            Map specMap = (Map)body.get("spec");
            List arguments = (List)specMap.get("arguments");
            Map spec = (Map)yaml.get("spec");
            Object args = spec.get("arguments");
            if (args.toString().equals("${arguments.val}")) {
                spec.put("arguments", arguments);
            }
            if (name.equals("${name.val}")) {
                md.put("name", this.jobName);
            }
            yamlContent = this.objectMapper.writeValueAsString((Object)yaml).substring(4);
            for (Map.Entry<String, Object> entry : parameters.entrySet()) {
                yamlContent = yamlContent.replace("${" + entry.getKey() + ".val}", entry.getValue().toString());
            }
            body = (Map)this.objectMapper.readValue(yamlContent, Map.class);
        } else {
            String string = this.namespace = metadata.containsKey("namespace") ? metadata.get("namespace").toString() : "gorkube";
        }
        if (body.containsKey("spec")) {
            List arguments;
            int i;
            Map specMap = (Map)body.get("spec");
            if (specMap.containsKey("arguments") && (i = (arguments = (List)specMap.get("arguments")).indexOf("#{result_dir}")) != -1) {
                arguments.set(i, result_dir);
            }
            if (specMap.containsKey("executor")) {
                Object instances;
                Map executor = (Map)specMap.get("executor");
                Object cores = executor.get("cores");
                if (cores instanceof String) {
                    executor.put("cores", Integer.parseInt(cores.toString()));
                }
                if ((instances = executor.get("instances")) instanceof String) {
                    executor.put("instances", Integer.parseInt(instances.toString()));
                }
            }
        }
        return body;
    }

    public String getSparkApplicationState(String name) throws ApiException {
        block3: {
            try {
                Object obj = this.apiInstance.getNamespacedCustomObject("sparkoperator.k8s.io", "v1beta2", this.namespace, "sparkapplications", name);
                Map map = (Map)obj;
                Map statusMap = (Map)map.get("status");
                if (statusMap != null) {
                    Map appMap = (Map)statusMap.get("applicationState");
                    return (String)appMap.get("state");
                }
            }
            catch (ApiException e) {
                if (e.getMessage().contains("Not Found")) break block3;
                throw e;
            }
        }
        return "";
    }

    public void deleteSparkApplication(String name) throws ApiException {
        V1DeleteOptions body = new V1DeleteOptions();
        this.apiInstance.deleteNamespacedCustomObject("sparkoperator.k8s.io", "v1beta2", this.namespace, "sparkapplications", name, null, null, null, null, body);
    }

    public boolean waitForSparkApplicationToComplete(GorMonitor mon, String name) throws ApiException, InterruptedException {
        String state = this.getSparkApplicationState(name);
        while (!state.equals(SPARKAPPLICATION_COMPLETED_STATE)) {
            if (state.equals(SPARKAPPLICATION_FAILED_STATE)) {
                throw new GorSystemException(state, null);
            }
            if (mon != null && mon.isCancelled()) {
                this.deleteSparkApplication(name);
                return false;
            }
            Thread.sleep(1000L);
            state = this.getSparkApplicationState(name);
        }
        return true;
    }

    void waitSparkApplicationState(GorMonitor mon, String name, String state) throws ApiException {
        Call call = this.core.listNamespacedPodCall(this.namespace, null, null, null, null, null, null, null, null, Integer.valueOf(120), null, null);
        try (Watch watchSparkApplication = Watch.createWatch((ApiClient)this.client, (Call)call, (Type)new TypeToken<Watch.Response<V1Pod>>(){}.getType());){
            for (Watch.Response item : watchSparkApplication) {
                if (item.type != null && item.type.equals("MODIFIED") && item.object != null && ((V1Pod)item.object).getStatus() != null) {
                    String phase = ((V1Pod)item.object).getStatus().getPhase();
                    if (state.equals(phase)) {
                        break;
                    }
                    if ("Failed".equals(phase) || "Error".equals(phase)) {
                        throw new GorSystemException(((V1Pod)item.object).toString(), null);
                    }
                }
                if (mon == null || !mon.isCancelled()) continue;
                this.deleteSparkApplication(name);
            }
        }
        catch (Exception e) {
            log.error(e.getMessage(), (Throwable)e);
        }
    }

    public void createSparkApplicationFromJson(String json) throws ApiException, JsonProcessingException {
        ObjectMapper mapper = new ObjectMapper();
        this.createSparkApplication(mapper, json);
    }

    public void createSparkApplicationFromYaml(String yaml) throws ApiException, JsonProcessingException {
        ObjectMapper mapper = new ObjectMapper((JsonFactory)new YAMLFactory());
        this.createSparkApplication(mapper, yaml);
    }

    public void createSparkApplication(ObjectMapper mapper, String contents) throws ApiException, JsonProcessingException {
        Object jsonObject = mapper.readValue(contents, Object.class);
        this.apiInstance.createNamespacedCustomObject("sparkoperator.k8s.io", "v1beta2", this.namespace, "sparkapplications", jsonObject, "true", null, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static String getSparkOperatorYaml(String projectDir) throws IOException {
        String json = null;
        try {
            Path so_json;
            Path p = Paths.get(projectDir, new String[0]);
            if (Files.exists(p, new LinkOption[0]) && Files.exists(so_json = p.resolve("config/sparkoperator.yaml"), new LinkOption[0])) {
                json = new String(Files.readAllBytes(so_json));
            }
        }
        finally {
            if (json == null) {
                json = Util.readAndCloseStream((InputStream)SparkPipeInstance.class.getResourceAsStream("sparkoperator.yaml"));
            }
        }
        return json;
    }

    public void runQueryHandler(String appName, String uristr, String requestId, Path projectDir, GorMonitor gm, String[] commands, String[] fingerprints, String[] jobIds, String[] cacheFiles, String[] resources) throws IOException, ApiException, InterruptedException {
        String[] args = new String[]{uristr, requestId, projectDir.toString(), String.join((CharSequence)";;", commands), String.join((CharSequence)";", fingerprints), String.join((CharSequence)";", cacheFiles), String.join((CharSequence)";", jobIds)};
        this.runSparkOperator(gm, appName, projectDir, args, resources);
    }

    public void runSparkOperator(GorMonitor gm, String sparkApplicationName, Path projectDir, String[] args, String[] resources) throws IOException, ApiException, InterruptedException {
        String[] vollist;
        SparkOperatorSpecs sparkOperatorSpecs = new SparkOperatorSpecs();
        ArrayList<Map<String, Path>> listMounts = new ArrayList<Map<String, Path>>();
        listMounts.add(Map.of("name", GOR_PROJECT_MOUNT_NAME, "mountPath", projectDir));
        sparkOperatorSpecs.addConfig("spec.executor.volumeMounts", listMounts);
        sparkOperatorSpecs.addConfig("spec.driver.volumeMounts", listMounts);
        Path projectBasePath = Paths.get(BASE_NFS_MOUNT_POINT, new String[0]);
        Path projectRealPath = projectDir.toRealPath(new LinkOption[0]).toAbsolutePath();
        Path projectSubPath = projectBasePath.relativize(projectRealPath);
        if (this.hostMount) {
            vollist = new ArrayList();
            vollist.add(Map.of("name", GOR_PROJECT_MOUNT_NAME, "hostPath", Map.of("path", projectDir, "type", "Directory")));
            sparkOperatorSpecs.addConfig("spec.volumes", vollist);
            String projectRealPathStr = projectRealPath.toString();
            sparkOperatorSpecs.addDriverHostPath(GOR_PROJECT_MOUNT_NAME, projectRealPathStr, projectRealPathStr, null, false);
            sparkOperatorSpecs.addExecutorHostPath(GOR_PROJECT_MOUNT_NAME, projectRealPathStr, projectRealPathStr, null, false);
        } else {
            vollist = new ArrayList();
            vollist.add(Map.of("name", GOR_PROJECT_MOUNT_NAME, "persistentVolumeClaim", Map.of("claimName", "pvc-gor-nfs-v2")));
            sparkOperatorSpecs.addConfig("spec.volumes", vollist);
            sparkOperatorSpecs.addDriverVolumeClaim(GOR_PROJECT_MOUNT_NAME, "pvc-gor-nfs-v2", projectRealPath.toString(), projectSubPath.toString(), false);
            sparkOperatorSpecs.addExecutorVolumeClaim(GOR_PROJECT_MOUNT_NAME, "pvc-gor-nfs-v2", projectRealPath.toString(), projectSubPath.toString(), false);
        }
        sparkOperatorSpecs.addConfig("spec.arguments", Arrays.asList(args));
        sparkOperatorSpecs.addConfig("metadata.name", sparkApplicationName);
        for (String config : resources) {
            String[] confSplit = config.split("=");
            try {
                Integer ii = Integer.parseInt(confSplit[1]);
                sparkOperatorSpecs.addConfig(confSplit[0], ii);
            }
            catch (NumberFormatException ne) {
                sparkOperatorSpecs.addConfig(confSplit[0], confSplit[1]);
            }
        }
        String yaml = SparkOperatorRunner.getSparkOperatorYaml(projectDir.toString());
        this.runYaml(yaml, projectDir.toString(), sparkOperatorSpecs);
        this.waitForSparkApplicationToComplete(gm, sparkApplicationName);
    }

    public Path run(String uristr, String requestId, String projectDir, GorMonitor gm, String[] commands, String[] resourceSplit, String cachefile) throws IOException, ApiException, InterruptedException {
        Path cachefilepath;
        Object lastCommand = resourceSplit[0];
        int i = ((String)lastCommand).indexOf(" -j ");
        String jobid = null;
        if (i > 0) {
            int k = ((String)lastCommand).indexOf(32, i + 4);
            jobid = ((String)lastCommand).substring(i + 4, k).trim();
            lastCommand = ((String)lastCommand).substring(0, i) + ((String)lastCommand).substring(k);
        }
        Object queries = commands.length > 1 ? String.join((CharSequence)";", Arrays.copyOfRange(commands, 0, commands.length - 1)) + ";" + (String)lastCommand : lastCommand;
        String fingerprint = StringUtilities.createMD5((String)queries);
        Path projectPath = Paths.get(projectDir, new String[0]);
        if (cachefile == null) {
            Path cachePath = projectPath.resolve("result_cache");
            String cachefiles = fingerprint + ".parquet";
            cachefilepath = cachePath.resolve(cachefiles);
            cachefile = cachefilepath.toAbsolutePath().normalize().toString();
        } else {
            cachefilepath = Paths.get(cachefile, new String[0]);
        }
        if (jobid == null) {
            jobid = fingerprint;
        }
        String[] args = new String[]{uristr, requestId, projectDir, queries, fingerprint, cachefile, jobid};
        if (!Files.exists(cachefilepath, new LinkOption[0])) {
            String sparkApplicationName = "gorquery-" + jobid;
            String[] resources = resourceSplit[1].split(" ");
            this.runSparkOperator(gm, sparkApplicationName, projectPath, args, resources);
        }
        return cachefilepath;
    }

    private void runLocal(SparkSession sparkSession, String[] args) {
        String redisUrl = args[0];
        String requestId = args[1];
        String projectDir = args[2];
        String queries = args[3];
        String fingerprints = args[4];
        String cachefiles = args[5];
        String jobids = args[6];
        try (RedisBatchConsumer redisBatchConsumer = new RedisBatchConsumer(sparkSession, redisUrl);){
            String[] arr = new String[]{queries, fingerprints, projectDir, requestId, jobids, cachefiles};
            List<String[]> lstr = Collections.singletonList(arr);
            Map<String, Future<List<String>>> futMap = redisBatchConsumer.runJobBatch(lstr);
            for (Future<List<String>> f : futMap.values()) {
                f.get();
            }
        }
        catch (InterruptedException | ExecutionException e) {
            throw new GorSystemException((Throwable)e);
        }
    }

    public void runYaml(String yaml, String projectroot, SparkOperatorSpecs specs) throws IOException, ApiException {
        if (projectroot == null || projectroot.length() == 0) {
            projectroot = Paths.get(".", new String[0]).toAbsolutePath().normalize().toString();
        }
        Map<String, Object> body = this.loadBody(yaml, projectroot, "", new HashMap<String, Object>());
        specs.apply(body);
        this.apiInstance.createNamespacedCustomObject("sparkoperator.k8s.io", "v1beta2", this.namespace, "sparkapplications", body, "true", null, null);
    }
}

