/*
 * Decompiled with CFR 0.152.
 */
package de.otto.synapse.eventsource.aws;

import de.otto.synapse.channel.ChannelPosition;
import de.otto.synapse.endpoint.receiver.MessageLogReceiverEndpoint;
import de.otto.synapse.eventsource.AbstractEventSource;
import de.otto.synapse.eventsource.EventSourceNotification;
import de.otto.synapse.message.Message;
import java.util.function.Predicate;
import javax.annotation.Nonnull;
import org.springframework.context.ApplicationEventPublisher;

public class KinesisEventSource
extends AbstractEventSource {
    private final MessageLogReceiverEndpoint messageLog;

    public KinesisEventSource(String name, MessageLogReceiverEndpoint messageLog, ApplicationEventPublisher eventPublisher) {
        super(name, messageLog, eventPublisher);
        this.messageLog = messageLog;
    }

    public void stop() {
        super.stop();
        this.messageLog.stop();
    }

    @Nonnull
    public ChannelPosition consume(ChannelPosition startFrom, Predicate<Message<?>> stopCondition) {
        this.publishEvent(startFrom, EventSourceNotification.Status.STARTED, "Consuming messages from Kinesis.");
        try {
            ChannelPosition currentPosition = this.messageLog.consume(startFrom, stopCondition);
            this.publishEvent(currentPosition, EventSourceNotification.Status.FINISHED, "Stopped consuming messages from Kinesis.");
            return currentPosition;
        }
        catch (RuntimeException e) {
            this.publishEvent(startFrom, EventSourceNotification.Status.FAILED, "Error consuming messages from Kinesis: " + e.getMessage());
            throw e;
        }
    }

    MessageLogReceiverEndpoint getMessageLogReceiverEndpoint() {
        return this.messageLog;
    }
}

