/*
 * Decompiled with CFR 0.152.
 */
package jp.ad.sinet.stream.plugins.s3;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Predicate;
import java.util.logging.Logger;
import jp.ad.sinet.stream.api.Consistency;
import jp.ad.sinet.stream.api.InvalidConfigurationException;
import jp.ad.sinet.stream.plugins.s3.S3Cli;
import jp.ad.sinet.stream.plugins.s3.S3Message;
import jp.ad.sinet.stream.plugins.s3.S3Parameters;
import jp.ad.sinet.stream.plugins.s3.Util;
import jp.ad.sinet.stream.spi.PluginMessageReader;
import jp.ad.sinet.stream.spi.PluginMessageWrapper;
import jp.ad.sinet.stream.spi.ReaderParameters;
import lombok.Generated;

public class S3MessageReader
implements PluginMessageReader {
    @Generated
    private static final Logger log = Logger.getLogger(S3MessageReader.class.getName());
    private final List<String> topics;
    private final Consistency consistency;
    private final String clientId;
    private final Duration receiveTimeout;
    S3Cli s3cli;
    S3Parameters s3parameters;
    Predicate<String> pathFilter;
    S3Cli.ListObjRes lastListObjRes;

    void connect() {
        this.s3cli = new S3Cli(this.s3parameters);
    }

    S3MessageReader(ReaderParameters parameters) {
        log.fine("S3MessageReader: ctor=" + parameters);
        this.topics = Collections.unmodifiableList(parameters.getTopics());
        this.receiveTimeout = parameters.getReceiveTimeout();
        if (parameters.getConfig().get("brokers") != null) {
            throw new InvalidConfigurationException("brokers: cannot be specified for s3; use s3.endpoint_url");
        }
        this.consistency = parameters.getConsistency();
        if (this.consistency != Consistency.AT_MOST_ONCE) {
            throw new InvalidConfigurationException("consistency must be AT_MOST_ONCE for s3");
        }
        this.s3parameters = S3Parameters.create(parameters.getConfig());
        log.fine("S3MessageWriter: s3parameters=" + this.s3parameters);
        this.pathFilter = Util.makePathFilter(this.s3parameters.getPrefix(), this.topics, this.s3parameters.getName(), this.s3parameters.getSuffix());
        String clientId = parameters.getClientId();
        this.clientId = clientId != null && !clientId.isEmpty() ? clientId : UUID.randomUUID().toString();
        this.connect();
        this.lastListObjRes = this.s3cli.listObj(this.s3parameters.getBucket(), this.s3parameters.getPrefix(), this.pathFilter, null);
    }

    public PluginMessageWrapper read() {
        log.fine("read: lastListObjRes=" + this.lastListObjRes);
        if (this.lastListObjRes.keyList.isEmpty()) {
            if (this.lastListObjRes.continuationToken == null) {
                return null;
            }
            this.lastListObjRes = this.s3cli.listObj(this.s3parameters.getBucket(), this.s3parameters.getPrefix(), this.pathFilter, this.lastListObjRes.continuationToken);
            return this.read();
        }
        String key = this.lastListObjRes.keyList.get(0);
        this.lastListObjRes.keyList.remove(0);
        S3Cli.GetObjRes r = this.s3cli.getObj(this.s3parameters.getBucket(), key);
        String topic = Util.extractTopicFromPath(key);
        S3Message msg = new S3Message(topic, r.data, r.timestamp_us, r.res);
        return msg;
    }

    public void close() {
        if (this.s3cli != null) {
            this.s3cli.close();
            this.s3cli = null;
        }
    }

    public Map<String, Object> getConfig() {
        return null;
    }

    @Generated
    public List<String> getTopics() {
        return this.topics;
    }

    @Generated
    public Consistency getConsistency() {
        return this.consistency;
    }

    @Generated
    public String getClientId() {
        return this.clientId;
    }

    @Generated
    public Duration getReceiveTimeout() {
        return this.receiveTimeout;
    }
}

