/*
 * Decompiled with CFR 0.152.
 */
package org.projectnessie.gc.base;

import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.projectnessie.gc.base.ContentBloomFilter;
import org.projectnessie.gc.base.GCParams;
import org.projectnessie.gc.base.IdentifiedResult;
import org.projectnessie.gc.base.IdentifyContentsPerExecutor;
import org.projectnessie.model.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DistributedIdentifyContents {
    private static final Logger LOGGER = LoggerFactory.getLogger(DistributedIdentifyContents.class);
    private final SparkSession session;
    private final GCParams gcParams;

    public DistributedIdentifyContents(SparkSession session, GCParams gcParams) {
        this.session = session;
        this.gcParams = gcParams;
    }

    public Map<String, ContentBloomFilter> getLiveContentsBloomFilters(List<Reference> references, long bloomFilterSize, Map<Reference, Instant> droppedRefTimeMap) {
        IdentifyContentsPerExecutor executor = new IdentifyContentsPerExecutor(this.gcParams);
        List bloomFilterMaps = new JavaSparkContext(this.session.sparkContext()).parallelize(references, DistributedIdentifyContents.getPartitionsCount(this.gcParams, references)).map(executor.computeLiveContentsFunc(bloomFilterSize, droppedRefTimeMap)).collect();
        return DistributedIdentifyContents.mergeLiveContentResults(bloomFilterMaps, this.gcParams.getBloomFilterFpp());
    }

    public IdentifiedResult getIdentifiedResults(Map<String, ContentBloomFilter> liveContentsBloomFilterMap, List<Reference> references) {
        IdentifyContentsPerExecutor executor = new IdentifyContentsPerExecutor(this.gcParams);
        List results = new JavaSparkContext(this.session.sparkContext()).parallelize(references, DistributedIdentifyContents.getPartitionsCount(this.gcParams, references)).map(executor.computeExpiredContentsFunc(liveContentsBloomFilterMap)).collect();
        IdentifiedResult identifiedResult = new IdentifiedResult();
        results.forEach(result -> identifiedResult.getContentValues().putAll(result.getContentValues()));
        return identifiedResult;
    }

    private static int getPartitionsCount(GCParams gcParams, List<Reference> references) {
        return gcParams.getSparkPartitionsCount() == null ? references.size() : gcParams.getSparkPartitionsCount().intValue();
    }

    private static Map<String, ContentBloomFilter> mergeLiveContentResults(List<Map<String, ContentBloomFilter>> bloomFilterMaps, double bloomFilterFpp) {
        HashMap<String, ContentBloomFilter> output = new HashMap<String, ContentBloomFilter>();
        bloomFilterMaps.forEach(map -> map.forEach((k, v) -> {
            if (output.containsKey(k)) {
                ((ContentBloomFilter)output.get(k)).merge((ContentBloomFilter)v);
            } else {
                output.put((String)k, (ContentBloomFilter)v);
            }
        }));
        output.entrySet().stream().filter(e -> ((ContentBloomFilter)e.getValue()).wasMerged()).forEach(e -> {
            double fpp = ((ContentBloomFilter)e.getValue()).getExpectedFpp();
            if (fpp > bloomFilterFpp) {
                String contentId = (String)e.getKey();
                LOGGER.info("Fpp of ContentBloomFilter for '{}': {}", (Object)contentId, (Object)String.format("%.3f", fpp));
            }
        });
        return output;
    }
}

