/*
 * Decompiled with CFR 0.152.
 */
package de.otto.synapse.channel.aws;

import com.google.common.annotations.VisibleForTesting;
import de.otto.synapse.channel.ChannelPosition;
import de.otto.synapse.channel.ShardPosition;
import de.otto.synapse.channel.StartFrom;
import de.otto.synapse.channel.aws.KinesisShardIterator;
import de.otto.synapse.consumer.MessageConsumer;
import de.otto.synapse.message.Header;
import de.otto.synapse.message.Message;
import de.otto.synapse.message.aws.KinesisMessage;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
import software.amazon.awssdk.services.kinesis.model.InvalidArgumentException;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;

@ThreadSafe
public class KinesisShard {
    private static final Logger LOG = LoggerFactory.getLogger(KinesisShard.class);
    private final String shardId;
    private final String channelName;
    private final KinesisClient kinesisClient;
    private final AtomicBoolean stopSignal = new AtomicBoolean(false);

    public KinesisShard(String shardId, String channelName, KinesisClient kinesisClient) {
        this.shardId = shardId;
        this.channelName = channelName;
        this.kinesisClient = kinesisClient;
    }

    public String getShardId() {
        return this.shardId;
    }

    public ChannelPosition consumeShard(ChannelPosition startPosition, Predicate<Message<?>> stopCondition, MessageConsumer<String> consumer) {
        try {
            LOG.info("Reading from stream {}, shard {} with starting sequence number {}", new Object[]{this.channelName, this.shardId, startPosition.shard(this.shardId)});
            ShardPosition shardPosition = startPosition.shard(this.shardId);
            KinesisShardIterator kinesisShardIterator = this.retrieveIterator(shardPosition);
            boolean stopRetrieval = false;
            Record lastRecord = null;
            do {
                GetRecordsResponse recordsResponse = kinesisShardIterator.next();
                Duration durationBehind = Duration.ofMillis(recordsResponse.millisBehindLatest());
                if (!this.isEmptyStream(recordsResponse)) {
                    for (Record record : recordsResponse.records()) {
                        Message<String> kinesisMessage = KinesisMessage.kinesisMessage(this.shardId, durationBehind, record);
                        this.consumeMessageSafely(consumer, record, kinesisMessage);
                        lastRecord = record;
                        shardPosition = ShardPosition.fromPosition((String)this.shardId, (String)record.sequenceNumber());
                        if (stopRetrieval) continue;
                        stopRetrieval = stopCondition.test(kinesisMessage);
                    }
                } else {
                    Message<String> kinesisMessage = lastRecord != null ? KinesisMessage.kinesisMessage(this.shardId, durationBehind, lastRecord) : this.dirtyHackToStopThreadMessage(durationBehind);
                    stopRetrieval = stopCondition.test(kinesisMessage);
                }
                this.logInfo(this.channelName, recordsResponse, durationBehind);
            } while (!(stopRetrieval = stopRetrieval || this.stopSignal.get() || this.waitABit()));
            LOG.info("Done consuming from shard '{}' of stream '{}'.", (Object)this.channelName, (Object)this.shardId);
            return ChannelPosition.channelPosition((ShardPosition[])new ShardPosition[]{shardPosition});
        }
        catch (Exception e) {
            LOG.error(String.format("kinesis consumer died unexpectedly. shard '%s', stream '%s'", this.channelName, this.shardId), (Throwable)e);
            throw e;
        }
    }

    private Message<String> dirtyHackToStopThreadMessage(Duration durationBehind) {
        return Message.message((String)"no_key", (Header)Header.responseHeader((ShardPosition)ShardPosition.fromHorizon((String)this.shardId), (Instant)Instant.now(), (Duration)durationBehind), null);
    }

    @VisibleForTesting
    protected KinesisShardIterator retrieveIterator(ShardPosition shardPosition) {
        GetShardIteratorResponse shardIteratorResponse;
        try {
            shardIteratorResponse = this.kinesisClient.getShardIterator(this.buildIteratorShardRequest(shardPosition));
        }
        catch (InvalidArgumentException e) {
            LOG.error(String.format("invalidShardSequenceNumber in Snapshot %s/%s - reading from HORIZON", this.channelName, this.shardId));
            shardIteratorResponse = this.kinesisClient.getShardIterator(this.buildIteratorShardRequest(ShardPosition.fromHorizon((String)this.shardId)));
        }
        return new KinesisShardIterator(this.kinesisClient, shardIteratorResponse.shardIterator());
    }

    private GetShardIteratorRequest buildIteratorShardRequest(ShardPosition shardPosition) {
        GetShardIteratorRequest.Builder shardRequestBuilder = GetShardIteratorRequest.builder().shardId(this.shardId).streamName(this.channelName);
        if (shardPosition == null || shardPosition.startFrom() == StartFrom.HORIZON) {
            shardRequestBuilder.shardIteratorType(ShardIteratorType.TRIM_HORIZON);
        } else {
            shardRequestBuilder.shardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER);
            shardRequestBuilder.startingSequenceNumber(shardPosition.position());
        }
        return (GetShardIteratorRequest)shardRequestBuilder.build();
    }

    private void consumeMessageSafely(MessageConsumer<String> consumer, Record record, Message<String> kinesisMessage) {
        try {
            consumer.accept(kinesisMessage);
        }
        catch (Exception e) {
            LOG.error("consumer failed while processing {}", (Object)record, (Object)e);
        }
    }

    private void logInfo(String channelName, GetRecordsResponse recordsResponse, Duration durationBehind) {
        boolean isBehind;
        int recordCount = recordsResponse.records().size();
        boolean bl = isBehind = durationBehind.getSeconds() > 0L;
        if (recordCount > 0 || isBehind) {
            String durationString = String.format("%s days %s hrs %s min %s sec", durationBehind.toDays(), durationBehind.toHours() % 24L, durationBehind.toMinutes() % 60L, durationBehind.getSeconds() % 60L);
            LOG.info("Got {} records from stream '{}' and shard '{}'; behind latest: {}", new Object[]{recordCount, channelName, this.shardId, durationString});
        }
    }

    private boolean waitABit() {
        try {
            Thread.sleep(500L);
        }
        catch (InterruptedException e) {
            LOG.warn("Thread got interrupted");
            return true;
        }
        return false;
    }

    private boolean isEmptyStream(GetRecordsResponse recordsResponse) {
        return recordsResponse.records().isEmpty();
    }

    public void stop() {
        this.stopSignal.set(true);
    }

    public boolean isStopping() {
        return this.stopSignal.get();
    }
}

