/*
 * Decompiled with CFR 0.152.
 */
package de.otto.synapse.channel.aws;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import de.otto.synapse.channel.ChannelPosition;
import de.otto.synapse.channel.aws.KinesisShard;
import de.otto.synapse.consumer.MessageConsumer;
import de.otto.synapse.endpoint.receiver.MessageLogReceiverEndpoint;
import de.otto.synapse.logging.LogHelper;
import de.otto.synapse.message.Message;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
import software.amazon.awssdk.services.kinesis.model.Shard;

public class KinesisMessageLogReceiverEndpoint
extends MessageLogReceiverEndpoint {
    private static final Logger LOG = LoggerFactory.getLogger(KinesisMessageLogReceiverEndpoint.class);
    private final KinesisClient kinesisClient;
    private List<KinesisShard> kinesisShards;
    private ExecutorService executorService;

    public KinesisMessageLogReceiverEndpoint(KinesisClient kinesisClient, ObjectMapper objectMapper, String channelName) {
        super(channelName, objectMapper);
        this.kinesisClient = kinesisClient;
        this.initExecutorService();
    }

    @Nonnull
    public ChannelPosition consume(@Nonnull ChannelPosition startFrom, @Nonnull Predicate<Message<?>> stopCondition) {
        try {
            long t1 = System.currentTimeMillis();
            if (Objects.isNull(this.executorService)) {
                this.initExecutorService();
            }
            List futureShardPositions = this.kinesisShards.stream().map(shard -> CompletableFuture.supplyAsync(() -> this.consumeShard((KinesisShard)shard, startFrom, stopCondition), this.executorService)).collect(Collectors.toList());
            List shardPositions = futureShardPositions.stream().map(CompletableFuture::join).collect(Collectors.toList());
            long t2 = System.currentTimeMillis();
            LogHelper.info((Logger)LOG, (Map)ImmutableMap.of((Object)"runtime", (Object)(t2 - t1)), (String)"Consume events from Kinesis", null);
            return ChannelPosition.merge(shardPositions);
        }
        catch (RuntimeException e) {
            LOG.error("Failed to consume from Kinesis stream {}: {}", (Object)this.getChannelName(), (Object)e.getMessage());
            this.stop();
            this.executorService.shutdownNow();
            try {
                boolean allThreadsSafelyTerminated = this.executorService.awaitTermination(30L, TimeUnit.SECONDS);
                if (!allThreadsSafelyTerminated) {
                    LOG.error("Kinesis Thread for stream {} is still running", (Object)this.getChannelName());
                }
            }
            catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
            }
            this.executorService = null;
            throw e;
        }
    }

    private void initExecutorService() {
        this.kinesisShards = this.retrieveAllOpenShards();
        this.executorService = this.kinesisShards.isEmpty() ? Executors.newSingleThreadExecutor() : Executors.newFixedThreadPool(this.kinesisShards.size(), new ThreadFactoryBuilder().setNameFormat("kinesis-message-log-%d").build());
    }

    private ChannelPosition consumeShard(KinesisShard shard, ChannelPosition startFrom, Predicate<Message<?>> stopCondition) {
        try {
            return shard.consumeShard(startFrom, stopCondition, (MessageConsumer<String>)this.getMessageDispatcher());
        }
        catch (RuntimeException e) {
            LOG.error("Failed to consume from Kinesis shard {}: {}", new Object[]{this.getChannelName(), shard.getShardId(), e.getMessage()});
            this.stop();
            throw e;
        }
    }

    public void stop() {
        this.kinesisShards.forEach(KinesisShard::stop);
    }

    @VisibleForTesting
    List<KinesisShard> getCurrentKinesisShards() {
        return this.kinesisShards;
    }

    private List<KinesisShard> retrieveAllOpenShards() {
        return (List)this.retrieveAllShards().stream().filter(this::isShardOpen).map(shard -> new KinesisShard(shard.shardId(), this.getChannelName(), this.kinesisClient, this.getInterceptorChain())).collect(ImmutableList.toImmutableList());
    }

    private List<Shard> retrieveAllShards() {
        ArrayList<Shard> shardList = new ArrayList<Shard>();
        boolean fetchMore = true;
        while (fetchMore) {
            fetchMore = this.retrieveAndAppendNextBatchOfShards(shardList);
        }
        return shardList;
    }

    private boolean retrieveAndAppendNextBatchOfShards(List<Shard> shardList) {
        DescribeStreamRequest describeStreamRequest = (DescribeStreamRequest)DescribeStreamRequest.builder().streamName(this.getChannelName()).exclusiveStartShardId(this.getLastSeenShardId(shardList)).limit(Integer.valueOf(10)).build();
        DescribeStreamResponse describeStreamResult = this.kinesisClient.describeStream(describeStreamRequest);
        shardList.addAll(describeStreamResult.streamDescription().shards());
        return describeStreamResult.streamDescription().hasMoreShards();
    }

    private String getLastSeenShardId(List<Shard> shardList) {
        if (!shardList.isEmpty()) {
            return shardList.get(shardList.size() - 1).shardId();
        }
        return null;
    }

    private boolean isShardOpen(Shard shard) {
        if (shard.sequenceNumberRange().endingSequenceNumber() == null) {
            return true;
        }
        LOG.warn("Shard with id {} is closed. Cannot retrieve data.", (Object)shard.shardId());
        return false;
    }
}

