/*
 * Decompiled with CFR 0.152.
 */
package de.otto.synapse.endpoint.receiver.aws;

import de.otto.synapse.channel.ShardPosition;
import de.otto.synapse.endpoint.receiver.aws.KinesisShardIterator;
import de.otto.synapse.endpoint.receiver.aws.KinesisShardResponse;
import java.time.Clock;
import java.time.Instant;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import software.amazon.awssdk.services.kinesis.KinesisClient;

@ThreadSafe
public class KinesisShardReader {
    private static final Logger LOG = LoggerFactory.getLogger(KinesisShardReader.class);
    private final String shardName;
    private final String channelName;
    private final KinesisClient kinesisClient;
    private final ExecutorService executorService;
    private final Clock clock;
    private final AtomicBoolean stopSignal = new AtomicBoolean(false);

    public KinesisShardReader(String channelName, String shardName, KinesisClient kinesisClient, ExecutorService executorService, Clock clock) {
        this.shardName = shardName;
        this.channelName = channelName;
        this.kinesisClient = kinesisClient;
        this.executorService = executorService;
        this.clock = clock;
    }

    public String getChannelName() {
        return this.channelName;
    }

    public String getShardName() {
        return this.shardName;
    }

    public CompletableFuture<ShardPosition> consumeUntil(ShardPosition startFrom, Instant until, Consumer<KinesisShardResponse> responseConsumer) {
        return CompletableFuture.supplyAsync(() -> {
            MDC.put((String)"channelName", (String)this.channelName);
            MDC.put((String)"shardName", (String)this.shardName);
            LOG.info("Reading from channel={}, shard={}, position={}", new Object[]{this.channelName, this.shardName, startFrom});
            try {
                boolean stopRetrieval;
                KinesisShardIterator kinesisShardIterator = new KinesisShardIterator(this.kinesisClient, this.channelName, startFrom);
                do {
                    if (kinesisShardIterator.isPoison()) {
                        LOG.warn("Received Poison-Pill - This should only happen during tests!");
                        break;
                    }
                    KinesisShardResponse response = kinesisShardIterator.next();
                    responseConsumer.accept(response);
                } while (!(stopRetrieval = !until.isAfter(Instant.now(this.clock)) || this.isStopping() || this.waitABit()));
                ShardPosition shardPosition = kinesisShardIterator.getShardPosition();
                return shardPosition;
            }
            catch (RuntimeException e) {
                LOG.error("Failed to consume from Kinesis shard {}: {}", new Object[]{this.channelName, this.shardName, e.getMessage()});
                this.stop();
                throw e;
            }
            finally {
                MDC.remove((String)"channelName");
                MDC.remove((String)"shardName");
            }
        }, this.executorService);
    }

    private boolean waitABit() {
        try {
            Thread.sleep(1000L);
        }
        catch (InterruptedException e) {
            LOG.warn("Thread got interrupted");
            return true;
        }
        return false;
    }

    public void stop() {
        LOG.info("Shard {} received stop signal.", (Object)this.shardName);
        this.stopSignal.set(true);
    }

    public boolean isStopping() {
        return this.stopSignal.get();
    }
}

