/*
 * Decompiled with CFR 0.152.
 */
package org.jberet.support.io;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.batch.api.BatchProperty;
import javax.batch.api.chunk.ItemReader;
import javax.enterprise.context.Dependent;
import javax.inject.Inject;
import javax.inject.Named;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.jberet.support._private.SupportMessages;
import org.jberet.support.io.KafkaItemReaderWriterBase;

@Named
@Dependent
public class KafkaItemReader
extends KafkaItemReaderWriterBase
implements ItemReader {
    @Inject
    @BatchProperty
    protected List<String> topicPartitions;
    @Inject
    @BatchProperty
    protected long pollTimeout;
    protected KafkaConsumer consumer;
    protected Iterator<ConsumerRecord> recordsBuffer;
    protected HashMap<String, Long> topicPartitionOffset = new HashMap();

    public void open(Serializable checkpoint) throws Exception {
        this.consumer = new KafkaConsumer(this.createConfigProperties());
        this.consumer.assign(this.createTopicPartitions());
        if (checkpoint != null) {
            HashMap chkp = (HashMap)checkpoint;
            for (Map.Entry e : chkp.entrySet()) {
                int partition;
                String topic;
                String key = (String)e.getKey();
                int colonPos = key.lastIndexOf(58);
                if (colonPos > 0) {
                    topic = key.substring(0, colonPos);
                    partition = Integer.parseInt(key.substring(colonPos + 1));
                } else if (colonPos < 0) {
                    topic = key;
                    partition = 0;
                } else {
                    throw SupportMessages.MESSAGES.invalidCheckpoint(checkpoint);
                }
                long newStartPosition = (Long)chkp.get(key) + 1L;
                this.consumer.seek(new TopicPartition(topic, partition), newStartPosition);
            }
        }
    }

    public Serializable checkpointInfo() {
        return this.topicPartitionOffset;
    }

    public Object readItem() throws Exception {
        if (this.recordsBuffer == null || !this.recordsBuffer.hasNext()) {
            ConsumerRecords records = this.consumer.poll(this.pollTimeout);
            if (records == null || records.isEmpty()) {
                return null;
            }
            this.recordsBuffer = records.iterator();
        }
        if (this.recordsBuffer.hasNext()) {
            ConsumerRecord rec = this.recordsBuffer.next();
            if (rec == null) {
                return null;
            }
            Object val = rec.value();
            this.topicPartitionOffset.put(rec.topic() + ':' + rec.partition(), rec.offset());
            return val;
        }
        return null;
    }

    public void close() {
        if (this.consumer != null) {
            this.consumer.close();
            this.consumer = null;
        }
    }

    protected List<TopicPartition> createTopicPartitions() {
        ArrayList<TopicPartition> tps = new ArrayList<TopicPartition>();
        if (this.topicPartitions != null) {
            for (String e : this.topicPartitions) {
                int colonPos = e.lastIndexOf(58);
                if (colonPos > 0) {
                    tps.add(new TopicPartition(e.substring(0, colonPos), Integer.parseInt(e.substring(colonPos + 1))));
                    continue;
                }
                if (colonPos < 0) {
                    tps.add(new TopicPartition(e, 0));
                    continue;
                }
                throw SupportMessages.MESSAGES.invalidReaderWriterProperty(null, this.topicPartitions.toString(), "topicPartitions");
            }
        } else {
            throw SupportMessages.MESSAGES.invalidReaderWriterProperty(null, null, "topicPartitions");
        }
        return tps;
    }
}

