/*
 * Decompiled with CFR 0.152.
 */
package replication;

import channels.Abort;
import channels.Abort$;
import channels.Connection;
import channels.LatentConnection;
import channels.MessageBuffer;
import com.github.plokhotnyuk.jsoniter_scala.core.JsonValueCodec;
import de.rmgk.delay;
import java.io.Serializable;
import rdts.base.Lattice;
import rdts.base.Lattice$;
import rdts.base.LocalUid;
import rdts.base.Uid;
import rdts.time.Dot;
import rdts.time.Dots;
import rdts.time.Dots$;
import replication.Aead;
import replication.DeltaDissemination$;
import replication.ProtocolMessage;
import replication.ProtocolMessage$Payload$;
import replication.ProtocolMessage$Ping$;
import replication.ProtocolMessage$Pong$;
import replication.ProtocolMessage$Request$;
import scala.Function1;
import scala.MatchError;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.Some$;
import scala.collection.immutable.List;
import scala.collection.immutable.Map;
import scala.collection.immutable.Seq;
import scala.collection.immutable.Set;
import scala.package$;
import scala.runtime.BoxedUnit;
import scala.runtime.ScalaRunTime$;
import scala.runtime.function.JProcedure1;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;

public class DeltaDissemination<State> {
    private final LocalUid replicaId;
    private final Function1<State, BoxedUnit> receiveCallback;
    private final boolean immediateForward;
    private final Abort globalAbort;
    private List<Connection<ProtocolMessage<State>>> connections;
    private final Object lock;
    private List<ProtocolMessage.Payload<State>> pastPayloads;
    private Map<Uid, Dots> contexts;

    public static <T> LatentConnection<ProtocolMessage<T>> jsoniterMessages(LatentConnection<MessageBuffer> latentConnection, JsonValueCodec<T> jsonValueCodec) {
        return DeltaDissemination$.MODULE$.jsoniterMessages(latentConnection, jsonValueCodec);
    }

    public static <State> Option<Aead> $lessinit$greater$default$3() {
        return DeltaDissemination$.MODULE$.$lessinit$greater$default$3();
    }

    public static <State> boolean $lessinit$greater$default$4() {
        return DeltaDissemination$.MODULE$.$lessinit$greater$default$4();
    }

    public DeltaDissemination(LocalUid replicaId, Function1<State, BoxedUnit> receiveCallback, Option<Aead> crypto, boolean immediateForward) {
        this.replicaId = replicaId;
        this.receiveCallback = receiveCallback;
        this.immediateForward = immediateForward;
        this.globalAbort = new Abort(Abort$.MODULE$.$lessinit$greater$default$1());
        this.connections = package$.MODULE$.Nil();
        this.lock = new Object(){};
        this.pastPayloads = package$.MODULE$.Nil();
        this.contexts = Predef$.MODULE$.Map().empty();
    }

    public LocalUid replicaId() {
        return this.replicaId;
    }

    public final LocalUid given_LocalUid() {
        return this.replicaId();
    }

    public Abort globalAbort() {
        return this.globalAbort;
    }

    public List<Connection<ProtocolMessage<State>>> connections() {
        return this.connections;
    }

    public void connections_$eq(List<Connection<ProtocolMessage<State>>> x$1) {
        this.connections = x$1;
    }

    public delay.Callback<Object> debugCallbackAndRemoveCon(Connection<ProtocolMessage<State>> con) {
        return x$1 -> {
            Try try_ = x$1;
            if (try_ instanceof Success) {
                Object value = ((Success)try_).value();
                return;
            }
            if (try_ instanceof Failure) {
                Throwable exception = ((Failure)try_).exception();
                Object object = this.lock();
                synchronized (object) {
                    this.connections_$eq(this.connections().filter((Function1 & Serializable)cc -> {
                        Connection connection = cc;
                        Connection connection2 = con;
                        return connection == null ? connection2 != null : !connection.equals(connection2);
                    }));
                }
                exception.printStackTrace();
                return;
            }
            throw new MatchError((Object)try_);
        };
    }

    public void requestData() {
        this.connections().foreach((Function1)(JProcedure1 & Serializable)con -> ((Function1)con.send((Object)ProtocolMessage$Request$.MODULE$.apply(this.replicaId().uid(), this.selfContext())).handleInCtx().apply((Object)BoxedUnit.UNIT)).apply(this.debugCallbackAndRemoveCon((Connection<ProtocolMessage<State>>)con)));
    }

    public void pingAll() {
        this.connections().foreach((Function1)(JProcedure1 & Serializable)conn -> ((Function1)conn.send((Object)ProtocolMessage$Ping$.MODULE$.apply(System.nanoTime())).handleInCtx().apply((Object)this.given_LocalUid())).apply(this.debugCallbackAndRemoveCon((Connection<ProtocolMessage<State>>)conn)));
    }

    public void addLatentConnection(LatentConnection<MessageBuffer> latentConnection, JsonValueCodec<State> x$2) {
        this.addLatentConnection(DeltaDissemination$.MODULE$.jsoniterMessages(latentConnection, x$2));
    }

    public void addLatentConnection(LatentConnection<ProtocolMessage<State>> latentConnection) {
        delay.Async preparedConnection = latentConnection.prepare(from -> x$1 -> {
            Try try_ = x$1;
            if (try_ instanceof Success) {
                ProtocolMessage msg = (ProtocolMessage)((Success)try_).value();
                this.handleMessage(msg, from);
                return;
            }
            if (try_ instanceof Failure) {
                Throwable error = ((Failure)try_).exception();
                error.printStackTrace();
                return;
            }
            throw new MatchError((Object)try_);
        });
        ((Function1)preparedConnection.handleInCtx().apply((Object)this.globalAbort())).apply(x$1 -> {
            Try try_ = x$1;
            if (try_ instanceof Success) {
                Connection conn = (Connection)((Success)try_).value();
                Object object = this.lock();
                synchronized (object) {
                    this.connections_$eq(this.connections().$colon$colon((Object)conn));
                }
                ((Function1)conn.send((Object)ProtocolMessage$Request$.MODULE$.apply(this.replicaId().uid(), this.selfContext())).handleInCtx().apply((Object)BoxedUnit.UNIT)).apply(this.debugCallbackAndRemoveCon(conn));
                return;
            }
            if (try_ instanceof Failure) {
                Throwable ex = ((Failure)try_).exception();
                Predef$.MODULE$.println((Object)"exception during connection activation");
                ex.printStackTrace();
                return;
            }
            throw new MatchError((Object)try_);
        });
    }

    public Object lock() {
        return this.lock;
    }

    public List<ProtocolMessage.Payload<State>> allPayloads() {
        return this.pastPayloads;
    }

    private void rememberPayload(ProtocolMessage.Payload<State> payload) {
        this.pastPayloads = this.pastPayloads.$colon$colon(payload);
    }

    public Dots selfContext() {
        return (Dots)this.contexts.getOrElse((Object)this.replicaId().uid(), DeltaDissemination::selfContext$$anonfun$1);
    }

    public void applyDelta(State delta) {
        ProtocolMessage.Payload<State> payload;
        Object object = this.lock();
        synchronized (object) {
            Dot nextDot = this.selfContext().nextDot(this.replicaId().uid());
            ProtocolMessage.Payload<State> payload2 = ProtocolMessage$Payload$.MODULE$.apply(this.replicaId().uid(), Dots$.MODULE$.single(nextDot), delta);
            this.updateContext(this.replicaId().uid(), payload2.dots());
            this.rememberPayload(payload2);
            payload = payload2;
        }
        ProtocolMessage.Payload<State> payload3 = payload;
        this.disseminate(payload3, this.disseminate$default$2());
    }

    public void updateContext(Uid rr, Dots dots) {
        Object object = this.lock();
        synchronized (object) {
            this.contexts = (Map)this.contexts.updatedWith((Object)rr, (Function1 & Serializable)curr -> {
                Lattice Lattice_this = Lattice$.MODULE$.optionLattice(Dots$.MODULE$.contextLattice());
                Some right$proxy1 = Some$.MODULE$.apply((Object)dots);
                return (Option)Lattice_this.merge(curr, (Object)right$proxy1);
            });
        }
    }

    public void handleMessage(ProtocolMessage<State> msg2, Connection<ProtocolMessage<State>> from) {
        ProtocolMessage<State> protocolMessage = msg2;
        if (protocolMessage instanceof ProtocolMessage.Ping) {
            long l;
            ProtocolMessage.Ping ping = ProtocolMessage$Ping$.MODULE$.unapply((ProtocolMessage.Ping)protocolMessage);
            long time = l = ping._1();
            ((Function1)from.send((Object)ProtocolMessage$Pong$.MODULE$.apply(time)).handleInCtx().apply((Object)this.given_LocalUid())).apply(this.debugCallbackAndRemoveCon(from));
            return;
        }
        if (protocolMessage instanceof ProtocolMessage.Pong) {
            long l;
            ProtocolMessage.Pong pong = ProtocolMessage$Pong$.MODULE$.unapply((ProtocolMessage.Pong)protocolMessage);
            long time = l = pong._1();
            Predef$.MODULE$.println((Object)("ping took " + Predef$.MODULE$.long2Long(System.nanoTime() - time).doubleValue() / (double)1000000 + "ms"));
            return;
        }
        if (protocolMessage instanceof ProtocolMessage.Request) {
            List<ProtocolMessage.Payload<State>> list;
            ProtocolMessage.Request request = ProtocolMessage$Request$.MODULE$.unapply((ProtocolMessage.Request)protocolMessage);
            Uid uid2 = request._1();
            Dots dots = request._2();
            Uid uid3 = uid2;
            Dots knows = dots;
            Object object = this.lock();
            synchronized (object) {
                list = this.pastPayloads;
            }
            List relevant = list.filterNot((Function1 & Serializable)dt -> dt.dots().lessThanEquals(knows));
            relevant.foreach((Function1)(JProcedure1 & Serializable)msg -> ((Function1)from.send(msg.addSender(this.replicaId().uid())).handleInCtx().apply((Object)BoxedUnit.UNIT)).apply(this.debugCallbackAndRemoveCon(from)));
            Lattice Lattice_this = Dots$.MODULE$.contextLattice();
            Dots left$proxy1 = this.selfContext();
            this.updateContext(uid3, (Dots)Lattice_this.merge((Object)left$proxy1, (Object)knows));
            return;
        }
        if (protocolMessage instanceof ProtocolMessage.Payload) {
            ProtocolMessage.Payload payload = (ProtocolMessage.Payload)protocolMessage;
            ProtocolMessage.Payload payload2 = ProtocolMessage$Payload$.MODULE$.unapply(payload);
            Set<Uid> set = payload2._1();
            Dots dots = payload2._2();
            Object t = payload2._3();
            Set<Uid> uid4 = set;
            Dots context = dots;
            Object data = t;
            ProtocolMessage.Payload payload3 = payload;
            if (context.lessThanEquals(this.selfContext())) {
                return;
            }
            Object object = this.lock();
            synchronized (object) {
                uid4.foreach((Function1)(JProcedure1 & Serializable)uid -> this.updateContext((Uid)uid, context));
                this.updateContext(this.replicaId().uid(), context);
                this.rememberPayload(payload3);
            }
            this.receiveCallback.apply(data);
            if (this.immediateForward) {
                this.disseminate(payload3, (Set)Predef$.MODULE$.Set().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Connection[]{from})));
                return;
            }
            return;
        }
        throw new MatchError(protocolMessage);
    }

    public void disseminate(ProtocolMessage.Payload<State> payload, Set<Connection<ProtocolMessage<State>>> except) {
        this.connections().filterNot((Function1 & Serializable)con -> except.contains(con)).foreach((Function1)(JProcedure1 & Serializable)con -> ((Function1)con.send((Object)payload).handleInCtx().apply((Object)BoxedUnit.UNIT)).apply(this.debugCallbackAndRemoveCon((Connection<ProtocolMessage<State>>)con)));
    }

    public Set<Connection<ProtocolMessage<State>>> disseminate$default$2() {
        return Predef$.MODULE$.Set().empty();
    }

    private static final Dots selfContext$$anonfun$1() {
        return Dots$.MODULE$.empty();
    }
}

