/*
 * Decompiled with CFR 0.152.
 */
package de.otto.synapse.endpoint.receiver.aws;

import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMap;
import de.otto.synapse.channel.ShardPosition;
import de.otto.synapse.endpoint.receiver.aws.KinesisShardResponse;
import de.otto.synapse.logging.LogHelper;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryListener;
import org.springframework.retry.RetryPolicy;
import org.springframework.retry.backoff.BackOffPolicy;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.listener.RetryListenerSupport;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.KinesisException;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;

public class KinesisShardIterator {
    private static final Logger LOG = LoggerFactory.getLogger(KinesisShardIterator.class);
    public static final String POISON_SHARD_ITER = "__synapse__poison__iter";
    public static final Integer FETCH_RECORDS_LIMIT = 10000;
    private static final int RETRY_MAX_ATTEMPTS = 16;
    private static final int RETRY_BACK_OFF_POLICY_INITIAL_INTERVAL = 1000;
    private static final int RETRY_BACK_OFF_POLICY_MAX_INTERVAL = 64000;
    private static final double RETRY_BACK_OFF_POLICY_MULTIPLIER = 2.0;
    private final KinesisClient kinesisClient;
    private final String channelName;
    private String id;
    private ShardPosition shardPosition;
    private final int fetchRecordLimit;
    private final RetryTemplate retryTemplate;
    private final AtomicBoolean stopSignal = new AtomicBoolean(false);

    public KinesisShardIterator(@Nonnull KinesisClient kinesisClient, @Nonnull String channelName, @Nonnull ShardPosition shardPosition) {
        this(kinesisClient, channelName, shardPosition, FETCH_RECORDS_LIMIT);
    }

    public KinesisShardIterator(@Nonnull KinesisClient kinesisClient, @Nonnull String channelName, @Nonnull ShardPosition shardPosition, int fetchRecordLimit) {
        this.kinesisClient = kinesisClient;
        this.fetchRecordLimit = fetchRecordLimit;
        this.retryTemplate = this.createRetryTemplate();
        this.channelName = channelName;
        this.shardPosition = shardPosition;
        this.id = kinesisClient.getShardIterator(this.buildIteratorShardRequest(shardPosition)).shardIterator();
    }

    public String getId() {
        return this.id;
    }

    @Nonnull
    public ShardPosition getShardPosition() {
        return this.shardPosition;
    }

    public int getFetchRecordLimit() {
        return this.fetchRecordLimit;
    }

    boolean isPoison() {
        return this.id.equals(POISON_SHARD_ITER);
    }

    public void stop() {
        this.stopSignal.set(true);
    }

    public KinesisShardResponse next() {
        try {
            Stopwatch stopwatch = Stopwatch.createStarted();
            GetRecordsResponse recordsResponse = (GetRecordsResponse)this.retryTemplate.execute(context -> {
                if (this.stopSignal.get()) {
                    context.setExhaustedOnly();
                }
                return this.tryNext();
            });
            return new KinesisShardResponse(this.channelName, this.shardPosition, recordsResponse, stopwatch.elapsed(TimeUnit.MILLISECONDS));
        }
        catch (Throwable t) {
            throw new RuntimeException(t);
        }
    }

    private GetShardIteratorRequest buildIteratorShardRequest(ShardPosition shardPosition) {
        GetShardIteratorRequest.Builder shardRequestBuilder = GetShardIteratorRequest.builder().shardId(shardPosition.shardName()).streamName(this.channelName);
        switch (shardPosition.startFrom()) {
            case HORIZON: {
                shardRequestBuilder.shardIteratorType(ShardIteratorType.TRIM_HORIZON);
                break;
            }
            case POSITION: {
                shardRequestBuilder.shardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER);
                shardRequestBuilder.startingSequenceNumber(shardPosition.position());
                break;
            }
            case AT_POSITION: {
                shardRequestBuilder.shardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER);
                shardRequestBuilder.startingSequenceNumber(shardPosition.position());
                break;
            }
            case TIMESTAMP: {
                shardRequestBuilder.shardIteratorType(ShardIteratorType.AT_TIMESTAMP).timestamp(shardPosition.timestamp());
            }
        }
        return (GetShardIteratorRequest)shardRequestBuilder.build();
    }

    private GetRecordsResponse tryNext() {
        GetRecordsResponse response = this.kinesisClient.getRecords((GetRecordsRequest)GetRecordsRequest.builder().shardIterator(this.id).limit(Integer.valueOf(this.fetchRecordLimit)).build());
        this.id = response.nextShardIterator();
        LOG.debug("next() with id " + this.id + " returned " + response.records().size() + " records");
        if (!response.records().isEmpty()) {
            this.shardPosition = ShardPosition.fromPosition((String)this.shardPosition.shardName(), (String)((Record)response.records().get(response.records().size() - 1)).sequenceNumber());
        }
        return response;
    }

    private RetryTemplate createRetryTemplate() {
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(16, (Map)ImmutableMap.of(KinesisException.class, (Object)true, SdkClientException.class, (Object)true));
        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        backOffPolicy.setInitialInterval(1000L);
        backOffPolicy.setMaxInterval(64000L);
        backOffPolicy.setMultiplier(2.0);
        RetryTemplate template = new RetryTemplate();
        template.registerListener((RetryListener)new LogRetryListener());
        template.setRetryPolicy((RetryPolicy)retryPolicy);
        template.setBackOffPolicy((BackOffPolicy)backOffPolicy);
        return template;
    }

    class LogRetryListener
    extends RetryListenerSupport {
        LogRetryListener() {
        }

        public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable t) {
            LogHelper.warn((Logger)LOG, (Map)ImmutableMap.of((Object)"retryCount", (Object)context.getRetryCount(), (Object)"errorMessage", (Object)t.getMessage()), (String)"fail to iterate on shard", null);
        }
    }
}

