/*
 * Decompiled with CFR 0.152.
 */
package org.projectnessie.versioned.mongodb;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
import com.mongodb.WriteConcern;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.InsertManyOptions;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.result.UpdateResult;
import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoClients;
import com.mongodb.reactivestreams.client.MongoCollection;
import com.mongodb.reactivestreams.client.MongoDatabase;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.bson.BsonBinary;
import org.bson.BsonReader;
import org.bson.BsonWriter;
import org.bson.Document;
import org.bson.codecs.Codec;
import org.bson.codecs.DecoderContext;
import org.bson.codecs.EncoderContext;
import org.bson.codecs.configuration.CodecProvider;
import org.bson.codecs.configuration.CodecRegistries;
import org.bson.codecs.configuration.CodecRegistry;
import org.projectnessie.versioned.impl.EntityStoreHelper;
import org.projectnessie.versioned.impl.condition.ConditionExpression;
import org.projectnessie.versioned.impl.condition.UpdateExpression;
import org.projectnessie.versioned.mongodb.MongoSerDe;
import org.projectnessie.versioned.mongodb.MongoStoreConfig;
import org.projectnessie.versioned.store.Id;
import org.projectnessie.versioned.store.LoadOp;
import org.projectnessie.versioned.store.LoadStep;
import org.projectnessie.versioned.store.NotFoundException;
import org.projectnessie.versioned.store.SaveOp;
import org.projectnessie.versioned.store.Store;
import org.projectnessie.versioned.store.StoreOperationException;
import org.projectnessie.versioned.store.ValueType;
import org.projectnessie.versioned.tiered.BaseValue;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class MongoDBStore
implements Store {
    @VisibleForTesting
    static final int LOAD_SIZE = 1000;
    private final MongoStoreConfig config;
    private final MongoClientSettings mongoClientSettings;
    private MongoClient mongoClient;
    private MongoDatabase mongoDatabase;
    private final Duration timeout;
    private Map<ValueType<?>, MongoCollection<Document>> collections;
    static final IdCodec ID_CODEC_INSTANCE = new IdCodec();

    public MongoDBStore(MongoStoreConfig config) {
        this.config = config;
        this.timeout = Duration.ofMillis(config.getTimeoutMs());
        this.collections = new HashMap();
        this.mongoClientSettings = MongoClientSettings.builder().applyConnectionString(new ConnectionString(config.getConnectionString())).codecRegistry(CodecRegistries.fromProviders((CodecProvider[])new CodecProvider[]{new CodecProvider(){

            public <T> Codec<T> get(Class<T> clazz, CodecRegistry registry) {
                return clazz == Id.class ? ID_CODEC_INSTANCE : null;
            }
        }, MongoClientSettings.getDefaultCodecRegistry()})).writeConcern(WriteConcern.MAJORITY).build();
    }

    public void start() {
        this.mongoClient = MongoClients.create((MongoClientSettings)this.mongoClientSettings);
        this.mongoDatabase = this.mongoClient.getDatabase(this.config.getDatabaseName());
        this.collections = (Map)ValueType.values().stream().collect(ImmutableMap.toImmutableMap(v -> v, v -> {
            String collectionName = v.getTableName(this.config.getTablePrefix());
            return this.mongoDatabase.getCollection(collectionName);
        }));
        if (this.config.initializeDatabase()) {
            EntityStoreHelper.storeMinimumEntities(this::putIfAbsent);
        }
    }

    public void close() {
        if (null != this.mongoClient) {
            this.mongoClient.close();
        }
    }

    public void load(LoadStep loadstep) throws NotFoundException {
        LoadStep step = loadstep;
        while (step != null) {
            Map idLoadOps = step.getOps().collect(Collectors.toMap(LoadOp::getId, Function.identity()));
            Flux.fromStream((Stream)step.getOps()).groupBy(LoadOp::getValueType).flatMap(entry -> {
                ValueType type = (ValueType)entry.key();
                MongoCollection<Document> collection = this.getCollection(type);
                return entry.map(LoadOp::getId).buffer(1000).map(l -> new CollectionLoadIds((ValueType<?>)type, collection, (List<Id>)l));
            }).flatMap(entry -> entry.collection.find(Filters.in((String)"id", entry.ids))).handle((op, sink) -> {
                Id id = MongoSerDe.deserializeId(op, "id");
                LoadOp loadOp = (LoadOp)idLoadOps.remove(id);
                MongoSerDe.produceToConsumer(op, loadOp.getValueType(), loadOp.getReceiver());
                loadOp.done();
            }).blockLast(this.timeout);
            Collection missedIds = idLoadOps.values().stream().map(e -> e.getId().toString()).collect(Collectors.toList());
            if (!missedIds.isEmpty()) {
                throw new NotFoundException(String.format("Requested object IDs missing: %s", String.join((CharSequence)", ", missedIds)));
            }
            step = step.getNext().orElse(null);
        }
    }

    public <C extends BaseValue<C>> boolean putIfAbsent(SaveOp<C> saveOp) {
        MongoCollection<Document> collection = this.getCollection(saveOp.getType());
        UpdateResult result = (UpdateResult)Mono.from((Publisher)collection.updateOne(Filters.eq((String)"id", (Object)saveOp.getId()), MongoSerDe.bsonForValueType(saveOp, "$setOnInsert"), new UpdateOptions().upsert(true))).block(this.timeout);
        return result != null && result.getUpsertedId() != null;
    }

    public <C extends BaseValue<C>> void put(SaveOp<C> saveOp, Optional<ConditionExpression> conditionUnAliased) {
        if (conditionUnAliased.isPresent()) {
            throw new UnsupportedOperationException("ConditionExpressions are not supported with MongoDB yet.");
        }
        MongoCollection<Document> collection = this.getCollection(saveOp.getType());
        UpdateResult result = (UpdateResult)Mono.from((Publisher)collection.updateOne(Filters.eq((String)"id", (Object)saveOp.getId()), MongoSerDe.bsonForValueType(saveOp, "$set"), new UpdateOptions().upsert(true))).block(this.timeout);
        if (result == null || result.getModifiedCount() != 0L && result.getUpsertedId() == null) {
            throw new StoreOperationException(String.format("Update of %s %s did not succeed", saveOp.getType().name(), saveOp.getId()));
        }
    }

    public <C extends BaseValue<C>> boolean delete(ValueType<C> type, Id id, Optional<ConditionExpression> condition) {
        throw new UnsupportedOperationException();
    }

    public void save(List<SaveOp<?>> ops) {
        Map<ValueType, List<SaveOp>> perType = ops.stream().collect(Collectors.groupingBy(SaveOp::getType));
        Flux.fromIterable(perType.entrySet()).flatMap(entry -> this.writeCollection((ValueType)entry.getKey()).insertMany((List)entry.getValue(), new InsertManyOptions().ordered(false))).blockLast(this.timeout);
    }

    public <C extends BaseValue<C>> void loadSingle(ValueType<C> valueType, Id id, C consumer) {
        MongoCollection<Document> collection = this.getCollection(valueType);
        Document found = (Document)Mono.from((Publisher)collection.find(Filters.eq((String)"id", (Object)id))).block(this.timeout);
        if (null == found) {
            throw new NotFoundException(String.format("Unable to load item with ID: %s", id));
        }
        MongoSerDe.produceToConsumer(found, valueType, consumer);
    }

    public <C extends BaseValue<C>> boolean update(ValueType<C> type, Id id, UpdateExpression update, Optional<ConditionExpression> condition, Optional<BaseValue<C>> consumer) throws NotFoundException {
        throw new UnsupportedOperationException();
    }

    public <C extends BaseValue<C>> Stream<Store.Acceptor<C>> getValues(ValueType<C> type) {
        return Flux.from((Publisher)this.getCollection(type).find()).toStream().map(d -> producer -> MongoSerDe.produceToConsumer(d, type, producer));
    }

    @VisibleForTesting
    void resetCollections() {
        Flux.fromIterable(this.collections.values()).flatMap(collection -> collection.deleteMany(Filters.ne((String)"_id", (Object)"s"))).blockLast(this.timeout);
    }

    private MongoCollection<Document> writeCollection(ValueType<?> type) {
        Codec<SaveOp> codec = new Codec<SaveOp>(){

            public SaveOp decode(BsonReader bsonReader, DecoderContext decoderContext) {
                throw new UnsupportedOperationException();
            }

            public void encode(BsonWriter bsonWriter, SaveOp o, EncoderContext encoderContext) {
                MongoSerDe.serializeEntity(bsonWriter, o);
            }

            public Class<SaveOp> getEncoderClass() {
                return SaveOp.class;
            }
        };
        return this.getCollection(type, codec);
    }

    private MongoCollection<Document> getCollection(ValueType<?> valueType, final Codec<?> codec) {
        return this.getCollection(valueType).withCodecRegistry(new CodecRegistry(){

            public <T> Codec<T> get(Class<T> clazz, CodecRegistry codecRegistry) {
                return this.get(clazz);
            }

            public <T> Codec<T> get(Class<T> clazz) {
                return clazz == Id.class ? ID_CODEC_INSTANCE : codec;
            }
        });
    }

    private MongoCollection<Document> getCollection(ValueType<?> valueType) {
        return (MongoCollection)Preconditions.checkNotNull(this.collections.get(valueType), (String)"Unsupported Entity type: %s", (Object)valueType.name());
    }

    private static class IdCodec
    implements Codec<Id> {
        private IdCodec() {
        }

        public Id decode(BsonReader bsonReader, DecoderContext decoderContext) {
            return Id.of((byte[])bsonReader.readBinaryData().getData());
        }

        public void encode(BsonWriter bsonWriter, Id id, EncoderContext encoderContext) {
            bsonWriter.writeBinaryData(new BsonBinary(id.toBytes()));
        }

        public Class<Id> getEncoderClass() {
            return Id.class;
        }
    }

    private static class CollectionLoadIds {
        final ValueType<?> type;
        final MongoCollection<Document> collection;
        final List<Id> ids;

        CollectionLoadIds(ValueType<?> type, MongoCollection<Document> collection, List<Id> ops) {
            this.type = type;
            this.collection = collection;
            this.ids = ops;
        }
    }
}

