package org.sourcelab.kafka.webview.ui.manager.kafka;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.sourcelab.kafka.webview.ui.manager.kafka.config.ClientConfig;
import org.sourcelab.kafka.webview.ui.manager.kafka.dto.KafkaResult;
import org.sourcelab.kafka.webview.ui.manager.socket.StartingPosition;

/* loaded from: input_file:BOOT-INF/classes/org/sourcelab/kafka/webview/ui/manager/kafka/SocketKafkaConsumer.class */
public class SocketKafkaConsumer implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) SocketKafkaConsumer.class);
    private static final long POLL_TIMEOUT_MS = 3000;
    private static final long DWELL_TIME_MS = 300;
    private static final int maxQueueCapacity = 25;
    private final KafkaConsumer kafkaConsumer;
    private final ClientConfig clientConfig;
    private volatile boolean requestStop = false;
    private final BlockingQueue<KafkaResult> outputQueue = new LinkedBlockingQueue(25);

    public SocketKafkaConsumer(KafkaConsumer kafkaConsumer, ClientConfig clientConfig) {
        this.kafkaConsumer = kafkaConsumer;
        this.clientConfig = clientConfig;
    }

    public Optional<KafkaResult> nextResult() {
        return Optional.ofNullable(this.outputQueue.poll());
    }

    public void requestStop() {
        this.requestStop = true;
    }

    @Override // java.lang.Runnable
    public void run() {
        Thread.currentThread().setName("WebSocket Consumer: " + this.clientConfig.getConsumerId());
        logger.info("Starting socket consumer for {}", this.clientConfig.getConsumerId());
        initializeStartingPosition(this.clientConfig.getStartingPosition());
        do {
            ConsumerRecords<ConsumerRecord> poll = this.kafkaConsumer.poll(POLL_TIMEOUT_MS);
            if (poll.isEmpty()) {
                sleep(POLL_TIMEOUT_MS);
            } else {
                for (ConsumerRecord consumerRecord : poll) {
                    try {
                        this.outputQueue.put(new KafkaResult(consumerRecord.partition(), consumerRecord.offset(), consumerRecord.timestamp(), consumerRecord.key(), consumerRecord.value()));
                    } catch (InterruptedException e) {
                        requestStop();
                    }
                }
                sleep(DWELL_TIME_MS);
            }
        } while (!this.requestStop);
        this.kafkaConsumer.close();
        logger.info("Shutdown consumer {}", this.clientConfig.getConsumerId());
    }

    private void initializeStartingPosition(StartingPosition startingPosition) {
        if (startingPosition.isStartFromHead()) {
            seekToHead();
            return;
        }
        if (startingPosition.isStartFromTimestamp()) {
            seekToTimestamp(startingPosition.getTimestamp());
            return;
        }
        if (!startingPosition.isStartFromOffsets()) {
            seekToTail();
            return;
        }
        HashMap hashMap = new HashMap();
        for (Map.Entry<Integer, Long> entry : startingPosition.getOffsetsMap().entrySet()) {
            hashMap.put(new TopicPartition(this.clientConfig.getTopicConfig().getTopicName(), entry.getKey().intValue()), entry.getValue());
        }
        seek(hashMap);
    }

    private void sleep(long j) {
        try {
            Thread.sleep(j);
        } catch (InterruptedException e) {
            requestStop();
        }
    }

    private void seekToHead() {
        seek(this.kafkaConsumer.beginningOffsets(getAllPartitions()));
    }

    private void seekToTail() {
        seek(this.kafkaConsumer.endOffsets(getAllPartitions()));
    }

    private void seekToTimestamp(long j) {
        HashMap hashMap = new HashMap();
        Iterator<TopicPartition> it = getAllPartitions().iterator();
        while (it.hasNext()) {
            hashMap.put(it.next(), Long.valueOf(j));
        }
        Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = this.kafkaConsumer.offsetsForTimes(hashMap);
        Map<TopicPartition, Long> hashMap2 = new HashMap<>();
        for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsetsForTimes.entrySet()) {
            hashMap2.put(entry.getKey(), Long.valueOf(entry.getValue().offset()));
        }
        seek(hashMap2);
    }

    private void seek(Map<TopicPartition, Long> map) {
        for (Map.Entry<TopicPartition, Long> entry : map.entrySet()) {
            this.kafkaConsumer.seek(entry.getKey(), entry.getValue().longValue());
        }
    }

    private List<TopicPartition> getAllPartitions() {
        List<PartitionInfo> partitionsFor = this.kafkaConsumer.partitionsFor(this.clientConfig.getTopicConfig().getTopicName());
        ArrayList arrayList = new ArrayList();
        for (PartitionInfo partitionInfo : partitionsFor) {
            if (!this.clientConfig.isPartitionFiltered(partitionInfo.partition())) {
                arrayList.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
            }
        }
        return arrayList;
    }
}
