package com.ibm.cloudant.kafka.connect;

import com.cloudant.client.api.Database;
import com.cloudant.client.api.model.ChangesResult;
import com.google.gson.JsonObject;
import com.ibm.cloudant.kafka.common.CloudantConst;
import com.ibm.cloudant.kafka.common.InterfaceConst;
import com.ibm.cloudant.kafka.common.MessageKey;
import com.ibm.cloudant.kafka.common.utils.JavaCloudantUtil;
import com.ibm.cloudant.kafka.common.utils.ResourceBundleUtil;
import com.ibm.cloudant.kafka.schema.DocumentAsSchemaStruct;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/ibm/cloudant/kafka/connect/CloudantSourceTask.class */
public class CloudantSourceTask extends SourceTask {
    private AtomicBoolean stop;
    private AtomicBoolean _running;
    private String url = null;
    private List<String> topics = null;
    private boolean generateStructSchema = false;
    private boolean flattenStructSchema = false;
    private boolean omitDesignDocs = false;
    private static Logger LOG = Logger.getLogger(CloudantSourceTask.class);
    private static long FEED_SLEEP_MILLISEC = 5000;
    private static long SHUTDOWN_DELAY_MILLISEC = 10;
    private static String DEFAULT_CLOUDANT_LAST_SEQ = CloudantSinkConnectorConfig.CLOUDANT_LAST_SEQ_NUM_DEFAULT;
    private static String latestSequenceNumber = null;
    private static int batch_size = 0;
    private static Database cloudantDb = null;

    public List<SourceRecord> poll() throws InterruptedException {
        Schema schema;
        Struct jsonObject;
        while (!this.stop.get()) {
            this._running.set(true);
            ArrayList arrayList = new ArrayList();
            LOG.debug("Process lastSeq: " + latestSequenceNumber);
            ChangesResult changes = cloudantDb.changes().includeDocs(true).since(latestSequenceNumber).limit(batch_size).getChanges();
            if (cloudantDb.changes() != null) {
                if (changes.getResults().size() == 0) {
                    LOG.debug("Get continuous feed for lastSeq: " + latestSequenceNumber);
                    changes = cloudantDb.changes().includeDocs(true).since(latestSequenceNumber).limit(batch_size).heartBeat(FEED_SLEEP_MILLISEC).getChanges();
                }
                LOG.debug("Got " + changes.getResults().size() + " changes");
                latestSequenceNumber = changes.getLastSeq();
                ListIterator<ChangesResult.Row> listIterator = changes.getResults().listIterator();
                while (listIterator.hasNext()) {
                    ChangesResult.Row next = listIterator.next();
                    JsonObject doc = next.getDoc();
                    if (this.generateStructSchema) {
                        Struct convert = DocumentAsSchemaStruct.convert(doc, this.flattenStructSchema);
                        schema = convert.schema();
                        jsonObject = convert;
                    } else {
                        schema = Schema.STRING_SCHEMA;
                        jsonObject = doc.toString();
                    }
                    String id = next.getId();
                    if (!this.omitDesignDocs || !id.startsWith(CloudantConst.DESIGN_PRIFIX)) {
                        Iterator<String> it = this.topics.iterator();
                        while (it.hasNext()) {
                            arrayList.add(new SourceRecord(offsetKey(this.url), offsetValue(latestSequenceNumber), it.next(), Schema.STRING_SCHEMA, id, schema, jsonObject));
                        }
                    }
                }
                LOG.info("Return " + (arrayList.size() / this.topics.size()) + " records with last offset " + latestSequenceNumber);
                this._running.set(false);
                return arrayList;
            }
        }
        return null;
    }

    public void start(Map<String, String> map) {
        Map offset;
        try {
            CloudantSourceTaskConfig cloudantSourceTaskConfig = new CloudantSourceTaskConfig(map);
            this.url = cloudantSourceTaskConfig.getString(InterfaceConst.URL);
            String string = cloudantSourceTaskConfig.getString(InterfaceConst.USER_NAME);
            String value = cloudantSourceTaskConfig.getPassword(InterfaceConst.PASSWORD).value();
            this.topics = cloudantSourceTaskConfig.getList(InterfaceConst.TOPIC);
            this.omitDesignDocs = cloudantSourceTaskConfig.getBoolean(InterfaceConst.OMIT_DESIGN_DOCS).booleanValue();
            this.generateStructSchema = cloudantSourceTaskConfig.getBoolean(InterfaceConst.USE_VALUE_SCHEMA_STRUCT).booleanValue();
            this.flattenStructSchema = cloudantSourceTaskConfig.getBoolean(InterfaceConst.FLATTEN_VALUE_SCHEMA_STRUCT).booleanValue();
            latestSequenceNumber = cloudantSourceTaskConfig.getString(InterfaceConst.LAST_CHANGE_SEQ);
            batch_size = cloudantSourceTaskConfig.getInt(InterfaceConst.BATCH_SIZE) == null ? InterfaceConst.DEFAULT_BATCH_SIZE : cloudantSourceTaskConfig.getInt(InterfaceConst.BATCH_SIZE).intValue();
            this._running = new AtomicBoolean(false);
            this.stop = new AtomicBoolean(false);
            if (latestSequenceNumber == null) {
                latestSequenceNumber = DEFAULT_CLOUDANT_LAST_SEQ;
                OffsetStorageReader offsetStorageReader = this.context.offsetStorageReader();
                if (offsetStorageReader != null && (offset = offsetStorageReader.offset(Collections.singletonMap(InterfaceConst.URL, this.url))) != null) {
                    latestSequenceNumber = (String) offset.get(InterfaceConst.LAST_CHANGE_SEQ);
                    LOG.info("Start with current offset (last sequence): " + latestSequenceNumber);
                }
            }
            cloudantDb = JavaCloudantUtil.getDBInst(this.url, string, value, false);
        } catch (ConfigException e) {
            throw new ConnectException(ResourceBundleUtil.get(MessageKey.CONFIGURATION_EXCEPTION), e);
        }
    }

    public void stop() {
        if (this.stop != null) {
            this.stop.set(true);
        }
        if (cloudantDb.changes() != null) {
            cloudantDb.changes().stop();
            if (this._running == null) {
                return;
            }
            while (this._running.get()) {
                try {
                    Thread.sleep(SHUTDOWN_DELAY_MILLISEC);
                } catch (InterruptedException e) {
                    LOG.error(e);
                }
            }
        }
    }

    private Map<String, String> offsetKey(String str) {
        return Collections.singletonMap(InterfaceConst.URL, str);
    }

    private Map<String, String> offsetValue(String str) {
        return Collections.singletonMap(InterfaceConst.LAST_CHANGE_SEQ, str);
    }

    public String version() {
        return new CloudantSourceConnector().version();
    }
}
