package org.springframework.data.aerospike.core;

import com.aerospike.client.AerospikeException;
import com.aerospike.client.BatchRecord;
import com.aerospike.client.Bin;
import com.aerospike.client.Key;
import com.aerospike.client.Operation;
import com.aerospike.client.Record;
import com.aerospike.client.Value;
import com.aerospike.client.cdt.CTX;
import com.aerospike.client.cluster.Node;
import com.aerospike.client.policy.BatchDeletePolicy;
import com.aerospike.client.policy.BatchPolicy;
import com.aerospike.client.policy.InfoPolicy;
import com.aerospike.client.policy.Policy;
import com.aerospike.client.policy.RecordExistsAction;
import com.aerospike.client.policy.WritePolicy;
import com.aerospike.client.query.IndexCollectionType;
import com.aerospike.client.query.IndexType;
import com.aerospike.client.query.KeyRecord;
import com.aerospike.client.reactor.IAerospikeReactorClient;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import lombok.Generated;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.aerospike.convert.AerospikeWriteData;
import org.springframework.data.aerospike.convert.MappingAerospikeConverter;
import org.springframework.data.aerospike.core.BaseAerospikeTemplate;
import org.springframework.data.aerospike.core.model.GroupedEntities;
import org.springframework.data.aerospike.core.model.GroupedKeys;
import org.springframework.data.aerospike.index.IndexesCacheRefresher;
import org.springframework.data.aerospike.mapping.AerospikeMappingContext;
import org.springframework.data.aerospike.mapping.AerospikePersistentEntity;
import org.springframework.data.aerospike.query.QualifierUtils;
import org.springframework.data.aerospike.query.ReactorQueryEngine;
import org.springframework.data.aerospike.query.cache.ReactorIndexRefresher;
import org.springframework.data.aerospike.query.qualifier.Qualifier;
import org.springframework.data.aerospike.repository.query.Query;
import org.springframework.data.aerospike.server.version.ServerVersionSupport;
import org.springframework.data.aerospike.util.InfoCommandUtils;
import org.springframework.data.aerospike.util.Utils;
import org.springframework.data.domain.Sort;
import org.springframework.data.mapping.context.MappingContext;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/springframework/data/aerospike/core/ReactiveAerospikeTemplate.class */
public class ReactiveAerospikeTemplate extends BaseAerospikeTemplate implements ReactiveAerospikeOperations, IndexesCacheRefresher {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(ReactiveAerospikeTemplate.class);
    private static final Pattern INDEX_EXISTS_REGEX_PATTERN = Pattern.compile("^FAIL:(-?\\d+).*$");
    private final IAerospikeReactorClient reactorClient;
    private final ReactorQueryEngine reactorQueryEngine;
    private final ReactorIndexRefresher reactorIndexRefresher;

    public ReactiveAerospikeTemplate(IAerospikeReactorClient iAerospikeReactorClient, String str, MappingAerospikeConverter mappingAerospikeConverter, AerospikeMappingContext aerospikeMappingContext, AerospikeExceptionTranslator aerospikeExceptionTranslator, ReactorQueryEngine reactorQueryEngine, ReactorIndexRefresher reactorIndexRefresher, ServerVersionSupport serverVersionSupport) {
        super(str, mappingAerospikeConverter, aerospikeMappingContext, aerospikeExceptionTranslator, iAerospikeReactorClient.getAerospikeClient().copyWritePolicyDefault(), serverVersionSupport);
        Assert.notNull(iAerospikeReactorClient, "Aerospike reactor client must not be null!");
        this.reactorClient = iAerospikeReactorClient;
        this.reactorQueryEngine = reactorQueryEngine;
        this.reactorIndexRefresher = reactorIndexRefresher;
    }

    @Override // org.springframework.data.aerospike.index.IndexesCacheRefresher
    public void refreshIndexesCache() {
        this.reactorIndexRefresher.refreshIndexes();
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<T> save(T t) {
        Assert.notNull(t, "Document for saving must not be null!");
        return save(t, getSetName(t));
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<T> save(T t, String str) {
        Assert.notNull(t, "Document for saving must not be null!");
        AerospikeWriteData writeData = writeData(t, str);
        return ((AerospikePersistentEntity) this.mappingContext.getRequiredPersistentEntity(t.getClass())).hasVersionProperty() ? doPersistWithVersionAndHandleCasError(t, writeData, expectGenerationCasAwarePolicy(writeData), CoreUtils.operations(writeData.getBinsAsArray(), (Function<Bin, Operation>) Operation::put, Operation.array(new Operation[]{Operation.delete()})), BaseAerospikeTemplate.OperationType.SAVE_OPERATION) : doPersistAndHandleError(t, writeData, ignoreGenerationPolicy(writeData, RecordExistsAction.UPDATE), CoreUtils.operations(writeData.getBinsAsArray(), (Function<Bin, Operation>) Operation::put, Operation.array(new Operation[]{Operation.delete()})));
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Flux<T> saveAll(Iterable<T> iterable) {
        validateForBatchWrite(iterable, "Documents for saving");
        return saveAll(iterable, getSetName(iterable.iterator().next()));
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Flux<T> saveAll(Iterable<T> iterable, String str) {
        Assert.notNull(str, "Set name must not be null!");
        validateForBatchWrite(iterable, "Documents for saving");
        return applyBufferedBatchWrite(iterable, str, BaseAerospikeTemplate.OperationType.SAVE_OPERATION);
    }

    private <T> Flux<T> applyBufferedBatchWrite(Iterable<? extends T> iterable, String str, BaseAerospikeTemplate.OperationType operationType) {
        int batchWriteSize = this.converter.getAerospikeDataSettings().getBatchWriteSize();
        ArrayList arrayList = new ArrayList();
        Flux empty = Flux.empty();
        for (T t : iterable) {
            if (batchWriteSizeMatch(batchWriteSize, arrayList.size())) {
                empty = Flux.concat(new Publisher[]{empty, batchWriteAllDocuments(arrayList, str, operationType)});
                arrayList.clear();
            }
            arrayList.add(t);
        }
        if (!arrayList.isEmpty()) {
            empty = Flux.concat(new Publisher[]{empty, batchWriteAllDocuments(arrayList, str, operationType)});
        }
        return empty;
    }

    private <T> Flux<T> batchWriteAllDocuments(List<T> list, String str, BaseAerospikeTemplate.OperationType operationType) {
        ArrayList arrayList = new ArrayList();
        switch (operationType) {
            case SAVE_OPERATION:
                list.forEach(obj -> {
                    arrayList.add(getBatchWriteForSave(obj, str));
                });
                break;
            case INSERT_OPERATION:
                list.forEach(obj2 -> {
                    arrayList.add(getBatchWriteForInsert(obj2, str));
                });
                break;
            case UPDATE_OPERATION:
                list.forEach(obj3 -> {
                    arrayList.add(getBatchWriteForUpdate(obj3, str));
                });
                break;
            case DELETE_OPERATION:
                list.forEach(obj4 -> {
                    arrayList.add(getBatchWriteForDelete(obj4, str));
                });
                break;
            default:
                throw new IllegalArgumentException("Unexpected operation name: " + operationType);
        }
        List<T> list2 = arrayList.stream().map((v0) -> {
            return v0.batchRecord();
        }).toList();
        return TemplateUtils.enrichPolicyWithTransaction(this.reactorClient, (Policy) this.reactorClient.getAerospikeClient().copyBatchPolicyDefault()).flatMapMany(policy -> {
            return batchWriteAndCheckForErrors((BatchPolicy) policy, list2, arrayList, operationType);
        });
    }

    private <T> Flux<T> batchWriteAndCheckForErrors(BatchPolicy batchPolicy, List<BatchRecord> list, List<BaseAerospikeTemplate.BatchWriteData<T>> list2, BaseAerospikeTemplate.OperationType operationType) {
        return this.reactorClient.operate(batchPolicy, list).onErrorMap(this::translateError).flatMap(bool -> {
            return checkForErrorsAndUpdateVersion(list2, list, operationType);
        }).flux().flatMapIterable(list3 -> {
            return list3.stream().map((v0) -> {
                return v0.document();
            }).toList();
        });
    }

    private <T> Mono<List<BaseAerospikeTemplate.BatchWriteData<T>>> checkForErrorsAndUpdateVersion(List<BaseAerospikeTemplate.BatchWriteData<T>> list, List<BatchRecord> list2, BaseAerospikeTemplate.OperationType operationType) {
        boolean z = false;
        String str = null;
        for (BaseAerospikeTemplate.BatchWriteData<T> batchWriteData : list) {
            if (!z && batchRecordFailed(batchWriteData.batchRecord())) {
                z = true;
            }
            if (batchWriteData.hasVersionProperty()) {
                if (batchRecordFailed(batchWriteData.batchRecord())) {
                    if (hasOptimisticLockingError(batchWriteData.batchRecord().resultCode)) {
                        str = batchWriteData.batchRecord().key.userKey.toString();
                    }
                } else if (operationType != BaseAerospikeTemplate.OperationType.DELETE_OPERATION) {
                    updateVersion(batchWriteData.document(), batchWriteData.batchRecord().record);
                }
            }
        }
        return z ? str != null ? Mono.error(getOptimisticLockingFailureException("Failed to %s the record with ID '%s' due to versions mismatch".formatted(operationType, str), null)) : Mono.error(new AerospikeException.BatchRecordArray((BatchRecord[]) list2.toArray(i -> {
            return new BatchRecord[i];
        }), new AerospikeException("Errors during batch " + operationType))) : Mono.just(list);
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<T> insert(T t) {
        return insert(t, getSetName(t));
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<T> insert(T t, String str) {
        Assert.notNull(t, "Document must not be null!");
        Assert.notNull(str, "Set name must not be null!");
        AerospikeWriteData writeData = writeData(t, str);
        WritePolicy ignoreGenerationPolicy = ignoreGenerationPolicy(writeData, RecordExistsAction.CREATE_ONLY);
        return ((AerospikePersistentEntity) this.mappingContext.getRequiredPersistentEntity(t.getClass())).hasVersionProperty() ? doPersistWithVersionAndHandleError(t, writeData, ignoreGenerationPolicy, CoreUtils.operations(writeData.getBinsAsArray(), Operation::put, null, Operation.array(new Operation[]{Operation.getHeader()}))) : doPersistAndHandleError(t, writeData, ignoreGenerationPolicy, CoreUtils.operations(writeData.getBinsAsArray(), Operation::put));
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Flux<T> insertAll(Iterable<? extends T> iterable) {
        validateForBatchWrite(iterable, "Documents for insert");
        return insertAll(iterable, getSetName(iterable.iterator().next()));
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Flux<T> insertAll(Iterable<? extends T> iterable, String str) {
        Assert.notNull(str, "Set name must not be null!");
        validateForBatchWrite(iterable, "Documents for insert");
        return applyBufferedBatchWrite(iterable, str, BaseAerospikeTemplate.OperationType.INSERT_OPERATION);
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<T> persist(T t, WritePolicy writePolicy) {
        Assert.notNull(t, "Document must not be null!");
        Assert.notNull(writePolicy, "Policy must not be null!");
        return persist(t, writePolicy, getSetName(t));
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<T> persist(T t, WritePolicy writePolicy, String str) {
        Assert.notNull(t, "Document must not be null!");
        Assert.notNull(writePolicy, "Policy must not be null!");
        Assert.notNull(str, "Set name must not be null!");
        AerospikeWriteData writeData = writeData(t, str);
        Operation[] operations = CoreUtils.operations(writeData.getBinsAsArray(), Operation::put);
        return TemplateUtils.enrichPolicyWithTransaction(this.reactorClient, (Policy) new WritePolicy(writePolicy)).flatMap(policy -> {
            return doPersistAndHandleError(t, writeData, (WritePolicy) policy, operations);
        });
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<T> update(T t) {
        return update((ReactiveAerospikeTemplate) t, getSetName(t));
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<T> update(T t, String str) {
        Assert.notNull(t, "Document must not be null!");
        Assert.notNull(str, "Set name must not be null!");
        AerospikeWriteData writeData = writeData(t, str);
        return ((AerospikePersistentEntity) this.mappingContext.getRequiredPersistentEntity(t.getClass())).hasVersionProperty() ? doPersistWithVersionAndHandleCasError(t, writeData, expectGenerationPolicy(writeData, RecordExistsAction.UPDATE_ONLY), CoreUtils.operations(writeData.getBinsAsArray(), Operation::put, Operation.array(new Operation[]{Operation.delete()}), Operation.array(new Operation[]{Operation.getHeader()})), BaseAerospikeTemplate.OperationType.UPDATE_OPERATION) : doPersistAndHandleError(t, writeData, ignoreGenerationPolicy(writeData, RecordExistsAction.UPDATE_ONLY), CoreUtils.operations(writeData.getBinsAsArray(), (Function<Bin, Operation>) Operation::put, Operation.array(new Operation[]{Operation.delete()})));
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<T> update(T t, Collection<String> collection) {
        return update(t, getSetName(t), collection);
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<T> update(T t, String str, Collection<String> collection) {
        Assert.notNull(t, "Document must not be null!");
        Assert.notNull(str, "Set name must not be null!");
        Assert.notNull(collection, "Fields must not be null!");
        AerospikeWriteData writeDataWithSpecificFields = writeDataWithSpecificFields(t, str, collection);
        return ((AerospikePersistentEntity) this.mappingContext.getRequiredPersistentEntity(t.getClass())).hasVersionProperty() ? doPersistWithVersionAndHandleCasError(t, writeDataWithSpecificFields, expectGenerationPolicy(writeDataWithSpecificFields, RecordExistsAction.UPDATE_ONLY), CoreUtils.operations(writeDataWithSpecificFields.getBinsAsArray(), Operation::put, null, Operation.array(new Operation[]{Operation.getHeader()})), BaseAerospikeTemplate.OperationType.UPDATE_OPERATION) : doPersistAndHandleError(t, writeDataWithSpecificFields, ignoreGenerationPolicy(writeDataWithSpecificFields, RecordExistsAction.UPDATE_ONLY), CoreUtils.operations(writeDataWithSpecificFields.getBinsAsArray(), Operation::put));
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Flux<T> updateAll(Iterable<? extends T> iterable) {
        validateForBatchWrite(iterable, "Documents for update");
        return updateAll(iterable, getSetName(iterable.iterator().next()));
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Flux<T> updateAll(Iterable<? extends T> iterable, String str) {
        Assert.notNull(str, "Set name must not be null!");
        validateForBatchWrite(iterable, "Documents for update");
        return applyBufferedBatchWrite(iterable, str, BaseAerospikeTemplate.OperationType.UPDATE_OPERATION);
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<Boolean> delete(T t) {
        return delete((ReactiveAerospikeTemplate) t, getSetName(t));
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<Boolean> delete(T t, String str) {
        Assert.notNull(t, "Document must not be null!");
        Assert.notNull(t, "Set name must not be null!");
        AerospikeWriteData writeData = writeData(t, str);
        return ((AerospikePersistentEntity) this.mappingContext.getRequiredPersistentEntity(t.getClass())).hasVersionProperty() ? TemplateUtils.enrichPolicyWithTransaction(this.reactorClient, (Policy) expectGenerationPolicy(writeData)).flatMap(policy -> {
            return this.reactorClient.delete((WritePolicy) policy, writeData.getKey());
        }).hasElement().onErrorMap(th -> {
            return translateCasThrowable(th, BaseAerospikeTemplate.OperationType.DELETE_OPERATION.toString());
        }) : TemplateUtils.enrichPolicyWithTransaction(this.reactorClient, (Policy) ignoreGenerationPolicy()).flatMap(policy2 -> {
            return this.reactorClient.delete((WritePolicy) policy2, writeData.getKey());
        }).hasElement().onErrorMap(this::translateError);
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<Void> delete(Query query, Class<T> cls, String str) {
        Assert.notNull(query, "Query must not be null!");
        Assert.notNull(cls, "Entity class must not be null!");
        Assert.notNull(str, "Set name must not be null!");
        return find(query, cls, str).filter(Objects::nonNull).collect(Collectors.toUnmodifiableList()).flatMap(list -> {
            return !list.isEmpty() ? deleteAll(list) : Mono.empty();
        });
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<Void> delete(Query query, Class<T> cls) {
        Assert.notNull(query, "Query passed in to exist can't be null");
        Assert.notNull(cls, "Class must not be null!");
        return delete(query, cls, getSetName((Class) cls));
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<Void> deleteByIdsUsingQuery(Collection<?> collection, Class<T> cls, @Nullable Query query) {
        return deleteByIdsUsingQuery(collection, cls, getSetName((Class) cls), query);
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<Void> deleteByIdsUsingQuery(Collection<?> collection, Class<T> cls, String str, @Nullable Query query) {
        return findByIdsUsingQuery(collection, cls, cls, str, query).filter(Objects::nonNull).collect(Collectors.toUnmodifiableList()).flatMap(list -> {
            return !list.isEmpty() ? deleteAll(list) : Mono.empty();
        });
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<Boolean> deleteById(Object obj, Class<T> cls) {
        Assert.notNull(obj, "Id must not be null!");
        Assert.notNull(cls, "Class must not be null!");
        return deleteById(obj, getSetName((Class) cls));
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public Mono<Boolean> deleteById(Object obj, String str) {
        Assert.notNull(obj, "Id must not be null!");
        Assert.notNull(str, "Set name must not be null!");
        return TemplateUtils.enrichPolicyWithTransaction(this.reactorClient, (Policy) ignoreGenerationPolicy()).flatMap(policy -> {
            return this.reactorClient.delete((WritePolicy) policy, getKey(obj, str));
        }).map(key -> {
            return true;
        }).onErrorMap(this::translateError);
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<Void> deleteAll(Iterable<T> iterable) {
        validateForBatchWrite(iterable, "Documents for deleting");
        return deleteAll(iterable, getSetName(iterable.iterator().next()));
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<Void> deleteAll(Iterable<T> iterable, String str) {
        Assert.notNull(str, "Set name must not be null!");
        validateForBatchWrite(iterable, "Documents for deleting");
        return applyBufferedBatchWrite(iterable, str, BaseAerospikeTemplate.OperationType.DELETE_OPERATION).then();
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<Void> deleteByIds(Iterable<?> iterable, Class<T> cls) {
        Assert.notNull(cls, "Class must not be null!");
        validateForBatchWrite(iterable, "IDs");
        return deleteByIds(iterable, getSetName((Class) cls));
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public Mono<Void> deleteByIds(Iterable<?> iterable, String str) {
        Assert.notNull(str, "Set name must not be null!");
        validateForBatchWrite(iterable, "IDs");
        int batchWriteSize = this.converter.getAerospikeDataSettings().getBatchWriteSize();
        ArrayList arrayList = new ArrayList();
        Flux empty = Flux.empty();
        for (Object obj : iterable) {
            if (batchWriteSizeMatch(batchWriteSize, arrayList.size())) {
                empty = deleteByIds((Collection<?>) arrayList, str).concatWith(empty);
                arrayList.clear();
            }
            arrayList.add(obj);
        }
        if (!arrayList.isEmpty()) {
            empty = deleteByIds((Collection<?>) arrayList, str).concatWith(empty);
        }
        return empty.then();
    }

    private Mono<Void> deleteByIds(Collection<?> collection, String str) {
        Assert.notNull(str, "Set name must not be null!");
        validateForBatchWrite(collection, "IDs");
        return batchDeleteAndCheckForErrors(this.reactorClient, (Key[]) collection.stream().map(obj -> {
            return getKey(obj, str);
        }).toArray(i -> {
            return new Key[i];
        }));
    }

    private Mono<Void> batchDeleteAndCheckForErrors(IAerospikeReactorClient iAerospikeReactorClient, Key[] keyArr) {
        return TemplateUtils.enrichPolicyWithTransaction(iAerospikeReactorClient, (Policy) iAerospikeReactorClient.getAerospikeClient().copyBatchPolicyDefault()).flatMap(policy -> {
            return iAerospikeReactorClient.delete((BatchPolicy) policy, (BatchDeletePolicy) null, keyArr);
        }).onErrorMap(this::translateError).flatMap(batchResults -> {
            if (batchResults.records == null) {
                return Mono.error(new AerospikeException.BatchRecordArray(batchResults.records, new AerospikeException("Errors during batch delete")));
            }
            for (BatchRecord batchRecord : batchResults.records) {
                if (batchRecordFailed(batchRecord)) {
                    return Mono.error(new AerospikeException.BatchRecordArray(batchResults.records, new AerospikeException("Errors during batch delete")));
                }
            }
            return Mono.empty();
        });
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public Mono<Void> deleteByIds(GroupedKeys groupedKeys) {
        validateGroupedKeys(groupedKeys);
        return groupedKeys.getEntitiesKeys().isEmpty() ? Mono.empty() : deleteEntitiesByGroupedKeys(groupedKeys);
    }

    private Mono<Void> deleteEntitiesByGroupedKeys(GroupedKeys groupedKeys) {
        EntitiesKeys of = EntitiesKeys.of(toEntitiesKeyMap(groupedKeys));
        TemplateUtils.enrichPolicyWithTransaction(this.reactorClient, (Policy) this.reactorClient.getAerospikeClient().copyBatchPolicyDefault()).flatMap(policy -> {
            return this.reactorClient.delete((BatchPolicy) policy, (BatchDeletePolicy) null, of.getKeys());
        }).doOnError(this::translateError);
        return batchDeleteAndCheckForErrors(this.reactorClient, of.getKeys());
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<Void> deleteAll(Class<T> cls) {
        Assert.notNull(cls, "Class must not be null!");
        return deleteAll(getSetName((Class) cls), (Instant) null);
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<Void> deleteAll(Class<T> cls, Instant instant) {
        Assert.notNull(cls, "Class must not be null!");
        return deleteAll(getSetName((Class) cls), instant);
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public Mono<Void> deleteAll(String str) {
        Assert.notNull(str, "Set name must not be null!");
        return deleteAll(str, (Instant) null);
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public Mono<Void> deleteAll(String str, Instant instant) {
        Assert.notNull(str, "Set name must not be null!");
        Calendar convertToCalendar = convertToCalendar(instant);
        try {
            return Mono.fromRunnable(() -> {
                this.reactorClient.getAerospikeClient().truncate((InfoPolicy) null, this.namespace, str, convertToCalendar);
            });
        } catch (AerospikeException e) {
            throw translateError(e);
        }
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<T> add(T t, Map<String, Long> map) {
        return add((ReactiveAerospikeTemplate) t, getSetName(t), map);
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<T> add(T t, String str, Map<String, Long> map) {
        Assert.notNull(t, "Document must not be null!");
        Assert.notNull(str, "Set name must not be null!");
        Assert.notNull(map, "Values must not be null!");
        AerospikeWriteData writeData = writeData(t, str);
        Operation[] operationArr = new Operation[map.size() + 1];
        int i = 0;
        for (Map.Entry<String, Long> entry : map.entrySet()) {
            operationArr[i] = new Operation(Operation.Type.ADD, entry.getKey(), Value.get(entry.getValue()));
            i++;
        }
        operationArr[i] = Operation.get();
        return TemplateUtils.enrichPolicyWithTransaction(this.reactorClient, (Policy) WritePolicyBuilder.builder(this.writePolicyDefault).expiration(writeData.getExpiration()).build()).flatMap(policy -> {
            return executeOperationsOnValue(t, writeData, (WritePolicy) policy, operationArr);
        });
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<T> add(T t, String str, long j) {
        return add(t, getSetName(t), str, j);
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<T> add(T t, String str, String str2, long j) {
        Assert.notNull(t, "Document must not be null!");
        Assert.notNull(str, "Set name must not be null!");
        Assert.notNull(str2, "Bin name must not be null!");
        AerospikeWriteData writeData = writeData(t, str);
        WritePolicy build = WritePolicyBuilder.builder(this.writePolicyDefault).expiration(writeData.getExpiration()).build();
        Operation[] operationArr = {Operation.add(new Bin(str2, j)), Operation.get(str2)};
        return TemplateUtils.enrichPolicyWithTransaction(this.reactorClient, (Policy) build).flatMap(policy -> {
            return executeOperationsOnValue(t, writeData, (WritePolicy) policy, operationArr);
        });
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<T> append(T t, Map<String, String> map) {
        return append((ReactiveAerospikeTemplate) t, getSetName(t), map);
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<T> append(T t, String str, Map<String, String> map) {
        Assert.notNull(t, "Document must not be null!");
        Assert.notNull(str, "Set name must not be null!");
        Assert.notNull(map, "Values must not be null!");
        AerospikeWriteData writeData = writeData(t, str);
        Operation[] operations = CoreUtils.operations(map, Operation.Type.APPEND, Operation.get());
        return TemplateUtils.enrichPolicyWithTransaction(this.reactorClient, (Policy) this.reactorClient.getAerospikeClient().copyWritePolicyDefault()).flatMap(policy -> {
            return executeOperationsOnValue(t, writeData, (WritePolicy) policy, operations);
        });
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<T> append(T t, String str, String str2) {
        return append(t, getSetName(t), str, str2);
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<T> append(T t, String str, String str2, String str3) {
        Assert.notNull(t, "Document must not be null!");
        Assert.notNull(str, "Set name must not be null!");
        AerospikeWriteData writeData = writeData(t, str);
        Operation[] operationArr = {Operation.append(new Bin(str2, str3)), Operation.get(str2)};
        return TemplateUtils.enrichPolicyWithTransaction(this.reactorClient, (Policy) this.reactorClient.getAerospikeClient().copyWritePolicyDefault()).flatMap(policy -> {
            return executeOperationsOnValue(t, writeData, (WritePolicy) policy, operationArr);
        });
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<T> prepend(T t, Map<String, String> map) {
        return prepend((ReactiveAerospikeTemplate) t, getSetName(t), map);
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<T> prepend(T t, String str, Map<String, String> map) {
        Assert.notNull(t, "Document must not be null!");
        Assert.notNull(str, "Set name must not be null!");
        Assert.notNull(map, "Values must not be null!");
        AerospikeWriteData writeData = writeData(t, str);
        Operation[] operations = CoreUtils.operations(map, Operation.Type.PREPEND, Operation.get());
        return TemplateUtils.enrichPolicyWithTransaction(this.reactorClient, (Policy) this.reactorClient.getAerospikeClient().copyWritePolicyDefault()).flatMap(policy -> {
            return executeOperationsOnValue(t, writeData, (WritePolicy) policy, operations);
        });
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<T> prepend(T t, String str, String str2) {
        return prepend(t, getSetName(t), str, str2);
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<T> prepend(T t, String str, String str2, String str3) {
        Assert.notNull(t, "Document must not be null!");
        Assert.notNull(str, "Set name must not be null!");
        AerospikeWriteData writeData = writeData(t, str);
        Operation[] operationArr = {Operation.prepend(new Bin(str2, str3)), Operation.get(str2)};
        return TemplateUtils.enrichPolicyWithTransaction(this.reactorClient, (Policy) this.reactorClient.getAerospikeClient().copyWritePolicyDefault()).flatMap(policy -> {
            return executeOperationsOnValue(t, writeData, (WritePolicy) policy, operationArr);
        });
    }

    private <T> Mono<T> executeOperationsOnValue(T t, AerospikeWriteData aerospikeWriteData, WritePolicy writePolicy, Operation[] operationArr) {
        return this.reactorClient.operate(writePolicy, aerospikeWriteData.getKey(), operationArr).filter(keyRecord -> {
            return Objects.nonNull(keyRecord.record);
        }).map(keyRecord2 -> {
            return mapToEntity(keyRecord2.key, getEntityClass(t), keyRecord2.record);
        }).onErrorMap(this::translateError);
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<T> execute(Supplier<T> supplier) {
        Assert.notNull(supplier, "Supplier must not be null!");
        return Mono.fromSupplier(supplier).onErrorMap(this::translateError);
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<T> findById(Object obj, Class<T> cls) {
        Assert.notNull(cls, "Class must not be null!");
        return findById(obj, cls, getSetName((Class) cls));
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<T> findById(Object obj, Class<T> cls, String str) {
        AerospikePersistentEntity aerospikePersistentEntity = (AerospikePersistentEntity) this.mappingContext.getRequiredPersistentEntity(cls);
        Key key = getKey(obj, str);
        if (!aerospikePersistentEntity.isTouchOnRead()) {
            return TemplateUtils.enrichPolicyWithTransaction(this.reactorClient, this.reactorClient.getAerospikeClient().copyReadPolicyDefault()).flatMap(policy -> {
                return this.reactorClient.get(policy, key);
            }).filter(keyRecord -> {
                return Objects.nonNull(keyRecord.record);
            }).map(keyRecord2 -> {
                return mapToEntity(keyRecord2.key, cls, keyRecord2.record);
            }).onErrorMap(this::translateError);
        }
        Assert.state(!aerospikePersistentEntity.hasExpirationProperty(), "Touch on read is not supported for entity without expiration property");
        return getAndTouch(key, aerospikePersistentEntity.getExpiration(), null, null).filter(keyRecord3 -> {
            return Objects.nonNull(keyRecord3.record);
        }).map(keyRecord4 -> {
            return mapToEntity(keyRecord4.key, cls, keyRecord4.record);
        }).onErrorResume(th -> {
            return (th instanceof AerospikeException) && ((AerospikeException) th).getResultCode() == 2;
        }, th2 -> {
            return Mono.empty();
        }).onErrorMap(this::translateError);
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T, S> Mono<S> findById(Object obj, Class<T> cls, Class<S> cls2) {
        return findById(obj, cls, cls2, getSetName((Class) cls));
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T, S> Mono<S> findById(Object obj, Class<T> cls, Class<S> cls2, String str) {
        AerospikePersistentEntity aerospikePersistentEntity = (AerospikePersistentEntity) this.mappingContext.getRequiredPersistentEntity(cls);
        Key key = getKey(obj, str);
        String[] binNamesFromTargetClass = getBinNamesFromTargetClass(cls2);
        if (!aerospikePersistentEntity.isTouchOnRead()) {
            return TemplateUtils.enrichPolicyWithTransaction(this.reactorClient, this.reactorClient.getAerospikeClient().copyReadPolicyDefault()).flatMap(policy -> {
                return this.reactorClient.get(policy, key, binNamesFromTargetClass);
            }).filter(keyRecord -> {
                return Objects.nonNull(keyRecord.record);
            }).map(keyRecord2 -> {
                return mapToEntity(keyRecord2.key, cls2, keyRecord2.record);
            }).onErrorMap(this::translateError);
        }
        Assert.state(!aerospikePersistentEntity.hasExpirationProperty(), "Touch on read is not supported for entity without expiration property");
        return getAndTouch(key, aerospikePersistentEntity.getExpiration(), binNamesFromTargetClass, null).filter(keyRecord3 -> {
            return Objects.nonNull(keyRecord3.record);
        }).map(keyRecord4 -> {
            return mapToEntity(keyRecord4.key, cls2, keyRecord4.record);
        }).onErrorResume(th -> {
            return (th instanceof AerospikeException) && ((AerospikeException) th).getResultCode() == 2;
        }, th2 -> {
            return Mono.empty();
        }).onErrorMap(this::translateError);
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Flux<T> findByIds(Iterable<?> iterable, Class<T> cls) {
        Assert.notNull(cls, "Class must not be null!");
        return findByIds(iterable, cls, getSetName((Class) cls));
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T, S> Flux<S> findByIds(Iterable<?> iterable, Class<T> cls, Class<S> cls2) {
        Assert.notNull(cls, "Class must not be null!");
        return findByIds(iterable, cls2, getSetName((Class) cls));
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Flux<T> findByIds(Iterable<?> iterable, Class<T> cls, String str) {
        Assert.notNull(iterable, "List of ids must not be null!");
        Assert.notNull(cls, "Class must not be null!");
        Assert.notNull(str, "Set name must not be null!");
        int batchWriteSize = this.converter.getAerospikeDataSettings().getBatchWriteSize();
        ArrayList arrayList = new ArrayList();
        Flux empty = Flux.empty();
        for (Object obj : iterable) {
            if (batchWriteSizeMatch(batchWriteSize, arrayList.size())) {
                empty = Flux.concat(new Publisher[]{empty, findByIds((Collection<?>) arrayList, (Class) cls, str)});
                arrayList.clear();
            }
            arrayList.add(obj);
        }
        if (!arrayList.isEmpty()) {
            empty = Flux.concat(new Publisher[]{empty, findByIds((Collection<?>) arrayList, (Class) cls, str)});
        }
        return empty;
    }

    private <T> Flux<T> findByIds(Collection<?> collection, Class<T> cls, String str) {
        Key[] keyArr = (Key[]) Utils.iterableToList(collection).stream().map(obj -> {
            return getKey(obj, str);
        }).toArray(i -> {
            return new Key[i];
        });
        return TemplateUtils.enrichPolicyWithTransaction(this.reactorClient, (Policy) this.reactorClient.getAerospikeClient().copyBatchPolicyDefault()).flatMap(policy -> {
            return this.reactorClient.get((BatchPolicy) policy, keyArr);
        }).flatMap(keysRecords -> {
            return Mono.just(keysRecords.asMap());
        }).flatMapMany(map -> {
            return Flux.fromIterable((List) map.entrySet().stream().filter(entry -> {
                return entry.getValue() != null;
            }).map(entry2 -> {
                return mapToEntity((Key) entry2.getKey(), cls, (Record) entry2.getValue());
            }).collect(Collectors.toList()));
        });
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public Mono<GroupedEntities> findByIds(GroupedKeys groupedKeys) {
        validateGroupedKeys(groupedKeys);
        return groupedKeys.getEntitiesKeys().isEmpty() ? Mono.just(GroupedEntities.builder().build()) : findGroupedEntitiesByGroupedKeys(this.reactorClient.getAerospikeClient().copyBatchPolicyDefault(), groupedKeys);
    }

    private Mono<GroupedEntities> findGroupedEntitiesByGroupedKeys(BatchPolicy batchPolicy, GroupedKeys groupedKeys) {
        EntitiesKeys of = EntitiesKeys.of(toEntitiesKeyMap(groupedKeys));
        return TemplateUtils.enrichPolicyWithTransaction(this.reactorClient, (Policy) batchPolicy).flatMap(policy -> {
            return this.reactorClient.get((BatchPolicy) policy, of.getKeys());
        }).map(keysRecords -> {
            return toGroupedEntities(of, keysRecords.records);
        }).onErrorMap(this::translateError);
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T, S> Mono<?> findByIdUsingQuery(Object obj, Class<T> cls, Class<S> cls2, Query query) {
        return findByIdUsingQuery(obj, cls, cls2, getSetName((Class) cls), query);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T, S> Mono<?> findByIdUsingQuery(Object obj, Class<T> cls, Class<S> cls2, String str, Query query) {
        AerospikePersistentEntity aerospikePersistentEntity = (AerospikePersistentEntity) this.mappingContext.getRequiredPersistentEntity(cls);
        Key key = getKey(obj, str);
        String[] binNamesFromTargetClass = getBinNamesFromTargetClass(cls2);
        Class<T> cls3 = (cls2 == 0 || cls2 == cls) ? cls : cls2;
        if (aerospikePersistentEntity.isTouchOnRead()) {
            Assert.state(!aerospikePersistentEntity.hasExpirationProperty(), "Touch on read is not supported for entity without expiration property");
            Class<T> cls4 = cls3;
            return getAndTouch(key, aerospikePersistentEntity.getExpiration(), binNamesFromTargetClass, query).filter(keyRecord -> {
                return Objects.nonNull(keyRecord.record);
            }).map(keyRecord2 -> {
                return mapToEntity(keyRecord2.key, cls4, keyRecord2.record);
            }).onErrorResume(th -> {
                return (th instanceof AerospikeException) && ((AerospikeException) th).getResultCode() == 2;
            }, th2 -> {
                return Mono.empty();
            }).onErrorMap(this::translateError);
        }
        Policy policy = null;
        if (QualifierUtils.queryCriteriaIsNotNull(query)) {
            policy = this.reactorClient.getAerospikeClient().copyReadPolicyDefault();
            policy.filterExp = this.reactorQueryEngine.getFilterExpressionsBuilder().build(query.getCriteriaObject());
        }
        Class<T> cls5 = cls3;
        return TemplateUtils.enrichPolicyWithTransaction(this.reactorClient, policy).flatMap(policy2 -> {
            return this.reactorClient.get(policy2, key, binNamesFromTargetClass);
        }).filter(keyRecord3 -> {
            return Objects.nonNull(keyRecord3.record);
        }).map(keyRecord4 -> {
            return mapToEntity(keyRecord4.key, cls5, keyRecord4.record);
        }).onErrorMap(this::translateError);
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T, S> Flux<?> findByIdsUsingQuery(Collection<?> collection, Class<T> cls, Class<S> cls2, @Nullable Query query) {
        return findByIdsUsingQuery(collection, cls, cls2, getSetName((Class) cls), query);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T, S> Flux<?> findByIdsUsingQuery(Collection<?> collection, Class<T> cls, Class<S> cls2, String str, @Nullable Query query) {
        Assert.notNull(collection, "Ids must not be null!");
        Assert.notNull(cls, "Entity class must not be null!");
        Assert.notNull(str, "Set name must not be null!");
        if (collection.isEmpty()) {
            return Flux.empty();
        }
        BatchPolicy batchPolicyFilterExp = getBatchPolicyFilterExp(query);
        Class<T> cls3 = (cls2 == 0 || cls2 == cls) ? cls : cls2;
        return applyPostProcessingOnResults(Flux.fromIterable(collection).map(obj -> {
            return getKey(obj, str);
        }).flatMap(key -> {
            return getFromClient(batchPolicyFilterExp, key, cls2);
        }).filter(keyRecord -> {
            return Objects.nonNull(keyRecord.record);
        }).map(keyRecord2 -> {
            return mapToEntity(keyRecord2.key, cls3, keyRecord2.record);
        }), query);
    }

    private Flux<?> findByIdsUsingQueryWithoutMapping(Collection<?> collection, String str, Query query) {
        Assert.notNull(collection, "Ids must not be null!");
        Assert.notNull(str, "Set name must not be null!");
        if (collection.isEmpty()) {
            return Flux.empty();
        }
        BatchPolicy batchPolicyFilterExp = getBatchPolicyFilterExp(query);
        return Flux.fromIterable(collection).map(obj -> {
            return getKey(obj, str);
        }).flatMap(key -> {
            return getFromClient(batchPolicyFilterExp, key, null);
        }).filter(keyRecord -> {
            return Objects.nonNull(keyRecord.record);
        });
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Flux<T> find(Query query, Class<T> cls) {
        Assert.notNull(cls, "Class must not be null!");
        return find(query, cls, getSetName((Class) cls));
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T, S> Flux<S> find(Query query, Class<T> cls, Class<S> cls2) {
        Assert.notNull(cls, "Class must not be null!");
        return find(query, cls2, getSetName((Class) cls));
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Flux<T> find(Query query, Class<T> cls, String str) {
        Assert.notNull(query, "Query must not be null!");
        Assert.notNull(cls, "Target class must not be null!");
        Assert.notNull(str, "Set name must not be null!");
        return findWithPostProcessing(str, cls, query);
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Flux<T> findAll(Class<T> cls) {
        Assert.notNull(cls, "Entity class must not be null!");
        return findAll(cls, getSetName((Class) cls));
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T, S> Flux<S> findAll(Class<T> cls, Class<S> cls2) {
        Assert.notNull(cls, "Entity class must not be null!");
        Assert.notNull(cls2, "Target class must not be null!");
        return findAll(cls2, getSetName((Class) cls));
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Flux<T> findAll(Class<T> cls, String str) {
        Assert.notNull(cls, "Target class must not be null!");
        Assert.notNull(str, "Set name must not be null!");
        return find(str, cls);
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Flux<T> findAll(Sort sort, long j, long j2, Class<T> cls) {
        Assert.notNull(cls, "Class must not be null!");
        return findAll(sort, j, j2, cls, getSetName((Class) cls));
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T, S> Flux<S> findAll(Sort sort, long j, long j2, Class<T> cls, Class<S> cls2) {
        Assert.notNull(cls, "Class must not be null!");
        Assert.notNull(cls2, "Target class must not be null!");
        return findAll(sort, j, j2, cls2, getSetName((Class) cls));
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Flux<T> findAll(Sort sort, long j, long j2, Class<T> cls, String str) {
        Assert.notNull(cls, "Target class must not be null!");
        Assert.notNull(str, "Set name must not be null!");
        return findWithPostProcessing(str, cls, sort, j, j2);
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Flux<T> findInRange(long j, long j2, Sort sort, Class<T> cls) {
        Assert.notNull(cls, "Class must not be null!");
        return findInRange(j, j2, sort, cls, getSetName((Class) cls));
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T, S> Flux<S> findInRange(long j, long j2, Sort sort, Class<T> cls, Class<S> cls2) {
        Assert.notNull(cls, "Class must not be null!");
        Assert.notNull(cls2, "Target class must not be null!");
        return findInRange(j, j2, sort, cls2, getSetName((Class) cls));
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Flux<T> findInRange(long j, long j2, Sort sort, Class<T> cls, String str) {
        Assert.notNull(cls, "Target Class must not be null!");
        Assert.notNull(str, "Set name must not be null!");
        return findWithPostProcessing(str, cls, sort, j, j2);
    }

    private BatchPolicy getBatchPolicyFilterExp(Query query) {
        if (!QualifierUtils.queryCriteriaIsNotNull(query)) {
            return null;
        }
        BatchPolicy copyBatchPolicyDefault = this.reactorClient.getAerospikeClient().copyBatchPolicyDefault();
        copyBatchPolicyDefault.filterExp = this.reactorQueryEngine.getFilterExpressionsBuilder().build(query.getCriteriaObject());
        return copyBatchPolicyDefault;
    }

    private Mono<KeyRecord> getFromClient(BatchPolicy batchPolicy, Key key, Class<?> cls) {
        if (cls == null) {
            return TemplateUtils.enrichPolicyWithTransaction(this.reactorClient, (Policy) batchPolicy).flatMap(policy -> {
                return this.reactorClient.get(policy, key);
            });
        }
        String[] binNamesFromTargetClass = getBinNamesFromTargetClass(cls);
        return TemplateUtils.enrichPolicyWithTransaction(this.reactorClient, (Policy) batchPolicy).flatMap(policy2 -> {
            return this.reactorClient.get(policy2, key, binNamesFromTargetClass);
        });
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<Boolean> exists(Object obj, Class<T> cls) {
        Assert.notNull(obj, "Id must not be null!");
        Assert.notNull(cls, "Class must not be null!");
        return exists(obj, getSetName((Class) cls));
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public Mono<Boolean> exists(Object obj, String str) {
        Assert.notNull(obj, "Id must not be null!");
        Assert.notNull(str, "Set name must not be null!");
        Key key = getKey(obj, str);
        return TemplateUtils.enrichPolicyWithTransaction(this.reactorClient, this.reactorClient.getAerospikeClient().copyReadPolicyDefault()).flatMap(policy -> {
            return this.reactorClient.exists(policy, key);
        }).map((v0) -> {
            return Objects.nonNull(v0);
        }).defaultIfEmpty(false).onErrorMap(this::translateError);
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<Boolean> exists(Query query, Class<T> cls) {
        Assert.notNull(query, "Query passed in to exist can't be null");
        Assert.notNull(cls, "Class must not be null!");
        return exists(query, getSetName((Class) cls));
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public Mono<Boolean> exists(Query query, String str) {
        Assert.notNull(query, "Query passed in to exist can't be null");
        Assert.notNull(str, "Set name must not be null!");
        return findKeyRecordsUsingQuery(str, query).hasElements();
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<Boolean> existsByIdsUsingQuery(Collection<?> collection, Class<T> cls, @Nullable Query query) {
        return existsByIdsUsingQuery(collection, getSetName((Class) cls), query);
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public Mono<Boolean> existsByIdsUsingQuery(Collection<?> collection, String str, @Nullable Query query) {
        return findByIdsUsingQueryWithoutMapping(collection, str, query).filter(Objects::nonNull).hasElements();
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<Long> count(Class<T> cls) {
        Assert.notNull(cls, "Class must not be null!");
        return count(getSetName((Class) cls));
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public Mono<Long> count(String str) {
        Assert.notNull(str, "Set name must not be null!");
        try {
            return Mono.fromCallable(() -> {
                return Long.valueOf(countSet(str));
            });
        } catch (AerospikeException e) {
            throw translateError(e);
        }
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<Long> countByIdsUsingQuery(Collection<?> collection, Class<T> cls, @Nullable Query query) {
        return countByIdsUsingQuery(collection, getSetName((Class) cls), query);
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public Mono<Long> countByIdsUsingQuery(Collection<?> collection, String str, @Nullable Query query) {
        return findByIdsUsingQueryWithoutMapping(collection, str, query).filter(Objects::nonNull).count();
    }

    private long countSet(String str) {
        Node[] nodes = this.reactorClient.getAerospikeClient().getNodes();
        int replicationFactor = Utils.getReplicationFactor(this.reactorClient.getAerospikeClient(), nodes, this.namespace);
        long sum = Arrays.stream(nodes).mapToLong(node -> {
            return Utils.getObjectsCount(this.reactorClient.getAerospikeClient(), node, this.namespace, str);
        }).sum();
        return nodes.length > 1 ? sum / replicationFactor : sum;
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<Long> count(Query query, Class<T> cls) {
        Assert.notNull(cls, "Class must not be null!");
        return count(query, getSetName((Class) cls));
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public Mono<Long> count(Query query, String str) {
        Assert.notNull(str, "Set for count must not be null!");
        return findKeyRecordsUsingQuery(str, query).count();
    }

    private Flux<KeyRecord> findKeyRecordsUsingQuery(String str, Query query) {
        Qualifier idQualifier;
        Assert.notNull(str, "Set name must not be null!");
        Qualifier criteriaObject = QualifierUtils.queryCriteriaIsNotNull(query) ? query.getCriteriaObject() : null;
        return (criteriaObject == null || (idQualifier = QualifierUtils.getIdQualifier(criteriaObject)) == null) ? this.reactorQueryEngine.selectForCount(this.namespace, str, query) : findByIdsWithoutMapping(TemplateUtils.getIdValue(idQualifier), str, null, new Query(TemplateUtils.excludeIdQualifier(criteriaObject)));
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<Void> createIndex(Class<T> cls, String str, String str2, IndexType indexType) {
        return createIndex(cls, str, str2, indexType, IndexCollectionType.DEFAULT);
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<Void> createIndex(Class<T> cls, String str, String str2, IndexType indexType, IndexCollectionType indexCollectionType) {
        return createIndex(cls, str, str2, indexType, indexCollectionType, new CTX[0]);
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<Void> createIndex(Class<T> cls, String str, String str2, IndexType indexType, IndexCollectionType indexCollectionType, CTX... ctxArr) {
        return createIndex(getSetName((Class) cls), str, str2, indexType, indexCollectionType, ctxArr);
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public Mono<Void> createIndex(String str, String str2, String str3, IndexType indexType) {
        return createIndex(str, str2, str3, indexType, IndexCollectionType.DEFAULT);
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public Mono<Void> createIndex(String str, String str2, String str3, IndexType indexType, IndexCollectionType indexCollectionType) {
        return createIndex(str, str2, str3, indexType, indexCollectionType, new CTX[0]);
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public Mono<Void> createIndex(String str, String str2, String str3, IndexType indexType, IndexCollectionType indexCollectionType, CTX... ctxArr) {
        Assert.notNull(str, "Set name must not be null!");
        Assert.notNull(str2, "Index name must not be null!");
        Assert.notNull(str3, "Bin name must not be null!");
        Assert.notNull(indexType, "Index type must not be null!");
        Assert.notNull(indexCollectionType, "Index collection type must not be null!");
        Assert.notNull(ctxArr, "Ctx must not be null!");
        return this.reactorClient.createIndex((Policy) null, this.namespace, str, str2, str3, indexType, indexCollectionType, ctxArr).then(this.reactorIndexRefresher.refreshIndexes()).onErrorMap(this::translateError);
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<Void> deleteIndex(Class<T> cls, String str) {
        Assert.notNull(cls, "Class must not be null!");
        return deleteIndex(getSetName((Class) cls), str);
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public Mono<Void> deleteIndex(String str, String str2) {
        Assert.notNull(str, "Set name must not be null!");
        Assert.notNull(str2, "Index name must not be null!");
        return this.reactorClient.dropIndex((Policy) null, this.namespace, str, str2).then(this.reactorIndexRefresher.refreshIndexes()).onErrorMap(this::translateError);
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public Mono<Boolean> indexExists(String str) {
        Assert.notNull(str, "Index name must not be null!");
        try {
            for (Node node : this.reactorClient.getAerospikeClient().getNodes()) {
                if (node.isActive()) {
                    String request = InfoCommandUtils.request(this.reactorClient.getAerospikeClient(), node, "sindex-exists:ns=" + this.namespace + ";indexname=" + str);
                    if (request == null) {
                        throw new AerospikeException("Null node response");
                    }
                    if (request.equalsIgnoreCase("true")) {
                        return Mono.just(true);
                    }
                    if (request.equalsIgnoreCase("false")) {
                        return Mono.just(false);
                    }
                    Matcher matcher = INDEX_EXISTS_REGEX_PATTERN.matcher(request);
                    if (!matcher.matches()) {
                        throw new AerospikeException("Unexpected node response: " + request);
                    }
                    try {
                        int parseInt = Integer.parseInt(matcher.group(1));
                        if (parseInt != 20) {
                            throw new AerospikeException(parseInt);
                        }
                    } catch (NumberFormatException e) {
                        throw new AerospikeException("Unexpected node response, unable to parse ResultCode: " + request);
                    }
                }
            }
            return Mono.just(false);
        } catch (AerospikeException e2) {
            throw translateError(e2);
        }
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public IAerospikeReactorClient getAerospikeReactorClient() {
        return this.reactorClient;
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public long getQueryMaxRecords() {
        return this.reactorQueryEngine.getQueryMaxRecords();
    }

    private <T> Mono<T> doPersistAndHandleError(T t, AerospikeWriteData aerospikeWriteData, WritePolicy writePolicy, Operation[] operationArr) {
        return TemplateUtils.enrichPolicyWithTransaction(this.reactorClient, (Policy) writePolicy).flatMap(policy -> {
            return this.reactorClient.operate((WritePolicy) policy, aerospikeWriteData.getKey(), operationArr);
        }).map(keyRecord -> {
            return t;
        }).onErrorMap(this::translateError);
    }

    private <T> Mono<T> doPersistWithVersionAndHandleCasError(T t, AerospikeWriteData aerospikeWriteData, WritePolicy writePolicy, Operation[] operationArr, BaseAerospikeTemplate.OperationType operationType) {
        return TemplateUtils.enrichPolicyWithTransaction(this.reactorClient, (Policy) writePolicy).flatMap(policy -> {
            return putAndGetHeader(aerospikeWriteData, (WritePolicy) policy, operationArr);
        }).map(record -> {
            return updateVersion(t, record);
        }).onErrorMap(AerospikeException.class, aerospikeException -> {
            return translateCasError(aerospikeException, "Failed to " + operationType.toString() + " record due to versions mismatch");
        });
    }

    private <T> Mono<T> doPersistWithVersionAndHandleError(T t, AerospikeWriteData aerospikeWriteData, WritePolicy writePolicy, Operation[] operationArr) {
        return TemplateUtils.enrichPolicyWithTransaction(this.reactorClient, (Policy) writePolicy).flatMap(policy -> {
            return putAndGetHeader(aerospikeWriteData, (WritePolicy) policy, operationArr);
        }).map(record -> {
            return updateVersion(t, record);
        }).onErrorMap(AerospikeException.class, this::translateError);
    }

    private Mono<Record> putAndGetHeader(AerospikeWriteData aerospikeWriteData, WritePolicy writePolicy, Operation[] operationArr) {
        return this.reactorClient.operate(writePolicy, aerospikeWriteData.getKey(), operationArr).map(keyRecord -> {
            return keyRecord.record;
        });
    }

    private Mono<KeyRecord> getAndTouch(Key key, int i, String[] strArr, Query query) {
        WritePolicyBuilder expiration = WritePolicyBuilder.builder(this.writePolicyDefault).expiration(i);
        if (QualifierUtils.queryCriteriaIsNotNull(query)) {
            expiration.filterExp(this.reactorQueryEngine.getFilterExpressionsBuilder().build(query.getCriteriaObject()));
        }
        WritePolicy build = expiration.build();
        if (strArr == null || strArr.length == 0) {
            return TemplateUtils.enrichPolicyWithTransaction(this.reactorClient, (Policy) build).flatMap(policy -> {
                return this.reactorClient.operate((WritePolicy) policy, key, new Operation[]{Operation.touch(), Operation.get()});
            });
        }
        Operation[] operationArr = new Operation[strArr.length + 1];
        operationArr[0] = Operation.touch();
        for (int i2 = 1; i2 < operationArr.length; i2++) {
            operationArr[i2] = Operation.get(strArr[i2 - 1]);
        }
        return TemplateUtils.enrichPolicyWithTransaction(this.reactorClient, (Policy) build).flatMap(policy2 -> {
            return this.reactorClient.operate((WritePolicy) policy2, key, operationArr);
        });
    }

    private String[] getBinNamesFromTargetClass(Class<?> cls) {
        AerospikePersistentEntity aerospikePersistentEntity = (AerospikePersistentEntity) this.mappingContext.getRequiredPersistentEntity(cls);
        ArrayList arrayList = new ArrayList();
        aerospikePersistentEntity.doWithProperties(aerospikePersistentProperty -> {
            if (aerospikePersistentProperty.isIdProperty()) {
                return;
            }
            arrayList.add(aerospikePersistentProperty.getFieldName());
        });
        return (String[]) arrayList.toArray(new String[0]);
    }

    private Throwable translateError(Throwable th) {
        return th instanceof AerospikeException ? translateError((AerospikeException) th) : th;
    }

    private Throwable translateCasThrowable(Throwable th, String str) {
        return th instanceof AerospikeException ? translateCasError((AerospikeException) th, "Failed to %s record due to versions mismatch".formatted(str)) : th;
    }

    private <T> Flux<T> findWithPostProcessing(String str, Class<T> cls, Query query) {
        verifyUnsortedWithOffset(query.getSort(), query.getOffset());
        return applyPostProcessingOnResults(findUsingQueryWithDistinctPredicate(str, cls, CoreUtils.getDistinctPredicate(query), query), query);
    }

    private <T> Flux<T> findWithPostProcessing(String str, Class<T> cls, Sort sort, long j, long j2) {
        verifyUnsortedWithOffset(sort, j);
        return applyPostProcessingOnResults(find(str, cls), sort, j, j2);
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T, S> Flux<S> findUsingQueryWithoutPostProcessing(Class<T> cls, Class<S> cls2, Query query) {
        verifyUnsortedWithOffset(query.getSort(), query.getOffset());
        return findUsingQueryWithDistinctPredicate(getSetName((Class) cls), cls2, CoreUtils.getDistinctPredicate(query), query);
    }

    private void verifyUnsortedWithOffset(Sort sort, long j) {
        if ((sort == null || sort.isUnsorted()) && j > 0) {
            throw new IllegalArgumentException("Unsorted query must not have offset value. For retrieving paged results use sorted query.");
        }
    }

    private <T> Flux<T> applyPostProcessingOnResults(Flux<T> flux, Query query) {
        if (query.getSort() != null && query.getSort().isSorted()) {
            flux = flux.sort(getComparator(query));
        }
        if (query.hasOffset()) {
            flux = flux.skip(query.getOffset());
        }
        if (query.hasRows()) {
            flux = flux.take(query.getRows());
        }
        return flux;
    }

    private <T> Flux<T> applyPostProcessingOnResults(Flux<T> flux, Sort sort, long j, long j2) {
        if (sort != null && sort.isSorted()) {
            flux = flux.sort(getComparator(sort));
        }
        if (j > 0) {
            flux = flux.skip(j);
        }
        if (j2 > 0) {
            flux = flux.take(j2);
        }
        return flux;
    }

    private <T> Flux<T> find(String str, Class<T> cls) {
        return findRecordsUsingQuery(str, cls, null).map(keyRecord -> {
            return mapToEntity(keyRecord, cls);
        });
    }

    private <T> Flux<T> findUsingQueryWithDistinctPredicate(String str, Class<T> cls, Predicate<KeyRecord> predicate, Query query) {
        return findRecordsUsingQuery(str, cls, query).filter(predicate).map(keyRecord -> {
            return mapToEntity(keyRecord, cls);
        });
    }

    private <T> Flux<KeyRecord> findRecordsUsingQuery(String str, Class<T> cls, Query query) {
        Qualifier idQualifier;
        Qualifier criteriaObject = QualifierUtils.queryCriteriaIsNotNull(query) ? query.getCriteriaObject() : null;
        return (criteriaObject == null || (idQualifier = QualifierUtils.getIdQualifier(criteriaObject)) == null) ? cls != null ? this.reactorQueryEngine.select(this.namespace, str, getBinNamesFromTargetClass(cls), query) : this.reactorQueryEngine.select(this.namespace, str, null, query) : findByIdsWithoutMapping(TemplateUtils.getIdValue(idQualifier), str, cls, new Query(TemplateUtils.excludeIdQualifier(criteriaObject)));
    }

    private <T> Flux<KeyRecord> findByIdsWithoutMapping(Collection<?> collection, String str, Class<T> cls, Query query) {
        Assert.notNull(collection, "List of ids must not be null!");
        Assert.notNull(str, "Set name must not be null!");
        if (collection.isEmpty()) {
            return Flux.empty();
        }
        BatchPolicy batchPolicyFilterExp = getBatchPolicyFilterExp(query);
        return Flux.fromIterable(collection).map(obj -> {
            return getKey(obj, str);
        }).flatMap(key -> {
            return getFromClient(batchPolicyFilterExp, key, cls);
        }).filter(keyRecord -> {
            return Objects.nonNull(keyRecord.record);
        });
    }

    @Override // org.springframework.data.aerospike.core.BaseAerospikeTemplate
    public /* bridge */ /* synthetic */ BaseAerospikeTemplate.BatchWriteData getBatchWriteForDelete(Object obj, String str) {
        return super.getBatchWriteForDelete(obj, str);
    }

    @Override // org.springframework.data.aerospike.core.BaseAerospikeTemplate
    public /* bridge */ /* synthetic */ BaseAerospikeTemplate.BatchWriteData getBatchWriteForUpdate(Object obj, String str) {
        return super.getBatchWriteForUpdate(obj, str);
    }

    @Override // org.springframework.data.aerospike.core.BaseAerospikeTemplate
    public /* bridge */ /* synthetic */ BaseAerospikeTemplate.BatchWriteData getBatchWriteForInsert(Object obj, String str) {
        return super.getBatchWriteForInsert(obj, str);
    }

    @Override // org.springframework.data.aerospike.core.BaseAerospikeTemplate
    public /* bridge */ /* synthetic */ BaseAerospikeTemplate.BatchWriteData getBatchWriteForSave(Object obj, String str) {
        return super.getBatchWriteForSave(obj, str);
    }

    @Override // org.springframework.data.aerospike.core.BaseAerospikeTemplate
    public /* bridge */ /* synthetic */ String getNamespace() {
        return super.getNamespace();
    }

    @Override // org.springframework.data.aerospike.core.BaseAerospikeTemplate, org.springframework.data.aerospike.core.AerospikeOperations
    public /* bridge */ /* synthetic */ ServerVersionSupport getServerVersionSupport() {
        return super.getServerVersionSupport();
    }

    @Override // org.springframework.data.aerospike.core.BaseAerospikeTemplate, org.springframework.data.aerospike.core.AerospikeOperations
    public /* bridge */ /* synthetic */ MappingAerospikeConverter getAerospikeConverter() {
        return super.getAerospikeConverter();
    }

    @Override // org.springframework.data.aerospike.core.BaseAerospikeTemplate, org.springframework.data.aerospike.core.AerospikeOperations
    public /* bridge */ /* synthetic */ MappingContext getMappingContext() {
        return super.getMappingContext();
    }

    @Override // org.springframework.data.aerospike.core.BaseAerospikeTemplate, org.springframework.data.aerospike.core.AerospikeOperations
    public /* bridge */ /* synthetic */ String getSetName(Object obj) {
        return super.getSetName((ReactiveAerospikeTemplate) obj);
    }

    @Override // org.springframework.data.aerospike.core.BaseAerospikeTemplate, org.springframework.data.aerospike.core.AerospikeOperations
    public /* bridge */ /* synthetic */ String getSetName(Class cls) {
        return super.getSetName(cls);
    }
}
