/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.consumer.offset;

import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.common.metric.MetricsFacade;
import pl.allegro.tech.hermes.consumers.consumer.offset.ConsumerPartitionAssignmentState;
import pl.allegro.tech.hermes.consumers.consumer.offset.MessageState;
import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartition;
import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartitionOffset;
import pl.allegro.tech.hermes.metrics.HermesCounter;
import pl.allegro.tech.hermes.metrics.HermesTimer;
import pl.allegro.tech.hermes.metrics.HermesTimerContext;

public class OffsetCommitter {
    private static final Logger logger = LoggerFactory.getLogger(OffsetCommitter.class);
    private final ConsumerPartitionAssignmentState partitionAssignmentState;
    private final HermesCounter obsoleteCounter;
    private final HermesCounter committedCounter;
    private final HermesTimer timer;
    private final Set<SubscriptionPartitionOffset> inflightOffsets = new HashSet<SubscriptionPartitionOffset>();
    private final Map<SubscriptionPartition, Long> maxCommittedOffsets = new HashMap<SubscriptionPartition, Long>();

    public OffsetCommitter(ConsumerPartitionAssignmentState partitionAssignmentState, MetricsFacade metrics) {
        this.partitionAssignmentState = partitionAssignmentState;
        this.obsoleteCounter = metrics.offsetCommits().obsoleteCounter();
        this.committedCounter = metrics.offsetCommits().committedCounter();
        this.timer = metrics.offsetCommits().duration();
    }

    public Set<SubscriptionPartitionOffset> calculateOffsetsToBeCommitted(Map<SubscriptionPartitionOffset, MessageState> offsets) {
        Object object;
        block13: {
            HermesTimerContext ignored = this.timer.time();
            try {
                ArrayList<SubscriptionPartitionOffset> processedOffsets = new ArrayList<SubscriptionPartitionOffset>();
                for (Map.Entry<SubscriptionPartitionOffset, MessageState> entry : offsets.entrySet()) {
                    if (entry.getValue() != MessageState.PROCESSED) continue;
                    processedOffsets.add((SubscriptionPartitionOffset)entry.getKey());
                }
                ArrayList<SubscriptionPartitionOffset> allOffsets = new ArrayList<SubscriptionPartitionOffset>();
                for (Map.Entry entry : offsets.entrySet()) {
                    allOffsets.add((SubscriptionPartitionOffset)entry.getKey());
                }
                ReducingConsumer processedOffsetsReducer = this.prepareProcessedOffsets(processedOffsets);
                Map<SubscriptionPartition, Long> map = processedOffsetsReducer.reduced;
                this.updateMaxProcessedOffsets(map);
                ReducingConsumer inflightOffsetReducer = this.prepareInflightOffsets(processedOffsetsReducer.all, allOffsets);
                Map<SubscriptionPartition, Long> minInflightOffsets = inflightOffsetReducer.reduced;
                int scheduledToCommitCount = 0;
                int obsoleteCount = 0;
                HashSet<SubscriptionPartition> processedOffsetToBeRemoved = new HashSet<SubscriptionPartition>();
                HashSet<SubscriptionPartitionOffset> offsetsToCommit = new HashSet<SubscriptionPartitionOffset>();
                for (SubscriptionPartition partition : Sets.union(minInflightOffsets.keySet(), this.maxCommittedOffsets.keySet())) {
                    if (this.partitionAssignmentState.isAssignedPartitionAtCurrentTerm(partition)) {
                        long maxCommitted;
                        long minInflight = minInflightOffsets.getOrDefault(partition, Long.MAX_VALUE);
                        long offsetToBeCommitted = Math.min(minInflight, maxCommitted = this.maxCommittedOffsets.getOrDefault(partition, Long.MAX_VALUE).longValue());
                        if (offsetToBeCommitted >= 0L && offsetToBeCommitted < Long.MAX_VALUE) {
                            ++scheduledToCommitCount;
                            offsetsToCommit.add(new SubscriptionPartitionOffset(partition, offsetToBeCommitted));
                            if (maxCommitted != offsetToBeCommitted) continue;
                            processedOffsetToBeRemoved.add(partition);
                            continue;
                        }
                        logger.warn("Skipping offset out of bounds for subscription {}: partition={}, offset={}", new Object[]{partition.getSubscriptionName(), partition.getPartition(), offsetToBeCommitted});
                        continue;
                    }
                    ++obsoleteCount;
                }
                processedOffsetToBeRemoved.forEach(this.maxCommittedOffsets::remove);
                this.obsoleteCounter.increment((long)obsoleteCount);
                this.committedCounter.increment((long)scheduledToCommitCount);
                this.cleanupStoredOffsetsWithObsoleteTerms();
                object = offsetsToCommit;
                if (ignored == null) break block13;
            }
            catch (Throwable throwable) {
                try {
                    if (ignored != null) {
                        try {
                            ignored.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                catch (Exception exception) {
                    logger.error("Failed to run offset committer: {}", (Object)exception.getMessage(), (Object)exception);
                    return Set.of();
                }
            }
            ignored.close();
        }
        return object;
    }

    private ReducingConsumer prepareProcessedOffsets(List<SubscriptionPartitionOffset> processedOffsets) {
        ReducingConsumer processedOffsetsReducer = new ReducingConsumer(Math::max, c -> c + 1L);
        this.drain(processedOffsets, processedOffsetsReducer);
        processedOffsetsReducer.resetModifierFunction();
        return processedOffsetsReducer;
    }

    private void updateMaxProcessedOffsets(Map<SubscriptionPartition, Long> maxDrainedCommittedOffsets) {
        maxDrainedCommittedOffsets.forEach((partition, drainedOffset) -> this.maxCommittedOffsets.compute((SubscriptionPartition)partition, (p, storedOffset) -> storedOffset == null || storedOffset < drainedOffset ? drainedOffset : storedOffset));
    }

    private ReducingConsumer prepareInflightOffsets(Set<SubscriptionPartitionOffset> processedOffsets, List<SubscriptionPartitionOffset> inflightOffsetsQueue) {
        ReducingConsumer inflightOffsetReducer = new ReducingConsumer(Math::min);
        this.drain(inflightOffsetsQueue, o -> this.reduceIfNotDelivered((SubscriptionPartitionOffset)o, inflightOffsetReducer, processedOffsets));
        this.inflightOffsets.forEach(o -> this.reduceIfNotDelivered((SubscriptionPartitionOffset)o, inflightOffsetReducer, processedOffsets));
        this.inflightOffsets.clear();
        this.inflightOffsets.addAll(inflightOffsetReducer.all);
        return inflightOffsetReducer;
    }

    private void reduceIfNotDelivered(SubscriptionPartitionOffset offset, ReducingConsumer inflightOffsetReducer, Set<SubscriptionPartitionOffset> committedOffsets) {
        if (!committedOffsets.contains(offset)) {
            inflightOffsetReducer.accept(offset);
        }
    }

    private void cleanupStoredOffsetsWithObsoleteTerms() {
        this.inflightOffsets.removeIf(o -> !this.partitionAssignmentState.isAssignedPartitionAtCurrentTerm(o.getSubscriptionPartition()));
        this.maxCommittedOffsets.entrySet().removeIf(entry -> !this.partitionAssignmentState.isAssignedPartitionAtCurrentTerm((SubscriptionPartition)entry.getKey()));
    }

    private void drain(List<SubscriptionPartitionOffset> subscriptionPartitionOffsets, Consumer<SubscriptionPartitionOffset> consumer) {
        int size = subscriptionPartitionOffsets.size();
        for (int i = 0; i < size; ++i) {
            SubscriptionPartitionOffset element = subscriptionPartitionOffsets.get(i);
            if (element == null) {
                logger.warn("Unexpected null value while draining queue [idx={}, size={}]", (Object)i, (Object)size);
                break;
            }
            consumer.accept(element);
        }
    }

    private static final class ReducingConsumer
    implements Consumer<SubscriptionPartitionOffset> {
        private final BiFunction<Long, Long, Long> reductor;
        private Function<Long, Long> modifier;
        private final Map<SubscriptionPartition, Long> reduced = new HashMap<SubscriptionPartition, Long>();
        private final Set<SubscriptionPartitionOffset> all = new HashSet<SubscriptionPartitionOffset>();

        private ReducingConsumer(BiFunction<Long, Long, Long> reductor, Function<Long, Long> offsetModifier) {
            this.reductor = reductor;
            this.modifier = offsetModifier;
        }

        private ReducingConsumer(BiFunction<Long, Long, Long> reductor) {
            this(reductor, Function.identity());
        }

        private void resetModifierFunction() {
            this.modifier = Function.identity();
        }

        @Override
        public void accept(SubscriptionPartitionOffset p) {
            this.all.add(p);
            this.reduced.compute(p.getSubscriptionPartition(), (k, v) -> {
                long offset = this.modifier.apply(p.getOffset());
                return v == null ? offset : this.reductor.apply((Long)v, offset);
            });
        }
    }
}

