/*
 * Decompiled with CFR 0.152.
 */
package ai.databand.spark;

import ai.databand.DbndWrapper;
import ai.databand.parameters.DatasetOperationPreview;
import ai.databand.schema.DatasetOperationStatus;
import ai.databand.schema.DatasetOperationType;
import ai.databand.schema.Pair;
import java.net.URI;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import org.apache.spark.sql.execution.CollectLimitExec;
import org.apache.spark.sql.execution.FileSourceScanExec;
import org.apache.spark.sql.execution.QueryExecution;
import org.apache.spark.sql.execution.SparkPlan;
import org.apache.spark.sql.execution.WholeStageCodegenExec;
import org.apache.spark.sql.execution.command.DataWritingCommandExec;
import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand;
import org.apache.spark.sql.execution.metric.SQLMetric;
import org.apache.spark.sql.hive.execution.HiveTableScanExec;
import org.apache.spark.sql.hive.execution.InsertIntoHiveTable;
import org.apache.spark.sql.util.QueryExecutionListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConverters;
import scala.collection.Seq;

public class DbndSparkQueryExecutionListener
implements QueryExecutionListener {
    private static final Logger LOG = LoggerFactory.getLogger(DbndSparkQueryExecutionListener.class);
    private final DbndWrapper dbnd;
    private final DatasetOperationPreview operationPreview;
    private final boolean isHiveEnabled;

    public DbndSparkQueryExecutionListener(DbndWrapper dbnd) {
        this.dbnd = dbnd;
        this.operationPreview = new DatasetOperationPreview();
        try {
            Class.forName("org.apache.spark.sql.hive.execution.HiveTableScanExec", false, this.getClass().getClassLoader());
            Class.forName("org.apache.spark.sql.hive.execution.InsertIntoHiveTable", false, this.getClass().getClassLoader());
        }
        catch (ClassNotFoundException e) {
            this.isHiveEnabled = false;
            return;
        }
        this.isHiveEnabled = true;
    }

    public DbndSparkQueryExecutionListener() {
        this(DbndWrapper.instance());
    }

    public void onSuccess(String funcName, QueryExecution qe, long durationNs) {
        if (qe.executedPlan() instanceof DataWritingCommandExec) {
            Pair<String, List<Long>> schema;
            long rows;
            String path;
            InsertIntoHadoopFsRelationCommand cmd;
            DataWritingCommandExec writePlan = (DataWritingCommandExec)qe.executedPlan();
            if (writePlan.cmd() instanceof InsertIntoHadoopFsRelationCommand) {
                cmd = (InsertIntoHadoopFsRelationCommand)writePlan.cmd();
                path = this.exctractPath(cmd.outputPath().toString());
                rows = ((SQLMetric)cmd.metrics().get((Object)"numOutputRows").get()).value();
                schema = this.operationPreview.extractSchema(cmd.query().schema(), rows);
                this.log(path, DatasetOperationType.WRITE, schema);
            }
            if (this.isHiveEnabled && writePlan.cmd() instanceof InsertIntoHiveTable) {
                try {
                    cmd = (InsertIntoHiveTable)writePlan.cmd();
                    path = cmd.table().identifier().table();
                    rows = ((SQLMetric)cmd.metrics().get((Object)"numOutputRows").get()).value();
                    schema = this.operationPreview.extractSchema(cmd.query().schema(), rows);
                    this.log(path, DatasetOperationType.WRITE, schema);
                }
                catch (Exception e) {
                    LOG.error("Unable to extract dataset information from InsertIntoHiveTable", (Throwable)e);
                }
            }
        }
        if (qe.executedPlan() instanceof WholeStageCodegenExec || qe.executedPlan() instanceof CollectLimitExec) {
            List<SparkPlan> allChildren = this.getAllChildren(qe.executedPlan());
            for (SparkPlan next : allChildren) {
                Pair<String, List<Long>> schema;
                String path;
                if (next instanceof FileSourceScanExec) {
                    FileSourceScanExec fileSourceScan = (FileSourceScanExec)next;
                    path = this.exctractPath((String)fileSourceScan.metadata().get((Object)"Location").get());
                    long rows = ((SQLMetric)fileSourceScan.metrics().get((Object)"numOutputRows").get()).value();
                    schema = this.operationPreview.extractSchema(fileSourceScan.schema(), rows);
                    this.log(path, DatasetOperationType.READ, schema);
                }
                if (!this.isHiveEnabled || !(next instanceof HiveTableScanExec)) continue;
                try {
                    HiveTableScanExec hiveTableScan = (HiveTableScanExec)next;
                    path = ((URI)hiveTableScan.relation().tableMeta().storage().locationUri().get()).toString();
                    long rows = ((SQLMetric)hiveTableScan.metrics().get((Object)"numOutputRows").get()).value();
                    schema = this.operationPreview.extractSchema(hiveTableScan.schema(), rows);
                    this.log(path, DatasetOperationType.READ, schema);
                }
                catch (Exception e) {
                    LOG.error("Unable to extract dataset information from HiveTableScanExec", (Throwable)e);
                }
            }
        }
    }

    protected void log(String path, DatasetOperationType operationType, Pair<String, List<Long>> schema) {
        this.dbnd.logDatasetOperation(path, operationType, DatasetOperationStatus.OK, "", schema.right(), schema.left(), (Boolean)true);
    }

    protected String exctractPath(String path) {
        if (path.contains("InMemoryFileIndex")) {
            path = path.replace("InMemoryFileIndex[", "");
            path = path.substring(0, path.length() - 1);
        }
        return path;
    }

    protected List<SparkPlan> getAllChildren(SparkPlan root) {
        ArrayList<SparkPlan> result = new ArrayList<SparkPlan>();
        LinkedList<SparkPlan> deque = new LinkedList<SparkPlan>();
        deque.add(root);
        while (!deque.isEmpty()) {
            SparkPlan next = (SparkPlan)deque.pop();
            result.add(next);
            List children = (List)JavaConverters.seqAsJavaListConverter((Seq)next.children()).asJava();
            deque.addAll(children);
        }
        return result;
    }

    public void onFailure(String funcName, QueryExecution qe, Exception exception) {
    }
}

