/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.consumer.offset;

import com.codahale.metrics.Timer;
import com.google.common.collect.Sets;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.MpscArrayQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.common.metric.HermesMetrics;
import pl.allegro.tech.hermes.consumers.consumer.offset.FailedToCommitOffsets;
import pl.allegro.tech.hermes.consumers.consumer.offset.OffsetQueue;
import pl.allegro.tech.hermes.consumers.consumer.offset.OffsetsToCommit;
import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartition;
import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartitionOffset;
import pl.allegro.tech.hermes.consumers.consumer.receiver.MessageCommitter;

public class OffsetCommitter
implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(OffsetCommitter.class);
    private final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
    private final int offsetCommitPeriodSeconds;
    private final OffsetQueue offsetQueue;
    private final List<MessageCommitter> messageCommitters;
    private final HermesMetrics metrics;
    private final Set<SubscriptionPartitionOffset> inflightOffsets = new HashSet<SubscriptionPartitionOffset>();
    private final Set<SubscriptionPartitionOffset> failedToCommitOffsets = new HashSet<SubscriptionPartitionOffset>();
    private final MpscArrayQueue<SubscriptionName> subscriptionsToCleanup = new MpscArrayQueue(1000);

    public OffsetCommitter(OffsetQueue offsetQueue, List<MessageCommitter> messageCommitters, int offsetCommitPeriodSeconds, HermesMetrics metrics) {
        this.offsetQueue = offsetQueue;
        this.messageCommitters = messageCommitters;
        this.offsetCommitPeriodSeconds = offsetCommitPeriodSeconds;
        this.metrics = metrics;
    }

    @Override
    public void run() {
        try (Timer.Context c = this.metrics.timer("offset-committer.duration").time();){
            ReducingConsumer committedOffsetsReducer = this.processCommittedOffsets();
            Map maxCommittedOffsets = committedOffsetsReducer.reduced;
            ReducingConsumer inflightOffsetReducer = this.processInflightOffsets(committedOffsetsReducer.all);
            Map minInflightOffsets = inflightOffsetReducer.reduced;
            int scheduledToCommit = 0;
            OffsetsToCommit offsetsToCommit = new OffsetsToCommit();
            for (SubscriptionPartition partition : Sets.union(minInflightOffsets.keySet(), maxCommittedOffsets.keySet())) {
                long offset = Math.min(minInflightOffsets.getOrDefault(partition, Long.MAX_VALUE), maxCommittedOffsets.getOrDefault(partition, Long.MAX_VALUE));
                if (offset < 0L || offset >= Long.MAX_VALUE) continue;
                ++scheduledToCommit;
                offsetsToCommit.add(new SubscriptionPartitionOffset(partition, offset));
            }
            this.commit(offsetsToCommit);
            this.metrics.counter("offset-committer.committed").inc((long)(scheduledToCommit - this.failedToCommitOffsets.size()));
            this.metrics.counter("offset-committer.failed").inc((long)this.failedToCommitOffsets.size());
            this.cleanupUnusedSubscriptions();
        }
        catch (Exception exception) {
            logger.error("Failed to run offset committer: {}", (Object)exception.getMessage(), (Object)exception);
        }
    }

    private ReducingConsumer processCommittedOffsets() {
        ReducingConsumer committedOffsetsReducer = new ReducingConsumer(Math::max, c -> c + 1L);
        this.offsetQueue.drainCommittedOffsets(committedOffsetsReducer);
        committedOffsetsReducer.resetModifierFunction();
        this.failedToCommitOffsets.forEach(committedOffsetsReducer::accept);
        this.failedToCommitOffsets.clear();
        return committedOffsetsReducer;
    }

    private ReducingConsumer processInflightOffsets(Set<SubscriptionPartitionOffset> committedOffsets) {
        ReducingConsumer inflightOffsetReducer = new ReducingConsumer(Math::min);
        this.offsetQueue.drainInflightOffsets((MessagePassingQueue.Consumer<SubscriptionPartitionOffset>)((MessagePassingQueue.Consumer)o -> this.reduceIfNotCommitted((SubscriptionPartitionOffset)o, inflightOffsetReducer, committedOffsets)));
        this.inflightOffsets.forEach(o -> this.reduceIfNotCommitted((SubscriptionPartitionOffset)o, inflightOffsetReducer, committedOffsets));
        this.inflightOffsets.clear();
        this.inflightOffsets.addAll(inflightOffsetReducer.all);
        return inflightOffsetReducer;
    }

    private void reduceIfNotCommitted(SubscriptionPartitionOffset offset, ReducingConsumer inflightOffsetReducer, Set<SubscriptionPartitionOffset> committedOffsets) {
        if (!committedOffsets.contains(offset)) {
            inflightOffsetReducer.accept(offset);
        }
    }

    private void commit(OffsetsToCommit offsetsToCommit) {
        for (MessageCommitter committer : this.messageCommitters) {
            FailedToCommitOffsets failedOffsets = committer.commitOffsets(offsetsToCommit);
            if (!failedOffsets.hasFailed()) continue;
            this.failedToCommitOffsets.addAll(failedOffsets.failedOffsets());
        }
    }

    public void removeUncommittedOffsets(SubscriptionName subscriptionName) {
        this.subscriptionsToCleanup.offer((Object)subscriptionName);
    }

    private void cleanupUnusedSubscriptions() {
        HashSet subscriptionNames = new HashSet();
        this.subscriptionsToCleanup.drain(subscriptionNames::add);
        Iterator<SubscriptionPartitionOffset> iterator = this.inflightOffsets.iterator();
        while (iterator.hasNext()) {
            if (!subscriptionNames.contains(iterator.next().getSubscriptionName())) continue;
            iterator.remove();
        }
    }

    public void start() {
        this.scheduledExecutor.scheduleWithFixedDelay(this, this.offsetCommitPeriodSeconds, this.offsetCommitPeriodSeconds, TimeUnit.SECONDS);
    }

    public void shutdown() {
        this.scheduledExecutor.shutdown();
    }

    private static final class ReducingConsumer
    implements MessagePassingQueue.Consumer<SubscriptionPartitionOffset> {
        private final BiFunction<Long, Long, Long> reductor;
        private Function<Long, Long> modifier;
        private final Map<SubscriptionPartition, Long> reduced = new HashMap<SubscriptionPartition, Long>();
        private final Set<SubscriptionPartitionOffset> all = new HashSet<SubscriptionPartitionOffset>();

        private ReducingConsumer(BiFunction<Long, Long, Long> reductor, Function<Long, Long> offsetModifier) {
            this.reductor = reductor;
            this.modifier = offsetModifier;
        }

        private ReducingConsumer(BiFunction<Long, Long, Long> reductor) {
            this(reductor, Function.identity());
        }

        private void resetModifierFunction() {
            this.modifier = Function.identity();
        }

        public void accept(SubscriptionPartitionOffset p) {
            this.all.add(p);
            this.reduced.compute(p.getSubscriptionPartition(), (k, v) -> {
                long offset = this.modifier.apply(p.getOffset());
                return v == null ? Long.valueOf(offset) : this.reductor.apply((Long)v, offset);
            });
        }
    }
}

