/*
 * Decompiled with CFR 0.152.
 */
package de.otto.synapse.channel.aws;

import de.otto.synapse.channel.ChannelPosition;
import de.otto.synapse.channel.ChannelResponse;
import de.otto.synapse.channel.Status;
import de.otto.synapse.channel.aws.KinesisShardIterator;
import de.otto.synapse.consumer.MessageConsumer;
import de.otto.synapse.message.Message;
import de.otto.synapse.message.aws.KinesisMessage;
import java.time.Duration;
import java.util.function.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
import software.amazon.awssdk.services.kinesis.model.InvalidArgumentException;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;

public class KinesisShard {
    private static final Logger LOG = LoggerFactory.getLogger(KinesisShard.class);
    private final String shardId;
    private final String streamName;
    private final KinesisClient kinesisClient;

    public KinesisShard(String shardId, String streamName, KinesisClient kinesisClient) {
        this.shardId = shardId;
        this.streamName = streamName;
        this.kinesisClient = kinesisClient;
    }

    public String getShardId() {
        return this.shardId;
    }

    public ChannelResponse consumeShard(ChannelPosition channelPosition, Predicate<Message<?>> stopCondition, MessageConsumer<String> consumer) {
        boolean stopRetrieval;
        String lastSequenceNumber;
        block5: {
            lastSequenceNumber = channelPosition.positionOf(this.shardId);
            stopRetrieval = false;
            try {
                LOG.info("Reading from stream {}, shard {} with starting sequence number {}", new Object[]{this.streamName, this.shardId, lastSequenceNumber});
                GetRecordsResponse recordsResponse = this.retrieveIterator(lastSequenceNumber).next();
                Duration durationBehind = Duration.ofMillis(recordsResponse.millisBehindLatest());
                if (this.isEmptyStream(recordsResponse)) break block5;
                for (Record record : recordsResponse.records()) {
                    try {
                        Message<String> kinesisMessage = KinesisMessage.kinesisMessage(this.shardId, durationBehind, record);
                        consumer.accept(kinesisMessage);
                        stopRetrieval = stopRetrieval || stopCondition.test(kinesisMessage);
                    }
                    catch (Exception e) {
                        LOG.error("consumer failed while processing {}: {}", (Object)record, (Object)e);
                    }
                    lastSequenceNumber = record.sequenceNumber();
                }
                this.logInfo(this.streamName, recordsResponse, durationBehind);
            }
            catch (Exception e) {
                LOG.error("Kinesis consumer died unexpectedly.", (Throwable)e);
                throw e;
            }
        }
        return ChannelResponse.of((Status)(stopRetrieval ? Status.STOPPED : Status.OK), (ChannelPosition)ChannelPosition.of((String)this.shardId, (String)lastSequenceNumber));
    }

    public KinesisShardIterator retrieveIterator(String sequenceNumber) {
        GetShardIteratorResponse shardIteratorResponse;
        try {
            shardIteratorResponse = this.kinesisClient.getShardIterator(this.buildIteratorShardRequest(sequenceNumber));
        }
        catch (InvalidArgumentException e) {
            LOG.error(String.format("invalidShardSequenceNumber in Snapshot %s/%s - reading from HORIZON", this.streamName, this.shardId));
            shardIteratorResponse = this.kinesisClient.getShardIterator(this.buildIteratorShardRequest("0"));
        }
        return new KinesisShardIterator(this.kinesisClient, shardIteratorResponse.shardIterator());
    }

    private GetShardIteratorRequest buildIteratorShardRequest(String sequenceNumber) {
        GetShardIteratorRequest.Builder shardRequestBuilder = GetShardIteratorRequest.builder().shardId(this.shardId).streamName(this.streamName);
        if (sequenceNumber == null || sequenceNumber.equals("0")) {
            shardRequestBuilder.shardIteratorType(ShardIteratorType.TRIM_HORIZON);
        } else {
            shardRequestBuilder.shardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER);
            shardRequestBuilder.startingSequenceNumber(sequenceNumber);
        }
        return (GetShardIteratorRequest)shardRequestBuilder.build();
    }

    private void logInfo(String streamName, GetRecordsResponse recordsResponse, Duration durationBehind) {
        String durationString = String.format("%s days %s hrs %s min %s sec", durationBehind.toDays(), durationBehind.toHours() % 24L, durationBehind.toMinutes() % 60L, durationBehind.getSeconds() % 60L);
        LOG.info("Consumed {} records from stream '{}' shard '{}'; behind latest: {}", new Object[]{recordsResponse.records().size(), streamName, this.shardId, durationString});
    }

    private boolean isEmptyStream(GetRecordsResponse recordsResponse) {
        return recordsResponse.records().isEmpty();
    }
}

