/*
 * Decompiled with CFR 0.152.
 */
package de.otto.synapse.eventsource.aws;

import de.otto.synapse.channel.ChannelPosition;
import de.otto.synapse.consumer.MessageConsumer;
import de.otto.synapse.consumer.MessageDispatcher;
import de.otto.synapse.eventsource.EventSource;
import de.otto.synapse.message.Message;
import java.util.Objects;
import java.util.function.Predicate;
import javax.annotation.Nonnull;

public class CompactedKinesisEventSource
implements EventSource {
    private final EventSource snapshotEventSource;
    private final EventSource kinesisEventSource;
    private final String channelName;

    public CompactedKinesisEventSource(EventSource snapshotEventSource, EventSource kinesisEventSource) {
        Objects.requireNonNull(snapshotEventSource, "snapshot event source must not be null");
        Objects.requireNonNull(kinesisEventSource, "kinesis event source must not be null");
        if (!snapshotEventSource.getChannelName().equals(kinesisEventSource.getChannelName())) {
            throw new IllegalArgumentException(String.format("given event sources must have same stream name, but was: '%s' and '%s'", snapshotEventSource.getChannelName(), kinesisEventSource.getChannelName()));
        }
        this.snapshotEventSource = snapshotEventSource;
        this.kinesisEventSource = kinesisEventSource;
        this.channelName = kinesisEventSource.getChannelName();
    }

    public void register(MessageConsumer<?> messageConsumer) {
        this.snapshotEventSource.register(messageConsumer);
        this.kinesisEventSource.register(messageConsumer);
    }

    public String getName() {
        return this.kinesisEventSource.getName();
    }

    @Nonnull
    public MessageDispatcher getMessageDispatcher() {
        return this.snapshotEventSource.getMessageDispatcher();
    }

    public String getChannelName() {
        return this.channelName;
    }

    public ChannelPosition consume(ChannelPosition startFrom, Predicate<Message<?>> stopCondition) {
        Predicate<Message> neverStop = e -> false;
        ChannelPosition channelPosition = this.snapshotEventSource.consume(neverStop);
        return this.kinesisEventSource.consume(channelPosition, stopCondition);
    }

    public void stop() {
        this.kinesisEventSource.stop();
        this.snapshotEventSource.stop();
    }

    public boolean isStopping() {
        return this.kinesisEventSource.isStopping() || this.snapshotEventSource.isStopping();
    }
}

