/*
 * Decompiled with CFR 0.152.
 */
package de.otto.synapse.eventsource;

import com.fasterxml.jackson.databind.ObjectMapper;
import de.otto.synapse.channel.ChannelPosition;
import de.otto.synapse.channel.InMemoryChannel;
import de.otto.synapse.consumer.EventSourceNotification;
import de.otto.synapse.eventsource.AbstractEventSource;
import de.otto.synapse.message.Header;
import de.otto.synapse.message.Message;
import java.time.Instant;
import java.util.function.Predicate;
import org.springframework.context.ApplicationEventPublisher;

public class InMemoryEventSource
extends AbstractEventSource {
    private final InMemoryChannel inMemoryChannel;
    private final String streamName;

    public InMemoryEventSource(String name, String streamName, InMemoryChannel inMemoryChannel, ApplicationEventPublisher eventPublisher, ObjectMapper objectMapper) {
        super(name, eventPublisher, objectMapper);
        this.streamName = streamName;
        this.inMemoryChannel = inMemoryChannel;
    }

    @Override
    public String getStreamName() {
        return this.streamName;
    }

    @Override
    public ChannelPosition consumeAll(ChannelPosition startFrom, Predicate<Message<?>> stopCondition) {
        Message<String> receivedMessage;
        boolean shouldStop;
        this.publishEvent(startFrom, EventSourceNotification.Status.STARTED);
        do {
            if ((receivedMessage = this.inMemoryChannel.receive()) == null) {
                return null;
            }
            Message<String> messageWithHeaders = Message.message(receivedMessage.getKey(), Header.responseHeader(null, Instant.now()), receivedMessage.getPayload());
            this.dispatchingMessageConsumer().accept(messageWithHeaders);
        } while (!(shouldStop = stopCondition.test(receivedMessage)));
        this.publishEvent(null, EventSourceNotification.Status.FINISHED);
        return null;
    }
}

