/*
 * Decompiled with CFR 0.152.
 */
package replication;

import channels.Abort;
import channels.Connection;
import channels.LatentConnection;
import channels.LatentConnection$;
import channels.MessageBuffer;
import com.github.plokhotnyuk.jsoniter_scala.core.JsonValueCodec;
import de.rmgk.delay;
import java.io.Serializable;
import rdts.base.Lattice;
import rdts.base.Lattice$;
import rdts.base.LocalUid;
import rdts.base.Uid;
import rdts.time.Dot;
import rdts.time.Dots;
import rdts.time.Dots$;
import replication.Aead;
import replication.CachedMessage;
import replication.DeltaDissemination$;
import replication.DeltaStorage;
import replication.DiscardingHistory;
import replication.ProtocolMessage;
import replication.ProtocolMessage$Payload$;
import replication.ProtocolMessage$Ping$;
import replication.ProtocolMessage$Pong$;
import replication.ProtocolMessage$Request$;
import replication.ReceivedCachedMessage;
import replication.SentCachedMessage;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.Some$;
import scala.Tuple2;
import scala.Tuple2$;
import scala.collection.immutable.List;
import scala.collection.immutable.Map;
import scala.collection.immutable.Seq;
import scala.collection.immutable.Set;
import scala.concurrent.ExecutionContext;
import scala.package$;
import scala.runtime.BoxedUnit;
import scala.runtime.ScalaRunTime$;
import scala.runtime.function.JProcedure1;
import scala.util.Failure;
import scala.util.Failure$;
import scala.util.Success;
import scala.util.Success$;
import scala.util.Try;
import scala.util.control.NonFatal$;

public class DeltaDissemination<State> {
    private final LocalUid replicaId;
    private final Function1<State, BoxedUnit> receiveCallback;
    private final boolean immediateForward;
    private final ExecutionContext sendingActor;
    private final Abort globalAbort;
    private final DeltaStorage<State> deltaStorage;
    private final JsonValueCodec<State> x$8;
    private volatile List<Connection<CachedMessage<ProtocolMessage<State>>>> connections;
    private final delay.Callback<Object> printExceptionHandler;
    private final Object lock;
    private Map<Uid, Dots> contexts;

    public static ExecutionContext executeImmediately() {
        return DeltaDissemination$.MODULE$.executeImmediately();
    }

    public static <State, T extends ProtocolMessage<State>> JsonValueCodec<T> pmscodec(JsonValueCodec<State> jsonValueCodec) {
        return DeltaDissemination$.MODULE$.pmscodec(jsonValueCodec);
    }

    public static <State> Option<Aead> $lessinit$greater$default$3() {
        return DeltaDissemination$.MODULE$.$lessinit$greater$default$3();
    }

    public static <State> boolean $lessinit$greater$default$4() {
        return DeltaDissemination$.MODULE$.$lessinit$greater$default$4();
    }

    public static <State> ExecutionContext $lessinit$greater$default$5() {
        return DeltaDissemination$.MODULE$.$lessinit$greater$default$5();
    }

    public static <State> Abort $lessinit$greater$default$6() {
        return DeltaDissemination$.MODULE$.$lessinit$greater$default$6();
    }

    public static <State> DiscardingHistory<State> $lessinit$greater$default$7() {
        return DeltaDissemination$.MODULE$.$lessinit$greater$default$7();
    }

    public DeltaDissemination(LocalUid replicaId, Function1<State, BoxedUnit> receiveCallback, Option<Aead> crypto, boolean immediateForward, ExecutionContext sendingActor, Abort globalAbort, DeltaStorage<State> deltaStorage, JsonValueCodec<State> x$8) {
        this.replicaId = replicaId;
        this.receiveCallback = receiveCallback;
        this.immediateForward = immediateForward;
        this.sendingActor = sendingActor;
        this.globalAbort = globalAbort;
        this.deltaStorage = deltaStorage;
        this.x$8 = x$8;
        this.connections = package$.MODULE$.Nil();
        this.printExceptionHandler = x$1 -> {
            Try try_ = x$1;
            if (try_ instanceof Failure) {
                Throwable ex = ((Failure)try_).exception();
                Predef$.MODULE$.println((Object)"exception during connection activation");
                ex.printStackTrace();
                return;
            }
            if (try_ instanceof Success) {
                return;
            }
            throw new MatchError((Object)try_);
        };
        this.lock = new Object(){};
        this.contexts = Predef$.MODULE$.Map().empty();
    }

    public LocalUid replicaId() {
        return this.replicaId;
    }

    public Abort globalAbort() {
        return this.globalAbort;
    }

    public DeltaStorage<State> deltaStorage() {
        return this.deltaStorage;
    }

    public final LocalUid given_LocalUid() {
        return this.replicaId();
    }

    public LatentConnection<CachedMessage<ProtocolMessage<State>>> cachedMessages(LatentConnection<MessageBuffer> conn) {
        return LatentConnection$.MODULE$.adapt((Function1 & Serializable)mb -> new ReceivedCachedMessage((MessageBuffer)mb, DeltaDissemination$.MODULE$.pmscodec(this.x$8)), (Function1 & Serializable)pm -> pm.messageBuffer(), conn);
    }

    public List<Connection<CachedMessage<ProtocolMessage<State>>>> connections() {
        return this.connections;
    }

    public void connections_$eq(List<Connection<CachedMessage<ProtocolMessage<State>>>> x$1) {
        this.connections = x$1;
    }

    public delay.Callback<Object> debugCallbackAndRemoveCon(Connection<CachedMessage<ProtocolMessage<State>>> con) {
        return x$1 -> {
            Try try_ = x$1;
            if (try_ instanceof Success) {
                Object value = ((Success)try_).value();
                return;
            }
            if (try_ instanceof Failure) {
                Throwable exception = ((Failure)try_).exception();
                Object object = this.lock();
                synchronized (object) {
                    this.connections_$eq(this.connections().filter((Function1 & Serializable)cc -> {
                        Connection connection = cc;
                        Connection connection2 = con;
                        return connection == null ? connection2 != null : !connection.equals(connection2);
                    }));
                }
                Predef$.MODULE$.println((Object)("exception during message handling, removing connection " + con + " from list of connections"));
                exception.printStackTrace();
                return;
            }
            throw new MatchError((Object)try_);
        };
    }

    public void requestData() {
        SentCachedMessage<ProtocolMessage.Request> msg = new SentCachedMessage<ProtocolMessage.Request>(ProtocolMessage$Request$.MODULE$.apply(this.replicaId().uid(), this.selfContext()), DeltaDissemination$.MODULE$.pmscodec(this.x$8));
        this.connections().foreach((Function1)(JProcedure1 & Serializable)con -> this.send((Connection<CachedMessage<ProtocolMessage<State>>>)con, (CachedMessage<ProtocolMessage<State>>)msg));
    }

    public void pingAll() {
        SentCachedMessage<ProtocolMessage.Ping> msg = new SentCachedMessage<ProtocolMessage.Ping>(ProtocolMessage$Ping$.MODULE$.apply(System.nanoTime()), DeltaDissemination$.MODULE$.pmscodec(this.x$8));
        this.connections().foreach((Function1)(JProcedure1 & Serializable)conn -> this.send((Connection<CachedMessage<ProtocolMessage<State>>>)conn, (CachedMessage<ProtocolMessage<State>>)msg));
    }

    public delay.Callback<Object> printExceptionHandler() {
        return this.printExceptionHandler;
    }

    public void addBinaryConnection(LatentConnection<MessageBuffer> latentConnection) {
        ((Function1)this.prepareBinaryConnection(latentConnection).handleInCtx().apply((Object)BoxedUnit.UNIT)).apply(this.printExceptionHandler());
    }

    public void addObjectConnection(LatentConnection<ProtocolMessage<State>> latentConnection) {
        ((Function1)this.prepareObjectConnection(latentConnection).handleInCtx().apply((Object)BoxedUnit.UNIT)).apply(this.printExceptionHandler());
    }

    public delay.Async<Object, BoxedUnit> prepareBinaryConnection(LatentConnection<MessageBuffer> latentConnection) {
        return this.prepareLatentConnection(this.cachedMessages(latentConnection));
    }

    public delay.Async<Object, BoxedUnit> prepareObjectConnection(LatentConnection<ProtocolMessage<State>> latentConnection) {
        return this.prepareLatentConnection(LatentConnection$.MODULE$.adapt((Function1 & Serializable)pm -> new SentCachedMessage<ProtocolMessage>((ProtocolMessage)pm, (JsonValueCodec<ProtocolMessage>)DeltaDissemination$.MODULE$.pmscodec(this.x$8)), (Function1 & Serializable)cm -> (ProtocolMessage)cm.payload(), latentConnection));
    }

    public delay.Async<Object, BoxedUnit> prepareLatentConnection(LatentConnection<CachedMessage<ProtocolMessage<State>>> latentConnection) {
        delay.Async<Abort, Connection<CachedMessage<ProtocolMessage<State>>>> preparedConnection = latentConnection.prepare(from -> x$1 -> {
            Try try_ = x$1;
            if (try_ instanceof Success) {
                CachedMessage msg = (CachedMessage)((Success)try_).value();
                this.handleMessage(msg, from);
                return;
            }
            if (try_ instanceof Failure) {
                Throwable error = ((Failure)try_).exception();
                Predef$.MODULE$.println((Object)"exception during message handling");
                error.printStackTrace();
                return;
            }
            throw new MatchError((Object)try_);
        });
        return new delay.Async((Function1 & Serializable)ctx -> (JProcedure1 & Serializable)cb -> {
            Abort abort = this.globalAbort();
            try {
                delay.Async async = new delay.Async((Function1 & Serializable)ctx -> (JProcedure1 & Serializable)cb -> ((Function1)preparedConnection.handleInCtx().apply(ctx)).apply(arg_0 -> this.$anonfun$2$$anonfun$1$$anonfun$1(cb, arg_0)));
                ((Function1)async.handleInCtx().apply((Object)abort)).apply(cb);
            }
            catch (Throwable throwable) {
                Throwable throwable2;
                Throwable e = throwable2 = throwable;
                if (NonFatal$.MODULE$.apply(e)) {
                    delay.Callback Callback_this = cb;
                    Callback_this.complete((Try)Failure$.MODULE$.apply(e));
                }
                throw throwable;
            }
        });
    }

    private void sendInitialSyncRequest(Connection<CachedMessage<ProtocolMessage<State>>> conn) {
        this.send(conn, new SentCachedMessage<ProtocolMessage<State>>(ProtocolMessage$Request$.MODULE$.apply(this.replicaId().uid(), this.selfContext()), DeltaDissemination$.MODULE$.pmscodec(this.x$8)));
    }

    public Object lock() {
        return this.lock;
    }

    public Dots selfContext() {
        return (Dots)this.contexts.getOrElse((Object)this.replicaId().uid(), DeltaDissemination::selfContext$$anonfun$1);
    }

    public void applyDelta(State delta) {
        SentCachedMessage<ProtocolMessage<State>> sentCachedMessage;
        Object object = this.lock();
        synchronized (object) {
            Dot nextDot = this.selfContext().nextDot(this.replicaId().uid());
            ProtocolMessage.Payload<State> payload = ProtocolMessage$Payload$.MODULE$.apply(this.replicaId().uid(), Dots$.MODULE$.single(nextDot), delta);
            this.updateContext(this.replicaId().uid(), payload.dots());
            SentCachedMessage<ProtocolMessage<State>> message = new SentCachedMessage<ProtocolMessage<State>>(payload, DeltaDissemination$.MODULE$.pmscodec(this.x$8));
            this.rememberPayload(message);
            sentCachedMessage = message;
        }
        SentCachedMessage<ProtocolMessage<State>> message = sentCachedMessage;
        this.disseminate(message, this.disseminate$default$2());
    }

    public void updateContext(Uid rr, Dots dots) {
        Object object = this.lock();
        synchronized (object) {
            this.contexts = (Map)this.contexts.updatedWith((Object)rr, (Function1 & Serializable)curr -> {
                Lattice Lattice_this = Lattice$.MODULE$.optionLattice(Dots$.MODULE$.contextLattice());
                Some right$proxy1 = Some$.MODULE$.apply((Object)dots);
                return (Option)Lattice_this.merge(curr, (Object)right$proxy1);
            });
        }
    }

    public List<CachedMessage<ProtocolMessage.Payload<State>>> allPayloads() {
        List<CachedMessage<ProtocolMessage.Payload<State>>> list;
        Object object = this.lock();
        synchronized (object) {
            list = this.deltaStorage().getHistory();
        }
        return list;
    }

    public void rememberPayload(CachedMessage<ProtocolMessage.Payload<State>> message) {
        Object object = this.lock();
        synchronized (object) {
            this.deltaStorage().remember(message);
        }
    }

    public void handleMessage(CachedMessage<ProtocolMessage<State>> msg2, Connection<CachedMessage<ProtocolMessage<State>>> from) {
        if (this.globalAbort().closeRequest()) {
            return;
        }
        ProtocolMessage<State> protocolMessage = msg2.payload();
        if (protocolMessage instanceof ProtocolMessage.Ping) {
            long l;
            ProtocolMessage.Ping ping = ProtocolMessage$Ping$.MODULE$.unapply((ProtocolMessage.Ping)protocolMessage);
            long time = l = ping._1();
            this.send(from, new SentCachedMessage<ProtocolMessage<State>>(ProtocolMessage$Pong$.MODULE$.apply(time), DeltaDissemination$.MODULE$.pmscodec(this.x$8)));
            return;
        }
        if (protocolMessage instanceof ProtocolMessage.Pong) {
            long l;
            ProtocolMessage.Pong pong = ProtocolMessage$Pong$.MODULE$.unapply((ProtocolMessage.Pong)protocolMessage);
            long time = l = pong._1();
            Predef$.MODULE$.println((Object)("ping took " + Predef$.MODULE$.long2Long(System.nanoTime() - time).doubleValue() / (double)1000000 + "ms"));
            Predef$.MODULE$.println((Object)("current state is " + this.selfContext()));
            return;
        }
        if (protocolMessage instanceof ProtocolMessage.Request) {
            Tuple2 tuple2;
            ProtocolMessage.Request request = ProtocolMessage$Request$.MODULE$.unapply((ProtocolMessage.Request)protocolMessage);
            Uid uid2 = request._1();
            Dots dots = request._2();
            Uid uid3 = uid2;
            Dots knows = dots;
            Object object = this.lock();
            synchronized (object) {
                List relevant = this.allPayloads().filterNot((Function1 & Serializable)dt -> ((ProtocolMessage.Payload)dt.payload()).dots().lessThanEquals(knows));
                Lattice Lattice_this = Dots$.MODULE$.contextLattice();
                Dots right$proxy2 = (Dots)relevant.map((Function1 & Serializable)dt -> ((ProtocolMessage.Payload)dt.payload()).dots()).reduceOption((Function2 & Serializable)(left, right) -> (Dots)Lattice$.MODULE$.merge(left, right, Dots$.MODULE$.contextLattice())).getOrElse(DeltaDissemination::$anonfun$6);
                Dots newknowledge = (Dots)Lattice_this.merge((Object)knows, (Object)right$proxy2);
                Dots context = this.selfContext();
                Dots diff = context.subtract(newknowledge);
                if (!diff.isEmpty()) {
                    throw new IllegalStateException("could not answer request, missing deltas for: " + diff + "\n  relevant: " + relevant.map((Function1 & Serializable)_$1 -> (ProtocolMessage.Payload)_$1.payload()) + "\n knows: " + knows + "\n  selfcontext: " + this.selfContext() + "}");
                }
                tuple2 = Tuple2$.MODULE$.apply((Object)relevant, (Object)context);
            }
            Tuple2 tuple22 = tuple2;
            List relevant = (List)tuple22._1();
            Dots context = (Dots)tuple22._2();
            Tuple2 tuple23 = Tuple2$.MODULE$.apply((Object)relevant, (Object)context);
            List relevant2 = (List)tuple23._1();
            Dots context2 = (Dots)tuple23._2();
            relevant2.foreach((Function1)(JProcedure1 & Serializable)msg -> this.send(from, new SentCachedMessage<ProtocolMessage<State>>(((ProtocolMessage.Payload)msg.payload()).addSender(this.replicaId().uid()), DeltaDissemination$.MODULE$.pmscodec(this.x$8))));
            Lattice Lattice_this = Dots$.MODULE$.contextLattice();
            this.updateContext(uid3, (Dots)Lattice_this.merge((Object)context2, (Object)knows));
            return;
        }
        if (protocolMessage instanceof ProtocolMessage.Payload) {
            ProtocolMessage.Payload payload = (ProtocolMessage.Payload)protocolMessage;
            ProtocolMessage.Payload payload2 = ProtocolMessage$Payload$.MODULE$.unapply(payload);
            Set<Uid> set = payload2._1();
            Dots dots = payload2._2();
            Object t = payload2._3();
            Set<Uid> uid4 = set;
            Dots context = dots;
            Object data = t;
            ProtocolMessage.Payload payload3 = payload;
            if (context.lessThanEquals(this.selfContext())) {
                return;
            }
            Object object = this.lock();
            synchronized (object) {
                uid4.foreach((Function1)(JProcedure1 & Serializable)uid -> this.updateContext((Uid)uid, context));
                this.updateContext(this.replicaId().uid(), context);
                this.rememberPayload(msg2);
            }
            this.receiveCallback.apply(data);
            if (this.immediateForward) {
                this.disseminate(msg2, (Set)Predef$.MODULE$.Set().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Connection[]{from})));
                return;
            }
            return;
        }
        throw new MatchError(protocolMessage);
    }

    public void send(Connection<CachedMessage<ProtocolMessage<State>>> con, CachedMessage<ProtocolMessage<State>> payload) {
        if (this.globalAbort().closeRequest()) {
            return;
        }
        this.sendingActor.execute(() -> ((Function1)con.send(payload).handleInCtx().apply((Object)BoxedUnit.UNIT)).apply(this.debugCallbackAndRemoveCon(con)));
    }

    public void disseminate(CachedMessage<ProtocolMessage<State>> payload, Set<Connection<CachedMessage<ProtocolMessage<State>>>> except) {
        List<Connection<CachedMessage<ProtocolMessage<State>>>> list;
        Object object = this.lock();
        synchronized (object) {
            list = this.connections();
        }
        List<Connection<CachedMessage<ProtocolMessage<State>>>> cons = list;
        cons.filterNot((Function1 & Serializable)con -> except.contains(con)).foreach((Function1)(JProcedure1 & Serializable)con -> this.send((Connection<CachedMessage<ProtocolMessage<State>>>)con, payload));
    }

    public Set<Connection<CachedMessage<ProtocolMessage<State>>>> disseminate$default$2() {
        return Predef$.MODULE$.Set().empty();
    }

    /*
     * Unable to fully structure code
     */
    private final /* synthetic */ void $anonfun$2$$anonfun$1$$anonfun$1(delay.Callback cb$1, Try res) {
        try {
            conn = var3_3 = (Connection)res.get();
            var5_5 = this.lock();
            synchronized (var5_5) {
                this.connections_$eq(this.connections().$colon$colon((Object)conn));
            }
            Callback_this = cb$1;
            this.sendInitialSyncRequest(conn);
            value$proxy1 = BoxedUnit.UNIT;
            Callback_this.complete((Try)Success$.MODULE$.apply((Object)BoxedUnit.UNIT));
        }
        catch (Throwable var8_8) {
            block7: {
                e = var9_9 = var8_8;
                if (!NonFatal$.MODULE$.apply(e)) break block7;
                var11_11 = res;
                if (!(var11_11 instanceof Failure)) ** GOTO lbl-1000
                v1 = exception = ((Failure)var11_11).exception();
                var13_13 = e;
                if (v1 == null ? var13_13 != null : v1.equals(var13_13) == false) {
                    e.addSuppressed(exception);
                    v2 = BoxedUnit.UNIT;
                } else lbl-1000:
                // 2 sources

                {
                    v2 = other = var11_11;
                }
                Callback_this = cb$1;
                Callback_this.complete((Try)Failure$.MODULE$.apply(e));
            }
            throw var8_8;
        }
    }

    private static final Dots selfContext$$anonfun$1() {
        return Dots$.MODULE$.empty();
    }

    private static final Dots $anonfun$6() {
        return Dots$.MODULE$.empty();
    }
}

