/*
 * Decompiled with CFR 0.152.
 */
package de.otto.synapse.endpoint.receiver.aws;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import de.otto.synapse.channel.ChannelDurationBehind;
import de.otto.synapse.channel.ChannelPosition;
import de.otto.synapse.consumer.MessageDispatcher;
import de.otto.synapse.endpoint.InterceptorChain;
import de.otto.synapse.endpoint.receiver.AbstractMessageLogReceiverEndpoint;
import de.otto.synapse.endpoint.receiver.aws.KinesisMessageLogReader;
import de.otto.synapse.endpoint.receiver.aws.KinesisShardReader;
import de.otto.synapse.endpoint.receiver.aws.KinesisShardResponse;
import de.otto.synapse.info.MessageReceiverNotification;
import de.otto.synapse.info.MessageReceiverStatus;
import de.otto.synapse.logging.LogHelper;
import de.otto.synapse.message.Message;
import java.time.Clock;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEventPublisher;
import software.amazon.awssdk.services.kinesis.KinesisClient;

public class KinesisMessageLogReceiverEndpoint
extends AbstractMessageLogReceiverEndpoint {
    private static final Logger LOG = LoggerFactory.getLogger(KinesisMessageLogReceiverEndpoint.class);
    private final KinesisMessageLogReader kinesisMessageLogReader;
    private final ApplicationEventPublisher eventPublisher;

    public KinesisMessageLogReceiverEndpoint(String channelName, KinesisClient kinesisClient, ObjectMapper objectMapper, ApplicationEventPublisher eventPublisher) {
        this(channelName, kinesisClient, objectMapper, eventPublisher, Clock.systemDefaultZone());
    }

    public KinesisMessageLogReceiverEndpoint(String channelName, KinesisClient kinesisClient, ObjectMapper objectMapper, ApplicationEventPublisher eventPublisher, Clock clock) {
        super(channelName, objectMapper, eventPublisher);
        this.eventPublisher = eventPublisher;
        this.kinesisMessageLogReader = new KinesisMessageLogReader(channelName, kinesisClient, clock);
    }

    @Nonnull
    public CompletableFuture<ChannelPosition> consumeUntil(@Nonnull ChannelPosition startFrom, @Nonnull Instant until) {
        try {
            this.publishEvent(MessageReceiverStatus.STARTING, "Consuming messages from Kinesis.", null);
            long t1 = System.currentTimeMillis();
            List<String> shards = this.kinesisMessageLogReader.getOpenShards();
            this.publishEvent(MessageReceiverStatus.STARTED, "Received shards from Kinesis.", null);
            KinesisShardResponseConsumer consumer = new KinesisShardResponseConsumer(shards, this.getInterceptorChain(), this.getMessageDispatcher(), this.eventPublisher);
            return ((CompletableFuture)this.kinesisMessageLogReader.consumeUntil(startFrom, until, consumer).exceptionally(throwable -> {
                LOG.error("Failed to consume from Kinesis stream {}: {}", (Object)this.getChannelName(), (Object)throwable.getMessage());
                this.publishEvent(MessageReceiverStatus.FAILED, "Failed to consume messages from Kinesis: " + throwable.getMessage(), null);
                this.stop();
                throw new RuntimeException(throwable.getMessage(), (Throwable)throwable);
            })).thenApply(channelPosition -> {
                long t2 = System.currentTimeMillis();
                LogHelper.info((Logger)LOG, (Map)ImmutableMap.of((Object)"runtime", (Object)(t2 - t1)), (String)"Consume events from Kinesis", null);
                this.publishEvent(MessageReceiverStatus.FINISHED, "Finished consuming messages from Kinesis", null);
                return channelPosition;
            });
        }
        catch (Exception e) {
            LOG.error("Failed to consume from Kinesis stream {}: {}", (Object)this.getChannelName(), (Object)e.getMessage());
            this.publishEvent(MessageReceiverStatus.FAILED, "Failed to consume messages from Kinesis: " + e.getMessage(), null);
            this.stop();
            throw new RuntimeException(e.getMessage(), e);
        }
    }

    public void stop() {
        LOG.info("Channel {} received stop signal.", (Object)this.getChannelName());
        this.kinesisMessageLogReader.stop();
    }

    @VisibleForTesting
    List<KinesisShardReader> getCurrentKinesisShards() {
        return this.kinesisMessageLogReader.getCurrentKinesisShards();
    }

    private static class KinesisShardResponseConsumer
    implements Consumer<KinesisShardResponse> {
        private final AtomicReference<ChannelDurationBehind> channelDurationBehind = new AtomicReference();
        private final InterceptorChain interceptorChain;
        private final MessageDispatcher messageDispatcher;
        private final ApplicationEventPublisher eventPublisher;

        private KinesisShardResponseConsumer(List<String> shardNames, InterceptorChain interceptorChain, MessageDispatcher messageDispatcher, ApplicationEventPublisher eventPublisher) {
            this.interceptorChain = interceptorChain;
            this.messageDispatcher = messageDispatcher;
            this.eventPublisher = eventPublisher;
            this.channelDurationBehind.set(ChannelDurationBehind.unknown(shardNames));
        }

        @Override
        public void accept(KinesisShardResponse response) {
            response.getMessages().forEach(message -> {
                try {
                    Message interceptedMessage = this.interceptorChain.intercept(message);
                    if (interceptedMessage != null) {
                        this.messageDispatcher.accept(message);
                    }
                }
                catch (Exception e) {
                    LOG.error("Error processing message: " + e.getMessage(), (Throwable)e);
                }
            });
            this.channelDurationBehind.updateAndGet(behind -> ChannelDurationBehind.copyOf((ChannelDurationBehind)behind).with(response.getShardName(), response.getDurationBehind()).build());
            if (this.eventPublisher != null) {
                this.eventPublisher.publishEvent((Object)MessageReceiverNotification.builder().withChannelName(response.getChannelName()).withChannelDurationBehind(this.channelDurationBehind.get()).withStatus(MessageReceiverStatus.RUNNING).withMessage("Reading from kinesis shard.").build());
            }
        }
    }
}

