/*
 * Decompiled with CFR 0.152.
 */
package de.otto.synapse.channel.aws;

import com.google.common.annotations.VisibleForTesting;
import de.otto.synapse.channel.ChannelPosition;
import de.otto.synapse.channel.aws.KinesisShardIterator;
import de.otto.synapse.consumer.MessageConsumer;
import de.otto.synapse.message.Message;
import de.otto.synapse.message.aws.KinesisMessage;
import java.time.Duration;
import java.util.function.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
import software.amazon.awssdk.services.kinesis.model.InvalidArgumentException;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;

public class KinesisShard {
    private static final Logger LOG = LoggerFactory.getLogger(KinesisShard.class);
    private final String shardId;
    private String streamName;
    private final KinesisClient kinesisClient;

    public KinesisShard(String shardId, String streamName, KinesisClient kinesisClient) {
        this.shardId = shardId;
        this.streamName = streamName;
        this.kinesisClient = kinesisClient;
    }

    public String getShardId() {
        return this.shardId;
    }

    public ChannelPosition consumeShard(ChannelPosition startPosition, Predicate<Message<?>> stopCondition, MessageConsumer<String> consumer) {
        try {
            LOG.info("Reading from stream {}, shard {} with starting sequence number {}", new Object[]{this.streamName, this.shardId, startPosition.positionOf(this.shardId)});
            String lastSequenceNumber = startPosition.positionOf(this.shardId);
            KinesisShardIterator kinesisShardIterator = this.retrieveIterator(lastSequenceNumber);
            boolean stopRetrieval = false;
            Record lastRecord = null;
            do {
                GetRecordsResponse recordsResponse = kinesisShardIterator.next();
                Duration durationBehind = Duration.ofMillis(recordsResponse.millisBehindLatest());
                if (!this.isEmptyStream(recordsResponse)) {
                    for (Record record : recordsResponse.records()) {
                        Message<String> kinesisMessage = KinesisMessage.kinesisMessage(this.shardId, durationBehind, record);
                        this.consumeMessageSafely(consumer, record, kinesisMessage);
                        lastRecord = record;
                        lastSequenceNumber = record.sequenceNumber();
                        if (stopRetrieval) continue;
                        stopRetrieval = stopCondition.test(kinesisMessage);
                    }
                } else {
                    Message<String> kinesisMessage = KinesisMessage.kinesisMessage(this.shardId, durationBehind, lastRecord);
                    stopRetrieval = stopCondition.test(kinesisMessage);
                }
                this.logInfo(this.streamName, recordsResponse, durationBehind);
                if (stopRetrieval) continue;
                stopRetrieval = this.waitABit();
            } while (!stopRetrieval);
            LOG.info("Done consuming from shard '{}' of stream '{}'.", (Object)this.streamName, (Object)this.shardId);
            return ChannelPosition.of((String)this.shardId, (String)lastSequenceNumber);
        }
        catch (Exception e) {
            LOG.error(String.format("kinesis consumer died unexpectedly. shard '%s', stream '%s'", this.streamName, this.shardId), (Throwable)e);
            throw e;
        }
    }

    @VisibleForTesting
    protected KinesisShardIterator retrieveIterator(String sequenceNumber) {
        GetShardIteratorResponse shardIteratorResponse;
        try {
            shardIteratorResponse = this.kinesisClient.getShardIterator(this.buildIteratorShardRequest(sequenceNumber));
        }
        catch (InvalidArgumentException e) {
            LOG.error(String.format("invalidShardSequenceNumber in Snapshot %s/%s - reading from HORIZON", this.streamName, this.shardId));
            shardIteratorResponse = this.kinesisClient.getShardIterator(this.buildIteratorShardRequest("0"));
        }
        return new KinesisShardIterator(this.kinesisClient, shardIteratorResponse.shardIterator());
    }

    private GetShardIteratorRequest buildIteratorShardRequest(String sequenceNumber) {
        GetShardIteratorRequest.Builder shardRequestBuilder = GetShardIteratorRequest.builder().shardId(this.shardId).streamName(this.streamName);
        if (sequenceNumber == null || sequenceNumber.equals("0")) {
            shardRequestBuilder.shardIteratorType(ShardIteratorType.TRIM_HORIZON);
        } else {
            shardRequestBuilder.shardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER);
            shardRequestBuilder.startingSequenceNumber(sequenceNumber);
        }
        return (GetShardIteratorRequest)shardRequestBuilder.build();
    }

    private void consumeMessageSafely(MessageConsumer<String> consumer, Record record, Message<String> kinesisMessage) {
        try {
            consumer.accept(kinesisMessage);
        }
        catch (Exception e) {
            LOG.error("consumer failed while processing {}", (Object)record, (Object)e);
        }
    }

    private void logInfo(String streamName, GetRecordsResponse recordsResponse, Duration durationBehind) {
        String durationString = String.format("%s days %s hrs %s min %s sec", durationBehind.toDays(), durationBehind.toHours() % 24L, durationBehind.toMinutes() % 60L, durationBehind.getSeconds() % 60L);
        LOG.info("Got {} records from stream '{}' and shard '{}'; behind latest: {}", new Object[]{recordsResponse.records().size(), streamName, this.shardId, durationString});
    }

    private boolean waitABit() {
        try {
            Thread.sleep(500L);
        }
        catch (InterruptedException e) {
            LOG.warn("Thread got interrupted");
            return true;
        }
        return false;
    }

    private boolean isEmptyStream(GetRecordsResponse recordsResponse) {
        return recordsResponse.records().isEmpty();
    }
}

