/*
 * Decompiled with CFR 0.152.
 */
package de.otto.synapse.messagestore;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Streams;
import de.otto.synapse.channel.ChannelPosition;
import de.otto.synapse.message.TextMessage;
import de.otto.synapse.messagestore.ChannelPositions;
import de.otto.synapse.messagestore.Index;
import de.otto.synapse.messagestore.Indexer;
import de.otto.synapse.messagestore.Indexers;
import de.otto.synapse.messagestore.MessageStore;
import de.otto.synapse.messagestore.MessageStoreEntry;
import java.util.Set;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import org.dizitart.no2.Cursor;
import org.dizitart.no2.Document;
import org.dizitart.no2.IndexOptions;
import org.dizitart.no2.IndexType;
import org.dizitart.no2.Nitrite;
import org.dizitart.no2.NitriteCollection;
import org.dizitart.no2.filters.Filters;

public class OffHeapIndexingMessageStore
implements MessageStore {
    private final Nitrite nitrite;
    private final NitriteCollection messages;
    private final ChannelPositions channelPositions = new ChannelPositions();
    private final Indexer indexer;

    public OffHeapIndexingMessageStore(String name) {
        this(name, Indexers.noOpIndexer());
    }

    public OffHeapIndexingMessageStore(String name, Indexer indexer) {
        this.nitrite = Nitrite.builder().openOrCreate();
        this.messages = this.nitrite.getCollection(name + "-messages");
        indexer.getIndexes().forEach(index -> this.messages.createIndex("_idx_" + index.getName(), IndexOptions.indexOptions((IndexType)IndexType.NonUnique)));
        this.indexer = indexer;
    }

    @Override
    public Set<String> getChannelNames() {
        return this.channelPositions.getChannelNames();
    }

    @Override
    public ImmutableSet<Index> getIndexes() {
        return this.indexer.getIndexes();
    }

    @Override
    public ChannelPosition getLatestChannelPosition(String channelName) {
        return this.channelPositions.getLatestChannelPosition(channelName);
    }

    @Override
    public Stream<MessageStoreEntry> stream() {
        return this.toEntryStream(this.messages.find());
    }

    @Override
    public Stream<MessageStoreEntry> stream(Index index, String value) {
        return this.toEntryStream(this.messages.find(Filters.eq((String)("_idx_" + index.getName()), (Object)value)));
    }

    @Override
    public void add(@Nonnull MessageStoreEntry entry) {
        MessageStoreEntry indexedEntry = this.indexer.index(entry);
        Document document = Document.createDocument((String)"channelName", (Object)indexedEntry.getChannelName());
        indexedEntry.getFilterValues().forEach((index, value) -> document.put("_idx_" + index.getName(), value));
        document.put("message", (Object)entry.getTextMessage());
        this.messages.insert(document, new Document[0]);
        this.channelPositions.updateFrom(entry);
    }

    @Override
    public long size() {
        return this.messages.size();
    }

    @Override
    public void close() {
        this.nitrite.close();
    }

    @Nonnull
    private Stream<MessageStoreEntry> toEntryStream(Cursor cursor) {
        return Streams.stream((Iterable)cursor).map(document -> {
            ImmutableMap.Builder filterValues = ImmutableMap.builder();
            this.indexer.getIndexes().forEach(index -> {
                String value = (String)document.get("_idx_" + index.getName(), String.class);
                if (value != null) {
                    filterValues.put(index, (Object)value);
                }
            });
            return MessageStoreEntry.of((String)document.get("channelName", String.class), (ImmutableMap<Index, String>)filterValues.build(), (TextMessage)document.get("message", TextMessage.class));
        });
    }
}

