/*
 * Decompiled with CFR 0.152.
 */
package me.ahoo.wow.elasticsearch.eventsourcing;

import co.elastic.clients.elasticsearch._types.OpType;
import co.elastic.clients.elasticsearch._types.Refresh;
import co.elastic.clients.elasticsearch._types.SortOptions;
import co.elastic.clients.elasticsearch._types.query_dsl.Query;
import co.elastic.clients.elasticsearch.core.IndexRequest;
import co.elastic.clients.elasticsearch.core.SearchRequest;
import co.elastic.clients.elasticsearch.core.search.Hit;
import co.elastic.clients.elasticsearch.core.search.ResponseBody;
import co.elastic.clients.util.ObjectBuilder;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import kotlin.Metadata;
import kotlin.Unit;
import kotlin.collections.CollectionsKt;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.internal.DefaultConstructorMarker;
import kotlin.jvm.internal.Intrinsics;
import kotlin.jvm.internal.Ref;
import kotlin.jvm.internal.SourceDebugExtension;
import me.ahoo.wow.api.modeling.AggregateId;
import me.ahoo.wow.api.modeling.NamedAggregate;
import me.ahoo.wow.api.query.Condition;
import me.ahoo.wow.elasticsearch.IndexNameConverter;
import me.ahoo.wow.elasticsearch.query.ElasticsearchSortConverter;
import me.ahoo.wow.elasticsearch.query.event.EventStreamConditionConverter;
import me.ahoo.wow.event.DomainEventStream;
import me.ahoo.wow.eventsourcing.AbstractEventStore;
import me.ahoo.wow.eventsourcing.EventVersionConflictException;
import me.ahoo.wow.query.dsl.ConditionDsl;
import me.ahoo.wow.query.dsl.DslKt;
import me.ahoo.wow.query.dsl.SortDsl;
import org.elasticsearch.client.ResponseException;
import org.jetbrains.annotations.NotNull;
import org.reactivestreams.Publisher;
import org.springframework.data.elasticsearch.client.elc.ReactiveElasticsearchClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kotlin.core.publisher.MonoExtensionsKt;

@Metadata(mv={2, 1, 0}, k=1, xi=48, d1={"\u0000X\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0010\b\n\u0002\b\u0003\n\u0002\u0010\u000e\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\b\u0004\n\u0002\u0010 \n\u0000\n\u0002\u0010\t\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\b\u0003\u0018\u0000 !2\u00020\u0001:\u0001!B#\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\b\b\u0002\u0010\u0004\u001a\u00020\u0005\u0012\b\b\u0002\u0010\u0006\u001a\u00020\u0007\u00a2\u0006\u0004\b\b\u0010\tJ\f\u0010\n\u001a\u00020\u000b*\u00020\fH\u0002J\u0016\u0010\r\u001a\b\u0012\u0004\u0012\u00020\u000f0\u000e2\u0006\u0010\u0010\u001a\u00020\fH\u0014J&\u0010\u0011\u001a\b\u0012\u0004\u0012\u00020\f0\u00122\u0006\u0010\u0013\u001a\u00020\u00142\u0006\u0010\u0015\u001a\u00020\u00072\u0006\u0010\u0016\u001a\u00020\u0007H\u0014J&\u0010\u0017\u001a\b\u0012\u0004\u0012\u00020\f0\u00122\u0006\u0010\u0013\u001a\u00020\u00142\u0006\u0010\u0015\u001a\u00020\u00072\u0006\u0010\u0016\u001a\u00020\u0007H\u0002J,\u0010\u0018\u001a\u000e\u0012\n\u0012\b\u0012\u0004\u0012\u00020\f0\u00190\u000e2\u0006\u0010\u0013\u001a\u00020\u00142\u0006\u0010\u0015\u001a\u00020\u00072\u0006\u0010\u0016\u001a\u00020\u0007H\u0002J&\u0010\u0011\u001a\b\u0012\u0004\u0012\u00020\f0\u00122\u0006\u0010\u0013\u001a\u00020\u00142\u0006\u0010\u001a\u001a\u00020\u001b2\u0006\u0010\u001c\u001a\u00020\u001bH\u0014J.\u0010\u001d\u001a\u000e\u0012\n\u0012\b\u0012\u0004\u0012\u00020\f0\u00190\u000e2\u0006\u0010\u0013\u001a\u00020\u00142\u0006\u0010\u001e\u001a\u00020\u001f2\b\b\u0002\u0010 \u001a\u00020\u0007H\u0002R\u000e\u0010\u0002\u001a\u00020\u0003X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u0004\u001a\u00020\u0005X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u0006\u001a\u00020\u0007X\u0082\u0004\u00a2\u0006\u0002\n\u0000\u00a8\u0006\""}, d2={"Lme/ahoo/wow/elasticsearch/eventsourcing/ElasticsearchEventStore;", "Lme/ahoo/wow/eventsourcing/AbstractEventStore;", "elasticsearchClient", "Lorg/springframework/data/elasticsearch/client/elc/ReactiveElasticsearchClient;", "refreshPolicy", "Lco/elastic/clients/elasticsearch/_types/Refresh;", "batchSize", "", "<init>", "(Lorg/springframework/data/elasticsearch/client/elc/ReactiveElasticsearchClient;Lco/elastic/clients/elasticsearch/_types/Refresh;I)V", "toDocId", "", "Lme/ahoo/wow/event/DomainEventStream;", "appendStream", "Lreactor/core/publisher/Mono;", "Ljava/lang/Void;", "eventStream", "loadStream", "Lreactor/core/publisher/Flux;", "aggregateId", "Lme/ahoo/wow/api/modeling/AggregateId;", "headVersion", "tailVersion", "loopStream", "findEventStream", "", "headEventTime", "", "tailEventTime", "searchEventStream", "condition", "Lme/ahoo/wow/api/query/Condition;", "size", "Companion", "wow-elasticsearch"})
@SourceDebugExtension(value={"SMAP\nElasticsearchEventStore.kt\nKotlin\n*S Kotlin\n*F\n+ 1 ElasticsearchEventStore.kt\nme/ahoo/wow/elasticsearch/eventsourcing/ElasticsearchEventStore\n+ 2 _Collections.kt\nkotlin/collections/CollectionsKt___CollectionsKt\n*L\n1#1,147:1\n1563#2:148\n1634#2,3:149\n*S KotlinDebug\n*F\n+ 1 ElasticsearchEventStore.kt\nme/ahoo/wow/elasticsearch/eventsourcing/ElasticsearchEventStore\n*L\n143#1:148\n143#1:149,3\n*E\n"})
public final class ElasticsearchEventStore
extends AbstractEventStore {
    @NotNull
    public static final Companion Companion = new Companion(null);
    @NotNull
    private final ReactiveElasticsearchClient elasticsearchClient;
    @NotNull
    private final Refresh refreshPolicy;
    private final int batchSize;
    private static final int VERSION_CONFLICT_CODE = 409;
    private static final int DEFAULT_BATCH_SIZE = 10000;

    public ElasticsearchEventStore(@NotNull ReactiveElasticsearchClient elasticsearchClient, @NotNull Refresh refreshPolicy, int batchSize) {
        Intrinsics.checkNotNullParameter((Object)elasticsearchClient, (String)"elasticsearchClient");
        Intrinsics.checkNotNullParameter((Object)refreshPolicy, (String)"refreshPolicy");
        this.elasticsearchClient = elasticsearchClient;
        this.refreshPolicy = refreshPolicy;
        this.batchSize = batchSize;
    }

    public /* synthetic */ ElasticsearchEventStore(ReactiveElasticsearchClient reactiveElasticsearchClient, Refresh refresh, int n, int n2, DefaultConstructorMarker defaultConstructorMarker) {
        if ((n2 & 2) != 0) {
            refresh = Refresh.True;
        }
        if ((n2 & 4) != 0) {
            n = 10000;
        }
        this(reactiveElasticsearchClient, refresh, n);
    }

    private final String toDocId(DomainEventStream $this$toDocId) {
        return $this$toDocId.getAggregateId().getId() + "-" + $this$toDocId.getVersion();
    }

    @NotNull
    protected Mono<Void> appendStream(@NotNull DomainEventStream eventStream) {
        Intrinsics.checkNotNullParameter((Object)eventStream, (String)"eventStream");
        Mono mono = this.elasticsearchClient.index(arg_0 -> ElasticsearchEventStore.appendStream$lambda$0(eventStream, this, arg_0)).onErrorResume(arg_0 -> ElasticsearchEventStore.appendStream$lambda$2(arg_0 -> ElasticsearchEventStore.appendStream$lambda$1(eventStream, arg_0), arg_0)).then();
        Intrinsics.checkNotNullExpressionValue((Object)mono, (String)"then(...)");
        return mono;
    }

    @NotNull
    protected Flux<DomainEventStream> loadStream(@NotNull AggregateId aggregateId, int headVersion, int tailVersion) {
        Intrinsics.checkNotNullParameter((Object)aggregateId, (String)"aggregateId");
        return this.loopStream(aggregateId, headVersion, tailVersion);
    }

    private final Flux<DomainEventStream> loopStream(AggregateId aggregateId, int headVersion, int tailVersion) {
        Ref.IntRef endVersion = new Ref.IntRef();
        endVersion.element = headVersion + this.batchSize - 1;
        if (tailVersion < endVersion.element) {
            endVersion.element = tailVersion;
        }
        Flux flux = this.findEventStream(aggregateId, headVersion, endVersion.element).flatMapMany(arg_0 -> ElasticsearchEventStore.loopStream$lambda$4(arg_0 -> ElasticsearchEventStore.loopStream$lambda$3(endVersion, headVersion, tailVersion, this, aggregateId, arg_0), arg_0));
        Intrinsics.checkNotNullExpressionValue((Object)flux, (String)"flatMapMany(...)");
        return flux;
    }

    private final Mono<List<DomainEventStream>> findEventStream(AggregateId aggregateId, int headVersion, int tailVersion) {
        Condition condition = DslKt.condition(arg_0 -> ElasticsearchEventStore.findEventStream$lambda$5(aggregateId, headVersion, tailVersion, arg_0));
        int size = tailVersion - headVersion + 1;
        return this.searchEventStream(aggregateId, condition, size);
    }

    @NotNull
    protected Flux<DomainEventStream> loadStream(@NotNull AggregateId aggregateId, long headEventTime, long tailEventTime) {
        Intrinsics.checkNotNullParameter((Object)aggregateId, (String)"aggregateId");
        Condition condition = DslKt.condition(arg_0 -> ElasticsearchEventStore.loadStream$lambda$6(aggregateId, headEventTime, tailEventTime, arg_0));
        Flux flux = ElasticsearchEventStore.searchEventStream$default(this, aggregateId, condition, 0, 4, null).flatMapIterable(arg_0 -> ElasticsearchEventStore.loadStream$lambda$8(ElasticsearchEventStore::loadStream$lambda$7, arg_0));
        Intrinsics.checkNotNullExpressionValue((Object)flux, (String)"flatMapIterable(...)");
        return flux;
    }

    private final Mono<List<DomainEventStream>> searchEventStream(AggregateId aggregateId, Condition condition, int size) {
        Query query = (Query)EventStreamConditionConverter.INSTANCE.convert(condition);
        List<SortOptions> sort = ElasticsearchSortConverter.INSTANCE.toSortOptions(DslKt.sort(ElasticsearchEventStore::searchEventStream$lambda$9));
        Mono mono = this.elasticsearchClient.search(arg_0 -> ElasticsearchEventStore.searchEventStream$lambda$10(aggregateId, query, size, sort, arg_0), DomainEventStream.class).map(arg_0 -> ElasticsearchEventStore.searchEventStream$lambda$13(ElasticsearchEventStore::searchEventStream$lambda$12, arg_0));
        Intrinsics.checkNotNullExpressionValue((Object)mono, (String)"map(...)");
        return mono;
    }

    static /* synthetic */ Mono searchEventStream$default(ElasticsearchEventStore elasticsearchEventStore, AggregateId aggregateId, Condition condition, int n, int n2, Object object) {
        if ((n2 & 4) != 0) {
            n = elasticsearchEventStore.batchSize;
        }
        return elasticsearchEventStore.searchEventStream(aggregateId, condition, n);
    }

    private static final ObjectBuilder appendStream$lambda$0(DomainEventStream $eventStream, ElasticsearchEventStore this$0, IndexRequest.Builder it) {
        return (ObjectBuilder)it.index(IndexNameConverter.INSTANCE.toEventStreamIndexName((NamedAggregate)$eventStream.getAggregateId())).id(this$0.toDocId($eventStream)).document((Object)$eventStream).routing($eventStream.getAggregateId().getId()).opType(OpType.Create).refresh(this$0.refreshPolicy);
    }

    private static final Mono appendStream$lambda$1(DomainEventStream $eventStream, Throwable it) {
        if (it instanceof ResponseException && ((ResponseException)it).getResponse().getStatusLine().getStatusCode() == 409) {
            return MonoExtensionsKt.toMono((Throwable)((Throwable)new EventVersionConflictException($eventStream, null, it, 2, null)));
        }
        return Mono.error((Throwable)it);
    }

    private static final Mono appendStream$lambda$2(Function1 $tmp0, Object p0) {
        return (Mono)$tmp0.invoke(p0);
    }

    private static final Publisher loopStream$lambda$3(Ref.IntRef $endVersion, int $headVersion, int $tailVersion, ElasticsearchEventStore this$0, AggregateId $aggregateId, List it) {
        Flux flux = Flux.fromIterable((Iterable)it);
        Intrinsics.checkNotNullExpressionValue((Object)flux, (String)"fromIterable(...)");
        Flux previousStreams = flux;
        int requestSize = $endVersion.element - $headVersion + 1;
        if (it.size() < requestSize) {
            return (Publisher)previousStreams;
        }
        Intrinsics.checkNotNull((Object)it);
        int lastVersion = ((DomainEventStream)CollectionsKt.last((List)it)).getVersion();
        if (lastVersion >= $tailVersion) {
            return (Publisher)previousStreams;
        }
        Flux<DomainEventStream> nextStreams = this$0.loopStream($aggregateId, lastVersion + 1, $tailVersion);
        return (Publisher)previousStreams.concatWith((Publisher)nextStreams);
    }

    private static final Publisher loopStream$lambda$4(Function1 $tmp0, Object p0) {
        return (Publisher)$tmp0.invoke(p0);
    }

    private static final Unit findEventStream$lambda$5(AggregateId $aggregateId, int $headVersion, int $tailVersion, ConditionDsl $this$condition) {
        Intrinsics.checkNotNullParameter((Object)$this$condition, (String)"$this$condition");
        $this$condition.tenantId($aggregateId.getTenantId());
        $this$condition.eq("aggregateId", (Object)$aggregateId.getId());
        $this$condition.to($this$condition.between("version", (Object)$headVersion), (Object)$tailVersion);
        return Unit.INSTANCE;
    }

    private static final Unit loadStream$lambda$6(AggregateId $aggregateId, long $headEventTime, long $tailEventTime, ConditionDsl $this$condition) {
        Intrinsics.checkNotNullParameter((Object)$this$condition, (String)"$this$condition");
        $this$condition.tenantId($aggregateId.getTenantId());
        $this$condition.eq("aggregateId", (Object)$aggregateId.getId());
        $this$condition.to($this$condition.between("createTime", (Object)$headEventTime), (Object)$tailEventTime);
        return Unit.INSTANCE;
    }

    private static final Iterable loadStream$lambda$7(List it) {
        return it;
    }

    private static final Iterable loadStream$lambda$8(Function1 $tmp0, Object p0) {
        return (Iterable)$tmp0.invoke(p0);
    }

    private static final Unit searchEventStream$lambda$9(SortDsl $this$sort) {
        Intrinsics.checkNotNullParameter((Object)$this$sort, (String)"$this$sort");
        $this$sort.asc("version");
        return Unit.INSTANCE;
    }

    private static final ObjectBuilder searchEventStream$lambda$10(AggregateId $aggregateId, Query $query, int $size, List $sort, SearchRequest.Builder it) {
        return (ObjectBuilder)it.index(IndexNameConverter.INSTANCE.toEventStreamIndexName((NamedAggregate)$aggregateId), new String[0]).query($query).size(Integer.valueOf($size)).routing($aggregateId.getId()).sort($sort);
    }

    /*
     * WARNING - void declaration
     */
    private static final List searchEventStream$lambda$12(ResponseBody it) {
        void $this$mapTo$iv$iv;
        List list = it.hits().hits();
        Intrinsics.checkNotNullExpressionValue((Object)list, (String)"hits(...)");
        Iterable $this$map$iv = list;
        boolean $i$f$map = false;
        Iterable iterable = $this$map$iv;
        Collection destination$iv$iv = new ArrayList(CollectionsKt.collectionSizeOrDefault((Iterable)$this$map$iv, (int)10));
        boolean $i$f$mapTo = false;
        for (Object item$iv$iv : $this$mapTo$iv$iv) {
            void hit;
            Hit hit2 = (Hit)item$iv$iv;
            Collection collection = destination$iv$iv;
            boolean bl = false;
            Object object = hit.source();
            Intrinsics.checkNotNull((Object)object, (String)"null cannot be cast to non-null type me.ahoo.wow.event.DomainEventStream");
            collection.add((DomainEventStream)object);
        }
        return (List)destination$iv$iv;
    }

    private static final List searchEventStream$lambda$13(Function1 $tmp0, Object p0) {
        return (List)$tmp0.invoke(p0);
    }

    @Metadata(mv={2, 1, 0}, k=1, xi=48, d1={"\u0000\u0014\n\u0002\u0018\u0002\n\u0002\u0010\u0000\n\u0002\b\u0003\n\u0002\u0010\b\n\u0002\b\u0002\b\u0086\u0003\u0018\u00002\u00020\u0001B\t\b\u0002\u00a2\u0006\u0004\b\u0002\u0010\u0003R\u000e\u0010\u0004\u001a\u00020\u0005X\u0082T\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u0006\u001a\u00020\u0005X\u0082T\u00a2\u0006\u0002\n\u0000\u00a8\u0006\u0007"}, d2={"Lme/ahoo/wow/elasticsearch/eventsourcing/ElasticsearchEventStore$Companion;", "", "<init>", "()V", "VERSION_CONFLICT_CODE", "", "DEFAULT_BATCH_SIZE", "wow-elasticsearch"})
    public static final class Companion {
        private Companion() {
        }

        public /* synthetic */ Companion(DefaultConstructorMarker $constructor_marker) {
            this();
        }
    }
}

