/*
 * Decompiled with CFR 0.152.
 */
package ai.databand.spark;

import ai.databand.DbndWrapper;
import ai.databand.schema.DatasetOperationStatuses;
import ai.databand.schema.DatasetOperationTypes;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import org.apache.spark.sql.execution.FileSourceScanExec;
import org.apache.spark.sql.execution.QueryExecution;
import org.apache.spark.sql.execution.SparkPlan;
import org.apache.spark.sql.execution.WholeStageCodegenExec;
import org.apache.spark.sql.execution.command.DataWritingCommandExec;
import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand;
import org.apache.spark.sql.execution.metric.SQLMetric;
import org.apache.spark.sql.util.QueryExecutionListener;
import scala.collection.JavaConverters;
import scala.collection.Seq;

public class DbndSparkQueryExecutionListener
implements QueryExecutionListener {
    private final DbndWrapper dbnd;

    public DbndSparkQueryExecutionListener(DbndWrapper dbnd) {
        this.dbnd = dbnd;
    }

    public DbndSparkQueryExecutionListener() {
        this.dbnd = DbndWrapper.instance();
    }

    public void onSuccess(String funcName, QueryExecution qe, long durationNs) {
        DataWritingCommandExec writePlan;
        if (qe.executedPlan() instanceof DataWritingCommandExec && (writePlan = (DataWritingCommandExec)qe.executedPlan()).cmd() instanceof InsertIntoHadoopFsRelationCommand) {
            InsertIntoHadoopFsRelationCommand cmd = (InsertIntoHadoopFsRelationCommand)writePlan.cmd();
            String path = this.exctractPath(cmd.outputPath().toString());
            ArrayList<Long> dimensions = new ArrayList<Long>(2);
            long rows = ((SQLMetric)cmd.metrics().get((Object)"numOutputRows").get()).value();
            dimensions.add(rows);
            List columns = (List)JavaConverters.seqAsJavaListConverter((Seq)cmd.outputColumnNames()).asJava();
            String schema = String.join((CharSequence)",", columns);
            dimensions.add(Long.valueOf(columns.size()));
            this.dbnd.logDatasetOperation(path, DatasetOperationTypes.WRITE, DatasetOperationStatuses.OK, "", dimensions, schema);
        }
        if (qe.executedPlan() instanceof WholeStageCodegenExec) {
            List<SparkPlan> allChildren = this.getAllChildren(qe.executedPlan());
            for (SparkPlan next : allChildren) {
                if (!(next instanceof FileSourceScanExec)) continue;
                FileSourceScanExec fileSourceScan = (FileSourceScanExec)next;
                String path = this.exctractPath((String)fileSourceScan.metadata().get((Object)"Location").get());
                String schema = (String)fileSourceScan.metadata().get((Object)"ReadSchema").get();
                ArrayList<Long> dimensions = new ArrayList<Long>(2);
                long rows = ((SQLMetric)fileSourceScan.metrics().get((Object)"numOutputRows").get()).value();
                dimensions.add(rows);
                long columns = fileSourceScan.schema().size();
                dimensions.add(columns);
                this.dbnd.logDatasetOperation(path, DatasetOperationTypes.READ, DatasetOperationStatuses.OK, "", dimensions, schema);
            }
        }
    }

    protected String exctractPath(String path) {
        if (path.contains("InMemoryFileIndex")) {
            path = path.replace("InMemoryFileIndex[", "");
            path = path.substring(0, path.length() - 1);
        }
        return path;
    }

    protected List<SparkPlan> getAllChildren(SparkPlan root) {
        ArrayList<SparkPlan> result = new ArrayList<SparkPlan>();
        LinkedList<SparkPlan> deque = new LinkedList<SparkPlan>();
        deque.add(root);
        while (!deque.isEmpty()) {
            SparkPlan next = (SparkPlan)deque.pop();
            result.add(next);
            List children = (List)JavaConverters.seqAsJavaListConverter((Seq)next.children()).asJava();
            deque.addAll(children);
        }
        return result;
    }

    public void onFailure(String funcName, QueryExecution qe, Exception exception) {
    }
}

