/*
 * Decompiled with CFR 0.152.
 */
package de.otto.synapse.aws.s3;

import com.fasterxml.jackson.databind.ObjectMapper;
import de.otto.synapse.aws.s3.SnapshotChannelPosition;
import de.otto.synapse.aws.s3.SnapshotConsumerService;
import de.otto.synapse.aws.s3.SnapshotFileTimestampParser;
import de.otto.synapse.aws.s3.SnapshotReadService;
import de.otto.synapse.channel.ChannelPosition;
import de.otto.synapse.consumer.EventSourceNotification;
import de.otto.synapse.eventsource.AbstractEventSource;
import de.otto.synapse.message.Message;
import java.io.File;
import java.util.Optional;
import java.util.function.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEventPublisher;

public class SnapshotEventSource
extends AbstractEventSource {
    private static final Logger LOG = LoggerFactory.getLogger(SnapshotEventSource.class);
    private final SnapshotReadService snapshotReadService;
    private final String streamName;
    private final SnapshotConsumerService snapshotConsumerService;

    public SnapshotEventSource(String name, String streamName, SnapshotReadService snapshotReadService, SnapshotConsumerService snapshotConsumerService, ApplicationEventPublisher eventPublisher, ObjectMapper objectMapper) {
        super(name, eventPublisher, objectMapper);
        this.streamName = streamName;
        this.snapshotReadService = snapshotReadService;
        this.snapshotConsumerService = snapshotConsumerService;
    }

    public String getStreamName() {
        return this.streamName;
    }

    public SnapshotChannelPosition consumeAll() {
        return this.consumeAll(ChannelPosition.fromHorizon(), (T event) -> false);
    }

    public SnapshotChannelPosition consumeAll(ChannelPosition startFrom) {
        return this.consumeAll(ChannelPosition.fromHorizon());
    }

    public SnapshotChannelPosition consumeAll(ChannelPosition startFrom, Predicate<Message<?>> stopCondition) {
        SnapshotChannelPosition snapshotStreamPosition;
        try {
            this.publishEvent(startFrom, EventSourceNotification.Status.STARTED, "Loading snapshot from S3.");
            Optional<File> snapshotFile = this.snapshotReadService.retrieveLatestSnapshot(this.streamName);
            if (snapshotFile.isPresent()) {
                ChannelPosition channelPosition = this.snapshotConsumerService.consumeSnapshot(snapshotFile.get(), this.streamName, stopCondition, this.dispatchingMessageConsumer());
                snapshotStreamPosition = SnapshotChannelPosition.of(channelPosition, SnapshotFileTimestampParser.getSnapshotTimestamp(snapshotFile.get().getName()));
            } else {
                snapshotStreamPosition = SnapshotChannelPosition.of();
            }
        }
        catch (RuntimeException e) {
            this.publishEvent(SnapshotChannelPosition.of(), EventSourceNotification.Status.FAILED, "Failed to load snapshot from S3: " + e.getMessage());
            throw e;
        }
        finally {
            LOG.info("Finished reading snapshot into Memory");
            this.snapshotReadService.deleteOlderSnapshots(this.streamName);
        }
        this.publishEvent(snapshotStreamPosition, EventSourceNotification.Status.FINISHED, "Finished to load snapshot from S3.");
        return snapshotStreamPosition;
    }
}

