/*
 * Decompiled with CFR 0.152.
 */
package org.onosproject.store.primitives.impl;

import com.google.common.base.Preconditions;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.Timer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.onlab.util.AbstractAccumulator;
import org.onlab.util.BoundedThreadPool;
import org.onlab.util.KryoNamespace;
import org.onlab.util.SlidingWindowCounter;
import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.persistence.PersistenceService;
import org.onosproject.store.LogicalTimestamp;
import org.onosproject.store.Timestamp;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.primitives.impl.AntiEntropyAdvertisement;
import org.onosproject.store.primitives.impl.AntiEntropyResponse;
import org.onosproject.store.primitives.impl.MapValue;
import org.onosproject.store.primitives.impl.UpdateEntry;
import org.onosproject.store.primitives.impl.UpdateRequest;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.StoreSerializer;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.WallClockTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventuallyConsistentMapImpl<K, V>
implements EventuallyConsistentMap<K, V> {
    private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
    private final Map<K, MapValue<V>> items;
    private final ClusterService clusterService;
    private final ClusterCommunicationService clusterCommunicator;
    private final StoreSerializer serializer;
    private final NodeId localNodeId;
    private final PersistenceService persistenceService;
    private final BiFunction<K, V, Timestamp> timestampProvider;
    private final MessageSubject updateMessageSubject;
    private final MessageSubject antiEntropyAdvertisementSubject;
    private final MessageSubject updateRequestSubject;
    private final Set<EventuallyConsistentMapListener<K, V>> listeners = Sets.newCopyOnWriteArraySet();
    private final ExecutorService executor;
    private final ScheduledExecutorService backgroundExecutor;
    private final BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
    private final ExecutorService communicationExecutor;
    private final Map<NodeId, EventAccumulator> senderPending;
    private long previousTombstonePurgeTime;
    private final Map<NodeId, Long> antiEntropyTimes = Maps.newConcurrentMap();
    private final String mapName;
    private volatile boolean destroyed = false;
    private static final String ERROR_DESTROYED = " map is already destroyed";
    private final String destroyedMessage;
    private static final String ERROR_NULL_KEY = "Key cannot be null";
    private static final String ERROR_NULL_VALUE = "Null values are not allowed";
    private final long initialDelaySec = 5L;
    private final boolean lightweightAntiEntropy;
    private final boolean tombstonesDisabled;
    private static final int WINDOW_SIZE = 5;
    private static final int HIGH_LOAD_THRESHOLD = 2;
    private static final int LOAD_WINDOW = 2;
    private SlidingWindowCounter counter = new SlidingWindowCounter(5);
    private final boolean persistent;
    private static final String PERSISTENT_LOCAL_MAP_NAME = "itemsMap";
    private static final int DEFAULT_MAX_EVENTS = 1000;
    private static final int DEFAULT_MAX_IDLE_MS = 10;
    private static final int DEFAULT_MAX_BATCH_MS = 50;
    private static final Timer TIMER = new Timer("onos-ecm-sender-events");

    EventuallyConsistentMapImpl(String mapName, ClusterService clusterService, ClusterCommunicationService clusterCommunicator, KryoNamespace ns, BiFunction<K, V, Timestamp> timestampProvider, BiFunction<K, V, Collection<NodeId>> peerUpdateFunction, ExecutorService eventExecutor, ExecutorService communicationExecutor, ScheduledExecutorService backgroundExecutor, boolean tombstonesDisabled, long antiEntropyPeriod, TimeUnit antiEntropyTimeUnit, boolean convergeFaster, boolean persistent, PersistenceService persistenceService) {
        this.mapName = mapName;
        this.serializer = this.createSerializer(ns);
        this.persistenceService = persistenceService;
        this.persistent = persistent;
        this.items = persistent ? this.persistenceService.persistentMapBuilder().withName(PERSISTENT_LOCAL_MAP_NAME).withSerializer((Serializer)this.serializer).build() : Maps.newConcurrentMap();
        this.senderPending = Maps.newConcurrentMap();
        this.destroyedMessage = mapName + ERROR_DESTROYED;
        this.clusterService = clusterService;
        this.clusterCommunicator = clusterCommunicator;
        this.localNodeId = clusterService.getLocalNode().id();
        this.timestampProvider = timestampProvider;
        this.peerUpdateFunction = peerUpdateFunction != null ? peerUpdateFunction : (key, value) -> clusterService.getNodes().stream().map(ControllerNode::id).filter(nodeId -> !nodeId.equals((Object)this.localNodeId)).collect(Collectors.toList());
        this.executor = eventExecutor != null ? eventExecutor : Executors.newFixedThreadPool(8, Tools.groupedThreads((String)"onos/ecm", (String)(mapName + "-fg-%d"), (Logger)log));
        this.communicationExecutor = communicationExecutor != null ? communicationExecutor : BoundedThreadPool.newFixedThreadPool((int)8, (ThreadFactory)Tools.groupedThreads((String)"onos/ecm", (String)(mapName + "-publish-%d"), (Logger)log));
        this.backgroundExecutor = backgroundExecutor != null ? backgroundExecutor : Executors.newSingleThreadScheduledExecutor(Tools.groupedThreads((String)"onos/ecm", (String)(mapName + "-bg-%d"), (Logger)log));
        this.backgroundExecutor.scheduleAtFixedRate(this::sendAdvertisement, 5L, antiEntropyPeriod, antiEntropyTimeUnit);
        this.updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
        clusterCommunicator.addSubscriber(this.updateMessageSubject, arg_0 -> ((StoreSerializer)this.serializer).decode(arg_0), this::processUpdates, (Executor)this.executor);
        this.antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
        clusterCommunicator.addSubscriber(this.antiEntropyAdvertisementSubject, arg_0 -> ((StoreSerializer)this.serializer).decode(arg_0), this::handleAntiEntropyAdvertisement, arg_0 -> ((StoreSerializer)this.serializer).encode(arg_0), (Executor)this.backgroundExecutor);
        this.updateRequestSubject = new MessageSubject("ecm-" + mapName + "-update-request");
        clusterCommunicator.addSubscriber(this.updateRequestSubject, arg_0 -> ((StoreSerializer)this.serializer).decode(arg_0), this::handleUpdateRequests, (Executor)this.backgroundExecutor);
        if (!tombstonesDisabled) {
            this.previousTombstonePurgeTime = 0L;
            this.backgroundExecutor.scheduleWithFixedDelay(this::purgeTombstones, 5L, antiEntropyPeriod, TimeUnit.SECONDS);
        }
        this.tombstonesDisabled = tombstonesDisabled;
        this.lightweightAntiEntropy = !convergeFaster;
        this.bootstrap();
    }

    private StoreSerializer createSerializer(KryoNamespace ns) {
        return StoreSerializer.using((KryoNamespace)KryoNamespace.newBuilder().register(ns).nextId(600).register(KryoNamespaces.BASIC).register(new Class[]{LogicalTimestamp.class}).register(new Class[]{WallClockTimestamp.class}).register(new Class[]{AntiEntropyAdvertisement.class}).register(new Class[]{AntiEntropyResponse.class}).register(new Class[]{UpdateEntry.class}).register(new Class[]{MapValue.class}).register(new Class[]{MapValue.Digest.class}).register(new Class[]{UpdateRequest.class}).build(this.name() + "-ecmap"));
    }

    public String name() {
        return this.mapName;
    }

    public int size() {
        Preconditions.checkState((!this.destroyed ? 1 : 0) != 0, (Object)this.destroyedMessage);
        return Maps.filterValues(this.items, MapValue::isAlive).size();
    }

    public boolean isEmpty() {
        Preconditions.checkState((!this.destroyed ? 1 : 0) != 0, (Object)this.destroyedMessage);
        return this.size() == 0;
    }

    public boolean containsKey(K key) {
        Preconditions.checkState((!this.destroyed ? 1 : 0) != 0, (Object)this.destroyedMessage);
        Preconditions.checkNotNull(key, (Object)ERROR_NULL_KEY);
        return this.get(key) != null;
    }

    public boolean containsValue(V value) {
        Preconditions.checkState((!this.destroyed ? 1 : 0) != 0, (Object)this.destroyedMessage);
        Preconditions.checkNotNull(value, (Object)ERROR_NULL_VALUE);
        return this.items.values().stream().filter(MapValue::isAlive).anyMatch(v -> value.equals(v.get()));
    }

    public V get(K key) {
        Preconditions.checkState((!this.destroyed ? 1 : 0) != 0, (Object)this.destroyedMessage);
        Preconditions.checkNotNull(key, (Object)ERROR_NULL_KEY);
        MapValue<V> value = this.items.get(key);
        return value == null || value.isTombstone() ? null : (V)value.get();
    }

    public void put(K key, V value) {
        Preconditions.checkState((!this.destroyed ? 1 : 0) != 0, (Object)this.destroyedMessage);
        Preconditions.checkNotNull(key, (Object)ERROR_NULL_KEY);
        Preconditions.checkNotNull(value, (Object)ERROR_NULL_VALUE);
        MapValue<V> newValue = new MapValue<V>(value, this.timestampProvider.apply(key, value));
        if (this.putInternal(key, newValue)) {
            this.notifyPeers(new UpdateEntry<K, V>(key, newValue), this.peerUpdateFunction.apply(key, value));
            this.notifyListeners(new EventuallyConsistentMapEvent(this.mapName, EventuallyConsistentMapEvent.Type.PUT, key, value));
        }
    }

    public V remove(K key) {
        Preconditions.checkState((!this.destroyed ? 1 : 0) != 0, (Object)this.destroyedMessage);
        Preconditions.checkNotNull(key, (Object)ERROR_NULL_KEY);
        return this.removeAndNotify(key, null);
    }

    public void remove(K key, V value) {
        Preconditions.checkState((!this.destroyed ? 1 : 0) != 0, (Object)this.destroyedMessage);
        Preconditions.checkNotNull(key, (Object)ERROR_NULL_KEY);
        Preconditions.checkNotNull(value, (Object)ERROR_NULL_VALUE);
        this.removeAndNotify(key, value);
    }

    private V removeAndNotify(K key, V value) {
        Timestamp timestamp = this.timestampProvider.apply(key, value);
        Optional<MapValue<V>> tombstone = this.tombstonesDisabled || timestamp == null ? Optional.empty() : Optional.of(MapValue.tombstone(timestamp));
        MapValue<V> previousValue = this.removeInternal(key, Optional.ofNullable(value), tombstone);
        if (previousValue != null) {
            this.notifyPeers(new UpdateEntry(key, tombstone.orElse(null)), this.peerUpdateFunction.apply(key, previousValue.get()));
            if (previousValue.isAlive()) {
                this.notifyListeners(new EventuallyConsistentMapEvent(this.mapName, EventuallyConsistentMapEvent.Type.REMOVE, key, previousValue.get()));
            }
        }
        return previousValue != null ? (V)previousValue.get() : null;
    }

    private MapValue<V> removeInternal(K key, Optional<V> value, Optional<MapValue<V>> tombstone) {
        Preconditions.checkState((!this.destroyed ? 1 : 0) != 0, (Object)this.destroyedMessage);
        Preconditions.checkNotNull(key, (Object)ERROR_NULL_KEY);
        Preconditions.checkNotNull(value, (Object)ERROR_NULL_VALUE);
        tombstone.ifPresent(v -> Preconditions.checkState((boolean)v.isTombstone()));
        this.counter.incrementCount();
        AtomicBoolean updated = new AtomicBoolean(false);
        AtomicReference previousValue = new AtomicReference();
        this.items.compute(key, (? super K k, ? super V existing) -> {
            boolean valueMatches = true;
            if (value.isPresent() && existing != null && existing.isAlive()) {
                valueMatches = Objects.equals(value.get(), existing.get());
            }
            if (existing == null) {
                log.trace("ECMap Remove: Existing value for key {} is already null", k);
            }
            if (valueMatches) {
                if (existing == null) {
                    updated.set(tombstone.isPresent());
                } else {
                    updated.set(!tombstone.isPresent() || ((MapValue)tombstone.get()).isNewerThan(existing));
                }
            }
            if (updated.get()) {
                previousValue.set(existing);
                return tombstone.orElse(null);
            }
            return existing;
        });
        return (MapValue)previousValue.get();
    }

    public V compute(K key, BiFunction<K, V, V> recomputeFunction) {
        Preconditions.checkState((!this.destroyed ? 1 : 0) != 0, (Object)this.destroyedMessage);
        Preconditions.checkNotNull(key, (Object)ERROR_NULL_KEY);
        Preconditions.checkNotNull(recomputeFunction, (Object)"Recompute function cannot be null");
        AtomicBoolean updated = new AtomicBoolean(false);
        AtomicReference previousValue = new AtomicReference();
        MapValue computedValue = this.items.compute(this.serializer.copy(key), (? super K k, ? super V mv) -> {
            previousValue.set(mv);
            Object newRawValue = recomputeFunction.apply(key, mv == null ? null : (Object)mv.get());
            MapValue newValue = new MapValue(newRawValue, this.timestampProvider.apply(key, newRawValue));
            if (mv == null || newValue.isNewerThan(mv)) {
                updated.set(true);
                return (MapValue)this.serializer.copy(newValue);
            }
            return mv;
        });
        if (updated.get()) {
            Object value;
            EventuallyConsistentMapEvent.Type updateType;
            this.notifyPeers(new UpdateEntry(key, computedValue), this.peerUpdateFunction.apply(key, computedValue.get()));
            EventuallyConsistentMapEvent.Type type = updateType = computedValue.isTombstone() ? EventuallyConsistentMapEvent.Type.REMOVE : EventuallyConsistentMapEvent.Type.PUT;
            Object object = computedValue.isTombstone() ? (previousValue.get() == null ? null : ((MapValue)previousValue.get()).get()) : (value = computedValue.get());
            if (value != null) {
                this.notifyListeners(new EventuallyConsistentMapEvent(this.mapName, updateType, key, value));
            }
        }
        return computedValue.get();
    }

    public void putAll(Map<? extends K, ? extends V> m) {
        Preconditions.checkState((!this.destroyed ? 1 : 0) != 0, (Object)this.destroyedMessage);
        m.forEach(this::put);
    }

    public void clear() {
        Preconditions.checkState((!this.destroyed ? 1 : 0) != 0, (Object)this.destroyedMessage);
        Maps.filterValues(this.items, MapValue::isAlive).forEach((k, v) -> this.remove(k));
    }

    public Set<K> keySet() {
        Preconditions.checkState((!this.destroyed ? 1 : 0) != 0, (Object)this.destroyedMessage);
        return Maps.filterValues(this.items, MapValue::isAlive).keySet();
    }

    public Collection<V> values() {
        Preconditions.checkState((!this.destroyed ? 1 : 0) != 0, (Object)this.destroyedMessage);
        return Collections2.transform(Maps.filterValues(this.items, MapValue::isAlive).values(), MapValue::get);
    }

    public Set<Map.Entry<K, V>> entrySet() {
        Preconditions.checkState((!this.destroyed ? 1 : 0) != 0, (Object)this.destroyedMessage);
        return Maps.filterValues(this.items, MapValue::isAlive).entrySet().stream().map(e -> Pair.of(e.getKey(), ((MapValue)e.getValue()).get())).collect(Collectors.toSet());
    }

    private boolean putInternal(K key, MapValue<V> newValue) {
        Preconditions.checkState((!this.destroyed ? 1 : 0) != 0, (Object)this.destroyedMessage);
        Preconditions.checkNotNull(key, (Object)ERROR_NULL_KEY);
        Preconditions.checkNotNull(newValue, (Object)ERROR_NULL_VALUE);
        Preconditions.checkState((boolean)newValue.isAlive());
        this.counter.incrementCount();
        AtomicBoolean updated = new AtomicBoolean(false);
        this.items.compute(key, (? super K k, ? super V existing) -> {
            if (existing == null || newValue.isNewerThan((MapValue)existing)) {
                updated.set(true);
                return newValue;
            }
            return existing;
        });
        return updated.get();
    }

    public void addListener(EventuallyConsistentMapListener<K, V> listener) {
        Preconditions.checkState((!this.destroyed ? 1 : 0) != 0, (Object)this.destroyedMessage);
        this.listeners.add((EventuallyConsistentMapListener<K, V>)Preconditions.checkNotNull(listener));
    }

    public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
        Preconditions.checkState((!this.destroyed ? 1 : 0) != 0, (Object)this.destroyedMessage);
        this.listeners.remove(Preconditions.checkNotNull(listener));
    }

    public CompletableFuture<Void> destroy() {
        this.destroyed = true;
        this.executor.shutdown();
        this.backgroundExecutor.shutdown();
        this.communicationExecutor.shutdown();
        this.listeners.clear();
        this.clusterCommunicator.removeSubscriber(this.updateMessageSubject);
        this.clusterCommunicator.removeSubscriber(this.updateRequestSubject);
        this.clusterCommunicator.removeSubscriber(this.antiEntropyAdvertisementSubject);
        return CompletableFuture.completedFuture(null);
    }

    private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
        this.listeners.forEach(listener -> listener.event(event));
    }

    private void notifyPeers(UpdateEntry<K, V> event, Collection<NodeId> peers) {
        this.queueUpdate(event, peers);
    }

    private void queueUpdate(UpdateEntry<K, V> event, Collection<NodeId> peers) {
        if (peers == null) {
            return;
        }
        peers.forEach(node -> this.senderPending.computeIfAbsent((NodeId)node, unusedKey -> new EventAccumulator((NodeId)node)).add(event));
    }

    private boolean underHighLoad() {
        return this.counter.get(2) > 2L;
    }

    private void sendAdvertisement() {
        try {
            if (this.underHighLoad() || this.destroyed) {
                return;
            }
            this.pickRandomActivePeer().ifPresent(this::sendAdvertisementToPeer);
        }
        catch (Exception e) {
            log.error("Exception thrown while sending advertisement", (Throwable)e);
        }
    }

    private Optional<NodeId> pickRandomActivePeer() {
        List activePeers = this.clusterService.getNodes().stream().map(ControllerNode::id).filter(id -> !this.localNodeId.equals(id)).filter(id -> this.clusterService.getState(id).isActive()).collect(Collectors.toList());
        Collections.shuffle(activePeers);
        return activePeers.isEmpty() ? Optional.empty() : Optional.of(activePeers.get(0));
    }

    private void sendAdvertisementToPeer(NodeId peer) {
        long adCreationTime = System.currentTimeMillis();
        AntiEntropyAdvertisement<K> ad = this.createAdvertisement();
        this.clusterCommunicator.sendAndReceive(ad, this.antiEntropyAdvertisementSubject, arg_0 -> ((StoreSerializer)this.serializer).encode(arg_0), arg_0 -> ((StoreSerializer)this.serializer).decode(arg_0), peer).whenComplete((result, error) -> {
            if (error != null) {
                log.debug("Failed to send anti-entropy advertisement to {}", (Object)peer, error);
            } else if (result == AntiEntropyResponse.PROCESSED) {
                this.antiEntropyTimes.put(peer, adCreationTime);
            }
        });
    }

    private void sendUpdateRequestToPeer(NodeId peer, Set<K> keys) {
        UpdateRequest<K> request = new UpdateRequest<K>(this.localNodeId, keys);
        this.clusterCommunicator.unicast(request, this.updateRequestSubject, arg_0 -> ((StoreSerializer)this.serializer).encode(arg_0), peer).whenComplete((result, error) -> {
            if (error != null) {
                log.debug("Failed to send update request to {}", (Object)peer, error);
            }
        });
    }

    private AntiEntropyAdvertisement<K> createAdvertisement() {
        return new AntiEntropyAdvertisement(this.localNodeId, ImmutableMap.copyOf((Map)Maps.transformValues(this.items, MapValue::digest)));
    }

    private AntiEntropyResponse handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
        if (this.destroyed || this.underHighLoad()) {
            return AntiEntropyResponse.IGNORED;
        }
        try {
            if (log.isTraceEnabled()) {
                log.trace("Received anti-entropy advertisement from {} for {} with {} entries in it", new Object[]{ad.sender(), this.mapName, ad.digest().size()});
            }
            this.antiEntropyCheckLocalItems(ad).forEach(this::notifyListeners);
        }
        catch (Exception e) {
            log.warn("Error handling anti-entropy advertisement", (Throwable)e);
            return AntiEntropyResponse.FAILED;
        }
        return AntiEntropyResponse.PROCESSED;
    }

    private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(AntiEntropyAdvertisement<K> ad) {
        LinkedList externalEvents = Lists.newLinkedList();
        NodeId sender = ad.sender();
        ImmutableList peers = ImmutableList.of((Object)sender);
        HashSet<K> staleOrMissing = new HashSet<K>();
        HashSet<K> locallyUnknown = new HashSet<K>(ad.digest().keySet());
        this.items.forEach((arg_0, arg_1) -> this.lambda$antiEntropyCheckLocalItems$224(locallyUnknown, ad, (List)peers, externalEvents, staleOrMissing, arg_0, arg_1));
        staleOrMissing.addAll(locallyUnknown);
        this.sendUpdateRequestToPeer(sender, staleOrMissing);
        return externalEvents;
    }

    private void handleUpdateRequests(UpdateRequest<K> request) {
        Set<K> keys = request.keys();
        NodeId sender = request.sender();
        ImmutableList peers = ImmutableList.of((Object)sender);
        keys.forEach(arg_0 -> this.lambda$handleUpdateRequests$225((List)peers, arg_0));
    }

    private void purgeTombstones() {
        long currentSafeTombstonePurgeTime = this.clusterService.getNodes().stream().map(ControllerNode::id).filter(id -> !id.equals((Object)this.localNodeId)).map(id -> this.antiEntropyTimes.getOrDefault(id, 0L)).reduce(Math::min).orElse(0L);
        if (currentSafeTombstonePurgeTime == this.previousTombstonePurgeTime) {
            return;
        }
        List<Map.Entry> tombStonesToDelete = this.items.entrySet().stream().filter(e -> ((MapValue)e.getValue()).isTombstone()).filter(e -> ((MapValue)e.getValue()).creationTime() <= currentSafeTombstonePurgeTime).collect(Collectors.toList());
        this.previousTombstonePurgeTime = currentSafeTombstonePurgeTime;
        tombStonesToDelete.forEach(entry -> this.items.remove(entry.getKey(), entry.getValue()));
    }

    private void processUpdates(Collection<UpdateEntry<K, V>> updates) {
        if (this.destroyed) {
            return;
        }
        updates.forEach(update -> {
            MapValue value;
            Object key = update.key();
            MapValue mapValue = value = update.value() == null ? null : update.value().copy();
            if (value == null || value.isTombstone()) {
                MapValue previousValue = this.removeInternal(key, Optional.empty(), Optional.ofNullable(value));
                if (previousValue != null && previousValue.isAlive()) {
                    this.notifyListeners(new EventuallyConsistentMapEvent(this.mapName, EventuallyConsistentMapEvent.Type.REMOVE, key, previousValue.get()));
                }
            } else if (this.putInternal(key, value)) {
                this.notifyListeners(new EventuallyConsistentMapEvent(this.mapName, EventuallyConsistentMapEvent.Type.PUT, key, value.get()));
            }
        });
    }

    private void bootstrap() {
        int n = 0;
        List activePeers = this.clusterService.getNodes().stream().map(ControllerNode::id).filter(id -> !this.localNodeId.equals(id)).filter(id -> this.clusterService.getState(id).isActive()).collect(Collectors.toList());
        if (activePeers.isEmpty()) {
            return;
        }
        n = this.lightweightAntiEntropy ? activePeers.size() / 2 : activePeers.size();
        for (int i = 0; i < n; ++i) {
            this.sendAdvertisementToPeer((NodeId)activePeers.get(i));
        }
    }

    private /* synthetic */ void lambda$handleUpdateRequests$225(List peers, Object key) {
        this.queueUpdate(new UpdateEntry<Object, V>(key, this.items.get(key)), peers);
    }

    private /* synthetic */ void lambda$antiEntropyCheckLocalItems$224(Set locallyUnknown, AntiEntropyAdvertisement ad, List peers, List externalEvents, Set staleOrMissing, Object key, MapValue localValue) {
        locallyUnknown.remove(key);
        MapValue.Digest remoteValueDigest = ad.digest().get(key);
        if (remoteValueDigest == null || localValue.isNewerThan(remoteValueDigest.timestamp())) {
            this.queueUpdate(new UpdateEntry(key, localValue), peers);
        } else if (remoteValueDigest != null && remoteValueDigest.isNewerThan(localValue.digest()) && remoteValueDigest.isTombstone()) {
            MapValue tombstone = MapValue.tombstone(remoteValueDigest.timestamp());
            MapValue previousValue = this.removeInternal(key, Optional.empty(), Optional.of(tombstone));
            if (previousValue != null && previousValue.isAlive()) {
                externalEvents.add(new EventuallyConsistentMapEvent(this.mapName, EventuallyConsistentMapEvent.Type.REMOVE, key, previousValue.get()));
            }
        } else if (remoteValueDigest.isNewerThan(localValue.digest())) {
            staleOrMissing.add(key);
        }
    }

    private final class EventAccumulator
    extends AbstractAccumulator<UpdateEntry<K, V>> {
        private final NodeId peer;

        private EventAccumulator(NodeId peer) {
            super(TIMER, 1000, 50, 10);
            this.peer = peer;
        }

        public void processItems(List<UpdateEntry<K, V>> items) {
            HashMap map = Maps.newHashMap();
            items.forEach(item -> map.compute(item.key(), (key, existing) -> item.isNewerThan(existing) ? item : existing));
            EventuallyConsistentMapImpl.this.communicationExecutor.execute(() -> EventuallyConsistentMapImpl.this.clusterCommunicator.unicast((Object)ImmutableList.copyOf(map.values()), EventuallyConsistentMapImpl.this.updateMessageSubject, arg_0 -> ((StoreSerializer)EventuallyConsistentMapImpl.this.serializer).encode(arg_0), this.peer).whenComplete((result, error) -> {
                if (error != null) {
                    log.debug("Failed to send to {}", (Object)this.peer, error);
                }
            }));
        }
    }
}

