/*
 * Decompiled with CFR 0.152.
 */
package org.onosproject.store.ecmap;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.tuple.Pair;
import org.onlab.util.AbstractAccumulator;
import org.onlab.util.BoundedThreadPool;
import org.onlab.util.KryoNamespace;
import org.onlab.util.SlidingWindowCounter;
import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.Timestamp;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.ecmap.AbstractEntry;
import org.onosproject.store.ecmap.AntiEntropyAdvertisement;
import org.onosproject.store.ecmap.MapDbPersistentStore;
import org.onosproject.store.ecmap.PersistentStore;
import org.onosproject.store.ecmap.PutEntry;
import org.onosproject.store.ecmap.RemoveEntry;
import org.onosproject.store.impl.LogicalTimestamp;
import org.onosproject.store.impl.Timestamped;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.service.ClockService;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
import org.onosproject.store.service.WallClockTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventuallyConsistentMapImpl<K, V>
implements EventuallyConsistentMap<K, V> {
    private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
    private final ConcurrentMap<K, Timestamped<V>> items;
    private final ConcurrentMap<K, Timestamp> removedItems;
    private final ClusterService clusterService;
    private final ClusterCommunicationService clusterCommunicator;
    private final KryoSerializer serializer;
    private final ClockService<K, V> clockService;
    private final MessageSubject updateMessageSubject;
    private final MessageSubject antiEntropyAdvertisementSubject;
    private final Set<EventuallyConsistentMapListener<K, V>> listeners = new CopyOnWriteArraySet<EventuallyConsistentMapListener<K, V>>();
    private final ExecutorService executor;
    private final ScheduledExecutorService backgroundExecutor;
    private final BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
    private final ExecutorService communicationExecutor;
    private final Map<NodeId, EventAccumulator> senderPending;
    private volatile boolean destroyed = false;
    private static final String ERROR_DESTROYED = " map is already destroyed";
    private final String destroyedMessage;
    private static final String ERROR_NULL_KEY = "Key cannot be null";
    private static final String ERROR_NULL_VALUE = "Null values are not allowed";
    private final long initialDelaySec = 5L;
    private final boolean lightweightAntiEntropy;
    private final boolean tombstonesDisabled;
    private static final int WINDOW_SIZE = 5;
    private static final int HIGH_LOAD_THRESHOLD = 0;
    private static final int LOAD_WINDOW = 2;
    private SlidingWindowCounter counter = new SlidingWindowCounter(5);
    private final boolean persistent;
    private final PersistentStore<K, V> persistentStore;
    private static final int DEFAULT_MAX_EVENTS = 1000;
    private static final int DEFAULT_MAX_IDLE_MS = 10;
    private static final int DEFAULT_MAX_BATCH_MS = 50;
    private static final Timer TIMER = new Timer("onos-ecm-sender-events");

    EventuallyConsistentMapImpl(String mapName, ClusterService clusterService, ClusterCommunicationService clusterCommunicator, KryoNamespace.Builder serializerBuilder, ClockService<K, V> clockService, BiFunction<K, V, Collection<NodeId>> peerUpdateFunction, ExecutorService eventExecutor, ExecutorService communicationExecutor, ScheduledExecutorService backgroundExecutor, boolean tombstonesDisabled, long antiEntropyPeriod, TimeUnit antiEntropyTimeUnit, boolean convergeFaster, boolean persistent) {
        this.items = new ConcurrentHashMap<K, Timestamped<V>>();
        this.removedItems = new ConcurrentHashMap<K, Timestamp>();
        this.senderPending = Maps.newConcurrentMap();
        this.destroyedMessage = mapName + ERROR_DESTROYED;
        this.clusterService = clusterService;
        this.clusterCommunicator = clusterCommunicator;
        this.serializer = this.createSerializer(serializerBuilder);
        this.clockService = clockService;
        this.peerUpdateFunction = peerUpdateFunction != null ? peerUpdateFunction : (key, value) -> clusterService.getNodes().stream().map(ControllerNode::id).filter(nodeId -> !nodeId.equals((Object)clusterService.getLocalNode().id())).collect(Collectors.toList());
        this.executor = eventExecutor != null ? eventExecutor : Executors.newFixedThreadPool(8, Tools.groupedThreads((String)"onos/ecm", (String)(mapName + "-fg-%d")));
        this.communicationExecutor = communicationExecutor != null ? communicationExecutor : BoundedThreadPool.newFixedThreadPool((int)8, (ThreadFactory)Tools.groupedThreads((String)"onos/ecm", (String)(mapName + "-publish-%d")));
        this.persistent = persistent;
        if (this.persistent) {
            String dataDirectory = System.getProperty("karaf.data", "./data");
            String filename = dataDirectory + "/" + "mapdb-ecm-" + mapName;
            BoundedThreadPool dbExecutor = BoundedThreadPool.newFixedThreadPool((int)1, (ThreadFactory)Tools.groupedThreads((String)"onos/ecm", (String)(mapName + "-dbwriter")));
            this.persistentStore = new MapDbPersistentStore(filename, (ExecutorService)dbExecutor, this.serializer);
            this.persistentStore.readInto(this.items, this.removedItems);
        } else {
            this.persistentStore = null;
        }
        this.backgroundExecutor = backgroundExecutor != null ? backgroundExecutor : Executors.newSingleThreadScheduledExecutor(Tools.groupedThreads((String)"onos/ecm", (String)(mapName + "-bg-%d")));
        this.backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(), 5L, antiEntropyPeriod, antiEntropyTimeUnit);
        this.updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
        clusterCommunicator.addSubscriber(this.updateMessageSubject, (ClusterMessageHandler)new InternalEventListener(), this.executor);
        this.antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
        clusterCommunicator.addSubscriber(this.antiEntropyAdvertisementSubject, (ClusterMessageHandler)new InternalAntiEntropyListener(), (ExecutorService)this.backgroundExecutor);
        this.tombstonesDisabled = tombstonesDisabled;
        this.lightweightAntiEntropy = !convergeFaster;
    }

    private KryoSerializer createSerializer(final KryoNamespace.Builder builder) {
        return new KryoSerializer(){

            protected void setupKryoPool() {
                this.serializerPool = builder.register(new Class[]{LogicalTimestamp.class}).register(new Class[]{WallClockTimestamp.class}).register(new Class[]{PutEntry.class}).register(new Class[]{RemoveEntry.class}).register(new Class[]{ArrayList.class}).register(new Class[]{AntiEntropyAdvertisement.class}).register(new Class[]{HashMap.class}).register(new Class[]{Timestamped.class}).build();
            }
        };
    }

    public int size() {
        Preconditions.checkState((!this.destroyed ? 1 : 0) != 0, (Object)this.destroyedMessage);
        return this.items.size();
    }

    public boolean isEmpty() {
        Preconditions.checkState((!this.destroyed ? 1 : 0) != 0, (Object)this.destroyedMessage);
        return this.items.isEmpty();
    }

    public boolean containsKey(K key) {
        Preconditions.checkState((!this.destroyed ? 1 : 0) != 0, (Object)this.destroyedMessage);
        Preconditions.checkNotNull(key, (Object)ERROR_NULL_KEY);
        return this.items.containsKey(key);
    }

    public boolean containsValue(V value) {
        Preconditions.checkState((!this.destroyed ? 1 : 0) != 0, (Object)this.destroyedMessage);
        Preconditions.checkNotNull(value, (Object)ERROR_NULL_VALUE);
        return this.items.values().stream().anyMatch(timestamped -> timestamped.value().equals(value));
    }

    public V get(K key) {
        Preconditions.checkState((!this.destroyed ? 1 : 0) != 0, (Object)this.destroyedMessage);
        Preconditions.checkNotNull(key, (Object)ERROR_NULL_KEY);
        Timestamped value = (Timestamped)this.items.get(key);
        if (value != null) {
            return (V)value.value();
        }
        return null;
    }

    public void put(K key, V value) {
        Preconditions.checkState((!this.destroyed ? 1 : 0) != 0, (Object)this.destroyedMessage);
        Preconditions.checkNotNull(key, (Object)ERROR_NULL_KEY);
        Preconditions.checkNotNull(value, (Object)ERROR_NULL_VALUE);
        Timestamp timestamp = this.clockService.getTimestamp(key, value);
        if (this.putInternal(key, value, timestamp)) {
            this.notifyPeers(new PutEntry<K, V>(key, value, timestamp), this.peerUpdateFunction.apply(key, value));
            this.notifyListeners(new EventuallyConsistentMapEvent(EventuallyConsistentMapEvent.Type.PUT, key, value));
        }
    }

    private boolean putInternal(K key, V value, Timestamp timestamp) {
        this.counter.incrementCount();
        Timestamp removed = (Timestamp)this.removedItems.get(key);
        if (removed != null && removed.isNewerThan(timestamp)) {
            log.debug("ecmap - removed was newer {}", value);
            return false;
        }
        MutableBoolean updated = new MutableBoolean(false);
        this.items.compute(key, (k, existing) -> {
            if (existing != null && existing.isNewerThan(timestamp)) {
                updated.setFalse();
                return existing;
            }
            updated.setTrue();
            return new Timestamped<Object>(value, timestamp);
        });
        boolean success = updated.booleanValue();
        if (!success) {
            log.debug("ecmap - existing was newer {}", value);
        }
        if (success && removed != null) {
            this.removedItems.remove(key, removed);
        }
        if (success && this.persistent) {
            this.persistentStore.put(key, value, timestamp);
        }
        return success;
    }

    public void remove(K key) {
        Preconditions.checkState((!this.destroyed ? 1 : 0) != 0, (Object)this.destroyedMessage);
        Preconditions.checkNotNull(key, (Object)ERROR_NULL_KEY);
        Timestamp timestamp = this.clockService.getTimestamp(key, null);
        if (this.removeInternal(key, timestamp)) {
            this.notifyPeers(new RemoveEntry(key, timestamp), this.peerUpdateFunction.apply(key, null));
            this.notifyListeners(new EventuallyConsistentMapEvent(EventuallyConsistentMapEvent.Type.REMOVE, key, null));
        }
    }

    private boolean removeInternal(K key, Timestamp timestamp) {
        if (timestamp == null) {
            return false;
        }
        this.counter.incrementCount();
        MutableBoolean updated = new MutableBoolean(false);
        this.items.compute(key, (k, existing) -> {
            if (existing != null && existing.isNewerThan(timestamp)) {
                updated.setFalse();
                return existing;
            }
            updated.setTrue();
            return null;
        });
        if (updated.isFalse()) {
            return false;
        }
        boolean updatedTombstone = false;
        if (!this.tombstonesDisabled) {
            Timestamp removedTimestamp = (Timestamp)this.removedItems.get(key);
            if (removedTimestamp == null) {
                updatedTombstone = this.removedItems.putIfAbsent(key, timestamp) == null;
            } else if (timestamp.isNewerThan(removedTimestamp)) {
                updatedTombstone = this.removedItems.replace(key, removedTimestamp, timestamp);
            }
        }
        if (updated.booleanValue() && this.persistent) {
            this.persistentStore.remove(key, timestamp);
        }
        return !this.tombstonesDisabled && updatedTombstone || updated.booleanValue();
    }

    public void remove(K key, V value) {
        Preconditions.checkState((!this.destroyed ? 1 : 0) != 0, (Object)this.destroyedMessage);
        Preconditions.checkNotNull(key, (Object)ERROR_NULL_KEY);
        Preconditions.checkNotNull(value, (Object)ERROR_NULL_VALUE);
        Timestamp timestamp = this.clockService.getTimestamp(key, value);
        if (this.removeInternal(key, timestamp)) {
            this.notifyPeers(new RemoveEntry(key, timestamp), this.peerUpdateFunction.apply(key, value));
            this.notifyListeners(new EventuallyConsistentMapEvent(EventuallyConsistentMapEvent.Type.REMOVE, key, value));
        }
    }

    public void putAll(Map<? extends K, ? extends V> m) {
        Preconditions.checkState((!this.destroyed ? 1 : 0) != 0, (Object)this.destroyedMessage);
        m.forEach(this::put);
    }

    public void clear() {
        Preconditions.checkState((!this.destroyed ? 1 : 0) != 0, (Object)this.destroyedMessage);
        this.items.forEach((key, value) -> this.remove(key));
    }

    public Set<K> keySet() {
        Preconditions.checkState((!this.destroyed ? 1 : 0) != 0, (Object)this.destroyedMessage);
        return this.items.keySet();
    }

    public Collection<V> values() {
        Preconditions.checkState((!this.destroyed ? 1 : 0) != 0, (Object)this.destroyedMessage);
        return this.items.values().stream().map(Timestamped::value).collect(Collectors.toList());
    }

    public Set<Map.Entry<K, V>> entrySet() {
        Preconditions.checkState((!this.destroyed ? 1 : 0) != 0, (Object)this.destroyedMessage);
        return this.items.entrySet().stream().map(e -> Pair.of(e.getKey(), ((Timestamped)e.getValue()).value())).collect(Collectors.toSet());
    }

    public void addListener(EventuallyConsistentMapListener<K, V> listener) {
        Preconditions.checkState((!this.destroyed ? 1 : 0) != 0, (Object)this.destroyedMessage);
        this.listeners.add((EventuallyConsistentMapListener<K, V>)Preconditions.checkNotNull(listener));
    }

    public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
        Preconditions.checkState((!this.destroyed ? 1 : 0) != 0, (Object)this.destroyedMessage);
        this.listeners.remove(Preconditions.checkNotNull(listener));
    }

    public void destroy() {
        this.destroyed = true;
        this.executor.shutdown();
        this.backgroundExecutor.shutdown();
        this.communicationExecutor.shutdown();
        this.listeners.clear();
        this.clusterCommunicator.removeSubscriber(this.updateMessageSubject);
        this.clusterCommunicator.removeSubscriber(this.antiEntropyAdvertisementSubject);
    }

    private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
        for (EventuallyConsistentMapListener<K, V> listener : this.listeners) {
            listener.event(event);
        }
    }

    private void notifyPeers(PutEntry<K, V> event, Collection<NodeId> peers) {
        this.queueUpdate(event, peers);
    }

    private void notifyPeers(RemoveEntry<K, V> event, Collection<NodeId> peers) {
        this.queueUpdate(event, peers);
    }

    private void queueUpdate(AbstractEntry<K, V> event, Collection<NodeId> peers) {
        if (peers == null) {
            return;
        }
        peers.forEach(node -> this.senderPending.computeIfAbsent((NodeId)node, unusedKey -> new EventAccumulator((NodeId)node)).add(event));
    }

    private boolean underHighLoad() {
        return this.counter.get(2) > 0L;
    }

    private AntiEntropyAdvertisement<K> createAdvertisement() {
        NodeId self = this.clusterService.getLocalNode().id();
        HashMap timestamps = new HashMap(this.items.size());
        this.items.forEach((key, value) -> timestamps.put(key, value.timestamp()));
        HashMap<K, Timestamp> tombstones = new HashMap<K, Timestamp>(this.removedItems);
        return new AntiEntropyAdvertisement(self, timestamps, tombstones);
    }

    private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
        List<EventuallyConsistentMapEvent<K, V>> externalEvents = this.antiEntropyCheckLocalItems(ad);
        this.antiEntropyCheckLocalRemoved(ad);
        if (!this.lightweightAntiEntropy) {
            externalEvents.addAll(this.antiEntropyCheckRemoteRemoved(ad));
            for (K key : ad.timestamps().keySet()) {
                if (this.items.containsKey(key)) continue;
                NodeId sender = ad.sender();
                AntiEntropyAdvertisement<K> myAd = this.createAdvertisement();
                this.clusterCommunicator.unicast(myAd, this.antiEntropyAdvertisementSubject, arg_0 -> ((KryoSerializer)this.serializer).encode(arg_0), sender).whenComplete((result, error) -> {
                    if (error != null) {
                        log.debug("Failed to send reactive anti-entropy advertisement to {}", (Object)sender);
                    }
                });
                break;
            }
        }
        externalEvents.forEach(this::notifyListeners);
    }

    private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(AntiEntropyAdvertisement<K> ad) {
        LinkedList<EventuallyConsistentMapEvent<K, V>> externalEvents = new LinkedList<EventuallyConsistentMapEvent<K, V>>();
        NodeId sender = ad.sender();
        for (Map.Entry item : this.items.entrySet()) {
            Timestamp remoteDeadTimestamp;
            Object key = item.getKey();
            Timestamped localValue = (Timestamped)item.getValue();
            Timestamp remoteTimestamp = ad.timestamps().get(key);
            if (remoteTimestamp == null) {
                remoteTimestamp = ad.tombstones().get(key);
            }
            if (remoteTimestamp == null || localValue.isNewerThan(remoteTimestamp)) {
                this.queueUpdate(new PutEntry(key, localValue.value(), localValue.timestamp()), (Collection<NodeId>)ImmutableList.of((Object)sender));
            }
            if ((remoteDeadTimestamp = ad.tombstones().get(key)) == null || !remoteDeadTimestamp.isNewerThan(localValue.timestamp()) || !this.removeInternal(key, remoteDeadTimestamp)) continue;
            externalEvents.add(new EventuallyConsistentMapEvent(EventuallyConsistentMapEvent.Type.REMOVE, key, null));
        }
        return externalEvents;
    }

    private void antiEntropyCheckLocalRemoved(AntiEntropyAdvertisement<K> ad) {
        NodeId sender = ad.sender();
        for (Map.Entry dead : this.removedItems.entrySet()) {
            Object key = dead.getKey();
            Timestamp localDeadTimestamp = (Timestamp)dead.getValue();
            Timestamp remoteLiveTimestamp = ad.timestamps().get(key);
            if (remoteLiveTimestamp == null || !localDeadTimestamp.isNewerThan(remoteLiveTimestamp)) continue;
            this.queueUpdate(new RemoveEntry(key, localDeadTimestamp), (Collection<NodeId>)ImmutableList.of((Object)sender));
        }
    }

    private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckRemoteRemoved(AntiEntropyAdvertisement<K> ad) {
        LinkedList<EventuallyConsistentMapEvent<K, V>> externalEvents = new LinkedList<EventuallyConsistentMapEvent<K, V>>();
        for (Map.Entry<K, Timestamp> remoteDead : ad.tombstones().entrySet()) {
            K key = remoteDead.getKey();
            Timestamp remoteDeadTimestamp = remoteDead.getValue();
            Timestamped local = (Timestamped)this.items.get(key);
            Timestamp localDead = (Timestamp)this.removedItems.get(key);
            if (local != null && remoteDeadTimestamp.isNewerThan(local.timestamp())) {
                if (!this.removeInternal(key, remoteDeadTimestamp)) continue;
                externalEvents.add(new EventuallyConsistentMapEvent(EventuallyConsistentMapEvent.Type.REMOVE, key, null));
                continue;
            }
            if (localDead == null || !remoteDeadTimestamp.isNewerThan(localDead)) continue;
            this.removeInternal(key, remoteDeadTimestamp);
        }
        return externalEvents;
    }

    private final class EventAccumulator
    extends AbstractAccumulator<AbstractEntry<K, V>> {
        private final NodeId peer;

        private EventAccumulator(NodeId peer) {
            super(TIMER, 1000, 50, 10);
            this.peer = peer;
        }

        public void processItems(List<AbstractEntry<K, V>> items) {
            HashMap map = Maps.newHashMap();
            items.forEach(item -> map.compute(item.key(), (key, oldValue) -> oldValue == null || item.compareTo(oldValue) > 0 ? item : oldValue));
            EventuallyConsistentMapImpl.this.communicationExecutor.submit(() -> EventuallyConsistentMapImpl.this.clusterCommunicator.unicast((Object)Lists.newArrayList(map.values()), EventuallyConsistentMapImpl.this.updateMessageSubject, arg_0 -> ((KryoSerializer)EventuallyConsistentMapImpl.this.serializer).encode(arg_0), this.peer).whenComplete((result, error) -> {
                if (error != null) {
                    log.debug("Failed to send to {}", (Object)this.peer);
                }
            }));
        }
    }

    private final class InternalEventListener
    implements ClusterMessageHandler {
        private InternalEventListener() {
        }

        public void handle(ClusterMessage message) {
            if (EventuallyConsistentMapImpl.this.destroyed) {
                return;
            }
            log.debug("Received update event from peer: {}", (Object)message.sender());
            Collection events = (Collection)EventuallyConsistentMapImpl.this.serializer.decode(message.payload());
            try {
                for (AbstractEntry entry : events) {
                    boolean success;
                    EventuallyConsistentMapEvent.Type type;
                    Object value;
                    Object key = entry.key();
                    Timestamp timestamp = entry.timestamp();
                    if (entry instanceof PutEntry) {
                        PutEntry putEntry = (PutEntry)entry;
                        value = putEntry.value();
                        type = EventuallyConsistentMapEvent.Type.PUT;
                    } else if (entry instanceof RemoveEntry) {
                        type = EventuallyConsistentMapEvent.Type.REMOVE;
                        value = null;
                    } else {
                        throw new IllegalStateException("Unknown entry type " + entry.getClass());
                    }
                    switch (type) {
                        case PUT: {
                            success = EventuallyConsistentMapImpl.this.putInternal(key, value, timestamp);
                            break;
                        }
                        case REMOVE: {
                            success = EventuallyConsistentMapImpl.this.removeInternal(key, timestamp);
                            break;
                        }
                        default: {
                            success = false;
                        }
                    }
                    if (!success) continue;
                    EventuallyConsistentMapImpl.this.notifyListeners(new EventuallyConsistentMapEvent(type, key, value));
                }
            }
            catch (Exception e) {
                log.warn("Exception thrown handling put", (Throwable)e);
            }
        }
    }

    private final class InternalAntiEntropyListener
    implements ClusterMessageHandler {
        private InternalAntiEntropyListener() {
        }

        public void handle(ClusterMessage message) {
            log.trace("Received anti-entropy advertisement from peer: {}", (Object)message.sender());
            AntiEntropyAdvertisement advertisement = (AntiEntropyAdvertisement)EventuallyConsistentMapImpl.this.serializer.decode(message.payload());
            try {
                if (!EventuallyConsistentMapImpl.this.underHighLoad()) {
                    EventuallyConsistentMapImpl.this.handleAntiEntropyAdvertisement(advertisement);
                }
            }
            catch (Exception e) {
                log.warn("Exception thrown handling advertisements", (Throwable)e);
            }
        }
    }

    private final class SendAdvertisementTask
    implements Runnable {
        private SendAdvertisementTask() {
        }

        @Override
        public void run() {
            if (Thread.currentThread().isInterrupted()) {
                log.info("Interrupted, quitting");
                return;
            }
            if (EventuallyConsistentMapImpl.this.underHighLoad() || EventuallyConsistentMapImpl.this.destroyed) {
                return;
            }
            try {
                int idx;
                NodeId peer;
                NodeId self = EventuallyConsistentMapImpl.this.clusterService.getLocalNode().id();
                Set nodes = EventuallyConsistentMapImpl.this.clusterService.getNodes();
                List nodeIds = nodes.stream().map(ControllerNode::id).collect(Collectors.toList());
                if (nodeIds.size() == 1 && ((NodeId)nodeIds.get(0)).equals((Object)self)) {
                    log.trace("No other peers in the cluster.");
                    return;
                }
                while ((peer = (NodeId)nodeIds.get(idx = RandomUtils.nextInt((int)0, (int)nodeIds.size()))).equals((Object)self)) {
                }
                if (Thread.currentThread().isInterrupted()) {
                    log.info("Interrupted, quitting");
                    return;
                }
                AntiEntropyAdvertisement ad = EventuallyConsistentMapImpl.this.createAdvertisement();
                NodeId destination = peer;
                EventuallyConsistentMapImpl.this.clusterCommunicator.unicast((Object)ad, EventuallyConsistentMapImpl.this.antiEntropyAdvertisementSubject, arg_0 -> ((KryoSerializer)EventuallyConsistentMapImpl.this.serializer).encode(arg_0), peer).whenComplete((result, error) -> {
                    if (error != null) {
                        log.debug("Failed to send anti-entropy advertisement to {}", (Object)destination);
                    }
                });
            }
            catch (Exception e) {
                log.error("Exception thrown while sending advertisement", (Throwable)e);
            }
        }
    }
}

