/*
 * Decompiled with CFR 0.152.
 */
package net.glorat.ledger;

import cakesolutions.kafka.KafkaProducer;
import cakesolutions.kafka.KafkaProducer$;
import cakesolutions.kafka.KafkaProducerRecord;
import cakesolutions.kafka.KafkaProducerRecord$;
import cakesolutions.kafka.KafkaSerializer$;
import java.io.Serializable;
import java.util.UUID;
import net.glorat.cqrs.AggregateRoot;
import net.glorat.cqrs.CommittedEvent;
import net.glorat.cqrs.DomainEvent;
import net.glorat.cqrs.EventStreamReceiver;
import net.glorat.cqrs.Repository;
import net.glorat.ledger.BinarySerializer$;
import net.glorat.ledger.ConcurrencyException;
import net.glorat.ledger.EntityView;
import net.glorat.ledger.KafkaEventDispatcher;
import net.glorat.ledger.Logging;
import org.slf4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.Option;
import scala.Some;
import scala.collection.Iterable;
import scala.collection.Iterable$;
import scala.collection.Seq;
import scala.collection.TraversableOnce;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.reflect.ClassTag;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;

@ScalaSignature(bytes="\u0006\u0001\u0005Ee\u0001\u0002\n\u0014\u0001iA\u0001b\u000b\u0001\u0003\u0002\u0003\u0006I\u0001\f\u0005\to\u0001\u0011\t\u0011)A\u0005Y!A\u0001\b\u0001B\u0001B\u0003%\u0011\b\u0003\u0005P\u0001\t\u0005\t\u0015!\u0003Q\u0011!9\u0006A!A!\u0002\u0017A\u0006\"\u00020\u0001\t\u0003y\u0006bB4\u0001\u0005\u0004%\t\u0001\u001b\u0005\u0007Y\u0002\u0001\u000b\u0011B5\t\u000f5\u0004!\u0019!C\u0001]\"1!\u000f\u0001Q\u0001\n=DQa\u001d\u0001\u0005BQDaa \u0001\u0005\u0002\u0005\u0005\u0001bBA\u0019\u0001\u0011\u0005\u00131\u0007\u0005\n\u0003\u000f\u0002!\u0019!C\u0005\u0003\u0013B\u0001\"!\u0017\u0001A\u0003%\u00111\n\u0005\u000b\u00037\u0002\u0001R1A\u0005\u0002\u0005u\u0003bBA@\u0001\u0011\u0005\u0011\u0011\u0011\u0002\f\u0017\u000647.\u0019'fI\u001e,'O\u0003\u0002\u0015+\u00051A.\u001a3hKJT!AF\f\u0002\r\u001ddwN]1u\u0015\u0005A\u0012a\u00018fi\u000e\u00011\u0003\u0002\u0001\u001cC\u001d\u0002\"\u0001H\u0010\u000e\u0003uQ\u0011AH\u0001\u0006g\u000e\fG.Y\u0005\u0003Au\u0011a!\u00118z%\u00164\u0007C\u0001\u0012&\u001b\u0005\u0019#B\u0001\u0013\u0016\u0003\u0011\u0019\u0017O]:\n\u0005\u0019\u001a#A\u0003*fa>\u001c\u0018\u000e^8ssB\u0011\u0001&K\u0007\u0002'%\u0011!f\u0005\u0002\b\u0019><w-\u001b8h\u0003\u001d\u0019XM\u001d<feN\u0004\"!\f\u001b\u000f\u00059\u0012\u0004CA\u0018\u001e\u001b\u0005\u0001$BA\u0019\u001a\u0003\u0019a$o\\8u}%\u00111'H\u0001\u0007!J,G-\u001a4\n\u0005U2$AB*ue&twM\u0003\u00024;\u0005)Ao\u001c9jG\u0006\u00012\u000f\u001e:fC6$vNU3wSNLwN\u001c\t\u00049ib\u0014BA\u001e\u001e\u0005\u0019y\u0005\u000f^5p]B!A$P M\u0013\tqTDA\u0005Gk:\u001cG/[8ocA\u0011\u0001)\u0013\b\u0003\u0003\u001es!A\u0011$\u000f\u0005\r+eBA\u0018E\u0013\u0005A\u0012B\u0001\f\u0018\u0013\t!S#\u0003\u0002IG\u00059\u0001/Y2lC\u001e,\u0017B\u0001&L\u0005\u00119U+\u0013#\u000b\u0005!\u001b\u0003C\u0001\u000fN\u0013\tqUDA\u0002J]R\f\u0001B]3hSN$(/\u001f\t\u00059u\nF\u000b\u0005\u0002#%&\u00111k\t\u0002\f\t>l\u0017-\u001b8Fm\u0016tG\u000f\u0005\u0002#+&\u0011ak\t\u0002\u000e\u0003\u001e<'/Z4bi\u0016\u0014vn\u001c;\u0002\u0005\u0015\u001c\u0007CA-]\u001b\u0005Q&BA.\u001e\u0003)\u0019wN\\2veJ,g\u000e^\u0005\u0003;j\u0013\u0001#\u0012=fGV$\u0018n\u001c8D_:$X\r\u001f;\u0002\rqJg.\u001b;?)\u0015\u00017\rZ3g)\t\t'\r\u0005\u0002)\u0001!)qK\u0002a\u00021\")1F\u0002a\u0001Y!)qG\u0002a\u0001Y!)\u0001H\u0002a\u0001s!)qJ\u0002a\u0001!\u0006QQM\u001c;jif4\u0016.Z<\u0016\u0003%\u0004\"\u0001\u000b6\n\u0005-\u001c\"AC#oi&$\u0018PV5fo\u0006YQM\u001c;jif4\u0016.Z<!\u0003!!\u0017n\u001d9bi\u000eDW#A8\u0011\u0005!\u0002\u0018BA9\u0014\u0005QY\u0015MZ6b\u000bZ,g\u000e\u001e#jgB\fGo\u00195fe\u0006IA-[:qCR\u001c\u0007\u000eI\u0001\u0005g\u00064X\rF\u0002vwv\u00042!\u0017<y\u0013\t9(L\u0001\u0004GkR,(/\u001a\t\u00039eL!A_\u000f\u0003\tUs\u0017\u000e\u001e\u0005\u0006y.\u0001\r\u0001V\u0001\nC\u001e<'/Z4bi\u0016DQA`\u0006A\u00021\u000bq\"\u001a=qK\u000e$X\r\u001a,feNLwN\\\u0001\u000bO\u0016$()_%e\u001fB$X\u0003BA\u0002\u0003\u001b!b!!\u0002\u0002*\u00055B\u0003BA\u0004\u00033\u0001B\u0001\b\u001e\u0002\nA!\u00111BA\u0007\u0019\u0001!q!a\u0004\r\u0005\u0004\t\tBA\u0001U#\r\t\u0019\u0002\u0016\t\u00049\u0005U\u0011bAA\f;\t9aj\u001c;iS:<\u0007\"CA\u000e\u0019\u0005\u0005\t9AA\u000f\u0003))g/\u001b3f]\u000e,G%\r\t\u0007\u0003?\t)#!\u0003\u000e\u0005\u0005\u0005\"bAA\u0012;\u00059!/\u001a4mK\u000e$\u0018\u0002BA\u0014\u0003C\u0011\u0001b\u00117bgN$\u0016m\u001a\u0005\u0007\u0003Wa\u0001\u0019A \u0002\u0005%$\u0007bBA\u0018\u0019\u0001\u0007\u0011\u0011B\u0001\u0005i6\u0004H.A\u0004hKR\u0014\u00150\u00133\u0016\t\u0005U\u00121\b\u000b\u0007\u0003o\t\u0019%!\u0012\u0015\t\u0005e\u0012Q\b\t\u0005\u0003\u0017\tY\u0004B\u0004\u0002\u00105\u0011\r!!\u0005\t\u0013\u0005}R\"!AA\u0004\u0005\u0005\u0013AC3wS\u0012,gnY3%eA1\u0011qDA\u0013\u0003sAa!a\u000b\u000e\u0001\u0004y\u0004bBA\u0018\u001b\u0001\u0007\u0011\u0011H\u0001\u0011gR\u0014\u0018N\\4TKJL\u0017\r\\5{KJ,\"!a\u0013\u0011\u000bqiD&!\u0014\u0011\u000bq\ty%a\u0015\n\u0007\u0005ESDA\u0003BeJ\f\u0017\u0010E\u0002\u001d\u0003+J1!a\u0016\u001e\u0005\u0011\u0011\u0015\u0010^3\u0002#M$(/\u001b8h'\u0016\u0014\u0018.\u00197ju\u0016\u0014\b%A\u0006p]\u0016\u0004&o\u001c3vG\u0016\u0014XCAA0!\u001d\t\t'a\u001b-\u0003_j!!a\u0019\u000b\t\u0005\u0015\u0014qM\u0001\u0006W\u000647.\u0019\u0006\u0003\u0003S\nQbY1lKN|G.\u001e;j_:\u001c\u0018\u0002BA7\u0003G\u0012QbS1gW\u0006\u0004&o\u001c3vG\u0016\u0014\b\u0003BA9\u0003wj!!a\u001d\u000b\t\u0005U\u0014qO\u0001\u0005Y\u0006twM\u0003\u0002\u0002z\u0005!!.\u0019<b\u0013\u0011\ti(a\u001d\u0003\r=\u0013'.Z2u\u00039\u0001(o\u001c3vG\u0016\u00148i\u001c8gS\u001e,\"!a!\u0011\u000f\u0005\u0015\u00151\u0012\u0017\u0002p9!\u0011\u0011MAD\u0013\u0011\tI)a\u0019\u0002\u001b-\u000bgm[1Qe>$WoY3s\u0013\u0011\ti)a$\u0003\t\r{gN\u001a\u0006\u0005\u0003\u0013\u000b\u0019\u0007")
public class KafkaLedger
implements Repository,
Logging {
    private KafkaProducer<String, Object> oneProducer;
    private final String servers;
    private final String topic;
    private final Option<Function1<UUID, Object>> streamToRevision;
    private final ExecutionContext ec;
    private final EntityView entityView;
    private final KafkaEventDispatcher dispatch;
    private final Function1<String, byte[]> stringSerializer;
    private Logger log;
    private volatile byte bitmap$0;

    private Logger log$lzycompute() {
        KafkaLedger kafkaLedger = this;
        synchronized (kafkaLedger) {
            if ((byte)(this.bitmap$0 & 2) == 0) {
                this.log = Logging.log$(this);
                this.bitmap$0 = (byte)(this.bitmap$0 | 2);
            }
        }
        return this.log;
    }

    @Override
    public Logger log() {
        return (byte)(this.bitmap$0 & 2) == 0 ? this.log$lzycompute() : this.log;
    }

    public EntityView entityView() {
        return this.entityView;
    }

    public KafkaEventDispatcher dispatch() {
        return this.dispatch;
    }

    @Override
    public Future<BoxedUnit> save(AggregateRoot aggregate, int expectedVersion) {
        int latestVersion;
        if (this.streamToRevision.isDefined() && expectedVersion < (latestVersion = BoxesRunTime.unboxToInt((Object)((Function1)this.streamToRevision.get()).apply((Object)aggregate.id())))) {
            throw new ConcurrencyException(new StringBuilder(58).append("Trying to save aggregate from version ").append(expectedVersion).append(" when ").append(latestVersion).append(" already in DB").toString());
        }
        Iterable<DomainEvent> evs = aggregate.getUncommittedChanges();
        IntRef i = IntRef.create((int)expectedVersion);
        Iterable cevs = (Iterable)evs.map((Function1 & Serializable & scala.Serializable)ev -> {
            ++i$1.elem;
            return new CommittedEvent((DomainEvent)ev, aggregate.id(), i$1.elem);
        }, Iterable$.MODULE$.canBuildFrom());
        this.log().debug(new StringBuilder(30).append("Persisting ").append(cevs.size()).append(" events to Kafka...").toString());
        Iterable futs = (Iterable)cevs.map((Function1 & Serializable & scala.Serializable)cev -> this.oneProducer().send(KafkaProducerRecord$.MODULE$.apply(KafkaProducerRecord.Destination$.MODULE$.apply($this.topic, 0), (Option)new Some((Object)"key"), cev, KafkaProducerRecord$.MODULE$.apply$default$4())), Iterable$.MODULE$.canBuildFrom());
        Future future = Future$.MODULE$.sequence((TraversableOnce)futs, Iterable$.MODULE$.canBuildFrom(), this.ec).map((Function1 & Serializable & scala.Serializable)x -> {
            KafkaLedger.$anonfun$save$3(x);
            return BoxedUnit.UNIT;
        }, this.ec);
        this.oneProducer().flush();
        return this.dispatch().pollEventStream();
    }

    @Override
    public <T extends AggregateRoot> Option<T> getByIdOpt(UUID id, T tmpl, ClassTag<T> evidence$1) {
        return this.entityView().getByIdOpt(id, tmpl, evidence$1);
    }

    @Override
    public <T extends AggregateRoot> T getById(UUID id, T tmpl, ClassTag<T> evidence$2) {
        return (T)((AggregateRoot)this.getByIdOpt(id, tmpl, evidence$2).getOrElse((Function0 & Serializable & scala.Serializable)() -> tmpl));
    }

    private Function1<String, byte[]> stringSerializer() {
        return this.stringSerializer;
    }

    private KafkaProducer<String, Object> oneProducer$lzycompute() {
        KafkaLedger kafkaLedger = this;
        synchronized (kafkaLedger) {
            if ((byte)(this.bitmap$0 & 1) == 0) {
                this.oneProducer = KafkaProducer$.MODULE$.apply(this.producerConfig());
                this.bitmap$0 = (byte)(this.bitmap$0 | 1);
            }
        }
        return this.oneProducer;
    }

    public KafkaProducer<String, Object> oneProducer() {
        return (byte)(this.bitmap$0 & 1) == 0 ? this.oneProducer$lzycompute() : this.oneProducer;
    }

    public KafkaProducer.Conf<String, Object> producerConfig() {
        return KafkaProducer.Conf$.MODULE$.apply(KafkaSerializer$.MODULE$.apply(this.stringSerializer()), KafkaSerializer$.MODULE$.apply(BinarySerializer$.MODULE$.serializer()), this.servers, KafkaProducer.Conf$.MODULE$.apply$default$4(), KafkaProducer.Conf$.MODULE$.apply$default$5(), KafkaProducer.Conf$.MODULE$.apply$default$6(), KafkaProducer.Conf$.MODULE$.apply$default$7(), KafkaProducer.Conf$.MODULE$.apply$default$8(), KafkaProducer.Conf$.MODULE$.apply$default$9(), KafkaProducer.Conf$.MODULE$.apply$default$10());
    }

    public static final /* synthetic */ void $anonfun$save$3(Iterable x) {
    }

    public KafkaLedger(String servers, String topic, Option<Function1<UUID, Object>> streamToRevision, Function1<DomainEvent, AggregateRoot> registry, ExecutionContext ec) {
        this.servers = servers;
        this.topic = topic;
        this.streamToRevision = streamToRevision;
        this.ec = ec;
        Repository.$init$(this);
        Logging.$init$(this);
        this.entityView = new EntityView(registry);
        this.dispatch = new KafkaEventDispatcher(servers, topic, (Seq<EventStreamReceiver>)((Seq)new .colon.colon((Object)this.entityView(), (List)Nil$.MODULE$)), ec);
        this.stringSerializer = (Function1 & Serializable & scala.Serializable)msg -> msg.getBytes();
    }
}

