/*
 * Decompiled with CFR 0.152.
 */
package org.projectnessie.gc.base;

import java.sql.Timestamp;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.types.StructType;
import org.projectnessie.gc.base.ContentBloomFilter;
import org.projectnessie.gc.base.GCParams;
import org.projectnessie.gc.base.IdentifiedResultsRepo;
import org.projectnessie.gc.base.IdentifyContentsPerExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DistributedIdentifyContents {
    private static final Logger LOGGER = LoggerFactory.getLogger(DistributedIdentifyContents.class);
    private final SparkSession session;
    private final GCParams gcParams;

    public DistributedIdentifyContents(SparkSession session, GCParams gcParams) {
        this.session = session;
        this.gcParams = gcParams;
    }

    public Map<String, ContentBloomFilter> getLiveContentsBloomFilters(List<String> references, long bloomFilterSize, Map<String, Instant> droppedRefTimeMap) {
        IdentifyContentsPerExecutor executor = new IdentifyContentsPerExecutor(this.gcParams);
        List bloomFilterMaps = new JavaSparkContext(this.session.sparkContext()).parallelize(references, DistributedIdentifyContents.getPartitionsCount(this.gcParams, references)).map(executor.computeLiveContentsFunc(bloomFilterSize, droppedRefTimeMap)).collect();
        return DistributedIdentifyContents.mergeLiveContentResults(bloomFilterMaps, this.gcParams.getBloomFilterFpp());
    }

    public String identifyExpiredContents(Map<String, ContentBloomFilter> liveContentsBloomFilterMap, List<String> references) {
        String runId = UUID.randomUUID().toString();
        Timestamp startedAt = Timestamp.from(Instant.now());
        IdentifiedResultsRepo identifiedResultsRepo = new IdentifiedResultsRepo(this.session, this.gcParams.getNessieCatalogName(), this.gcParams.getOutputBranchName(), this.gcParams.getOutputTableIdentifier());
        IdentifyContentsPerExecutor executor = new IdentifyContentsPerExecutor(this.gcParams);
        Dataset rowDataset = this.session.createDataset(references, Encoders.STRING()).mapPartitions(executor.getExpiredContentRowsFunc(liveContentsBloomFilterMap, runId, startedAt), (Encoder)RowEncoder.apply((StructType)identifiedResultsRepo.getSchema()));
        identifiedResultsRepo.writeToOutputTable((Dataset<Row>)rowDataset);
        return runId;
    }

    private static int getPartitionsCount(GCParams gcParams, List<String> references) {
        return gcParams.getSparkPartitionsCount() == null ? references.size() : gcParams.getSparkPartitionsCount().intValue();
    }

    private static Map<String, ContentBloomFilter> mergeLiveContentResults(List<Map<String, ContentBloomFilter>> bloomFilterMaps, double bloomFilterFpp) {
        HashMap<String, ContentBloomFilter> output = new HashMap<String, ContentBloomFilter>();
        bloomFilterMaps.forEach(map -> map.forEach((k, v) -> {
            if (output.containsKey(k)) {
                ((ContentBloomFilter)output.get(k)).merge((ContentBloomFilter)v);
            } else {
                output.put((String)k, (ContentBloomFilter)v);
            }
        }));
        output.entrySet().stream().filter(e -> ((ContentBloomFilter)e.getValue()).wasMerged()).forEach(e -> {
            double fpp = ((ContentBloomFilter)e.getValue()).getExpectedFpp();
            if (fpp > bloomFilterFpp) {
                String contentId = (String)e.getKey();
                LOGGER.info("Fpp of ContentBloomFilter for '{}': {}", (Object)contentId, (Object)String.format("%.3f", fpp));
            }
        });
        return output;
    }
}

