/*
 * Decompiled with CFR 0.152.
 */
package org.miaixz.bus.extra.mq.provider.kafka;

import java.io.IOException;
import java.time.Duration;
import java.util.Collection;
import java.util.Properties;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.miaixz.bus.core.xyz.IoKit;
import org.miaixz.bus.core.xyz.ListKit;
import org.miaixz.bus.extra.mq.Consumer;
import org.miaixz.bus.extra.mq.Message;
import org.miaixz.bus.extra.mq.MessageHandler;

public class KafkaConsumer
implements Consumer {
    private final org.apache.kafka.clients.consumer.Consumer<String, byte[]> consumer;

    public KafkaConsumer(Properties properties) {
        this.consumer = new org.apache.kafka.clients.consumer.KafkaConsumer(properties);
    }

    public KafkaConsumer(org.apache.kafka.clients.consumer.Consumer<String, byte[]> consumer) {
        this.consumer = consumer;
    }

    public KafkaConsumer setTopics(String ... topics) {
        this.consumer.subscribe((Collection)ListKit.of((Object[])topics));
        return this;
    }

    public KafkaConsumer setTopicPattern(Pattern topicPattern) {
        this.consumer.subscribe(topicPattern);
        return this;
    }

    @Override
    public void subscribe(MessageHandler messageHandler) {
        for (ConsumerRecord record : this.consumer.poll(Duration.ofMillis(3000L))) {
            messageHandler.handle(new ConsumerRecordMessage((ConsumerRecord<String, byte[]>)record));
        }
    }

    @Override
    public void close() throws IOException {
        IoKit.nullSafeClose(this.consumer);
    }

    private static class ConsumerRecordMessage
    implements Message {
        private final ConsumerRecord<String, byte[]> record;

        private ConsumerRecordMessage(ConsumerRecord<String, byte[]> record) {
            this.record = record;
        }

        @Override
        public String topic() {
            return this.record.topic();
        }

        @Override
        public byte[] content() {
            return (byte[])this.record.value();
        }
    }
}

