/*
 * Decompiled with CFR 0.152.
 */
package me.ahoo.wow.r2dbc;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.R2dbcDataIntegrityViolationException;
import io.r2dbc.spi.Readable;
import io.r2dbc.spi.Result;
import kotlin.Metadata;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.internal.Intrinsics;
import kotlin.text.StringsKt;
import me.ahoo.wow.api.modeling.AggregateId;
import me.ahoo.wow.command.DuplicateRequestIdException;
import me.ahoo.wow.event.DomainEventStream;
import me.ahoo.wow.eventsourcing.AbstractEventStore;
import me.ahoo.wow.eventsourcing.EventVersionConflictException;
import me.ahoo.wow.r2dbc.Database;
import me.ahoo.wow.r2dbc.EventStreamSchema;
import me.ahoo.wow.r2dbc.R2dbcEventStore;
import me.ahoo.wow.serialization.JsonSerializer;
import me.ahoo.wow.serialization.JsonSerializerKt;
import me.ahoo.wow.serialization.event.EventStreamRecord;
import me.ahoo.wow.serialization.event.EventStreamRecordKt;
import me.ahoo.wow.serialization.event.FlatEventStreamRecord;
import org.jetbrains.annotations.NotNull;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Metadata(mv={2, 0, 0}, k=1, xi=48, d1={"\u0000<\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0010\b\n\u0002\b\u0002\u0018\u00002\u00020\u0001B\u0017\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\u0006\u0010\u0004\u001a\u00020\u0005\u00a2\u0006\u0004\b\u0006\u0010\u0007J\u0016\u0010\b\u001a\b\u0012\u0004\u0012\u00020\n0\t2\u0006\u0010\u000b\u001a\u00020\fH\u0016J&\u0010\r\u001a\b\u0012\u0004\u0012\u00020\f0\u000e2\u0006\u0010\u000f\u001a\u00020\u00102\u0006\u0010\u0011\u001a\u00020\u00122\u0006\u0010\u0013\u001a\u00020\u0012H\u0016R\u000e\u0010\u0002\u001a\u00020\u0003X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u0004\u001a\u00020\u0005X\u0082\u0004\u00a2\u0006\u0002\n\u0000\u00a8\u0006\u0014"}, d2={"Lme/ahoo/wow/r2dbc/R2dbcEventStore;", "Lme/ahoo/wow/eventsourcing/AbstractEventStore;", "database", "Lme/ahoo/wow/r2dbc/Database;", "eventStreamSchema", "Lme/ahoo/wow/r2dbc/EventStreamSchema;", "<init>", "(Lme/ahoo/wow/r2dbc/Database;Lme/ahoo/wow/r2dbc/EventStreamSchema;)V", "appendStream", "Lreactor/core/publisher/Mono;", "Ljava/lang/Void;", "eventStream", "Lme/ahoo/wow/event/DomainEventStream;", "loadStream", "Lreactor/core/publisher/Flux;", "aggregateId", "Lme/ahoo/wow/api/modeling/AggregateId;", "headVersion", "", "tailVersion", "wow-r2dbc"})
public final class R2dbcEventStore
extends AbstractEventStore {
    @NotNull
    private final Database database;
    @NotNull
    private final EventStreamSchema eventStreamSchema;

    public R2dbcEventStore(@NotNull Database database, @NotNull EventStreamSchema eventStreamSchema) {
        Intrinsics.checkNotNullParameter((Object)database, (String)"database");
        Intrinsics.checkNotNullParameter((Object)eventStreamSchema, (String)"eventStreamSchema");
        this.database = database;
        this.eventStreamSchema = eventStreamSchema;
    }

    @NotNull
    public Mono<Void> appendStream(@NotNull DomainEventStream eventStream) {
        Intrinsics.checkNotNullParameter((Object)eventStream, (String)"eventStream");
        Mono mono = Flux.usingWhen(this.database.createConnection(eventStream.getAggregateId()), arg_0 -> R2dbcEventStore.appendStream$lambda$0(new Function1(eventStream, this){
            final /* synthetic */ DomainEventStream $eventStream;
            final /* synthetic */ R2dbcEventStore this$0;
            {
                this.$eventStream = $eventStream;
                this.this$0 = $receiver;
            }

            public final Publisher<? extends Result> invoke(Connection it) {
                JsonNode jsonNode = JsonSerializer.INSTANCE.valueToTree((Object)this.$eventStream);
                Intrinsics.checkNotNullExpressionValue((Object)jsonNode, (String)"valueToTree(...)");
                EventStreamRecord eventStreamRecord = EventStreamRecordKt.toEventStreamRecord((ObjectNode)((ObjectNode)jsonNode));
                return it.createStatement(R2dbcEventStore.access$getEventStreamSchema$p(this.this$0).append(this.$eventStream.getAggregateId())).bind(0, (Object)eventStreamRecord.getId()).bind(1, (Object)this.$eventStream.getAggregateId().getId()).bind(2, (Object)this.$eventStream.getAggregateId().getTenantId()).bind(3, (Object)eventStreamRecord.getRequestId()).bind(4, (Object)eventStreamRecord.getCommandId()).bind(5, (Object)eventStreamRecord.getVersion()).bind(6, (Object)JsonSerializerKt.toJsonString((Object)eventStreamRecord.getHeader())).bind(7, (Object)JsonSerializerKt.toJsonString((Object)eventStreamRecord.getBody())).bind(8, (Object)this.$eventStream.getSize()).bind(9, (Object)eventStreamRecord.getCreateTime()).execute();
            }
        }, arg_0), arg_0 -> R2dbcEventStore.appendStream$lambda$1(appendStream.2.INSTANCE, arg_0)).flatMap(arg_0 -> R2dbcEventStore.appendStream$lambda$3(R2dbcEventStore::appendStream$lambda$2, arg_0)).onErrorMap(R2dbcDataIntegrityViolationException.class, arg_0 -> R2dbcEventStore.appendStream$lambda$5(arg_0 -> R2dbcEventStore.appendStream$lambda$4(this, eventStream, arg_0), arg_0)).then();
        Intrinsics.checkNotNullExpressionValue((Object)mono, (String)"then(...)");
        return mono;
    }

    @NotNull
    public Flux<DomainEventStream> loadStream(@NotNull AggregateId aggregateId, int headVersion, int tailVersion) {
        Intrinsics.checkNotNullParameter((Object)aggregateId, (String)"aggregateId");
        Flux flux = Flux.usingWhen(this.database.createConnection(aggregateId), arg_0 -> R2dbcEventStore.loadStream$lambda$6(new Function1(this, aggregateId, headVersion, tailVersion){
            final /* synthetic */ R2dbcEventStore this$0;
            final /* synthetic */ AggregateId $aggregateId;
            final /* synthetic */ int $headVersion;
            final /* synthetic */ int $tailVersion;
            {
                this.this$0 = $receiver;
                this.$aggregateId = $aggregateId;
                this.$headVersion = $headVersion;
                this.$tailVersion = $tailVersion;
            }

            public final Publisher<? extends Result> invoke(Connection it) {
                return it.createStatement(R2dbcEventStore.access$getEventStreamSchema$p(this.this$0).load(this.$aggregateId)).bind(0, (Object)this.$aggregateId.getId()).bind(1, (Object)this.$headVersion).bind(2, (Object)this.$tailVersion).execute();
            }
        }, arg_0), arg_0 -> R2dbcEventStore.loadStream$lambda$7(loadStream.2.INSTANCE, arg_0)).flatMap(arg_0 -> R2dbcEventStore.loadStream$lambda$12(arg_0 -> R2dbcEventStore.loadStream$lambda$11(aggregateId, arg_0), arg_0));
        Intrinsics.checkNotNullExpressionValue((Object)flux, (String)"flatMap(...)");
        return flux;
    }

    private static final Publisher appendStream$lambda$0(Function1 $tmp0, Object p0) {
        return (Publisher)$tmp0.invoke(p0);
    }

    private static final Publisher appendStream$lambda$1(Function1 $tmp0, Object p0) {
        return (Publisher)$tmp0.invoke(p0);
    }

    private static final Publisher appendStream$lambda$2(Result it) {
        return it.getRowsUpdated();
    }

    private static final Publisher appendStream$lambda$3(Function1 $tmp0, Object p0) {
        return (Publisher)$tmp0.invoke(p0);
    }

    private static final Throwable appendStream$lambda$4(R2dbcEventStore this$0, DomainEventStream $eventStream, R2dbcDataIntegrityViolationException it) {
        Throwable throwable;
        String string = it.getMessage();
        Intrinsics.checkNotNull((Object)string);
        if (StringsKt.contains$default((CharSequence)string, (CharSequence)this$0.eventStreamSchema.getAggregateIdVersionUniqueIndexName(), (boolean)false, (int)2, null)) {
            throwable = (Throwable)new EventVersionConflictException($eventStream, null, (Throwable)it, 2, null);
        } else {
            String string2 = it.getMessage();
            Intrinsics.checkNotNull((Object)string2);
            throwable = StringsKt.contains$default((CharSequence)string2, (CharSequence)this$0.eventStreamSchema.getRequestIdUniqueIndexName(), (boolean)false, (int)2, null) ? (Throwable)new DuplicateRequestIdException($eventStream.getAggregateId(), $eventStream.getRequestId(), null, (Throwable)it, 4, null) : (Throwable)it;
        }
        return throwable;
    }

    private static final Throwable appendStream$lambda$5(Function1 $tmp0, Object p0) {
        return (Throwable)$tmp0.invoke(p0);
    }

    private static final Publisher loadStream$lambda$6(Function1 $tmp0, Object p0) {
        return (Publisher)$tmp0.invoke(p0);
    }

    private static final Publisher loadStream$lambda$7(Function1 $tmp0, Object p0) {
        return (Publisher)$tmp0.invoke(p0);
    }

    private static final DomainEventStream loadStream$lambda$11$lambda$9(AggregateId $aggregateId, Readable readable) {
        String actualAggregateId = (String)readable.get("aggregate_id", String.class);
        if (!Intrinsics.areEqual((Object)$aggregateId.getId(), (Object)actualAggregateId)) {
            String string = "Failed requirement.";
            throw new IllegalArgumentException(string.toString());
        }
        Object object = readable.get("id", String.class);
        if (object == null) {
            String string = "Required value was null.";
            throw new IllegalStateException(string.toString());
        }
        String id = (String)object;
        Object object2 = readable.get("request_id", String.class);
        if (object2 == null) {
            String string = "Required value was null.";
            throw new IllegalStateException(string.toString());
        }
        String requestId = (String)object2;
        Object object3 = readable.get("tenant_id", String.class);
        if (object3 == null) {
            String string = "Required value was null.";
            throw new IllegalStateException(string.toString());
        }
        String tenantId = (String)object3;
        if (!Intrinsics.areEqual((Object)tenantId, (Object)$aggregateId.getTenantId())) {
            boolean $i$a$-require-R2dbcEventStore$loadStream$3$1$22 = false;
            String $i$a$-require-R2dbcEventStore$loadStream$3$1$22 = "The aggregated tenantId[" + $aggregateId.getTenantId() + "] does not match the tenantId:[" + tenantId + "] stored in the eventStore";
            throw new IllegalArgumentException($i$a$-require-R2dbcEventStore$loadStream$3$1$22.toString());
        }
        Object object4 = readable.get("command_id", String.class);
        if (object4 == null) {
            String string = "Required value was null.";
            throw new IllegalStateException(string.toString());
        }
        String commandId = (String)object4;
        Object object5 = readable.get("version", Integer.TYPE);
        if (object5 == null) {
            String string = "Required value was null.";
            throw new IllegalStateException(string.toString());
        }
        int version = ((Number)object5).intValue();
        Object object6 = readable.get("header", String.class);
        if (object6 == null) {
            String string = "Required value was null.";
            throw new IllegalStateException(string.toString());
        }
        String header = (String)object6;
        Object object7 = readable.get("body", String.class);
        if (object7 == null) {
            String string = "Required value was null.";
            throw new IllegalStateException(string.toString());
        }
        String body = (String)object7;
        Object object8 = readable.get("create_time", Long.TYPE);
        if (object8 == null) {
            String string = "Required value was null.";
            throw new IllegalStateException(string.toString());
        }
        long createTime = ((Number)object8).longValue();
        JsonNode jsonNode = JsonSerializerKt.toJsonNode((String)header);
        Intrinsics.checkNotNull((Object)jsonNode, (String)"null cannot be cast to non-null type com.fasterxml.jackson.databind.node.ObjectNode");
        ObjectNode objectNode = (ObjectNode)jsonNode;
        JsonNode jsonNode2 = JsonSerializerKt.toJsonNode((String)body);
        return new FlatEventStreamRecord(id, $aggregateId, objectNode, version, commandId, requestId, jsonNode2, createTime).toDomainEventStream();
    }

    private static final DomainEventStream loadStream$lambda$11$lambda$10(Function1 $tmp0, Object p0) {
        return (DomainEventStream)$tmp0.invoke(p0);
    }

    private static final Publisher loadStream$lambda$11(AggregateId $aggregateId, Result it) {
        return it.map(arg_0 -> R2dbcEventStore.loadStream$lambda$11$lambda$10(arg_0 -> R2dbcEventStore.loadStream$lambda$11$lambda$9($aggregateId, arg_0), arg_0));
    }

    private static final Publisher loadStream$lambda$12(Function1 $tmp0, Object p0) {
        return (Publisher)$tmp0.invoke(p0);
    }

    public static final /* synthetic */ EventStreamSchema access$getEventStreamSchema$p(R2dbcEventStore $this) {
        return $this.eventStreamSchema;
    }
}

