/*
 * Decompiled with CFR 0.152.
 */
package org.projectnessie.gc.base;

import java.io.Serializable;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.OptionalInt;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Stream;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Row;
import org.projectnessie.api.params.FetchOption;
import org.projectnessie.client.StreamingUtil;
import org.projectnessie.client.api.GetCommitLogBuilder;
import org.projectnessie.client.api.GetEntriesBuilder;
import org.projectnessie.client.api.NessieApiV1;
import org.projectnessie.error.NessieNotFoundException;
import org.projectnessie.gc.base.ContentBloomFilter;
import org.projectnessie.gc.base.GCParams;
import org.projectnessie.gc.base.GCStateParamsPerTask;
import org.projectnessie.gc.base.GCUtil;
import org.projectnessie.gc.base.IdentifiedResultsRepo;
import org.projectnessie.gc.base.ImmutableGCStateParamsPerTask;
import org.projectnessie.gc.base.SerializableFunction1;
import org.projectnessie.model.CommitMeta;
import org.projectnessie.model.Content;
import org.projectnessie.model.ContentKey;
import org.projectnessie.model.IcebergTable;
import org.projectnessie.model.IcebergView;
import org.projectnessie.model.LogResponse;
import org.projectnessie.model.Operation;
import org.projectnessie.model.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.Iterator;
import scala.collection.JavaConverters;

public class IdentifyContentsPerExecutor
implements Serializable {
    private final GCParams gcParams;
    private static final Logger LOGGER = LoggerFactory.getLogger(IdentifyContentsPerExecutor.class);

    public IdentifyContentsPerExecutor(GCParams gcParams) {
        this.gcParams = gcParams;
    }

    protected Function<String, Map<String, ContentBloomFilter>> computeLiveContentsFunc(long bloomFilterSize, Map<String, Instant> droppedRefTimeMap) {
        return (Function & Serializable)reference -> this.computeLiveContents(this.getCutoffTimeForRef((String)reference, droppedRefTimeMap), (String)reference, (Instant)droppedRefTimeMap.get(reference), bloomFilterSize);
    }

    protected SerializableFunction1<Iterator<String>, Iterator<Row>> getExpiredContentRowsFunc(Map<String, ContentBloomFilter> liveContentsBloomFilterMap, String runId, Timestamp startedAt) {
        return result -> this.getExpiredContentRows((Iterator<String>)result, liveContentsBloomFilterMap, runId, startedAt);
    }

    private Map<String, ContentBloomFilter> computeLiveContents(Instant cutOffTimestamp, String reference, Instant droppedRefTime, long bloomFilterSize) {
        boolean isRefDroppedAfterCutoffTimeStamp;
        NessieApiV1 api = GCUtil.getApi(this.gcParams.getNessieClientConfigs());
        TaskContext.get().addTaskCompletionListener(context -> {
            LOGGER.info("Closing the nessie api for compute live contents task");
            api.close();
        });
        boolean bl = isRefDroppedAfterCutoffTimeStamp = droppedRefTime == null || droppedRefTime.compareTo(cutOffTimestamp) >= 0;
        if (!isRefDroppedAfterCutoffTimeStamp) {
            return new HashMap<String, ContentBloomFilter>();
        }
        Predicate<CommitMeta> liveCommitPredicate = commitMeta -> commitMeta.getCommitTime().compareTo(cutOffTimestamp) >= 0;
        ImmutableGCStateParamsPerTask gcStateParamsPerTask = ImmutableGCStateParamsPerTask.builder().api(api).reference(GCUtil.deserializeReference(reference)).liveCommitPredicate(liveCommitPredicate).bloomFilterSize(bloomFilterSize).build();
        return this.walkLiveCommitsInReference(gcStateParamsPerTask);
    }

    private Map<String, ContentBloomFilter> walkLiveCommitsInReference(GCStateParamsPerTask gcStateParamsPerTask) {
        HashMap<String, ContentBloomFilter> bloomFilterMap = new HashMap<String, ContentBloomFilter>();
        HashSet liveContentKeys = new HashSet();
        try (Stream commits = StreamingUtil.getCommitLogStream((NessieApiV1)gcStateParamsPerTask.getApi(), builder -> ((GetCommitLogBuilder)((GetCommitLogBuilder)builder.hashOnRef(gcStateParamsPerTask.getReference().getHash())).refName("DETACHED")).fetch(FetchOption.ALL), (OptionalInt)OptionalInt.empty());){
            MutableBoolean foundAllLiveCommitHeadsBeforeCutoffTime = new MutableBoolean(false);
            Consumer<LogResponse.LogEntry> commitHandler = logEntry -> this.handleLiveCommit(gcStateParamsPerTask, (LogResponse.LogEntry)logEntry, (Map<String, ContentBloomFilter>)bloomFilterMap, foundAllLiveCommitHeadsBeforeCutoffTime, liveContentKeys);
            GCUtil.traverseLiveCommits(foundAllLiveCommitHeadsBeforeCutoffTime, commits, commitHandler);
        }
        catch (NessieNotFoundException e) {
            throw new RuntimeException(e);
        }
        return bloomFilterMap;
    }

    private void handleLiveCommit(GCStateParamsPerTask gcStateParamsPerTask, LogResponse.LogEntry logEntry, Map<String, ContentBloomFilter> bloomFilterMap, MutableBoolean foundAllLiveCommitHeadsBeforeCutoffTime, Set<ContentKey> liveContentKeys) {
        if (logEntry.getOperations() != null) {
            boolean isExpired;
            boolean bl = isExpired = !gcStateParamsPerTask.getLiveCommitPredicate().test(logEntry.getCommitMeta());
            if (isExpired && liveContentKeys.isEmpty()) {
                try {
                    ((GetEntriesBuilder)((GetEntriesBuilder)gcStateParamsPerTask.getApi().getEntries().refName("DETACHED")).hashOnRef(logEntry.getCommitMeta().getHash())).get().getEntries().forEach(entries -> liveContentKeys.add(entries.getName()));
                }
                catch (NessieNotFoundException e) {
                    throw new RuntimeException(e);
                }
            }
            logEntry.getOperations().stream().filter(Operation.Put.class::isInstance).forEach(operation -> {
                boolean addContent;
                if (liveContentKeys.contains(operation.getKey())) {
                    addContent = true;
                    liveContentKeys.remove(operation.getKey());
                    if (liveContentKeys.isEmpty()) {
                        foundAllLiveCommitHeadsBeforeCutoffTime.setTrue();
                    }
                } else {
                    boolean bl = addContent = !isExpired;
                }
                if (addContent) {
                    Content content = ((Operation.Put)operation).getContent();
                    bloomFilterMap.computeIfAbsent(content.getId(), k -> new ContentBloomFilter(gcStateParamsPerTask.getBloomFilterSize(), this.gcParams.getBloomFilterFpp())).put(content);
                }
            });
        }
    }

    private Instant getCutoffTimeForRef(String reference, Map<String, Instant> droppedRefTimeMap) {
        if (droppedRefTimeMap.containsKey(reference) && this.gcParams.getDeadReferenceCutOffTimeStamp() != null) {
            return this.gcParams.getDeadReferenceCutOffTimeStamp();
        }
        return this.gcParams.getCutOffTimestampPerRef() == null ? this.gcParams.getDefaultCutOffTimestamp() : this.gcParams.getCutOffTimestampPerRef().getOrDefault(GCUtil.deserializeReference(reference).getName(), this.gcParams.getDefaultCutOffTimestamp());
    }

    private Iterator<Row> getExpiredContentRows(Iterator<String> references, Map<String, ContentBloomFilter> liveContentsBloomFilterMap, String runId, Timestamp startedAt) {
        NessieApiV1 api = GCUtil.getApi(this.gcParams.getNessieClientConfigs());
        TaskContext.get().addTaskCompletionListener(context -> {
            LOGGER.info("Closing the nessie api for compute expired contents task");
            api.close();
        });
        return references.flatMap(reference -> JavaConverters.asScalaIterator(this.walkAllCommitsInReference(api, GCUtil.deserializeReference(reference), liveContentsBloomFilterMap, runId, startedAt)).toTraversable());
    }

    private java.util.Iterator<Row> walkAllCommitsInReference(NessieApiV1 api, final Reference reference, Map<String, ContentBloomFilter> liveContentsBloomFilterMap, final String runId, final Timestamp startedAt) {
        Instant commitProtectionTime = Instant.now().minus(this.gcParams.getCommitProtectionDuration());
        Predicate<LogResponse.LogEntry> unprotectedCommitsPredicate = logEntry -> logEntry.getCommitMeta().getCommitTime().compareTo(commitProtectionTime) < 0;
        Predicate<Content> expiredContentPredicate = content -> liveContentsBloomFilterMap.get(content.getId()) == null || !((ContentBloomFilter)liveContentsBloomFilterMap.get(content.getId())).mightContain((Content)content);
        try {
            final java.util.Iterator iterator = StreamingUtil.getCommitLogStream((NessieApiV1)api, builder -> ((GetCommitLogBuilder)((GetCommitLogBuilder)builder.hashOnRef(reference.getHash())).refName("DETACHED")).fetch(FetchOption.ALL), (OptionalInt)OptionalInt.empty()).filter(unprotectedCommitsPredicate).map(LogResponse.LogEntry::getOperations).flatMap(operations -> operations.stream().filter(Operation.Put.class::isInstance)).map(Operation.Put.class::cast).map(Operation.Put::getContent).filter(content -> content.getType() == Content.Type.ICEBERG_TABLE || content.getType() == Content.Type.ICEBERG_VIEW).filter(expiredContentPredicate).iterator();
            return new java.util.Iterator<Row>(){

                @Override
                public boolean hasNext() {
                    return iterator.hasNext();
                }

                @Override
                public Row next() {
                    return IdentifyContentsPerExecutor.fillRow(reference, (Content)iterator.next(), runId, startedAt);
                }
            };
        }
        catch (NessieNotFoundException e) {
            throw new RuntimeException(e);
        }
    }

    private static Row fillRow(Reference reference, Content content, String runId, Timestamp startedAt) {
        return IdentifiedResultsRepo.createContentRow(content, runId, startedAt, IdentifyContentsPerExecutor.getSnapshotId(content), reference);
    }

    private static long getSnapshotId(Content content) {
        long snapshotId;
        switch (content.getType()) {
            case ICEBERG_VIEW: {
                snapshotId = ((IcebergView)content).getVersionId();
                break;
            }
            default: {
                snapshotId = ((IcebergTable)content).getSnapshotId();
            }
        }
        return snapshotId;
    }
}

