/*
 * Decompiled with CFR 0.152.
 */
package de.otto.synapse.channel.aws;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import de.otto.synapse.channel.ChannelPosition;
import de.otto.synapse.channel.ChannelResponse;
import de.otto.synapse.channel.aws.KinesisShard;
import de.otto.synapse.channel.aws.MessageLog;
import de.otto.synapse.consumer.MessageConsumer;
import de.otto.synapse.message.Message;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
import software.amazon.awssdk.services.kinesis.model.Shard;

public class KinesisMessageLog
implements MessageLog {
    private static final Logger LOG = LoggerFactory.getLogger(KinesisMessageLog.class);
    private final String streamName;
    private final KinesisClient kinesisClient;
    private final ExecutorService executorService;
    private List<KinesisShard> kinesisShards;

    public KinesisMessageLog(KinesisClient kinesisClient, String streamName) {
        this(kinesisClient, streamName, Executors.newFixedThreadPool(1));
    }

    public KinesisMessageLog(KinesisClient kinesisClient, String streamName, ExecutorService executorService) {
        this.streamName = streamName;
        this.kinesisClient = kinesisClient;
        this.executorService = executorService;
    }

    @Override
    public String getStreamName() {
        return this.streamName;
    }

    @Override
    public ChannelResponse consumeStream(ChannelPosition startFrom, Predicate<Message<?>> stopCondition, MessageConsumer<String> consumer) {
        List<KinesisShard> kinesisShards = this.retrieveAllOpenShards();
        try {
            List futureShardPositions = kinesisShards.stream().map(shard -> CompletableFuture.supplyAsync(() -> shard.consumeShard(startFrom, stopCondition, consumer), this.executorService)).collect(Collectors.toList());
            return ChannelResponse.of(futureShardPositions.stream().map(CompletableFuture::join).collect(Collectors.toList()));
        }
        catch (RuntimeException e) {
            LOG.error("Failed to consume from Kinesis stream {}: {}", (Object)this.streamName, (Object)e.getMessage());
            this.executorService.shutdownNow();
            try {
                boolean allThreadsSafelyTerminated = this.executorService.awaitTermination(30L, TimeUnit.SECONDS);
                if (!allThreadsSafelyTerminated) {
                    LOG.error("Kinesis Thread for stream {} is still running", (Object)this.streamName);
                }
            }
            catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
            }
            throw e;
        }
    }

    @VisibleForTesting
    List<KinesisShard> retrieveAllOpenShards() {
        if (this.kinesisShards == null) {
            this.kinesisShards = (List)this.retrieveAllShards().stream().filter(this::isShardOpen).map(shard -> new KinesisShard(shard.shardId(), this.streamName, this.kinesisClient)).collect(ImmutableList.toImmutableList());
        }
        return this.kinesisShards;
    }

    private List<Shard> retrieveAllShards() {
        ArrayList<Shard> shardList = new ArrayList<Shard>();
        boolean fetchMore = true;
        while (fetchMore) {
            fetchMore = this.retrieveAndAppendNextBatchOfShards(shardList);
        }
        return shardList;
    }

    private boolean retrieveAndAppendNextBatchOfShards(List<Shard> shardList) {
        DescribeStreamRequest describeStreamRequest = (DescribeStreamRequest)DescribeStreamRequest.builder().streamName(this.streamName).exclusiveStartShardId(this.getLastSeenShardId(shardList)).limit(Integer.valueOf(10)).build();
        DescribeStreamResponse describeStreamResult = this.kinesisClient.describeStream(describeStreamRequest);
        shardList.addAll(describeStreamResult.streamDescription().shards());
        return describeStreamResult.streamDescription().hasMoreShards();
    }

    private String getLastSeenShardId(List<Shard> shardList) {
        if (!shardList.isEmpty()) {
            return shardList.get(shardList.size() - 1).shardId();
        }
        return null;
    }

    private boolean isShardOpen(Shard shard) {
        if (shard.sequenceNumberRange().endingSequenceNumber() == null) {
            return true;
        }
        LOG.warn("Shard with id {} is closed. Cannot retrieve data.", (Object)shard.shardId());
        return false;
    }
}

