/*
 * Decompiled with CFR 0.152.
 */
package ch.squaredesk.nova.comm.kafka;

import ch.squaredesk.nova.comm.kafka.IncomingMessageMetaData;
import ch.squaredesk.nova.comm.kafka.RetrieveInfo;
import ch.squaredesk.nova.comm.retrieving.IncomingMessage;
import ch.squaredesk.nova.metrics.Metrics;
import ch.squaredesk.nova.tuples.Pair;
import io.reactivex.Flowable;
import io.reactivex.Scheduler;
import io.reactivex.schedulers.Schedulers;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MessageReceiver
extends ch.squaredesk.nova.comm.retrieving.MessageReceiver<String, String, IncomingMessageMetaData> {
    private final Logger logger = LoggerFactory.getLogger(MessageReceiver.class);
    private final Flowable<IncomingMessage<String, IncomingMessageMetaData>> allMessagesStream;
    private final Scheduler scheduler = Schedulers.io();
    private final Map<String, AtomicInteger> topicToSubscriptionCount = new ConcurrentHashMap<String, AtomicInteger>();

    protected MessageReceiver(String identifier, Properties consumerProperties, long pollTimeout, TimeUnit pollTimeUnit, Metrics metrics) {
        super(identifier, metrics);
        AtomicBoolean shutdown = new AtomicBoolean(false);
        Function<KafkaConsumer, ConsumerRecords> poller = consumer -> {
            ConsumerRecords consumerRecords = null;
            do {
                try {
                    consumerRecords = consumer.poll(pollTimeUnit.toMillis(pollTimeout));
                }
                catch (Exception ex) {
                    break;
                }
                if (consumerRecords == null || !consumerRecords.isEmpty()) continue;
                this.logger.trace("Ignoring empty consumer records");
            } while (consumerRecords == null && !shutdown.get());
            return consumerRecords;
        };
        Runnable sleeper = () -> {
            this.logger.trace("No topic subscribed yet, sleeping {} {}", (Object)pollTimeout, (Object)pollTimeUnit);
            try {
                Thread.currentThread();
                Thread.sleep(pollTimeUnit.toMillis(pollTimeout));
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        };
        BiFunction<Set, Pair, Boolean> subscriptionMaintainer = (subscribedTopics, consumerTopicsPair) -> {
            if (!((HashSet)consumerTopicsPair._2).equals(subscribedTopics)) {
                this.logger.debug("Changing topic subscriptions to " + subscribedTopics);
                ((HashSet)consumerTopicsPair._2).clear();
                ((HashSet)consumerTopicsPair._2).addAll(subscribedTopics);
                ((KafkaConsumer)consumerTopicsPair._1).subscribe((Collection)subscribedTopics);
            }
            return !((HashSet)consumerTopicsPair._2).isEmpty();
        };
        Flowable consumerRecordsStream = Flowable.generate(() -> {
            this.logger.info("Opening connection to Kafka broker");
            return new Pair((Object)new KafkaConsumer(consumerProperties), new HashSet());
        }, (consumerTopicsPair, emitter) -> {
            while (!((Boolean)subscriptionMaintainer.apply(this.topicToSubscriptionCount.keySet(), (Pair)consumerTopicsPair)).booleanValue()) {
                sleeper.run();
            }
            ConsumerRecords consumerRecords = (ConsumerRecords)poller.apply((KafkaConsumer)consumerTopicsPair._1);
            if (consumerRecords == null) {
                emitter.onComplete();
            } else {
                this.logger.debug("Read consumer records, size = {}", (Object)consumerRecords.count());
                emitter.onNext((Object)consumerRecords);
            }
        }, consumerTopicsPair -> {
            this.logger.info("Shutting down connection to Kafka broker");
            try {
                ((KafkaConsumer)consumerTopicsPair._1).close();
            }
            catch (Exception e) {
                this.logger.info("An error occurred trying to close KafkaConsumer", e.getCause());
            }
        });
        this.allMessagesStream = consumerRecordsStream.subscribeOn(this.scheduler).flatMap(MessageReceiver::observableFor).map(record -> {
            RetrieveInfo kafkaSpecificInfo = new RetrieveInfo();
            IncomingMessageMetaData metaData = new IncomingMessageMetaData(record.topic(), kafkaSpecificInfo);
            return new IncomingMessage((Object)((String)record.value()), (ch.squaredesk.nova.comm.retrieving.IncomingMessageMetaData)metaData);
        }).share();
    }

    private static Flowable<ConsumerRecord<String, String>> observableFor(ConsumerRecords<String, String> records) {
        return Flowable.generate(() -> records.iterator(), (iterator, emitter) -> {
            if (iterator.hasNext()) {
                emitter.onNext((Object)((ConsumerRecord)iterator.next()));
            } else {
                emitter.onComplete();
            }
        });
    }

    public Flowable<IncomingMessage<String, IncomingMessageMetaData>> messages(String destination) {
        Objects.requireNonNull(destination, "destination must not be null");
        return this.allMessagesStream.filter(incomingMessage -> destination.equals(((IncomingMessageMetaData)incomingMessage.metaData).destination)).doOnSubscribe(s -> this.scheduler.scheduleDirect(() -> {
            AtomicInteger subsCounter = this.topicToSubscriptionCount.computeIfAbsent(destination, key -> new AtomicInteger(0));
            int count = subsCounter.incrementAndGet();
            this.logger.info("Subscribing to topic {}, current subscription count is  {}", (Object)destination, (Object)count);
            this.metricsCollector.subscriptionCreated((Object)destination);
        })).doFinally(() -> this.scheduler.scheduleDirect(() -> {
            this.metricsCollector.subscriptionDestroyed((Object)destination);
            AtomicInteger subsCounter = this.topicToSubscriptionCount.get(destination);
            if (subsCounter == null) {
                this.logger.error("WTF! Unsubscribing topic {} but the counter is gone?!?!?", (Object)destination);
            } else {
                int count = subsCounter.decrementAndGet();
                if (count == 0) {
                    this.topicToSubscriptionCount.remove(destination);
                    this.logger.info("Unsubscribed last subscription to topic " + destination);
                } else {
                    this.logger.info("Unsubscribed from topic {}, current subscription count is  {}", (Object)destination, (Object)count);
                }
            }
        }));
    }

    public void shutdown() {
        this.logger.info("Shutting down, currently subscribed to " + this.topicToSubscriptionCount.keySet());
        this.topicToSubscriptionCount.clear();
    }
}

