/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.data.mongodb.core.messaging;

import com.mongodb.MongoNamespace;
import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Collation;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.client.model.changestream.FullDocumentBeforeChange;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.bson.BsonDocument;
import org.bson.BsonTimestamp;
import org.bson.BsonValue;
import org.bson.Document;
import org.springframework.data.mongodb.core.ChangeStreamEvent;
import org.springframework.data.mongodb.core.ChangeStreamOptions;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.aggregation.AggregationOperationContext;
import org.springframework.data.mongodb.core.aggregation.PrefixingDelegatingAggregationOperationContext;
import org.springframework.data.mongodb.core.aggregation.TypeBasedAggregationOperationContext;
import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
import org.springframework.data.mongodb.core.convert.MongoConverter;
import org.springframework.data.mongodb.core.convert.QueryMapper;
import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest;
import org.springframework.data.mongodb.core.messaging.CursorReadingTask;
import org.springframework.data.mongodb.core.messaging.Message;
import org.springframework.data.mongodb.core.messaging.SubscriptionRequest;
import org.springframework.lang.Nullable;
import org.springframework.util.ClassUtils;
import org.springframework.util.ErrorHandler;
import org.springframework.util.StringUtils;

class ChangeStreamTask
extends CursorReadingTask<ChangeStreamDocument<Document>, Object> {
    private final Set<String> denylist = new HashSet<String>(Arrays.asList("operationType", "fullDocument", "documentKey", "updateDescription", "ns"));
    private final QueryMapper queryMapper;
    private final MongoConverter mongoConverter;

    ChangeStreamTask(MongoTemplate template, ChangeStreamRequest<?> request, Class<?> targetType, ErrorHandler errorHandler) {
        super(template, request, targetType, errorHandler);
        this.queryMapper = new QueryMapper(template.getConverter());
        this.mongoConverter = template.getConverter();
    }

    @Override
    protected MongoCursor<ChangeStreamDocument<Document>> initCursor(MongoTemplate template, SubscriptionRequest.RequestOptions options, Class<?> targetType) {
        ChangeStreamIterable iterable;
        MongoDatabase db;
        List<Object> filter = Collections.emptyList();
        BsonDocument resumeToken = new BsonDocument();
        Collation collation = null;
        FullDocument fullDocument = ClassUtils.isAssignable(Document.class, targetType) ? FullDocument.DEFAULT : FullDocument.UPDATE_LOOKUP;
        FullDocumentBeforeChange fullDocumentBeforeChange = FullDocumentBeforeChange.DEFAULT;
        BsonTimestamp startAt = null;
        boolean resumeAfter = true;
        if (options instanceof ChangeStreamRequest.ChangeStreamRequestOptions) {
            Object val;
            ChangeStreamOptions changeStreamOptions = ((ChangeStreamRequest.ChangeStreamRequestOptions)options).getChangeStreamOptions();
            filter = this.prepareFilter(template, changeStreamOptions);
            if (changeStreamOptions.getFilter().isPresent() && (val = changeStreamOptions.getFilter().get()) instanceof Aggregation) {
                collation = ((Aggregation)val).getOptions().getCollation().map(org.springframework.data.mongodb.core.query.Collation::toMongoCollation).orElse(null);
            }
            if (changeStreamOptions.getResumeToken().isPresent()) {
                resumeToken = changeStreamOptions.getResumeToken().get().asDocument();
                resumeAfter = changeStreamOptions.isResumeAfter();
            }
            fullDocument = changeStreamOptions.getFullDocumentLookup().orElseGet(() -> ClassUtils.isAssignable(Document.class, targetType) ? FullDocument.DEFAULT : FullDocument.UPDATE_LOOKUP);
            fullDocumentBeforeChange = changeStreamOptions.getFullDocumentBeforeChangeLookup().orElse(FullDocumentBeforeChange.DEFAULT);
            startAt = changeStreamOptions.getResumeBsonTimestamp().orElse(null);
        }
        MongoDatabase mongoDatabase = db = StringUtils.hasText(options.getDatabaseName()) ? template.getMongoDatabaseFactory().getMongoDatabase(options.getDatabaseName()) : template.getDb();
        if (StringUtils.hasText(options.getCollectionName())) {
            iterable = filter.isEmpty() ? db.getCollection(options.getCollectionName()).watch(Document.class) : db.getCollection(options.getCollectionName()).watch(filter, Document.class);
        } else {
            ChangeStreamIterable changeStreamIterable = iterable = filter.isEmpty() ? db.watch(Document.class) : db.watch(filter, Document.class);
        }
        if (!options.maxAwaitTime().isZero()) {
            iterable = iterable.maxAwaitTime(options.maxAwaitTime().toMillis(), TimeUnit.MILLISECONDS);
        }
        if (!resumeToken.isEmpty()) {
            iterable = resumeAfter ? iterable.resumeAfter(resumeToken) : iterable.startAfter(resumeToken);
        }
        if (startAt != null) {
            iterable = iterable.startAtOperationTime(startAt);
        }
        if (collation != null) {
            iterable = iterable.collation(collation);
        }
        iterable = iterable.fullDocument(fullDocument);
        iterable = iterable.fullDocumentBeforeChange(fullDocumentBeforeChange);
        return iterable.iterator();
    }

    List<Document> prepareFilter(MongoTemplate template, ChangeStreamOptions options) {
        if (!options.getFilter().isPresent()) {
            return Collections.emptyList();
        }
        Object filter = options.getFilter().orElse(null);
        if (filter instanceof Aggregation) {
            Aggregation agg = filter;
            AggregationOperationContext context = agg instanceof TypedAggregation ? new TypeBasedAggregationOperationContext(((TypedAggregation)agg).getInputType(), template.getConverter().getMappingContext(), this.queryMapper) : Aggregation.DEFAULT_CONTEXT;
            return agg.toPipeline(new PrefixingDelegatingAggregationOperationContext(context, "fullDocument", this.denylist));
        }
        if (filter instanceof List) {
            return filter;
        }
        throw new IllegalArgumentException("ChangeStreamRequestOptions.filter mut be either an Aggregation or a plain list of Documents");
    }

    @Override
    protected Message<ChangeStreamDocument<Document>, Object> createMessage(ChangeStreamDocument<Document> source, Class<Object> targetType, SubscriptionRequest.RequestOptions options) {
        MongoNamespace namespace = source.getNamespace() != null ? source.getNamespace() : this.createNamespaceFromOptions(options);
        return new ChangeStreamEventMessage<Object>(new ChangeStreamEvent<Object>(source, targetType, this.mongoConverter), Message.MessageProperties.builder().databaseName(namespace.getDatabaseName()).collectionName(namespace.getCollectionName()).build());
    }

    MongoNamespace createNamespaceFromOptions(SubscriptionRequest.RequestOptions options) {
        String collectionName = StringUtils.hasText(options.getCollectionName()) ? options.getCollectionName() : "unknown";
        String databaseName = StringUtils.hasText(options.getDatabaseName()) ? options.getDatabaseName() : "unknown";
        return new MongoNamespace(databaseName, collectionName);
    }

    static class ChangeStreamEventMessage<T>
    implements Message<ChangeStreamDocument<Document>, T> {
        private final ChangeStreamEvent<T> delegate;
        private final Message.MessageProperties messageProperties;

        ChangeStreamEventMessage(ChangeStreamEvent<T> delegate, Message.MessageProperties messageProperties) {
            this.delegate = delegate;
            this.messageProperties = messageProperties;
        }

        @Override
        @Nullable
        public ChangeStreamDocument<Document> getRaw() {
            return this.delegate.getRaw();
        }

        @Override
        @Nullable
        public T getBody() {
            return this.delegate.getBody();
        }

        @Override
        @Nullable
        public T getBodyBeforeChange() {
            return this.delegate.getBodyBeforeChange();
        }

        @Override
        public Message.MessageProperties getProperties() {
            return this.messageProperties;
        }

        @Nullable
        BsonValue getResumeToken() {
            return this.delegate.getResumeToken();
        }

        @Nullable
        Instant getTimestamp() {
            return this.delegate.getTimestamp();
        }

        ChangeStreamEvent<T> getChangeStreamEvent() {
            return this.delegate;
        }
    }
}

