/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.consumer.offset;

import com.codahale.metrics.Timer;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.jctools.queues.MessagePassingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.common.metric.HermesMetrics;
import pl.allegro.tech.hermes.consumers.consumer.offset.ConsumerPartitionAssignmentState;
import pl.allegro.tech.hermes.consumers.consumer.offset.OffsetQueue;
import pl.allegro.tech.hermes.consumers.consumer.offset.OffsetsToCommit;
import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartition;
import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartitionOffset;
import pl.allegro.tech.hermes.consumers.consumer.receiver.MessageCommitter;

public class OffsetCommitter
implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(OffsetCommitter.class);
    private final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("offset-committer-%d").build());
    private final int offsetCommitPeriodSeconds;
    private final OffsetQueue offsetQueue;
    private final ConsumerPartitionAssignmentState partitionAssignmentState;
    private final MessageCommitter messageCommitter;
    private final HermesMetrics metrics;
    private final Set<SubscriptionPartitionOffset> inflightOffsets = new HashSet<SubscriptionPartitionOffset>();
    private final Map<SubscriptionPartition, Long> maxCommittedOffsets = new HashMap<SubscriptionPartition, Long>();

    public OffsetCommitter(OffsetQueue offsetQueue, ConsumerPartitionAssignmentState partitionAssignmentState, MessageCommitter messageCommitter, int offsetCommitPeriodSeconds, HermesMetrics metrics) {
        this.offsetQueue = offsetQueue;
        this.partitionAssignmentState = partitionAssignmentState;
        this.messageCommitter = messageCommitter;
        this.offsetCommitPeriodSeconds = offsetCommitPeriodSeconds;
        this.metrics = metrics;
    }

    @Override
    public void run() {
        try (Timer.Context c = this.metrics.timer("offset-committer.duration").time();){
            ReducingConsumer committedOffsetsReducer = this.processCommittedOffsets();
            Map maxDrainedCommittedOffsets = committedOffsetsReducer.reduced;
            this.updateMaxCommittedOffsets(maxDrainedCommittedOffsets);
            ReducingConsumer inflightOffsetReducer = this.processInflightOffsets(committedOffsetsReducer.all);
            Map minInflightOffsets = inflightOffsetReducer.reduced;
            int scheduledToCommitCount = 0;
            int obsoleteCount = 0;
            HashSet<SubscriptionPartition> committedOffsetToBeRemoved = new HashSet<SubscriptionPartition>();
            OffsetsToCommit offsetsToCommit = new OffsetsToCommit();
            for (SubscriptionPartition partition : Sets.union(minInflightOffsets.keySet(), this.maxCommittedOffsets.keySet())) {
                if (this.partitionAssignmentState.isAssignedPartitionAtCurrentTerm(partition)) {
                    long maxCommitted;
                    long minInflight = minInflightOffsets.getOrDefault(partition, Long.MAX_VALUE);
                    long offsetToBeCommitted = Math.min(minInflight, maxCommitted = this.maxCommittedOffsets.getOrDefault(partition, Long.MAX_VALUE).longValue());
                    if (offsetToBeCommitted >= 0L && offsetToBeCommitted < Long.MAX_VALUE) {
                        ++scheduledToCommitCount;
                        offsetsToCommit.add(new SubscriptionPartitionOffset(partition, offsetToBeCommitted));
                        if (maxCommitted != offsetToBeCommitted) continue;
                        committedOffsetToBeRemoved.add(partition);
                        continue;
                    }
                    logger.warn("Skipping offset out of bounds for subscription {}: partition={}, offset={}", new Object[]{partition.getSubscriptionName(), partition.getPartition(), offsetToBeCommitted});
                    continue;
                }
                ++obsoleteCount;
            }
            committedOffsetToBeRemoved.forEach(this.maxCommittedOffsets::remove);
            this.messageCommitter.commitOffsets(offsetsToCommit);
            this.metrics.counter("offset-committer.obsolete").inc((long)obsoleteCount);
            this.metrics.counter("offset-committer.committed").inc((long)scheduledToCommitCount);
            this.cleanupStoredOffsetsWithObsoleteTerms();
        }
        catch (Exception exception) {
            logger.error("Failed to run offset committer: {}", (Object)exception.getMessage(), (Object)exception);
        }
    }

    private ReducingConsumer processCommittedOffsets() {
        ReducingConsumer committedOffsetsReducer = new ReducingConsumer(Math::max, c -> c + 1L);
        this.offsetQueue.drainCommittedOffsets(committedOffsetsReducer);
        committedOffsetsReducer.resetModifierFunction();
        return committedOffsetsReducer;
    }

    private void updateMaxCommittedOffsets(Map<SubscriptionPartition, Long> maxDrainedCommittedOffsets) {
        maxDrainedCommittedOffsets.forEach((partition, drainedOffset) -> this.maxCommittedOffsets.compute((SubscriptionPartition)partition, (p, storedOffset) -> storedOffset == null || storedOffset < drainedOffset ? drainedOffset : storedOffset));
    }

    private ReducingConsumer processInflightOffsets(Set<SubscriptionPartitionOffset> committedOffsets) {
        ReducingConsumer inflightOffsetReducer = new ReducingConsumer(Math::min);
        this.offsetQueue.drainInflightOffsets((MessagePassingQueue.Consumer<SubscriptionPartitionOffset>)((MessagePassingQueue.Consumer)o -> this.reduceIfNotCommitted((SubscriptionPartitionOffset)o, inflightOffsetReducer, committedOffsets)));
        this.inflightOffsets.forEach(o -> this.reduceIfNotCommitted((SubscriptionPartitionOffset)o, inflightOffsetReducer, committedOffsets));
        this.inflightOffsets.clear();
        this.inflightOffsets.addAll(inflightOffsetReducer.all);
        return inflightOffsetReducer;
    }

    private void reduceIfNotCommitted(SubscriptionPartitionOffset offset, ReducingConsumer inflightOffsetReducer, Set<SubscriptionPartitionOffset> committedOffsets) {
        if (!committedOffsets.contains(offset)) {
            inflightOffsetReducer.accept(offset);
        }
    }

    private void cleanupStoredOffsetsWithObsoleteTerms() {
        this.inflightOffsets.removeIf(o -> !this.partitionAssignmentState.isAssignedPartitionAtCurrentTerm(o.getSubscriptionPartition()));
        this.maxCommittedOffsets.entrySet().removeIf(entry -> !this.partitionAssignmentState.isAssignedPartitionAtCurrentTerm((SubscriptionPartition)entry.getKey()));
    }

    public void start() {
        this.scheduledExecutor.scheduleWithFixedDelay(this, this.offsetCommitPeriodSeconds, this.offsetCommitPeriodSeconds, TimeUnit.SECONDS);
    }

    public void shutdown() {
        this.scheduledExecutor.shutdown();
    }

    private static final class ReducingConsumer
    implements MessagePassingQueue.Consumer<SubscriptionPartitionOffset> {
        private final BiFunction<Long, Long, Long> reductor;
        private Function<Long, Long> modifier;
        private final Map<SubscriptionPartition, Long> reduced = new HashMap<SubscriptionPartition, Long>();
        private final Set<SubscriptionPartitionOffset> all = new HashSet<SubscriptionPartitionOffset>();

        private ReducingConsumer(BiFunction<Long, Long, Long> reductor, Function<Long, Long> offsetModifier) {
            this.reductor = reductor;
            this.modifier = offsetModifier;
        }

        private ReducingConsumer(BiFunction<Long, Long, Long> reductor) {
            this(reductor, Function.identity());
        }

        private void resetModifierFunction() {
            this.modifier = Function.identity();
        }

        public void accept(SubscriptionPartitionOffset p) {
            this.all.add(p);
            this.reduced.compute(p.getSubscriptionPartition(), (k, v) -> {
                long offset = this.modifier.apply(p.getOffset());
                return v == null ? Long.valueOf(offset) : this.reductor.apply((Long)v, offset);
            });
        }
    }
}

