/*
 * Decompiled with CFR 0.152.
 */
package de.otto.synapse.messagestore.aws;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Streams;
import de.otto.synapse.channel.ChannelPosition;
import de.otto.synapse.channel.ShardPosition;
import de.otto.synapse.compaction.aws.SnapshotFileHelper;
import de.otto.synapse.compaction.aws.SnapshotReadService;
import de.otto.synapse.info.SnapshotReaderNotification;
import de.otto.synapse.info.SnapshotReaderStatus;
import de.otto.synapse.message.Header;
import de.otto.synapse.message.Message;
import de.otto.synapse.messagestore.MessageStore;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.time.Instant;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.stream.Stream;
import java.util.zip.ZipInputStream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEventPublisher;

@NotThreadSafe
public class SnapshotMessageStore
implements MessageStore {
    private static final Logger LOG = LoggerFactory.getLogger(SnapshotMessageStore.class);
    private MessageIterator messageIterator;
    private ChannelPosition channelPosition;
    private ZipInputStream zipInputStream;
    private Instant snapshotTimestamp;
    private final String channelName;
    private final ApplicationEventPublisher eventPublisher;

    public SnapshotMessageStore(@Nonnull String channelName, @Nonnull SnapshotReadService snapshotReadService) {
        this(channelName, snapshotReadService, null);
    }

    public SnapshotMessageStore(@Nonnull String channelName, @Nonnull SnapshotReadService snapshotReadService, @Nullable ApplicationEventPublisher eventPublisher) {
        this.channelName = channelName;
        this.eventPublisher = eventPublisher;
        this.publishEvent(SnapshotReaderStatus.STARTING, "Retrieve snapshot file from S3.", null);
        try {
            Optional<File> latestSnapshot = snapshotReadService.retrieveLatestSnapshot(channelName);
            if (latestSnapshot.isPresent()) {
                File snapshot = latestSnapshot.get();
                this.snapshotTimestamp = SnapshotFileHelper.getSnapshotTimestamp(snapshot.getName());
                this.publishEvent(SnapshotReaderStatus.STARTED, "Retrieve snapshot file from S3.", this.snapshotTimestamp);
                this.zipInputStream = new ZipInputStream(new BufferedInputStream(new FileInputStream(snapshot)));
                this.zipInputStream.getNextEntry();
                JsonFactory jsonFactory = new JsonFactory();
                JsonParser jsonParser = jsonFactory.createParser((InputStream)this.zipInputStream);
                while (!jsonParser.isClosed() && this.messageIterator == null) {
                    JsonToken currentToken = jsonParser.nextToken();
                    if (currentToken != JsonToken.FIELD_NAME) continue;
                    switch (jsonParser.getValueAsString()) {
                        case "startSequenceNumbers": {
                            this.channelPosition = this.processSequenceNumbers(jsonParser);
                            break;
                        }
                        case "data": {
                            this.messageIterator = new MessageIterator(jsonParser);
                            break;
                        }
                    }
                }
            } else {
                LOG.info("No Snapshot available. Returning emptyMessageStore MessageStore.");
            }
        }
        catch (Exception e) {
            try {
                this.zipInputStream.close();
            }
            catch (Exception exception) {
                // empty catch block
            }
            this.publishEvent(SnapshotReaderStatus.FAILED, "Failed to load snapshot from S3: " + e.getMessage(), this.snapshotTimestamp);
            throw new RuntimeException(e);
        }
    }

    public void close() throws IOException {
        LOG.info("Closing SnapshotMessageStore");
        this.publishEvent(SnapshotReaderStatus.FINISHED, "Finished to load snapshot from S3.", this.snapshotTimestamp);
        if (this.zipInputStream != null) {
            this.zipInputStream.close();
        }
    }

    public ChannelPosition getLatestChannelPosition() {
        return this.channelPosition != null ? this.channelPosition : ChannelPosition.fromHorizon();
    }

    public Stream<Message<String>> stream() {
        return this.messageIterator != null ? Streams.stream((Iterator)this.messageIterator) : Stream.empty();
    }

    private ChannelPosition processSequenceNumbers(JsonParser parser) throws IOException {
        ImmutableMap.Builder shardPositions = ImmutableMap.builder();
        String shardName = null;
        String sequenceNumber = null;
        while (parser.nextToken() != JsonToken.END_ARRAY) {
            JsonToken currentToken = parser.currentToken();
            block0 : switch (currentToken) {
                case FIELD_NAME: {
                    switch (parser.getValueAsString()) {
                        case "shard": {
                            parser.nextToken();
                            shardName = parser.getValueAsString();
                            break block0;
                        }
                        case "sequenceNumber": {
                            parser.nextToken();
                            sequenceNumber = parser.getValueAsString();
                            break block0;
                        }
                    }
                    break;
                }
                case END_OBJECT: {
                    if (shardName != null) {
                        ShardPosition shardPosition = sequenceNumber != null && !sequenceNumber.equals("") ? ShardPosition.fromPosition(shardName, (String)sequenceNumber) : ShardPosition.fromHorizon(shardName);
                        shardPositions.put(shardName, (Object)shardPosition);
                    }
                    shardName = null;
                    sequenceNumber = null;
                    break;
                }
            }
        }
        return ChannelPosition.channelPosition((Iterable)shardPositions.build().values());
    }

    private void publishEvent(SnapshotReaderStatus status, String message, Instant snapshotTimestamp) {
        if (this.eventPublisher != null) {
            SnapshotReaderNotification notification = SnapshotReaderNotification.builder().withSnapshotTimestamp(snapshotTimestamp).withChannelName(this.channelName).withStatus(status).withMessage(message).build();
            try {
                this.eventPublisher.publishEvent((Object)notification);
            }
            catch (Exception e) {
                LOG.error("error publishing event source notification: {}", (Object)notification, (Object)e);
            }
        }
    }

    private static class MessageIterator
    implements Iterator<Message<String>> {
        private Message<String> nextMessage = null;
        private JsonParser jsonParser;

        private MessageIterator(JsonParser jsonParser) {
            this.jsonParser = jsonParser;
        }

        @Override
        public boolean hasNext() {
            try {
                if (this.nextMessage == null) {
                    Instant arrivalTimestamp = Instant.EPOCH;
                    while (!this.jsonParser.isClosed() && this.jsonParser.nextToken() != JsonToken.END_ARRAY) {
                        JsonToken currentToken = this.jsonParser.currentToken();
                        if (currentToken != JsonToken.FIELD_NAME) continue;
                        String key = this.jsonParser.getValueAsString();
                        this.nextMessage = Message.message((String)key, (Header)Header.responseHeader(null, (Instant)arrivalTimestamp), (Object)this.jsonParser.nextTextValue());
                        break;
                    }
                }
                return this.nextMessage != null;
            }
            catch (IOException e) {
                throw new RuntimeException(e.getMessage(), e);
            }
        }

        @Override
        public Message<String> next() {
            if (this.hasNext()) {
                Message<String> nextMessage = this.nextMessage;
                this.nextMessage = null;
                return nextMessage;
            }
            throw new NoSuchElementException("No more messages available");
        }
    }
}

