/*
 * Decompiled with CFR 0.152.
 */
package de.otto.synapse.aws.s3;

import com.fasterxml.jackson.databind.ObjectMapper;
import de.otto.synapse.aws.s3.SnapshotConsumerService;
import de.otto.synapse.aws.s3.SnapshotFileTimestampParser;
import de.otto.synapse.aws.s3.SnapshotReadService;
import de.otto.synapse.channel.ChannelPosition;
import de.otto.synapse.consumer.MessageConsumer;
import de.otto.synapse.consumer.MessageDispatcher;
import de.otto.synapse.eventsource.EventSource;
import de.otto.synapse.eventsource.EventSourceNotification;
import de.otto.synapse.eventsource.aws.SnapshotEventSourceNotification;
import de.otto.synapse.message.Message;
import java.io.File;
import java.time.Instant;
import java.util.Optional;
import java.util.function.Predicate;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEventPublisher;

public class SnapshotEventSource
implements EventSource {
    private static final Logger LOG = LoggerFactory.getLogger(SnapshotEventSource.class);
    private final String name;
    private final String channelName;
    private final SnapshotReadService snapshotReadService;
    private final SnapshotConsumerService snapshotConsumerService;
    private final ApplicationEventPublisher eventPublisher;
    private final MessageDispatcher messageDispatcher;

    public SnapshotEventSource(String name, String channelName, SnapshotReadService snapshotReadService, SnapshotConsumerService snapshotConsumerService, ApplicationEventPublisher eventPublisher, ObjectMapper objectMapper) {
        this.name = name;
        this.channelName = channelName;
        this.snapshotReadService = snapshotReadService;
        this.snapshotConsumerService = snapshotConsumerService;
        this.eventPublisher = eventPublisher;
        this.messageDispatcher = new MessageDispatcher(objectMapper);
    }

    public String getName() {
        return this.name;
    }

    public void register(MessageConsumer<?> messageConsumer) {
        this.messageDispatcher.add(messageConsumer);
    }

    @Nonnull
    public MessageDispatcher getMessageDispatcher() {
        return this.messageDispatcher;
    }

    public String getChannelName() {
        return this.channelName;
    }

    public ChannelPosition consume(ChannelPosition startFrom, Predicate<Message<?>> stopCondition) {
        ChannelPosition snapshotStreamPosition;
        Instant snapshotTimestamp = null;
        try {
            this.publishEvent(startFrom, EventSourceNotification.Status.STARTED, "Loading snapshot from S3.", null);
            Optional<File> snapshotFile = this.snapshotReadService.retrieveLatestSnapshot(this.getChannelName());
            if (snapshotFile.isPresent()) {
                snapshotTimestamp = SnapshotFileTimestampParser.getSnapshotTimestamp(snapshotFile.get().getName());
                snapshotStreamPosition = this.snapshotConsumerService.consumeSnapshot(snapshotFile.get(), stopCondition, (MessageConsumer<String>)this.getMessageDispatcher());
            } else {
                snapshotStreamPosition = ChannelPosition.fromHorizon();
            }
        }
        catch (RuntimeException e) {
            this.publishEvent(ChannelPosition.fromHorizon(), EventSourceNotification.Status.FAILED, "Failed to load snapshot from S3: " + e.getMessage(), null);
            throw e;
        }
        finally {
            LOG.info("Finished reading snapshot into Memory");
            this.snapshotReadService.deleteOlderSnapshots(this.getChannelName());
        }
        this.publishEvent(snapshotStreamPosition, EventSourceNotification.Status.FINISHED, "Finished to load snapshot from S3.", snapshotTimestamp);
        return snapshotStreamPosition;
    }

    public void stop() {
    }

    public boolean isStopping() {
        return false;
    }

    private void publishEvent(ChannelPosition channelPosition, EventSourceNotification.Status status, String message, Instant snapshotTimestamp) {
        if (this.eventPublisher != null) {
            EventSourceNotification notification = SnapshotEventSourceNotification.builder().withSnapshotTimestamp(snapshotTimestamp).withEventSourceName(this.name).withChannelName(this.getChannelName()).withChannelPosition(channelPosition).withStatus(status).withMessage(message).build();
            try {
                this.eventPublisher.publishEvent((Object)notification);
            }
            catch (Exception e) {
                LOG.error("error publishing event source notification: {}", (Object)notification, (Object)e);
            }
        }
    }
}

