/*
 * Decompiled with CFR 0.152.
 */
package de.otto.synapse.eventsource.aws;

import com.fasterxml.jackson.databind.ObjectMapper;
import de.otto.synapse.channel.ChannelPosition;
import de.otto.synapse.channel.ChannelResponse;
import de.otto.synapse.channel.Status;
import de.otto.synapse.channel.aws.MessageLog;
import de.otto.synapse.consumer.EventSourceNotification;
import de.otto.synapse.consumer.MessageConsumer;
import de.otto.synapse.eventsource.AbstractEventSource;
import de.otto.synapse.message.Message;
import java.util.function.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEventPublisher;

public class KinesisEventSource
extends AbstractEventSource {
    private static final Logger LOG = LoggerFactory.getLogger(KinesisEventSource.class);
    private final MessageLog messageLog;

    public KinesisEventSource(String name, MessageLog messageLog, ApplicationEventPublisher eventPublisher, ObjectMapper objectMapper) {
        super(name, eventPublisher, objectMapper);
        this.messageLog = messageLog;
    }

    public String getStreamName() {
        return this.messageLog.getStreamName();
    }

    public void stop() {
        super.stop();
    }

    public ChannelPosition consumeAll(ChannelPosition startFrom, Predicate<Message<?>> stopCondition) {
        ChannelPosition currentPosition = startFrom;
        this.publishEvent(startFrom, EventSourceNotification.Status.STARTED, "Consuming messages from Kinesis.");
        try {
            boolean consumeMore;
            do {
                ChannelResponse channelResponse = this.messageLog.consumeStream(currentPosition, stopCondition, (MessageConsumer<String>)this.dispatchingMessageConsumer());
                currentPosition = channelResponse.getChannelPosition();
                boolean bl = consumeMore = channelResponse.getStatus() != Status.STOPPED && !this.isStopping();
                if (!consumeMore) continue;
                consumeMore = this.waitABit();
            } while (consumeMore);
            this.publishEvent(currentPosition, EventSourceNotification.Status.FINISHED, "Stopped consuming messages from Kinesis.");
            return currentPosition;
        }
        catch (RuntimeException e) {
            this.publishEvent(currentPosition, EventSourceNotification.Status.FAILED, "Error consuming messages from Kinesis: " + e.getMessage());
            throw e;
        }
    }

    private boolean waitABit() {
        try {
            Thread.sleep(1000L);
        }
        catch (InterruptedException e) {
            LOG.warn("Thread got interrupted");
            return false;
        }
        return true;
    }
}

