/*
 * Decompiled with CFR 0.152.
 */
package ai.databand.spark;

import ai.databand.DbndWrapper;
import ai.databand.parameters.DatasetOperationPreview;
import ai.databand.schema.DatasetOperationStatus;
import ai.databand.schema.DatasetOperationType;
import ai.databand.schema.Pair;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import org.apache.spark.sql.execution.CollectLimitExec;
import org.apache.spark.sql.execution.FileSourceScanExec;
import org.apache.spark.sql.execution.QueryExecution;
import org.apache.spark.sql.execution.SparkPlan;
import org.apache.spark.sql.execution.WholeStageCodegenExec;
import org.apache.spark.sql.execution.command.DataWritingCommandExec;
import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand;
import org.apache.spark.sql.execution.metric.SQLMetric;
import org.apache.spark.sql.util.QueryExecutionListener;
import scala.collection.JavaConverters;
import scala.collection.Seq;

public class DbndSparkQueryExecutionListener
implements QueryExecutionListener {
    private final DbndWrapper dbnd;
    private final DatasetOperationPreview operationPreview;

    public DbndSparkQueryExecutionListener(DbndWrapper dbnd) {
        this.dbnd = dbnd;
        this.operationPreview = new DatasetOperationPreview();
    }

    public DbndSparkQueryExecutionListener() {
        this(DbndWrapper.instance());
    }

    public void onSuccess(String funcName, QueryExecution qe, long durationNs) {
        DataWritingCommandExec writePlan;
        if (qe.executedPlan() instanceof DataWritingCommandExec && (writePlan = (DataWritingCommandExec)qe.executedPlan()).cmd() instanceof InsertIntoHadoopFsRelationCommand) {
            InsertIntoHadoopFsRelationCommand cmd = (InsertIntoHadoopFsRelationCommand)writePlan.cmd();
            String path = this.exctractPath(cmd.outputPath().toString());
            long rows = ((SQLMetric)cmd.metrics().get((Object)"numOutputRows").get()).value();
            Pair<String, List<Long>> schema = this.operationPreview.extractSchema(cmd.query().schema(), rows);
            this.dbnd.logDatasetOperation(path, DatasetOperationType.WRITE, DatasetOperationStatus.OK, "", schema.right(), schema.left());
        }
        if (qe.executedPlan() instanceof WholeStageCodegenExec || qe.executedPlan() instanceof CollectLimitExec) {
            List<SparkPlan> allChildren = this.getAllChildren(qe.executedPlan());
            for (SparkPlan next : allChildren) {
                if (!(next instanceof FileSourceScanExec)) continue;
                FileSourceScanExec fileSourceScan = (FileSourceScanExec)next;
                String path = this.exctractPath((String)fileSourceScan.metadata().get((Object)"Location").get());
                long rows = ((SQLMetric)fileSourceScan.metrics().get((Object)"numOutputRows").get()).value();
                Pair<String, List<Long>> schema = this.operationPreview.extractSchema(fileSourceScan.schema(), rows);
                this.dbnd.logDatasetOperation(path, DatasetOperationType.READ, DatasetOperationStatus.OK, "", schema.right(), schema.left());
            }
        }
    }

    protected String exctractPath(String path) {
        if (path.contains("InMemoryFileIndex")) {
            path = path.replace("InMemoryFileIndex[", "");
            path = path.substring(0, path.length() - 1);
        }
        return path;
    }

    protected List<SparkPlan> getAllChildren(SparkPlan root) {
        ArrayList<SparkPlan> result = new ArrayList<SparkPlan>();
        LinkedList<SparkPlan> deque = new LinkedList<SparkPlan>();
        deque.add(root);
        while (!deque.isEmpty()) {
            SparkPlan next = (SparkPlan)deque.pop();
            result.add(next);
            List children = (List)JavaConverters.seqAsJavaListConverter((Seq)next.children()).asJava();
            deque.addAll(children);
        }
        return result;
    }

    public void onFailure(String funcName, QueryExecution qe, Exception exception) {
    }
}

