/*
 * Decompiled with CFR 0.152.
 */
package replication;

import channels.Abort;
import channels.Connection;
import channels.LatentConnection;
import channels.LatentConnection$;
import channels.MessageBuffer;
import com.github.plokhotnyuk.jsoniter_scala.core.JsonValueCodec;
import de.rmgk.delay;
import java.io.Serializable;
import java.lang.invoke.LambdaMetafactory;
import rdts.base.LocalUid;
import rdts.base.Uid;
import rdts.time.Dot;
import rdts.time.Dots;
import rdts.time.Dots$;
import replication.Aead;
import replication.CachedMessage;
import replication.DeltaDissemination$;
import replication.DeltaStorage;
import replication.DeltaTreeContext;
import replication.DiscardingHistory;
import replication.ProtocolMessage;
import replication.ProtocolMessage$Payload$;
import replication.ProtocolMessage$Ping$;
import replication.ProtocolMessage$Pong$;
import replication.ProtocolMessage$Request$;
import replication.ReceivedCachedMessage;
import replication.SentCachedMessage;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.Tuple2$;
import scala.collection.immutable.List;
import scala.collection.immutable.Seq;
import scala.collection.immutable.Set;
import scala.concurrent.ExecutionContext;
import scala.package$;
import scala.runtime.BoxedUnit;
import scala.runtime.ScalaRunTime$;
import scala.runtime.function.JProcedure1;
import scala.util.Failure;
import scala.util.Failure$;
import scala.util.Success;
import scala.util.Success$;
import scala.util.Try;
import scala.util.control.NonFatal$;

public class DeltaDissemination<State> {
    private final LocalUid replicaId;
    private final Function1<State, BoxedUnit> receiveCallback;
    private final boolean immediateForward;
    private final ExecutionContext sendingActor;
    private final Abort globalAbort;
    private final DeltaStorage<State> deltaStorage;
    private final JsonValueCodec<State> x$8;
    private volatile List<Connection<CachedMessage<ProtocolMessage<State>>>> connections;
    private final delay.Callback<Object> printExceptionHandler;
    private final Object lock;
    private final DeltaTreeContext<State> treeContext;

    public static ExecutionContext executeImmediately() {
        return DeltaDissemination$.MODULE$.executeImmediately();
    }

    public static <State, T extends ProtocolMessage<State>> JsonValueCodec<T> pmscodec(JsonValueCodec<State> jsonValueCodec) {
        return DeltaDissemination$.MODULE$.pmscodec(jsonValueCodec);
    }

    public static <State> Option<Aead> $lessinit$greater$default$3() {
        return DeltaDissemination$.MODULE$.$lessinit$greater$default$3();
    }

    public static <State> boolean $lessinit$greater$default$4() {
        return DeltaDissemination$.MODULE$.$lessinit$greater$default$4();
    }

    public static <State> ExecutionContext $lessinit$greater$default$5() {
        return DeltaDissemination$.MODULE$.$lessinit$greater$default$5();
    }

    public static <State> Abort $lessinit$greater$default$6() {
        return DeltaDissemination$.MODULE$.$lessinit$greater$default$6();
    }

    public static <State> DiscardingHistory<State> $lessinit$greater$default$7() {
        return DeltaDissemination$.MODULE$.$lessinit$greater$default$7();
    }

    public DeltaDissemination(LocalUid replicaId, Function1<State, BoxedUnit> receiveCallback, Option<Aead> crypto, boolean immediateForward, ExecutionContext sendingActor, Abort globalAbort, DeltaStorage<State> deltaStorage, JsonValueCodec<State> x$8) {
        this.replicaId = replicaId;
        this.receiveCallback = receiveCallback;
        this.immediateForward = immediateForward;
        this.sendingActor = sendingActor;
        this.globalAbort = globalAbort;
        this.deltaStorage = deltaStorage;
        this.x$8 = x$8;
        this.connections = package$.MODULE$.Nil();
        this.printExceptionHandler = x$1 -> {
            Try try_ = x$1;
            if (try_ instanceof Failure) {
                Throwable ex = ((Failure)try_).exception();
                Predef$.MODULE$.println((Object)"exception during connection activation");
                ex.printStackTrace();
                return;
            }
            if (try_ instanceof Success) {
                return;
            }
            throw new MatchError((Object)try_);
        };
        this.lock = new Object(){};
        this.treeContext = new DeltaTreeContext(replicaId.uid());
    }

    public LocalUid replicaId() {
        return this.replicaId;
    }

    public Abort globalAbort() {
        return this.globalAbort;
    }

    public DeltaStorage<State> deltaStorage() {
        return this.deltaStorage;
    }

    public final LocalUid given_LocalUid() {
        return this.replicaId();
    }

    public LatentConnection<CachedMessage<ProtocolMessage<State>>> cachedMessages(LatentConnection<MessageBuffer> conn) {
        return LatentConnection$.MODULE$.adapt((Function1 & Serializable)mb -> new ReceivedCachedMessage((MessageBuffer)mb, DeltaDissemination$.MODULE$.pmscodec(this.x$8)), (Function1 & Serializable)pm -> pm.messageBuffer(), conn);
    }

    public List<Connection<CachedMessage<ProtocolMessage<State>>>> connections() {
        return this.connections;
    }

    public void connections_$eq(List<Connection<CachedMessage<ProtocolMessage<State>>>> x$1) {
        this.connections = x$1;
    }

    public delay.Callback<Object> debugCallbackAndRemoveCon(Connection<CachedMessage<ProtocolMessage<State>>> con) {
        return x$1 -> {
            Try try_ = x$1;
            if (try_ instanceof Success) {
                Object value = ((Success)try_).value();
                return;
            }
            if (try_ instanceof Failure) {
                Throwable exception = ((Failure)try_).exception();
                Object object = this.lock();
                synchronized (object) {
                    this.connections_$eq(this.connections().filter((Function1 & Serializable)cc -> {
                        Connection connection = cc;
                        Connection connection2 = con;
                        return connection == null ? connection2 != null : !connection.equals(connection2);
                    }));
                }
                Predef$.MODULE$.println((Object)("exception during message handling, removing connection " + con + " from list of connections"));
                exception.printStackTrace();
                return;
            }
            throw new MatchError((Object)try_);
        };
    }

    public void requestData() {
        SentCachedMessage<ProtocolMessage.Request> msg = new SentCachedMessage<ProtocolMessage.Request>(ProtocolMessage$Request$.MODULE$.apply(this.replicaId().uid(), this.treeContext().getSelfKnowledge()), DeltaDissemination$.MODULE$.pmscodec(this.x$8));
        this.connections().foreach((Function1)(JProcedure1 & Serializable)con -> this.send((Connection<CachedMessage<ProtocolMessage<State>>>)con, (CachedMessage<ProtocolMessage<State>>)msg));
    }

    public void pingAll() {
        SentCachedMessage<ProtocolMessage.Ping> msg = new SentCachedMessage<ProtocolMessage.Ping>(ProtocolMessage$Ping$.MODULE$.apply(System.nanoTime()), DeltaDissemination$.MODULE$.pmscodec(this.x$8));
        this.connections().foreach((Function1)(JProcedure1 & Serializable)conn -> this.send((Connection<CachedMessage<ProtocolMessage<State>>>)conn, (CachedMessage<ProtocolMessage<State>>)msg));
    }

    public delay.Callback<Object> printExceptionHandler() {
        return this.printExceptionHandler;
    }

    public void addBinaryConnection(LatentConnection<MessageBuffer> latentConnection) {
        ((Function1)this.prepareBinaryConnection(latentConnection).handleInCtx().apply((Object)BoxedUnit.UNIT)).apply(this.printExceptionHandler());
    }

    public void addObjectConnection(LatentConnection<ProtocolMessage<State>> latentConnection) {
        ((Function1)this.prepareObjectConnection(latentConnection).handleInCtx().apply((Object)BoxedUnit.UNIT)).apply(this.printExceptionHandler());
    }

    public delay.Async<Object, BoxedUnit> prepareBinaryConnection(LatentConnection<MessageBuffer> latentConnection) {
        return this.prepareLatentConnection(this.cachedMessages(latentConnection));
    }

    public delay.Async<Object, BoxedUnit> prepareObjectConnection(LatentConnection<ProtocolMessage<State>> latentConnection) {
        return this.prepareLatentConnection(LatentConnection$.MODULE$.adapt((Function1 & Serializable)pm -> new SentCachedMessage<ProtocolMessage>((ProtocolMessage)pm, (JsonValueCodec<ProtocolMessage>)DeltaDissemination$.MODULE$.pmscodec(this.x$8)), (Function1 & Serializable)cm -> (ProtocolMessage)cm.payload(), latentConnection));
    }

    public delay.Async<Object, BoxedUnit> prepareLatentConnection(LatentConnection<CachedMessage<ProtocolMessage<State>>> latentConnection) {
        delay.Async<Abort, Connection<CachedMessage<ProtocolMessage<State>>>> preparedConnection = latentConnection.prepare(from -> x$1 -> {
            Try try_ = x$1;
            if (try_ instanceof Success) {
                CachedMessage msg = (CachedMessage)((Success)try_).value();
                this.handleMessage(msg, from);
                return;
            }
            if (try_ instanceof Failure) {
                Throwable error = ((Failure)try_).exception();
                Predef$.MODULE$.println((Object)"exception during message handling");
                error.printStackTrace();
                return;
            }
            throw new MatchError((Object)try_);
        });
        return new delay.Async((Function1 & Serializable)ctx -> (JProcedure1 & Serializable)cb -> {
            Abort abort = this.globalAbort();
            try {
                delay.Async async = new delay.Async((Function1 & Serializable)ctx -> (JProcedure1 & Serializable)cb -> ((Function1)preparedConnection.handleInCtx().apply(ctx)).apply(arg_0 -> this.$anonfun$2$$anonfun$1$$anonfun$1(cb, arg_0)));
                ((Function1)async.handleInCtx().apply((Object)abort)).apply(cb);
            }
            catch (Throwable throwable) {
                Throwable throwable2;
                Throwable e = throwable2 = throwable;
                if (NonFatal$.MODULE$.apply(e)) {
                    delay.Callback Callback_this = cb;
                    Callback_this.complete((Try)Failure$.MODULE$.apply(e));
                }
                throw throwable;
            }
        });
    }

    private void sendInitialSyncRequest(Connection<CachedMessage<ProtocolMessage<State>>> conn) {
        this.send(conn, new SentCachedMessage<ProtocolMessage<State>>(ProtocolMessage$Request$.MODULE$.apply(this.replicaId().uid(), this.treeContext().getSelfKnowledge()), DeltaDissemination$.MODULE$.pmscodec(this.x$8)));
    }

    public Object lock() {
        return this.lock;
    }

    public DeltaTreeContext<State> treeContext() {
        return this.treeContext;
    }

    public void applyDelta(State delta) {
        SentCachedMessage<ProtocolMessage.Payload<State>> sentCachedMessage;
        Object object = this.lock();
        synchronized (object) {
            Dot nextDot = this.treeContext().getNextDot();
            ProtocolMessage.Payload<State> payload = ProtocolMessage$Payload$.MODULE$.apply(this.replicaId().uid(), Dots$.MODULE$.single(nextDot), delta, this.treeContext().getSelfKnowledge());
            SentCachedMessage<ProtocolMessage.Payload<State>> message = new SentCachedMessage<ProtocolMessage.Payload<State>>(payload, DeltaDissemination$.MODULE$.pmscodec(this.x$8));
            this.treeContext().storeOutgoingMessage(nextDot, message);
            sentCachedMessage = message;
        }
        SentCachedMessage<ProtocolMessage.Payload<State>> message = sentCachedMessage;
        this.disseminatePayload(message, this.disseminatePayload$default$2());
    }

    public List<CachedMessage<ProtocolMessage.Payload<State>>> allPayloads() {
        List<CachedMessage<ProtocolMessage.Payload<State>>> list;
        Object object = this.lock();
        synchronized (object) {
            list = this.treeContext().getAllPayloads();
        }
        return list;
    }

    /*
     * Unable to fully structure code
     */
    public void handleMessage(CachedMessage<ProtocolMessage<State>> msg, Connection<CachedMessage<ProtocolMessage<State>>> from) {
        block15: {
            block14: {
                if (this.globalAbort().closeRequest()) {
                    return;
                }
                var3_3 = msg.payload();
                if (var3_3 instanceof ProtocolMessage.Ping) {
                    var4_4 = ProtocolMessage$Ping$.MODULE$.unapply((ProtocolMessage.Ping)var3_3);
                    time = var5_5 = var4_4._1();
                    this.send(from, new SentCachedMessage<ProtocolMessage<State>>(ProtocolMessage$Pong$.MODULE$.apply(time), DeltaDissemination$.MODULE$.pmscodec(this.x$8)));
                    return;
                }
                if (var3_3 instanceof ProtocolMessage.Pong) {
                    var9_7 = ProtocolMessage$Pong$.MODULE$.unapply((ProtocolMessage.Pong)var3_3);
                    time = var10_8 = var9_7._1();
                    return;
                }
                if (var3_3 instanceof ProtocolMessage.Request) {
                    var14_10 = ProtocolMessage$Request$.MODULE$.unapply((ProtocolMessage.Request)var3_3);
                    var15_11 = var14_10._1();
                    var16_12 = var14_10._2();
                    uid = var15_11;
                    knows = var16_12;
                    var21_15 = this.lock();
                    synchronized (var21_15) {
                        unknownDots = this.treeContext().getUnknownDotsForPeer(uid, knows);
                        payloads = this.treeContext().getPayloads(unknownDots);
                        var22_18 = Tuple2$.MODULE$.apply(payloads, (Object)unknownDots);
                    }
                    var20_19 = var22_18;
                    relevant = (List)var20_19._1();
                    context = (Dots)var20_19._2();
                    var19_22 = Tuple2$.MODULE$.apply((Object)relevant, (Object)context);
                    relevant = (List)var19_22._1();
                    context = (Dots)var19_22._2();
                    relevant.foreach((Function1)(JProcedure1 & Serializable)LambdaMetafactory.altMetafactory(null, null, null, (Ljava/lang/Object;)V, handleMessage$$anonfun$1(rdts.base.Uid channels.Connection replication.CachedMessage ), (Lreplication/CachedMessage;)V)((DeltaDissemination)this, (Uid)uid, from));
                    return;
                }
                if (!(var3_3 instanceof ProtocolMessage.Payload)) break block15;
                var29_25 = (ProtocolMessage.Payload)var3_3;
                var30_26 = ProtocolMessage$Payload$.MODULE$.unapply(var29_25);
                var31_27 = var30_26._1();
                var32_28 = var30_26._2();
                var33_29 = var30_26._3();
                var34_30 = var30_26._4();
                var35_31 = var30_26._5();
                uid = var31_27;
                dots = var32_28;
                data = var33_29;
                causalPredecessors = var34_30;
                lastKnownDots = var35_31;
                payload = var29_25;
                var42_38 = this.lock();
                synchronized (var42_38) {
                    block13: {
                        block17: {
                            block16: {
                                if (uid.size() != 1) break block16;
                                v1 = uid.head();
                                var43_39 = this.replicaId().uid();
                                if (!(v1 == null ? var43_39 != null : v1.equals(var43_39) == false)) break block16;
                                this.treeContext().updateKnowledgeOfPeer((Uid)uid.head(), lastKnownDots);
                                break block17;
                            }
                            if (uid.size() != 1) ** GOTO lbl-1000
                            v2 = uid.head();
                            var44_40 = this.replicaId().uid();
                            if (!(v2 != null ? v2.equals(var44_40) == false : var44_40 != null)) {
                                Predef$.MODULE$.println((Object)("cannot update knowledge of peer, received message from self: " + uid));
                            } else lbl-1000:
                            // 2 sources

                            {
                                Predef$.MODULE$.println((Object)("cannot update knowledge of peer, received message from ambiguous peers: " + uid));
                            }
                        }
                        nonRedundantDots = this.treeContext().addNonRedundant(dots, causalPredecessors);
                        if (nonRedundantDots.isEmpty()) break block13;
                        this.treeContext().storeMessage(dots, msg);
                        break block14;
                    }
                    return;
                }
            }
            this.receiveCallback.apply(data);
            if (this.immediateForward) {
                this.disseminatePayload(msg, (Set)Predef$.MODULE$.Set().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Connection[]{from})));
                return;
            }
            return;
        }
        throw new MatchError(var3_3);
    }

    public void send(Connection<CachedMessage<ProtocolMessage<State>>> con, CachedMessage<ProtocolMessage<State>> payload) {
        if (this.globalAbort().closeRequest()) {
            return;
        }
        this.sendingActor.execute(() -> ((Function1)con.send(payload).handleInCtx().apply((Object)BoxedUnit.UNIT)).apply(this.debugCallbackAndRemoveCon(con)));
    }

    public void disseminate(CachedMessage<ProtocolMessage<State>> payload, Set<Connection<CachedMessage<ProtocolMessage<State>>>> except) {
        List<Connection<CachedMessage<ProtocolMessage<State>>>> list;
        Object object = this.lock();
        synchronized (object) {
            list = this.connections();
        }
        List<Connection<CachedMessage<ProtocolMessage<State>>>> cons = list;
        cons.filterNot((Function1 & Serializable)con -> except.contains(con)).foreach((Function1)(JProcedure1 & Serializable)con -> this.send((Connection<CachedMessage<ProtocolMessage<State>>>)con, payload));
    }

    public Set<Connection<CachedMessage<ProtocolMessage<State>>>> disseminate$default$2() {
        return Predef$.MODULE$.Set().empty();
    }

    public void disseminatePayload(CachedMessage<ProtocolMessage.Payload<State>> payload, Set<Connection<CachedMessage<ProtocolMessage<State>>>> except) {
        List<Connection<CachedMessage<ProtocolMessage<State>>>> list;
        Object object = this.lock();
        synchronized (object) {
            list = this.connections();
        }
        List<Connection<CachedMessage<ProtocolMessage<State>>>> cons = list;
        cons.filterNot((Function1 & Serializable)con -> except.contains(con)).foreach((Function1)(JProcedure1 & Serializable)con -> {
            CachedMessage<ProtocolMessage<State>> cachedMessage;
            Option<Uid> option = con.authenticatedPeerReplicaId();
            if (option instanceof Some) {
                Uid receiver = (Uid)((Some)option).value();
                cachedMessage = this.augmentPayloadWithLastKnownDot((ProtocolMessage.Payload)payload.payload(), receiver);
            } else {
                cachedMessage = payload;
            }
            CachedMessage<ProtocolMessage<State>> p = cachedMessage;
            this.send((Connection<CachedMessage<ProtocolMessage<State>>>)con, p);
        });
    }

    public Set<Connection<CachedMessage<ProtocolMessage<State>>>> disseminatePayload$default$2() {
        return Predef$.MODULE$.Set().empty();
    }

    private CachedMessage<ProtocolMessage<State>> augmentPayloadWithLastKnownDot(ProtocolMessage.Payload<State> payload, Uid receiver) {
        ProtocolMessage.Payload<State> payload2;
        Option<Dot> option = this.treeContext().getLeaf(receiver);
        if (option instanceof Some) {
            Dot leaf = (Dot)((Some)option).value();
            payload2 = payload.addLastKnownDot(leaf);
        } else if (None$.MODULE$.equals(option)) {
            payload2 = payload;
        } else {
            throw new MatchError(option);
        }
        return new SentCachedMessage<ProtocolMessage<State>>(payload2, DeltaDissemination$.MODULE$.pmscodec(this.x$8));
    }

    /*
     * Unable to fully structure code
     */
    private final /* synthetic */ void $anonfun$2$$anonfun$1$$anonfun$1(delay.Callback cb$1, Try res) {
        try {
            conn = var3_3 = (Connection)res.get();
            var5_5 = this.lock();
            synchronized (var5_5) {
                this.connections_$eq(this.connections().$colon$colon((Object)conn));
            }
            Callback_this = cb$1;
            this.sendInitialSyncRequest(conn);
            value$proxy1 = BoxedUnit.UNIT;
            Callback_this.complete((Try)Success$.MODULE$.apply((Object)BoxedUnit.UNIT));
        }
        catch (Throwable var8_8) {
            block7: {
                e = var9_9 = var8_8;
                if (!NonFatal$.MODULE$.apply(e)) break block7;
                var11_11 = res;
                if (!(var11_11 instanceof Failure)) ** GOTO lbl-1000
                v1 = exception = ((Failure)var11_11).exception();
                var13_13 = e;
                if (v1 == null ? var13_13 != null : v1.equals(var13_13) == false) {
                    e.addSuppressed(exception);
                    v2 = BoxedUnit.UNIT;
                } else lbl-1000:
                // 2 sources

                {
                    v2 = other = var11_11;
                }
                Callback_this = cb$1;
                Callback_this.complete((Try)Failure$.MODULE$.apply(e));
            }
            throw var8_8;
        }
    }

    private final /* synthetic */ void handleMessage$$anonfun$1(Uid uid$1, Connection from$2, CachedMessage msg) {
        CachedMessage<ProtocolMessage<State>> newMsg = this.augmentPayloadWithLastKnownDot(((ProtocolMessage.Payload)msg.payload()).addSender(this.replicaId().uid()), uid$1);
        this.send(from$2, newMsg);
    }
}

