/*
 * Decompiled with CFR 0.152.
 */
package org.wikidata.query.rdf.tool.change;

import com.codahale.metrics.Counter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.AtomicLongMap;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Properties;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wikidata.query.rdf.tool.Utils;
import org.wikidata.query.rdf.tool.change.Change;
import org.wikidata.query.rdf.tool.change.JsonDeserializer;
import org.wikidata.query.rdf.tool.change.KafkaOffsetsRepository;
import org.wikidata.query.rdf.tool.change.events.ChangeEvent;
import org.wikidata.query.rdf.tool.change.events.PageDeleteEvent;
import org.wikidata.query.rdf.tool.change.events.RevisionCreateEvent;
import org.wikidata.query.rdf.tool.exception.RetryableException;
import org.wikidata.query.rdf.tool.wikibase.WikibaseRepository;

public class KafkaPoller
implements Change.Source<Batch> {
    private static final Logger log = LoggerFactory.getLogger(KafkaPoller.class);
    private static final String MAX_POLL_PROPERTY = KafkaPoller.class.getName() + ".maxPoll";
    private static final String MAX_FETCH_PROPERTY = KafkaPoller.class.getName() + ".maxFetch";
    private static final String REPORTING_TOPIC_PROP = KafkaPoller.class.getName() + ".reportingTopic";
    private static final Map<String, Class<? extends ChangeEvent>> defaultTopics = ImmutableMap.of((Object)"mediawiki.revision-create", RevisionCreateEvent.class, (Object)"mediawiki.page-delete", PageDeleteEvent.class, (Object)"mediawiki.page-undelete", RevisionCreateEvent.class);
    private static final String DEFAULT_REPORTING_TOPIC = "mediawiki.revision-create";
    private final String reportingTopic;
    private final Instant firstStartTime;
    private final int batchSize;
    private final Consumer<String, ChangeEvent> consumer;
    private final KafkaOffsetsRepository kafkaOffsetsRepository;
    private WikibaseRepository.Uris uris;
    @Nonnull
    private final ImmutableList<TopicPartition> topicPartitions;
    private final boolean ignoreStoredOffsets;
    private final Counter changesCounter;
    private final Timer pollingTimer;

    public KafkaPoller(Consumer<String, ChangeEvent> consumer, WikibaseRepository.Uris uris, Instant firstStartTime, int batchSize, Collection<String> topics, KafkaOffsetsRepository kafkaOffsetsRepository, boolean ignoreStoredOffsets, MetricRegistry metricRegistry) {
        this.consumer = consumer;
        this.uris = uris;
        this.firstStartTime = firstStartTime;
        this.batchSize = batchSize;
        this.changesCounter = metricRegistry.counter("kafka-changes-counter");
        this.pollingTimer = metricRegistry.timer("kafka-changes-timer");
        this.topicPartitions = KafkaPoller.topicsToPartitions(topics, consumer);
        this.kafkaOffsetsRepository = kafkaOffsetsRepository;
        this.ignoreStoredOffsets = ignoreStoredOffsets;
        this.reportingTopic = System.getProperty(REPORTING_TOPIC_PROP, DEFAULT_REPORTING_TOPIC);
    }

    private static Map<String, Class<? extends ChangeEvent>> clusterNamesAwareTopics(Collection<String> clusterNames) {
        if (clusterNames == null || clusterNames.isEmpty()) {
            return defaultTopics;
        }
        return (Map)defaultTopics.entrySet().stream().flatMap(entry -> clusterNames.stream().map(cluster -> Maps.immutableEntry((Object)(cluster + "." + (String)entry.getKey()), entry.getValue()))).collect(ImmutableMap.toImmutableMap(Map.Entry::getKey, Map.Entry::getValue));
    }

    private static KafkaConsumer<String, ChangeEvent> buildKafkaConsumer(String servers, String consumerId, Map<String, Class<? extends ChangeEvent>> topicToClass, int batchSize) {
        Properties props = new Properties();
        props.put("bootstrap.servers", servers);
        props.put("group.id", consumerId);
        props.put("max.poll.interval.ms", "600000");
        props.put("enable.auto.commit", "false");
        props.put("max.poll.records", System.getProperty(MAX_POLL_PROPERTY, String.valueOf(batchSize)));
        props.put("max.partition.fetch.bytes", System.getProperty(MAX_FETCH_PROPERTY, String.valueOf(batchSize * 1024)));
        log.info("Creating consumer {}", (Object)consumerId);
        return new KafkaConsumer(props, (Deserializer)new StringDeserializer(), new JsonDeserializer(topicToClass));
    }

    @Nonnull
    public static KafkaPoller buildKafkaPoller(String kafkaServers, String consumerId, Collection<String> clusterNames, WikibaseRepository.Uris uris, int batchSize, Instant startTime, boolean ignoreStoredOffsets, KafkaOffsetsRepository kafkaOffsetsRepository, MetricRegistry metricRegistry) {
        if (consumerId == null) {
            throw new IllegalArgumentException("Consumer ID (--consumer) must be set");
        }
        Map<String, Class<? extends ChangeEvent>> topicsToClass = KafkaPoller.clusterNamesAwareTopics(clusterNames);
        ImmutableSet topics = ImmutableSet.copyOf(topicsToClass.keySet());
        return new KafkaPoller((Consumer<String, ChangeEvent>)KafkaPoller.buildKafkaConsumer(kafkaServers, consumerId, topicsToClass, batchSize), uris, startTime, batchSize, (Collection<String>)topics, kafkaOffsetsRepository, ignoreStoredOffsets, metricRegistry);
    }

    @Override
    public Batch firstBatch() throws RetryableException {
        Map<TopicPartition, OffsetAndTimestamp> kafkaOffsets = this.fetchOffsets();
        this.consumer.assign(kafkaOffsets.keySet());
        log.info("Subscribed to {} topics", (Object)kafkaOffsets.size());
        kafkaOffsets.forEach((topic, offset) -> {
            if (offset == null) {
                log.info("No offset for {}, starting at the end", topic);
                this.consumer.seekToEnd(Collections.singletonList(topic));
                return;
            }
            this.consumer.seek(topic, offset.offset());
            log.info("Set topic {} to {}", topic, offset);
        });
        return this.fetch(this.firstStartTime);
    }

    private Map<TopicPartition, OffsetAndTimestamp> fetchOffsets() {
        Object storedOffsets = this.ignoreStoredOffsets ? ImmutableMap.of() : this.kafkaOffsetsRepository.load(this.firstStartTime);
        Map<TopicPartition, Long> topicParts = this.topicPartitions.stream().filter(arg_0 -> KafkaPoller.lambda$fetchOffsets$3((Map)storedOffsets, arg_0)).collect(Collectors.toMap(o -> o, o -> this.firstStartTime.toEpochMilli()));
        Map<TopicPartition, OffsetAndTimestamp> results = storedOffsets.entrySet().stream().filter(e -> this.topicPartitions.contains(e.getKey())).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
        if (topicParts.size() > 0) {
            results.putAll(this.consumer.offsetsForTimes(topicParts));
        }
        return results;
    }

    public ImmutableMap<TopicPartition, Long> currentOffsets() {
        return (ImmutableMap)this.topicPartitions.stream().collect(ImmutableMap.toImmutableMap(Function.identity(), arg_0 -> this.consumer.position(arg_0)));
    }

    @Override
    public Batch nextBatch(Batch lastBatch) throws RetryableException {
        this.consumer.commitSync();
        this.kafkaOffsetsRepository.store((Map<TopicPartition, Long>)this.currentOffsets());
        return this.fetch(lastBatch.leftOffDate());
    }

    private Batch fetch(Instant lastNextStartTime) throws RetryableException {
        AtomicLongMap topicCounts;
        Instant nextInstant;
        LinkedHashMap<String, Change> changesByTitle;
        block20: {
            boolean foundSomething;
            changesByTitle = new LinkedHashMap<String, Change>();
            nextInstant = Instant.EPOCH;
            topicCounts = AtomicLongMap.create();
            do {
                ConsumerRecords records;
                try (Timer.Context timerContext = this.pollingTimer.time();){
                    records = this.consumer.poll(1000L);
                }
                catch (InterruptException | WakeupException e) {
                    throw new RetryableException("Error fetching recent changes", e);
                }
                int count = records.count();
                log.debug("Fetched {} records from Kafka", (Object)count);
                this.changesCounter.inc((long)count);
                if (count == 0) break block20;
                foundSomething = false;
                for (ConsumerRecord record : records) {
                    Change change;
                    Change dupe;
                    ChangeEvent event = (ChangeEvent)record.value();
                    String topic = record.topic();
                    log.trace("Got event t:{} o:{}", (Object)record.topic(), (Object)record.offset());
                    if (!event.domain().equals(this.uris.getHost()) || !this.uris.isEntityNamespace(event.namespace()) || event.isRedundant()) continue;
                    foundSomething = true;
                    topicCounts.getAndIncrement((Object)record.topic());
                    if (topic.endsWith(this.reportingTopic)) {
                        nextInstant = Utils.max(nextInstant, Instant.ofEpochMilli(record.timestamp()));
                    }
                    if ((dupe = changesByTitle.put((change = new Change(event.title(), event.revision(), event.timestamp(), record.offset())).entityId(), change)) == null || change.revision() <= -1L || dupe.revision() <= change.revision() && dupe.revision() != -1L) continue;
                    changesByTitle.remove(change.entityId());
                    changesByTitle.put(change.entityId(), dupe);
                }
                log.debug("{} records left after filtering", (Object)changesByTitle.size());
                if (changesByTitle.size() >= this.batchSize) break block20;
            } while (changesByTitle.size() <= 0 || foundSomething);
            log.info("Did not find anything useful in this batch, returning existing data");
        }
        if (nextInstant.equals(Instant.EPOCH)) {
            nextInstant = lastNextStartTime;
        }
        ImmutableList changes = ImmutableList.copyOf(changesByTitle.values());
        log.info("Found {} changes", (Object)changes.size());
        if (log.isDebugEnabled()) {
            topicCounts.asMap().forEach((k, v) -> log.debug("Topic {}: {} records", k, v));
        }
        long advanced = ChronoUnit.MILLIS.between(lastNextStartTime, nextInstant);
        return new Batch((ImmutableList<Change>)changes, advanced, nextInstant.minusSeconds(1L).toString(), nextInstant);
    }

    private static ImmutableList<TopicPartition> topicsToPartitions(Collection<String> topics, Consumer<String, ChangeEvent> consumer) {
        return (ImmutableList)topics.stream().flatMap(topic -> consumer.partitionsFor(topic).stream()).map(p -> new TopicPartition(p.topic(), p.partition())).collect(ImmutableList.toImmutableList());
    }

    @Override
    public void close() {
        this.consumer.close();
    }

    private static /* synthetic */ boolean lambda$fetchOffsets$3(Map storedOffsets, TopicPartition tp) {
        return !storedOffsets.containsKey(tp);
    }

    public static final class Batch
    extends Change.Batch.AbstractDefaultImplementation {
        private final Instant leftOffDate;

        public Batch(ImmutableList<Change> changes, long advanced, String leftOff, Instant nextStartTime) {
            super(changes, advanced, leftOff);
            this.leftOffDate = nextStartTime;
        }

        @Override
        public String advancedUnits() {
            return "milliseconds";
        }

        @Override
        public Instant leftOffDate() {
            return this.leftOffDate;
        }
    }
}

