/*
 * Decompiled with CFR 0.152.
 */
package ai.databand.spark;

import ai.databand.DbndWrapper;
import ai.databand.spark.SparkIOSource;
import java.lang.reflect.Field;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.spark.Dependency;
import org.apache.spark.rdd.RDD;
import org.apache.spark.scheduler.ActiveJob;
import org.apache.spark.scheduler.Stage;
import org.apache.spark.sql.execution.datasources.FilePartition;
import org.apache.spark.sql.execution.datasources.FileScanRDD;
import org.apache.spark.sql.execution.datasources.PartitionedFile;
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions;
import org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD;

public class ActiveJobTracker {
    public static void track(ActiveJob job) {
        Stage stage = job.finalStage();
        List<RDD<?>> all = ActiveJobTracker.allRdds(stage.rdd());
        ArrayList<SparkIOSource> sources = new ArrayList<SparkIOSource>(1);
        for (RDD<?> rdd : all) {
            if (rdd instanceof FileScanRDD) {
                FileScanRDD fs = (FileScanRDD)rdd;
                for (FilePartition filePart : fs.filePartitions()) {
                    for (PartitionedFile file : filePart.files()) {
                        HashMap<String, Object> properties = new HashMap<String, Object>(1);
                        properties.put("start", file.start());
                        properties.put("length", file.length());
                        properties.put("index", filePart.index());
                        sources.add(new SparkIOSource(file.filePath(), "file_scan_rdd", properties));
                    }
                }
                continue;
            }
            if (!(rdd instanceof JDBCRDD)) continue;
            JDBCRDD db = (JDBCRDD)rdd;
            Optional<SparkIOSource> dbSource = ActiveJobTracker.extractDataFromJdbcRdd(db);
            dbSource.ifPresent(sources::add);
        }
        Map<String, Object> metrics = sources.stream().collect(Collectors.toMap(SparkIOSource::metricKey, io -> io));
        DbndWrapper.instance().logMetrics(metrics, "spark");
    }

    public static Optional<SparkIOSource> extractDataFromJdbcRdd(JDBCRDD rdd) {
        try {
            Field optionsField = rdd.getClass().getDeclaredField("options");
            optionsField.setAccessible(true);
            Field columnListField = rdd.getClass().getDeclaredField("columnList");
            columnListField.setAccessible(true);
            JDBCOptions options = (JDBCOptions)optionsField.get(rdd);
            String columnsList = (String)columnListField.get(rdd);
            HashMap<String, Object> properties = new HashMap<String, Object>(2);
            HashMap<String, String> jdbcOptions = new HashMap<String, String>(2);
            jdbcOptions.put("url", options.url());
            jdbcOptions.put("table_or_query", options.tableOrQuery());
            properties.put("options", jdbcOptions);
            properties.put("columnsList", columnsList);
            return Optional.of(new SparkIOSource(options.url(), "jdbc_rdd", properties));
        }
        catch (IllegalAccessException | NoSuchFieldException e) {
            return Optional.empty();
        }
    }

    public static List<RDD<?>> allRdds(RDD<?> rdd) {
        ArrayList result = new ArrayList(1);
        ArrayDeque stack = new ArrayDeque(Collections.singletonList(rdd));
        while (!stack.isEmpty()) {
            RDD polled = (RDD)stack.poll();
            result.add(polled);
            for (Dependency next : polled.dependencies()) {
                stack.add(next.rdd());
            }
        }
        return result;
    }
}

