/*
 * Decompiled with CFR 0.152.
 */
package org.zalando.nakadi.repository.kafka;

import com.google.common.collect.Lists;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.zalando.nakadi.domain.ConsumedEvent;
import org.zalando.nakadi.domain.Cursor;
import org.zalando.nakadi.repository.EventConsumer;
import org.zalando.nakadi.repository.kafka.KafkaCursor;

public class NakadiKafkaConsumer
implements EventConsumer {
    private final Consumer<String, String> kafkaConsumer;
    private Queue<ConsumedEvent> eventQueue = Lists.newLinkedList();
    private final long pollTimeout;

    public NakadiKafkaConsumer(Consumer<String, String> kafkaConsumer, String topic, List<KafkaCursor> kafkaCursors, long pollTimeout) {
        this.kafkaConsumer = kafkaConsumer;
        this.pollTimeout = pollTimeout;
        List<TopicPartition> topicPartitions = kafkaCursors.stream().map(cursor -> new TopicPartition(topic, cursor.getPartition())).collect(Collectors.toList());
        kafkaConsumer.assign(topicPartitions);
        topicPartitions.forEach(topicPartition -> kafkaConsumer.seek(topicPartition, kafkaCursors.stream().filter(cursor -> cursor.getPartition() == topicPartition.partition()).findFirst().get().getOffset()));
    }

    @Override
    public Optional<ConsumedEvent> readEvent() {
        if (this.eventQueue.isEmpty()) {
            this.pollFromKafka();
        }
        return Optional.ofNullable(this.eventQueue.poll());
    }

    @Override
    public void close() {
        this.kafkaConsumer.close();
    }

    private void pollFromKafka() {
        ConsumerRecords records = this.kafkaConsumer.poll(this.pollTimeout);
        this.eventQueue = StreamSupport.stream(records.spliterator(), false).map(record -> {
            Cursor cursor = KafkaCursor.kafkaCursor(record.partition(), record.offset()).asNakadiCursor();
            return new ConsumedEvent((String)record.value(), record.topic(), cursor.getPartition(), cursor.getOffset());
        }).collect(Collectors.toCollection(Lists::newLinkedList));
    }
}

