/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.kafka.impl;

import io.smallrye.mutiny.operators.AbstractMulti;
import io.smallrye.mutiny.subscription.MultiSubscriber;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.impl.KafkaRecordStreamSubscription;
import io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaConsumer;
import io.vertx.core.Context;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;

public class KafkaRecordBatchStream<K, V>
extends AbstractMulti<ConsumerRecords<K, V>> {
    private final ReactiveKafkaConsumer<K, V> client;
    private final KafkaConnectorIncomingConfiguration config;
    private final Context context;
    private final Set<KafkaRecordStreamSubscription<K, V, ConsumerRecords<K, V>>> subscriptions = Collections.newSetFromMap(new ConcurrentHashMap());

    public KafkaRecordBatchStream(ReactiveKafkaConsumer<K, V> client, KafkaConnectorIncomingConfiguration config, Context context) {
        this.config = config;
        this.client = client;
        this.context = context;
    }

    @Override
    public void subscribe(MultiSubscriber<? super ConsumerRecords<K, V>> subscriber) {
        KafkaRecordStreamSubscription<K, V, ? super ConsumerRecords<K, V>> subscription = new KafkaRecordStreamSubscription<K, V, ConsumerRecords<K, V>>(this.client, this.config, subscriber, this.context, 1, (cr, q) -> q.offer(cr));
        this.subscriptions.add(subscription);
        subscriber.onSubscribe(subscription);
    }

    void removeFromQueueRecordsFromTopicPartitions(Collection<TopicPartition> revokedPartitions) {
        if (revokedPartitions.isEmpty()) {
            return;
        }
        this.subscriptions.forEach(s -> this.removeFromQueue((KafkaRecordStreamSubscription<K, V, ConsumerRecords<K, V>>)s, revokedPartitions));
    }

    private void removeFromQueue(KafkaRecordStreamSubscription<K, V, ConsumerRecords<K, V>> subscription, Collection<TopicPartition> revokedPartitions) {
        subscription.rewriteQueue(cr -> {
            HashMap records = new HashMap();
            cr.partitions().stream().filter(t -> !revokedPartitions.contains(t)).forEach(t -> records.put((TopicPartition)t, cr.records((TopicPartition)t)));
            if (!records.isEmpty()) {
                return new ConsumerRecords(records);
            }
            return null;
        });
    }
}

