/*
 * Decompiled with CFR 0.152.
 */
package de.otto.synapse.eventsource.aws;

import com.fasterxml.jackson.databind.ObjectMapper;
import de.otto.synapse.channel.ChannelPosition;
import de.otto.synapse.channel.aws.MessageLog;
import de.otto.synapse.consumer.EventSourceNotification;
import de.otto.synapse.consumer.MessageConsumer;
import de.otto.synapse.eventsource.AbstractEventSource;
import de.otto.synapse.message.Message;
import java.util.function.Predicate;
import org.springframework.context.ApplicationEventPublisher;

public class KinesisEventSource
extends AbstractEventSource {
    private final MessageLog messageLog;

    public KinesisEventSource(String name, MessageLog messageLog, ApplicationEventPublisher eventPublisher, ObjectMapper objectMapper) {
        super(name, eventPublisher, objectMapper);
        this.messageLog = messageLog;
    }

    public String getStreamName() {
        return this.messageLog.getStreamName();
    }

    public void stop() {
        super.stop();
        this.messageLog.stop();
    }

    public ChannelPosition consumeAll(ChannelPosition startFrom, Predicate<Message<?>> stopCondition) {
        this.publishEvent(startFrom, EventSourceNotification.Status.STARTED, "Consuming messages from Kinesis.");
        try {
            ChannelPosition currentPosition = this.messageLog.consumeStream(startFrom, stopCondition, (MessageConsumer<String>)this.dispatchingMessageConsumer());
            this.publishEvent(currentPosition, EventSourceNotification.Status.FINISHED, "Stopped consuming messages from Kinesis.");
            return currentPosition;
        }
        catch (RuntimeException e) {
            this.publishEvent(startFrom, EventSourceNotification.Status.FAILED, "Error consuming messages from Kinesis: " + e.getMessage());
            throw e;
        }
    }
}

