package io.annot8.components.kafka.sources;

import io.annot8.api.capabilities.Capabilities;
import io.annot8.api.components.annotations.ComponentDescription;
import io.annot8.api.components.annotations.ComponentName;
import io.annot8.api.components.annotations.SettingsClass;
import io.annot8.api.components.responses.SourceResponse;
import io.annot8.api.context.Context;
import io.annot8.api.data.Item;
import io.annot8.api.data.ItemFactory;
import io.annot8.api.settings.Description;
import io.annot8.common.components.AbstractSource;
import io.annot8.common.components.AbstractSourceDescriptor;
import io.annot8.common.components.capabilities.SimpleCapabilities;
import io.annot8.common.data.content.InputStreamContent;
import io.annot8.common.data.content.Text;
import java.io.ByteArrayInputStream;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.record.TimestampType;

@ComponentName("Apache Kafka")
@ComponentDescription("Read data from a Apache Kafka topic")
@SettingsClass(Settings.class)
/* loaded from: input_file:io/annot8/components/kafka/sources/KafkaSource.class */
public class KafkaSource extends AbstractSourceDescriptor<Source, Settings> {

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.annot8.components.kafka.sources.KafkaSource$1, reason: invalid class name */
    /* loaded from: input_file:io/annot8/components/kafka/sources/KafkaSource$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$kafka$common$record$TimestampType = new int[TimestampType.values().length];

        static {
            try {
                $SwitchMap$org$apache$kafka$common$record$TimestampType[TimestampType.CREATE_TIME.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$record$TimestampType[TimestampType.LOG_APPEND_TIME.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$record$TimestampType[TimestampType.NO_TIMESTAMP_TYPE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* loaded from: input_file:io/annot8/components/kafka/sources/KafkaSource$Settings.class */
    public static class Settings implements io.annot8.api.settings.Settings {
        private List<String> topics = List.of();
        private List<String> servers = List.of("localhost:9092");
        private String keyDeserializer = "org.apache.kafka.common.serialization.StringDeserializer";
        private String valueDeserializer = "org.apache.kafka.common.serialization.StringDeserializer";
        private String groupId = "annot8";
        private Map<String, Object> overrideProperties = new HashMap();

        public boolean validate() {
            if (this.keyDeserializer == null || this.valueDeserializer == null) {
                return false;
            }
            try {
                Class.forName(this.keyDeserializer);
                Class.forName(this.valueDeserializer);
                return (this.topics == null || this.topics.isEmpty() || this.servers == null || this.servers.isEmpty() || this.groupId == null || this.overrideProperties == null) ? false : true;
            } catch (ClassNotFoundException e) {
                return false;
            }
        }

        @Description("The Apache Kafka topics to subscribe to")
        public List<String> getTopics() {
            return this.topics;
        }

        public void setTopics(List<String> list) {
            this.topics = list;
        }

        @Description("Apache Kafka servers to connect to")
        public List<String> getServers() {
            return this.servers;
        }

        public void setServers(List<String> list) {
            this.servers = list;
        }

        @Description(value = "The deserializer to use for deserializing record keys", defaultValue = "org.apache.kafka.common.serialization.StringDeserializer")
        public String getKeyDeserializer() {
            return this.keyDeserializer;
        }

        public void setKeyDeserializer(String str) {
            this.keyDeserializer = str;
        }

        @Description(value = "The deserializer to use for deserializing record values", defaultValue = "org.apache.kafka.common.serialization.StringDeserializer")
        public String getValueDeserializer() {
            return this.valueDeserializer;
        }

        public void setValueDeserializer(String str) {
            this.valueDeserializer = str;
        }

        @Description(value = "Set the group ID for the consumer", defaultValue = "annot8")
        public String getGroupId() {
            return this.groupId;
        }

        public void setGroupId(String str) {
            this.groupId = str;
        }

        @Description("Override properties on the Kafka connection - these take precedence over other values set")
        public Map<String, Object> getOverrideProperties() {
            return this.overrideProperties;
        }

        public void setOverrideProperties(Map<String, Object> map) {
            this.overrideProperties = map;
        }
    }

    /* loaded from: input_file:io/annot8/components/kafka/sources/KafkaSource$Source.class */
    public static class Source extends AbstractSource {
        private final Consumer<Object, Object> consumer;
        private static final String CONTENT_DESCRIPTION = "Value of Apache Kafka record";

        public Source(Settings settings) {
            HashMap hashMap = new HashMap();
            hashMap.put("bootstrap.servers", String.join(",", settings.getServers()));
            hashMap.put("group.id", settings.getGroupId());
            hashMap.put("enable.auto.commit", "true");
            hashMap.put("auto.commit.interval.ms", "1000");
            hashMap.put("key.deserializer", settings.getKeyDeserializer());
            hashMap.put("value.deserializer", settings.getValueDeserializer());
            hashMap.putAll(settings.getOverrideProperties());
            this.consumer = new KafkaConsumer(hashMap);
            this.consumer.subscribe(settings.getTopics());
        }

        protected Source(Consumer<Object, Object> consumer, List<String> list) {
            this.consumer = consumer;
            this.consumer.subscribe(list);
        }

        public SourceResponse read(ItemFactory itemFactory) {
            ConsumerRecords poll = this.consumer.poll(Duration.ofMillis(100L));
            if (poll.isEmpty()) {
                return SourceResponse.empty();
            }
            poll.forEach(consumerRecord -> {
                createItemFromRecord(itemFactory, consumerRecord);
            });
            return SourceResponse.ok();
        }

        public static Item createItemFromRecord(ItemFactory itemFactory, ConsumerRecord<Object, Object> consumerRecord) {
            Item create = itemFactory.create();
            consumerRecord.headers().iterator().forEachRemaining(header -> {
                create.getProperties().set("header/" + header.key(), header.value());
            });
            if (consumerRecord.key() != null) {
                create.getProperties().set("key", consumerRecord.key());
            }
            consumerRecord.leaderEpoch().ifPresent(num -> {
                create.getProperties().set("leaderEpoch", num);
            });
            create.getProperties().set("offset", Long.valueOf(consumerRecord.offset()));
            create.getProperties().set("partition", Integer.valueOf(consumerRecord.partition()));
            addTimestampToItem(create, consumerRecord.timestampType(), consumerRecord.timestamp());
            create.getProperties().set("topic", consumerRecord.topic());
            Object value = consumerRecord.value();
            if (value instanceof String) {
                create.createContent(Text.class).withData((String) value).withDescription(CONTENT_DESCRIPTION).save();
            } else if (value instanceof byte[]) {
                create.createContent(InputStreamContent.class).withData(new ByteArrayInputStream((byte[]) value)).withDescription(CONTENT_DESCRIPTION).save();
            } else if (value instanceof ByteBuffer) {
                create.createContent(InputStreamContent.class).withData(new ByteArrayInputStream(((ByteBuffer) value).array())).withDescription(CONTENT_DESCRIPTION).save();
            }
            return create;
        }

        public static void addTimestampToItem(Item item, TimestampType timestampType, long j) {
            switch (AnonymousClass1.$SwitchMap$org$apache$kafka$common$record$TimestampType[timestampType.ordinal()]) {
                case 1:
                    item.getProperties().set("createdTimestamp", LocalDateTime.ofInstant(Instant.ofEpochMilli(j), ZoneId.systemDefault()));
                    return;
                case 2:
                    item.getProperties().set("loggedTimestamp", LocalDateTime.ofInstant(Instant.ofEpochMilli(j), ZoneId.systemDefault()));
                    return;
                case 3:
                default:
                    return;
            }
        }
    }

    public Capabilities capabilities() {
        return new SimpleCapabilities.Builder().withCreatesContent(Text.class).withCreatesContent(InputStreamContent.class).build();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Source createComponent(Context context, Settings settings) {
        return new Source(settings);
    }
}
