/*
 * Decompiled with CFR 0.152.
 */
package de.otto.synapse.endpoint.receiver.aws;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import de.otto.synapse.channel.ChannelPosition;
import de.otto.synapse.endpoint.receiver.aws.KinesisMessageLogIterator;
import de.otto.synapse.endpoint.receiver.aws.KinesisMessageLogResponse;
import de.otto.synapse.endpoint.receiver.aws.KinesisShardIterator;
import de.otto.synapse.endpoint.receiver.aws.KinesisShardReader;
import de.otto.synapse.endpoint.receiver.aws.KinesisShardResponse;
import java.time.Clock;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
import software.amazon.awssdk.services.kinesis.model.Shard;

public class KinesisMessageLogReader {
    private static final Logger LOG = LoggerFactory.getLogger(KinesisMessageLogReader.class);
    private final String channelName;
    private final KinesisClient kinesisClient;
    private final Clock clock;
    private List<KinesisShardReader> kinesisShardReaders;
    private ExecutorService executorService;

    public KinesisMessageLogReader(String channelName, KinesisClient kinesisClient, Clock clock) {
        this.channelName = channelName;
        this.kinesisClient = kinesisClient;
        this.clock = clock;
    }

    public String getChannelName() {
        return this.channelName;
    }

    public List<String> getOpenShards() {
        if (Objects.isNull(this.executorService)) {
            this.initExecutorService();
        }
        return this.kinesisShardReaders.stream().map(KinesisShardReader::getShardName).collect(Collectors.toList());
    }

    public KinesisMessageLogIterator getMessageLogIterator(ChannelPosition channelPosition) {
        if (Objects.isNull(this.executorService)) {
            this.initExecutorService();
        }
        try {
            List futureShardPositions = this.kinesisShardReaders.stream().map(shardReader -> CompletableFuture.supplyAsync(() -> new KinesisShardIterator(this.kinesisClient, this.channelName, channelPosition.shard(shardReader.getShardName())), this.executorService)).collect(Collectors.toList());
            return new KinesisMessageLogIterator(futureShardPositions.stream().map(CompletableFuture::join).collect(Collectors.toList()));
        }
        catch (RuntimeException e) {
            this.shutdownExecutor();
            throw e;
        }
    }

    public CompletableFuture<KinesisMessageLogResponse> read(KinesisMessageLogIterator iterator) {
        if (Objects.isNull(this.executorService)) {
            this.initExecutorService();
        }
        try {
            List futureShardPositions = this.kinesisShardReaders.stream().map(shardReader -> CompletableFuture.supplyAsync(() -> {
                KinesisShardIterator shardIterator = iterator.getShardIterator(shardReader.getShardName());
                return shardIterator.next();
            }, this.executorService)).collect(Collectors.toList());
            return CompletableFuture.supplyAsync(() -> new KinesisMessageLogResponse(this.channelName, (ImmutableList<KinesisShardResponse>)((ImmutableList)futureShardPositions.stream().map(CompletableFuture::join).collect(ImmutableList.toImmutableList()))), this.executorService);
        }
        catch (RuntimeException e) {
            this.shutdownExecutor();
            throw e;
        }
    }

    public CompletableFuture<ChannelPosition> consumeUntil(ChannelPosition startFrom, Instant until, Consumer<KinesisShardResponse> consumer) {
        if (Objects.isNull(this.executorService)) {
            this.initExecutorService();
        }
        try {
            List futureShardPositions = this.kinesisShardReaders.stream().map(shard -> shard.consumeUntil(startFrom.shard(shard.getShardName()), until, consumer)).collect(Collectors.toList());
            return CompletableFuture.supplyAsync(() -> ChannelPosition.channelPosition((Iterable)futureShardPositions.stream().map(CompletableFuture::join).collect(Collectors.toList()))).exceptionally(throwable -> {
                this.shutdownExecutor();
                throw new RuntimeException(throwable.getMessage(), (Throwable)throwable);
            });
        }
        catch (RuntimeException e) {
            this.shutdownExecutor();
            throw e;
        }
    }

    private void initExecutorService() {
        Set<String> openShards = this.retrieveAllOpenShards();
        this.executorService = openShards.isEmpty() ? Executors.newSingleThreadExecutor() : Executors.newFixedThreadPool(openShards.size() + 1, new ThreadFactoryBuilder().setNameFormat("kinesis-message-log-%d").build());
        this.kinesisShardReaders = openShards.stream().map(shardName -> new KinesisShardReader(this.channelName, (String)shardName, this.kinesisClient, this.executorService, this.clock)).collect(Collectors.toList());
    }

    private Set<String> retrieveAllOpenShards() {
        return (Set)this.retrieveAllShards().stream().filter(this::isShardOpen).map(Shard::shardId).collect(ImmutableSet.toImmutableSet());
    }

    private List<Shard> retrieveAllShards() {
        ArrayList<Shard> shardList = new ArrayList<Shard>();
        boolean fetchMore = true;
        while (fetchMore) {
            fetchMore = this.retrieveAndAppendNextBatchOfShards(shardList);
        }
        return shardList;
    }

    private boolean retrieveAndAppendNextBatchOfShards(List<Shard> shardList) {
        DescribeStreamRequest describeStreamRequest = (DescribeStreamRequest)DescribeStreamRequest.builder().streamName(this.getChannelName()).exclusiveStartShardId(this.getLastSeenShardId(shardList)).limit(Integer.valueOf(10)).build();
        DescribeStreamResponse describeStreamResult = this.kinesisClient.describeStream(describeStreamRequest);
        shardList.addAll(describeStreamResult.streamDescription().shards());
        return describeStreamResult.streamDescription().hasMoreShards();
    }

    private String getLastSeenShardId(List<Shard> shardList) {
        if (!shardList.isEmpty()) {
            return shardList.get(shardList.size() - 1).shardId();
        }
        return null;
    }

    private boolean isShardOpen(Shard shard) {
        if (shard.sequenceNumberRange().endingSequenceNumber() == null) {
            return true;
        }
        LOG.warn("Shard with id {} is closed. Cannot retrieve data.", (Object)shard.shardId());
        return false;
    }

    private void shutdownExecutor() {
        if (this.executorService != null) {
            this.executorService.shutdownNow();
            try {
                boolean allThreadsSafelyTerminated = this.executorService.awaitTermination(30L, TimeUnit.SECONDS);
                if (!allThreadsSafelyTerminated) {
                    LOG.error("Kinesis Thread for stream {} is still running", (Object)this.getChannelName());
                }
            }
            catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
            }
            this.executorService = null;
        }
    }

    public void stop() {
        LOG.info("Channel {} received stop signal.", (Object)this.getChannelName());
        this.kinesisShardReaders.forEach(KinesisShardReader::stop);
    }

    @VisibleForTesting
    List<KinesisShardReader> getCurrentKinesisShards() {
        if (Objects.isNull(this.executorService)) {
            this.initExecutorService();
        }
        return this.kinesisShardReaders;
    }
}

