/*
 * Decompiled with CFR 0.152.
 */
package de.otto.synapse.channel.aws;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import de.otto.synapse.channel.ShardPosition;
import de.otto.synapse.channel.StartFrom;
import de.otto.synapse.channel.aws.KinesisShardIterator;
import de.otto.synapse.consumer.MessageConsumer;
import de.otto.synapse.endpoint.InterceptorChain;
import de.otto.synapse.logging.LogHelper;
import de.otto.synapse.message.Message;
import de.otto.synapse.message.aws.KinesisMessage;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
import software.amazon.awssdk.services.kinesis.model.InvalidArgumentException;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;

@ThreadSafe
public class KinesisShard {
    private static final Logger LOG = LoggerFactory.getLogger(KinesisShard.class);
    private final String shardId;
    private final String channelName;
    private final KinesisClient kinesisClient;
    private final InterceptorChain interceptorChain;
    private final AtomicBoolean stopSignal = new AtomicBoolean(false);
    private final Clock clock;

    public KinesisShard(String shardId, String channelName, KinesisClient kinesisClient, InterceptorChain interceptorChain, Clock clock) {
        this.shardId = shardId;
        this.channelName = channelName;
        this.kinesisClient = kinesisClient;
        this.interceptorChain = interceptorChain;
        this.clock = clock;
    }

    public String getShardId() {
        return this.shardId;
    }

    public ShardPosition consumeShard(ShardPosition startPosition, Instant until, MessageConsumer<String> consumer, Consumer<Duration> callback) {
        try {
            MDC.put((String)"channelName", (String)this.channelName);
            MDC.put((String)"shardId", (String)this.shardId);
            LogHelper.info((Logger)LOG, (Map)ImmutableMap.of((Object)"position", (Object)startPosition), (String)"Reading from stream", null);
            ShardPosition shardPosition = startPosition;
            KinesisShardIterator kinesisShardIterator = this.retrieveIterator(shardPosition);
            boolean stopRetrieval = false;
            long t0 = System.currentTimeMillis();
            do {
                if (kinesisShardIterator.isPoison()) {
                    LOG.warn("Received Poison-Pill - This should only happen during tests!");
                    break;
                }
                GetRecordsResponse recordsResponse = kinesisShardIterator.next();
                Duration durationBehind = Duration.ofMillis(recordsResponse.millisBehindLatest());
                long t1 = System.currentTimeMillis();
                if (!this.isEmptyStream(recordsResponse)) {
                    for (Record record : recordsResponse.records()) {
                        Message message = this.interceptorChain.intercept(KinesisMessage.kinesisMessage(this.shardId, record));
                        if (message == null) continue;
                        this.consumeMessageSafely(consumer, record, (Message<String>)message);
                        shardPosition = ShardPosition.fromPosition((String)this.shardId, (String)record.sequenceNumber());
                        if (stopRetrieval) continue;
                        stopRetrieval = !until.isAfter(message.getHeader().getArrivalTimestamp());
                    }
                } else {
                    stopRetrieval = !until.isAfter(Instant.now(this.clock));
                }
                callback.accept(durationBehind);
                long t2 = System.currentTimeMillis();
                this.logInfo(this.channelName, recordsResponse, durationBehind, t2 - t1);
            } while (!(stopRetrieval = stopRetrieval || this.stopSignal.get() || this.waitABit()));
            long t3 = System.currentTimeMillis();
            LogHelper.info((Logger)LOG, (Map)ImmutableMap.of((Object)"position", (Object)shardPosition.position(), (Object)"runtime", (Object)(t3 - t0)), (String)"Done consuming from shard.", null);
            ShardPosition shardPosition2 = shardPosition;
            return shardPosition2;
        }
        catch (Exception e) {
            LogHelper.error((Logger)LOG, (Map)ImmutableMap.of((Object)"channelName", (Object)this.channelName, (Object)"shardId", (Object)this.shardId), (String)"kinesis consumer died unexpectedly", (Exception)e);
            throw e;
        }
        finally {
            MDC.remove((String)"channelName");
            MDC.remove((String)"shardId");
        }
    }

    @VisibleForTesting
    protected KinesisShardIterator retrieveIterator(ShardPosition shardPosition) {
        GetShardIteratorResponse shardIteratorResponse;
        try {
            shardIteratorResponse = this.kinesisClient.getShardIterator(this.buildIteratorShardRequest(shardPosition));
        }
        catch (InvalidArgumentException e) {
            LOG.error(String.format("invalidShardSequenceNumber in Snapshot %s/%s - reading from HORIZON", this.channelName, this.shardId));
            shardIteratorResponse = this.kinesisClient.getShardIterator(this.buildIteratorShardRequest(ShardPosition.fromHorizon((String)this.shardId)));
        }
        return new KinesisShardIterator(this.kinesisClient, shardIteratorResponse.shardIterator());
    }

    private GetShardIteratorRequest buildIteratorShardRequest(ShardPosition shardPosition) {
        GetShardIteratorRequest.Builder shardRequestBuilder = GetShardIteratorRequest.builder().shardId(this.shardId).streamName(this.channelName);
        if (shardPosition == null || shardPosition.startFrom() == StartFrom.HORIZON) {
            shardRequestBuilder.shardIteratorType(ShardIteratorType.TRIM_HORIZON);
        } else if (shardPosition.startFrom() == StartFrom.TIMESTAMP) {
            shardRequestBuilder.shardIteratorType(ShardIteratorType.AT_TIMESTAMP);
            shardRequestBuilder.timestamp(shardPosition.timestamp());
        } else {
            shardRequestBuilder.shardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER);
            shardRequestBuilder.startingSequenceNumber(shardPosition.position());
        }
        return (GetShardIteratorRequest)shardRequestBuilder.build();
    }

    private void consumeMessageSafely(MessageConsumer<String> consumer, Record record, Message<String> kinesisMessage) {
        try {
            consumer.accept(kinesisMessage);
        }
        catch (Exception e) {
            LOG.error("consumer failed while processing {}", (Object)record, (Object)e);
        }
    }

    private void logInfo(String channelName, GetRecordsResponse recordsResponse, Duration durationBehind, long runtime) {
        boolean isBehind;
        int recordCount = recordsResponse.records().size();
        boolean bl = isBehind = durationBehind.getSeconds() > 0L;
        if (recordCount > 0 || isBehind) {
            String durationString = String.format("%s days %s hrs %s min %s sec", durationBehind.toDays(), durationBehind.toHours() % 24L, durationBehind.toMinutes() % 60L, durationBehind.getSeconds() % 60L);
            LogHelper.info((Logger)LOG, (Map)ImmutableMap.of((Object)"recordCount", (Object)recordCount, (Object)"durationBehind", (Object)durationBehind, (Object)"runtime", (Object)runtime), (String)"Reading from stream", null);
        }
    }

    private boolean waitABit() {
        try {
            Thread.sleep(1000L);
        }
        catch (InterruptedException e) {
            LOG.warn("Thread got interrupted");
            return true;
        }
        return false;
    }

    private boolean isEmptyStream(GetRecordsResponse recordsResponse) {
        return recordsResponse.records().isEmpty();
    }

    public void stop() {
        this.stopSignal.set(true);
    }

    public boolean isStopping() {
        return this.stopSignal.get();
    }
}

