/*
 * Decompiled with CFR 0.152.
 */
package org.onosproject.store.flow.impl;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.collect.Streams;
import com.google.common.util.concurrent.Futures;
import java.util.Collection;
import java.util.Collections;
import java.util.Dictionary;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Modified;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.core.CoreService;
import org.onosproject.core.IdGenerator;
import org.onosproject.event.Event;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.DeviceId;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.flow.CompletedBatchOperation;
import org.onosproject.net.flow.DefaultFlowEntry;
import org.onosproject.net.flow.FlowEntry;
import org.onosproject.net.flow.FlowId;
import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.flow.FlowRuleEvent;
import org.onosproject.net.flow.FlowRuleStore;
import org.onosproject.net.flow.FlowRuleStoreDelegate;
import org.onosproject.net.flow.StoredFlowEntry;
import org.onosproject.net.flow.TableStatisticsEntry;
import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry;
import org.onosproject.net.flow.oldbatch.FlowRuleBatchEvent;
import org.onosproject.net.flow.oldbatch.FlowRuleBatchOperation;
import org.onosproject.net.flow.oldbatch.FlowRuleBatchRequest;
import org.onosproject.persistence.PersistenceService;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.flow.ReplicaInfoEvent;
import org.onosproject.store.flow.ReplicaInfoEventListener;
import org.onosproject.store.flow.ReplicaInfoService;
import org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects;
import org.onosproject.store.impl.MastershipBasedTimestamp;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.WallClockTimestamp;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Component(immediate=true)
@Service
public class ECFlowRuleStore
extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
implements FlowRuleStore {
    private final Logger log = LoggerFactory.getLogger(((Object)((Object)this)).getClass());
    private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
    private static final int DEFAULT_MAX_BACKUP_COUNT = 2;
    private static final boolean DEFAULT_PERSISTENCE_ENABLED = false;
    private static final int DEFAULT_BACKUP_PERIOD_MILLIS = 2000;
    private static final int DEFAULT_ANTI_ENTROPY_PERIOD_MILLIS = 5000;
    private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000L;
    private static final int NUM_BUCKETS = 1024;
    @Property(name="msgHandlerPoolSize", intValue={8}, label="Number of threads in the message handler pool")
    private int msgHandlerPoolSize = 8;
    @Property(name="backupPeriod", intValue={2000}, label="Delay in ms between successive backup runs")
    private int backupPeriod = 2000;
    @Property(name="antiEntropyPeriod", intValue={5000}, label="Delay in ms between anti-entropy runs")
    private int antiEntropyPeriod = 5000;
    @Property(name="persistenceEnabled", boolValue={false}, label="Indicates whether or not changes in the flow table should be persisted to disk.")
    private boolean persistenceEnabled = false;
    @Property(name="backupCount", intValue={2}, label="Max number of backup copies for each device")
    private volatile int backupCount = 2;
    private InternalFlowTable flowTable = new InternalFlowTable();
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected ReplicaInfoService replicaInfoManager;
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected ClusterCommunicationService clusterCommunicator;
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected ClusterService clusterService;
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected DeviceService deviceService;
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected CoreService coreService;
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected ComponentConfigService configService;
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected MastershipService mastershipService;
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected PersistenceService persistenceService;
    private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
    private ExecutorService messageHandlingExecutor;
    private ExecutorService eventHandler;
    private ScheduledFuture<?> backupTask;
    private ScheduledFuture<?> antiEntropyTask;
    private final ScheduledExecutorService backupSenderExecutor = Executors.newSingleThreadScheduledExecutor(Tools.groupedThreads((String)"onos/flow", (String)"backup-sender", (Logger)this.log));
    private EventuallyConsistentMap<DeviceId, List<TableStatisticsEntry>> deviceTableStats;
    private final EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> tableStatsListener = new InternalTableStatsListener();
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected StorageService storageService;
    protected final Serializer serializer = Serializer.using((KryoNamespace)KryoNamespace.newBuilder().register(KryoNamespaces.API).register(new Class[]{BucketId.class}).register(new Class[]{FlowBucket.class}).build());
    protected final KryoNamespace.Builder serializerBuilder = KryoNamespace.newBuilder().register(KryoNamespaces.API).register(new Class[]{BucketId.class}).register(new Class[]{MastershipBasedTimestamp.class});
    private EventuallyConsistentMap<BucketId, Integer> flowCounts;
    private IdGenerator idGenerator;
    private NodeId local;

    @Activate
    public void activate(ComponentContext context) {
        this.configService.registerProperties(((Object)((Object)this)).getClass());
        this.idGenerator = this.coreService.getIdGenerator("flow-ops-ids");
        this.local = this.clusterService.getLocalNode().id();
        this.eventHandler = Executors.newSingleThreadExecutor(Tools.groupedThreads((String)"onos/flow", (String)"event-handler", (Logger)this.log));
        this.messageHandlingExecutor = Executors.newFixedThreadPool(this.msgHandlerPoolSize, Tools.groupedThreads((String)"onos/store/flow", (String)"message-handlers", (Logger)this.log));
        this.registerMessageHandlers(this.messageHandlingExecutor);
        this.replicaInfoManager.addListener(this.flowTable);
        this.backupTask = this.backupSenderExecutor.scheduleWithFixedDelay(() -> this.flowTable.backup(), 0L, this.backupPeriod, TimeUnit.MILLISECONDS);
        this.antiEntropyTask = this.backupSenderExecutor.scheduleWithFixedDelay(() -> this.flowTable.runAntiEntropy(), 0L, this.antiEntropyPeriod, TimeUnit.MILLISECONDS);
        this.flowCounts = this.storageService.eventuallyConsistentMapBuilder().withName("onos-flow-counts").withSerializer(this.serializerBuilder).withAntiEntropyPeriod(5L, TimeUnit.SECONDS).withTimestampProvider((k, v) -> new WallClockTimestamp()).withTombstonesDisabled().build();
        this.deviceTableStats = this.storageService.eventuallyConsistentMapBuilder().withName("onos-flow-table-stats").withSerializer(this.serializerBuilder).withAntiEntropyPeriod(5L, TimeUnit.SECONDS).withTimestampProvider((k, v) -> new WallClockTimestamp()).withTombstonesDisabled().build();
        this.deviceTableStats.addListener(this.tableStatsListener);
        this.logConfig("Started");
    }

    @Deactivate
    public void deactivate(ComponentContext context) {
        this.replicaInfoManager.removeListener(this.flowTable);
        this.backupTask.cancel(true);
        this.configService.unregisterProperties(((Object)((Object)this)).getClass(), false);
        this.unregisterMessageHandlers();
        this.deviceTableStats.removeListener(this.tableStatsListener);
        this.deviceTableStats.destroy();
        this.eventHandler.shutdownNow();
        this.messageHandlingExecutor.shutdownNow();
        this.backupSenderExecutor.shutdownNow();
        this.log.info("Stopped");
    }

    @Modified
    public void modified(ComponentContext context) {
        int newAntiEntropyPeriod;
        int newBackupCount;
        int newBackupPeriod;
        int newPoolSize;
        if (context == null) {
            this.logConfig("Default config");
            return;
        }
        Dictionary properties = context.getProperties();
        try {
            String s = Tools.get((Dictionary)properties, (String)"msgHandlerPoolSize");
            newPoolSize = Strings.isNullOrEmpty((String)s) ? this.msgHandlerPoolSize : Integer.parseInt(s.trim());
            s = Tools.get((Dictionary)properties, (String)"backupPeriod");
            newBackupPeriod = Strings.isNullOrEmpty((String)s) ? this.backupPeriod : Integer.parseInt(s.trim());
            s = Tools.get((Dictionary)properties, (String)"backupCount");
            newBackupCount = Strings.isNullOrEmpty((String)s) ? this.backupCount : Integer.parseInt(s.trim());
            s = Tools.get((Dictionary)properties, (String)"antiEntropyPeriod");
            newAntiEntropyPeriod = Strings.isNullOrEmpty((String)s) ? this.antiEntropyPeriod : Integer.parseInt(s.trim());
        }
        catch (ClassCastException | NumberFormatException e) {
            newPoolSize = 8;
            newBackupPeriod = 2000;
            newBackupCount = 2;
            newAntiEntropyPeriod = 5000;
        }
        boolean restartBackupTask = false;
        boolean restartAntiEntropyTask = false;
        if (newBackupPeriod != this.backupPeriod) {
            this.backupPeriod = newBackupPeriod;
            restartBackupTask = true;
        }
        if (newAntiEntropyPeriod != this.antiEntropyPeriod) {
            this.antiEntropyPeriod = newAntiEntropyPeriod;
            restartAntiEntropyTask = true;
        }
        if (restartBackupTask) {
            if (this.backupTask != null) {
                this.backupTask.cancel(false);
            }
            this.backupTask = this.backupSenderExecutor.scheduleWithFixedDelay(() -> this.flowTable.backup(), 0L, this.backupPeriod, TimeUnit.MILLISECONDS);
        }
        if (restartAntiEntropyTask) {
            if (this.antiEntropyTask != null) {
                this.antiEntropyTask.cancel(false);
            }
            this.antiEntropyTask = this.backupSenderExecutor.scheduleWithFixedDelay(() -> this.flowTable.runAntiEntropy(), 0L, this.antiEntropyPeriod, TimeUnit.MILLISECONDS);
        }
        if (newPoolSize != this.msgHandlerPoolSize) {
            this.msgHandlerPoolSize = newPoolSize;
            ExecutorService oldMsgHandler = this.messageHandlingExecutor;
            this.messageHandlingExecutor = Executors.newFixedThreadPool(this.msgHandlerPoolSize, Tools.groupedThreads((String)"onos/store/flow", (String)"message-handlers", (Logger)this.log));
            this.registerMessageHandlers(this.messageHandlingExecutor);
            oldMsgHandler.shutdown();
        }
        if (this.backupCount != newBackupCount) {
            this.backupCount = newBackupCount;
        }
        this.logConfig("Reconfigured");
    }

    private void registerMessageHandlers(ExecutorService executor) {
        this.clusterCommunicator.addSubscriber(ECFlowRuleStoreMessageSubjects.APPLY_BATCH_FLOWS, (ClusterMessageHandler)new OnStoreBatch(), executor);
        this.clusterCommunicator.addSubscriber(ECFlowRuleStoreMessageSubjects.REMOTE_APPLY_COMPLETED, arg_0 -> ((Serializer)this.serializer).decode(arg_0), arg_0 -> ((ECFlowRuleStore)this).notifyDelegate(arg_0), (Executor)executor);
        this.clusterCommunicator.addSubscriber(ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY, arg_0 -> ((Serializer)this.serializer).decode(arg_0), this.flowTable::getFlowEntry, arg_0 -> ((Serializer)this.serializer).encode(arg_0), (Executor)executor);
        this.clusterCommunicator.addSubscriber(ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES, arg_0 -> ((Serializer)this.serializer).decode(arg_0), this.flowTable::getFlowEntries, arg_0 -> ((Serializer)this.serializer).encode(arg_0), (Executor)executor);
        this.clusterCommunicator.addSubscriber(ECFlowRuleStoreMessageSubjects.REMOVE_FLOW_ENTRY, arg_0 -> ((Serializer)this.serializer).decode(arg_0), this::removeFlowRuleInternal, arg_0 -> ((Serializer)this.serializer).encode(arg_0), (Executor)executor);
        this.clusterCommunicator.addSubscriber(ECFlowRuleStoreMessageSubjects.FLOW_TABLE_BACKUP, arg_0 -> ((Serializer)this.serializer).decode(arg_0), x$0 -> this.flowTable.onBackup(x$0), arg_0 -> ((Serializer)this.serializer).encode(arg_0), (Executor)executor);
        this.clusterCommunicator.addSubscriber(ECFlowRuleStoreMessageSubjects.FLOW_TABLE_ANTI_ENTROPY, arg_0 -> ((Serializer)this.serializer).decode(arg_0), x$0 -> this.flowTable.onAntiEntropy(x$0), arg_0 -> ((Serializer)this.serializer).encode(arg_0), (Executor)executor);
    }

    private void unregisterMessageHandlers() {
        this.clusterCommunicator.removeSubscriber(ECFlowRuleStoreMessageSubjects.REMOVE_FLOW_ENTRY);
        this.clusterCommunicator.removeSubscriber(ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES);
        this.clusterCommunicator.removeSubscriber(ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY);
        this.clusterCommunicator.removeSubscriber(ECFlowRuleStoreMessageSubjects.APPLY_BATCH_FLOWS);
        this.clusterCommunicator.removeSubscriber(ECFlowRuleStoreMessageSubjects.REMOTE_APPLY_COMPLETED);
        this.clusterCommunicator.removeSubscriber(ECFlowRuleStoreMessageSubjects.FLOW_TABLE_BACKUP);
        this.clusterCommunicator.removeSubscriber(ECFlowRuleStoreMessageSubjects.FLOW_TABLE_ANTI_ENTROPY);
    }

    private void logConfig(String prefix) {
        this.log.info("{} with msgHandlerPoolSize = {}; backupPeriod = {}, backupCount = {}", new Object[]{prefix, this.msgHandlerPoolSize, this.backupPeriod, this.backupCount});
    }

    public int getFlowRuleCount() {
        return ((Stream)Streams.stream((Iterable)this.deviceService.getDevices()).parallel()).mapToInt(device -> this.getFlowRuleCount(device.id())).sum();
    }

    public int getFlowRuleCount(DeviceId deviceId) {
        return this.flowCounts.entrySet().stream().filter(entry -> ((BucketId)entry.getKey()).deviceId().equals((Object)deviceId)).mapToInt(entry -> (Integer)entry.getValue()).sum();
    }

    public FlowEntry getFlowEntry(FlowRule rule) {
        NodeId master = this.mastershipService.getMasterFor(rule.deviceId());
        if (master == null) {
            this.log.debug("Failed to getFlowEntry: No master for {}", (Object)rule.deviceId());
            return null;
        }
        if (Objects.equals(this.local, master)) {
            return this.flowTable.getFlowEntry(rule);
        }
        this.log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}", (Object)master, (Object)rule.deviceId());
        return (FlowEntry)Tools.futureGetOrElse((Future)this.clusterCommunicator.sendAndReceive((Object)rule, ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY, arg_0 -> ((Serializer)this.serializer).encode(arg_0), arg_0 -> ((Serializer)this.serializer).decode(arg_0), master), (long)5000L, (TimeUnit)TimeUnit.MILLISECONDS, null);
    }

    public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
        NodeId master = this.mastershipService.getMasterFor(deviceId);
        if (master == null) {
            this.log.debug("Failed to getFlowEntries: No master for {}", (Object)deviceId);
            return Collections.emptyList();
        }
        if (Objects.equals(this.local, master)) {
            return this.flowTable.getFlowEntries(deviceId);
        }
        this.log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}", (Object)master, (Object)deviceId);
        return (Iterable)Tools.futureGetOrElse((Future)this.clusterCommunicator.sendAndReceive((Object)deviceId, ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES, arg_0 -> ((Serializer)this.serializer).encode(arg_0), arg_0 -> ((Serializer)this.serializer).decode(arg_0), master), (long)5000L, (TimeUnit)TimeUnit.MILLISECONDS, Collections.emptyList());
    }

    public void storeFlowRule(FlowRule rule) {
        this.storeBatch(new FlowRuleBatchOperation(Collections.singletonList(new FlowRuleBatchEntry(FlowRuleBatchEntry.FlowRuleOperation.ADD, rule)), rule.deviceId(), this.idGenerator.getNewId()));
    }

    public void storeBatch(FlowRuleBatchOperation operation) {
        if (operation.getOperations().isEmpty()) {
            this.notifyDelegate((Event)FlowRuleBatchEvent.completed((FlowRuleBatchRequest)new FlowRuleBatchRequest(operation.id(), Collections.emptySet()), (CompletedBatchOperation)new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
            return;
        }
        DeviceId deviceId = operation.deviceId();
        NodeId master = this.mastershipService.getMasterFor(deviceId);
        if (master == null) {
            this.log.warn("No master for {} ", (Object)deviceId);
            this.updateStoreInternal(operation);
            this.notifyDelegate((Event)FlowRuleBatchEvent.completed((FlowRuleBatchRequest)new FlowRuleBatchRequest(operation.id(), Collections.emptySet()), (CompletedBatchOperation)new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
            return;
        }
        if (Objects.equals(this.local, master)) {
            this.storeBatchInternal(operation);
            return;
        }
        this.log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}", (Object)master, (Object)deviceId);
        this.clusterCommunicator.unicast((Object)operation, ECFlowRuleStoreMessageSubjects.APPLY_BATCH_FLOWS, arg_0 -> ((Serializer)this.serializer).encode(arg_0), master).whenComplete((result, error) -> {
            if (error != null) {
                this.log.warn("Failed to storeBatch: {} to {}", new Object[]{operation, master, error});
                Set allFailures = operation.getOperations().stream().map(op -> (FlowRule)op.target()).collect(Collectors.toSet());
                this.notifyDelegate((Event)FlowRuleBatchEvent.completed((FlowRuleBatchRequest)new FlowRuleBatchRequest(operation.id(), Collections.emptySet()), (CompletedBatchOperation)new CompletedBatchOperation(false, allFailures, deviceId)));
            }
        });
    }

    private void storeBatchInternal(FlowRuleBatchOperation operation) {
        DeviceId did = operation.deviceId();
        Set<FlowRuleBatchEntry> currentOps = this.updateStoreInternal(operation);
        if (currentOps.isEmpty()) {
            this.batchOperationComplete(FlowRuleBatchEvent.completed((FlowRuleBatchRequest)new FlowRuleBatchRequest(operation.id(), Collections.emptySet()), (CompletedBatchOperation)new CompletedBatchOperation(true, Collections.emptySet(), did)));
            return;
        }
        this.notifyDelegate((Event)FlowRuleBatchEvent.requested((FlowRuleBatchRequest)new FlowRuleBatchRequest(operation.id(), currentOps), (DeviceId)operation.deviceId()));
    }

    private Set<FlowRuleBatchEntry> updateStoreInternal(FlowRuleBatchOperation operation) {
        return operation.getOperations().stream().map(op -> {
            switch ((FlowRuleBatchEntry.FlowRuleOperation)op.operator()) {
                case ADD: {
                    DefaultFlowEntry entry = new DefaultFlowEntry((FlowRule)op.target());
                    this.flowTable.add((FlowEntry)entry);
                    return op;
                }
                case MODIFY: {
                    DefaultFlowEntry entry = new DefaultFlowEntry((FlowRule)op.target());
                    this.flowTable.update((FlowEntry)entry);
                    return op;
                }
                case REMOVE: {
                    StoredFlowEntry entry = this.flowTable.getFlowEntry((FlowRule)op.target());
                    if (entry == null) break;
                    entry.setState(FlowEntry.FlowEntryState.PENDING_REMOVE);
                    this.flowTable.update((FlowEntry)entry);
                    this.log.debug("Setting state of rule to pending remove: {}", (Object)entry);
                    return op;
                }
                default: {
                    this.log.warn("Unknown flow operation operator: {}", (Object)op.operator());
                }
            }
            return null;
        }).filter(Objects::nonNull).collect(Collectors.toSet());
    }

    public void deleteFlowRule(FlowRule rule) {
        this.storeBatch(new FlowRuleBatchOperation(Collections.singletonList(new FlowRuleBatchEntry(FlowRuleBatchEntry.FlowRuleOperation.REMOVE, rule)), rule.deviceId(), this.idGenerator.getNewId()));
    }

    public FlowRuleEvent pendingFlowRule(FlowEntry rule) {
        StoredFlowEntry stored;
        if (this.mastershipService.isLocalMaster(rule.deviceId()) && (stored = this.flowTable.getFlowEntry((FlowRule)rule)) != null && stored.state() != FlowEntry.FlowEntryState.PENDING_ADD) {
            stored.setState(FlowEntry.FlowEntryState.PENDING_ADD);
            return new FlowRuleEvent(FlowRuleEvent.Type.RULE_UPDATED, (FlowRule)rule);
        }
        return null;
    }

    public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
        NodeId master = this.mastershipService.getMasterFor(rule.deviceId());
        if (Objects.equals(this.local, master)) {
            return this.addOrUpdateFlowRuleInternal(rule);
        }
        this.log.warn("Tried to update FlowRule {} state, while the Node was not the master.", (Object)rule);
        return null;
    }

    private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
        StoredFlowEntry stored = this.flowTable.getFlowEntry((FlowRule)rule);
        if (stored != null) {
            stored.setBytes(rule.bytes());
            stored.setLife(rule.life(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
            stored.setLiveType(rule.liveType());
            stored.setPackets(rule.packets());
            stored.setLastSeen();
            if (stored.state() == FlowEntry.FlowEntryState.PENDING_ADD) {
                stored.setState(FlowEntry.FlowEntryState.ADDED);
                this.flowTable.update((FlowEntry)stored);
                return new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADDED, (FlowRule)rule);
            }
            return new FlowRuleEvent(FlowRuleEvent.Type.RULE_UPDATED, (FlowRule)rule);
        }
        this.flowTable.add(rule);
        return null;
    }

    public FlowRuleEvent removeFlowRule(FlowEntry rule) {
        DeviceId deviceId = rule.deviceId();
        NodeId master = this.mastershipService.getMasterFor(deviceId);
        if (Objects.equals(this.local, master)) {
            return this.removeFlowRuleInternal(rule);
        }
        if (master == null) {
            this.log.warn("Failed to removeFlowRule: No master for {}", (Object)deviceId);
            return null;
        }
        this.log.trace("Forwarding removeFlowRule to {}, which is the master for device {}", (Object)master, (Object)deviceId);
        return (FlowRuleEvent)Futures.getUnchecked((Future)this.clusterCommunicator.sendAndReceive((Object)rule, ECFlowRuleStoreMessageSubjects.REMOVE_FLOW_ENTRY, arg_0 -> ((Serializer)this.serializer).encode(arg_0), arg_0 -> ((Serializer)this.serializer).decode(arg_0), master));
    }

    private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
        FlowEntry removed = this.flowTable.remove(rule);
        return removed != null ? new FlowRuleEvent(FlowRuleEvent.Type.RULE_REMOVED, (FlowRule)removed) : null;
    }

    public void purgeFlowRule(DeviceId deviceId) {
        this.flowTable.purgeFlowRule(deviceId);
    }

    public void purgeFlowRules() {
        this.flowTable.purgeFlowRules();
    }

    public void batchOperationComplete(FlowRuleBatchEvent event) {
        NodeId nodeId = this.pendingResponses.remove(((FlowRuleBatchRequest)event.subject()).batchId());
        if (nodeId == null) {
            this.notifyDelegate((Event)event);
        } else {
            this.clusterCommunicator.unicast((Object)event, ECFlowRuleStoreMessageSubjects.REMOTE_APPLY_COMPLETED, arg_0 -> ((Serializer)this.serializer).encode(arg_0), nodeId);
        }
    }

    public FlowRuleEvent updateTableStatistics(DeviceId deviceId, List<TableStatisticsEntry> tableStats) {
        this.deviceTableStats.put((Object)deviceId, tableStats);
        return null;
    }

    public Iterable<TableStatisticsEntry> getTableStatistics(DeviceId deviceId) {
        NodeId master = this.mastershipService.getMasterFor(deviceId);
        if (master == null) {
            this.log.debug("Failed to getTableStats: No master for {}", (Object)deviceId);
            return Collections.emptyList();
        }
        List tableStats = (List)this.deviceTableStats.get((Object)deviceId);
        if (tableStats == null) {
            return Collections.emptyList();
        }
        return ImmutableList.copyOf((Collection)tableStats);
    }

    public long getActiveFlowRuleCount(DeviceId deviceId) {
        return Streams.stream(this.getTableStatistics(deviceId)).mapToLong(TableStatisticsEntry::activeFlowEntries).sum();
    }

    protected void bindReplicaInfoManager(ReplicaInfoService replicaInfoService) {
        this.replicaInfoManager = replicaInfoService;
    }

    protected void unbindReplicaInfoManager(ReplicaInfoService replicaInfoService) {
        if (this.replicaInfoManager == replicaInfoService) {
            this.replicaInfoManager = null;
        }
    }

    protected void bindClusterCommunicator(ClusterCommunicationService clusterCommunicationService) {
        this.clusterCommunicator = clusterCommunicationService;
    }

    protected void unbindClusterCommunicator(ClusterCommunicationService clusterCommunicationService) {
        if (this.clusterCommunicator == clusterCommunicationService) {
            this.clusterCommunicator = null;
        }
    }

    protected void bindClusterService(ClusterService clusterService) {
        this.clusterService = clusterService;
    }

    protected void unbindClusterService(ClusterService clusterService) {
        if (this.clusterService == clusterService) {
            this.clusterService = null;
        }
    }

    protected void bindDeviceService(DeviceService deviceService) {
        this.deviceService = deviceService;
    }

    protected void unbindDeviceService(DeviceService deviceService) {
        if (this.deviceService == deviceService) {
            this.deviceService = null;
        }
    }

    protected void bindCoreService(CoreService coreService) {
        this.coreService = coreService;
    }

    protected void unbindCoreService(CoreService coreService) {
        if (this.coreService == coreService) {
            this.coreService = null;
        }
    }

    protected void bindConfigService(ComponentConfigService componentConfigService) {
        this.configService = componentConfigService;
    }

    protected void unbindConfigService(ComponentConfigService componentConfigService) {
        if (this.configService == componentConfigService) {
            this.configService = null;
        }
    }

    protected void bindMastershipService(MastershipService mastershipService) {
        this.mastershipService = mastershipService;
    }

    protected void unbindMastershipService(MastershipService mastershipService) {
        if (this.mastershipService == mastershipService) {
            this.mastershipService = null;
        }
    }

    protected void bindPersistenceService(PersistenceService persistenceService) {
        this.persistenceService = persistenceService;
    }

    protected void unbindPersistenceService(PersistenceService persistenceService) {
        if (this.persistenceService == persistenceService) {
            this.persistenceService = null;
        }
    }

    protected void bindStorageService(StorageService storageService) {
        this.storageService = storageService;
    }

    protected void unbindStorageService(StorageService storageService) {
        if (this.storageService == storageService) {
            this.storageService = null;
        }
    }

    private class InternalTableStatsListener
    implements EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> {
        private InternalTableStatsListener() {
        }

        public void event(EventuallyConsistentMapEvent<DeviceId, List<TableStatisticsEntry>> event) {
        }
    }

    private class InternalFlowTable
    implements ReplicaInfoEventListener {
        private final Map<DeviceId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>> flowEntries = Maps.newConcurrentMap();
        private final Map<BackupOperation, Long> lastBackupTimes = Maps.newConcurrentMap();
        private final Map<BucketId, Long> lastUpdateTimes = Maps.newConcurrentMap();
        private final Set<BackupOperation> inFlightUpdates = Sets.newConcurrentHashSet();

        private InternalFlowTable() {
        }

        public void event(ReplicaInfoEvent event) {
            ECFlowRuleStore.this.eventHandler.execute(() -> this.handleEvent(event));
        }

        private void handleEvent(ReplicaInfoEvent event) {
            DeviceId deviceId = (DeviceId)event.subject();
            if (!ECFlowRuleStore.this.mastershipService.isLocalMaster(deviceId)) {
                return;
            }
            if (event.type() == ReplicaInfoEvent.Type.MASTER_CHANGED) {
                for (int bucket = 0; bucket < 1024; ++bucket) {
                    this.recordUpdate(new BucketId(deviceId, bucket));
                }
            }
            ECFlowRuleStore.this.backupSenderExecutor.execute(this::backup);
        }

        private Set<DeviceId> getDevices() {
            return this.flowEntries.keySet();
        }

        private Set<FlowBucketDigest> getDigests(DeviceId deviceId) {
            return IntStream.range(0, 1024).mapToObj(bucket -> {
                BucketId bucketId = new BucketId(deviceId, bucket);
                long timestamp = this.lastUpdateTimes.getOrDefault(bucketId, 0L);
                return new FlowBucketDigest(bucketId, timestamp);
            }).collect(Collectors.toSet());
        }

        private Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> getFlowTable(DeviceId deviceId) {
            if (ECFlowRuleStore.this.persistenceEnabled) {
                Map flowTable = this.flowEntries.get(deviceId);
                return flowTable != null ? flowTable : this.flowEntries.computeIfAbsent(deviceId, id -> ECFlowRuleStore.this.persistenceService.persistentMapBuilder().withName("FlowTable:" + deviceId.toString()).withSerializer(ECFlowRuleStore.this.serializer).build());
            }
            Map flowTable = this.flowEntries.get(deviceId);
            return flowTable != null ? flowTable : this.flowEntries.computeIfAbsent(deviceId, id -> Maps.newConcurrentMap());
        }

        private FlowBucket getFlowBucket(BucketId bucketId) {
            long timestamp = this.lastUpdateTimes.getOrDefault(bucketId, 0L);
            return new FlowBucket(bucketId, this.getFlowTable(bucketId.deviceId()).entrySet().stream().filter(entry -> this.isInBucket((FlowId)entry.getKey(), bucketId.bucket())).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)), timestamp);
        }

        private Map<StoredFlowEntry, StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId, FlowId flowId) {
            Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> flowTable = this.getFlowTable(deviceId);
            Map flowEntries = flowTable.get(flowId);
            return flowEntries != null ? flowEntries : flowTable.computeIfAbsent(flowId, id -> Maps.newConcurrentMap());
        }

        private StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
            return this.getFlowEntriesInternal(rule.deviceId(), rule.id()).get(rule);
        }

        private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
            return this.getFlowTable(deviceId).values().stream().flatMap(m -> m.values().stream()).collect(Collectors.toSet());
        }

        public StoredFlowEntry getFlowEntry(FlowRule rule) {
            return this.getFlowEntryInternal(rule);
        }

        public Set<FlowEntry> getFlowEntries(DeviceId deviceId) {
            return this.getFlowEntriesInternal(deviceId);
        }

        private boolean isInBucket(FlowId flowId, int bucket) {
            return this.bucket(flowId) == bucket;
        }

        private int bucket(FlowId flowId) {
            return (int)((Long)flowId.id() % 1024L);
        }

        private void recordUpdate(BucketId bucketId) {
            this.lastUpdateTimes.put(bucketId, System.currentTimeMillis());
        }

        public void add(FlowEntry rule) {
            this.getFlowEntriesInternal(rule.deviceId(), rule.id()).compute((StoredFlowEntry)rule, (k, stored) -> (StoredFlowEntry)rule);
            this.recordUpdate(new BucketId(rule.deviceId(), this.bucket(rule.id())));
        }

        public void update(FlowEntry rule) {
            this.getFlowEntriesInternal(rule.deviceId(), rule.id()).computeIfPresent((StoredFlowEntry)rule, (k, stored) -> {
                if (rule instanceof DefaultFlowEntry) {
                    DefaultFlowEntry updated = (DefaultFlowEntry)rule;
                    if (stored instanceof DefaultFlowEntry) {
                        DefaultFlowEntry storedEntry = (DefaultFlowEntry)stored;
                        if (updated.created() >= storedEntry.created()) {
                            this.recordUpdate(new BucketId(rule.deviceId(), this.bucket(rule.id())));
                            return updated;
                        }
                        ECFlowRuleStore.this.log.debug("Trying to update more recent flow entry {} (stored: {})", (Object)updated, stored);
                        return stored;
                    }
                }
                return stored;
            });
        }

        public FlowEntry remove(FlowEntry rule) {
            AtomicReference removedRule = new AtomicReference();
            Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> flowTable = this.getFlowTable(rule.deviceId());
            flowTable.computeIfPresent(rule.id(), (flowId, flowEntries) -> {
                flowEntries.computeIfPresent((StoredFlowEntry)rule, (k, stored) -> {
                    if (rule instanceof DefaultFlowEntry) {
                        DefaultFlowEntry toRemove = (DefaultFlowEntry)rule;
                        if (stored instanceof DefaultFlowEntry) {
                            DefaultFlowEntry storedEntry = (DefaultFlowEntry)stored;
                            if (toRemove.created() < storedEntry.created()) {
                                ECFlowRuleStore.this.log.debug("Trying to remove more recent flow entry {} (stored: {})", (Object)toRemove, stored);
                                return stored;
                            }
                        }
                    }
                    removedRule.set(stored);
                    return null;
                });
                return flowEntries.isEmpty() ? null : flowEntries;
            });
            if (removedRule.get() != null) {
                this.recordUpdate(new BucketId(rule.deviceId(), this.bucket(rule.id())));
                return (FlowEntry)removedRule.get();
            }
            return null;
        }

        public void purgeFlowRule(DeviceId deviceId) {
            this.flowEntries.remove(deviceId);
        }

        public void purgeFlowRules() {
            this.flowEntries.clear();
        }

        private boolean isMasterNode(DeviceId deviceId) {
            NodeId master = ECFlowRuleStore.this.replicaInfoManager.getReplicaInfoFor(deviceId).master().orElse(null);
            return Objects.equals(master, ECFlowRuleStore.this.clusterService.getLocalNode().id());
        }

        private void backup() {
            for (DeviceId deviceId : this.getDevices()) {
                this.backup(deviceId);
            }
        }

        private void backup(DeviceId deviceId) {
            if (!this.isMasterNode(deviceId)) {
                return;
            }
            List<NodeId> backupNodes = ECFlowRuleStore.this.replicaInfoManager.getReplicaInfoFor(deviceId).backups();
            int availableBackupCount = Math.min(ECFlowRuleStore.this.backupCount, backupNodes.size());
            if (availableBackupCount == 0) {
                this.updateDeviceFlowCounts(deviceId);
            } else {
                for (int index = 0; index < availableBackupCount; ++index) {
                    NodeId backupNode = backupNodes.get(index);
                    try {
                        this.backup(deviceId, backupNode);
                        continue;
                    }
                    catch (Exception e) {
                        ECFlowRuleStore.this.log.error("Backup of " + deviceId + " to " + backupNode + " failed", (Throwable)e);
                    }
                }
            }
        }

        private void backup(DeviceId deviceId, NodeId nodeId) {
            long timestamp = System.currentTimeMillis();
            for (int bucket = 0; bucket < 1024; ++bucket) {
                BucketId bucketId = new BucketId(deviceId, bucket);
                BackupOperation operation = new BackupOperation(nodeId, bucketId);
                if (!this.startBackup(operation)) continue;
                this.backup(operation).whenCompleteAsync((succeeded, error) -> {
                    if (error == null && succeeded.booleanValue()) {
                        this.succeedBackup(operation, timestamp);
                    } else {
                        this.failBackup(operation);
                    }
                    this.backup(deviceId, nodeId);
                }, (Executor)ECFlowRuleStore.this.backupSenderExecutor);
            }
        }

        private boolean startBackup(BackupOperation operation) {
            long lastBackupTime = this.lastBackupTimes.getOrDefault(operation, 0L);
            long lastUpdateTime = this.lastUpdateTimes.getOrDefault(operation.bucketId(), 0L);
            return lastUpdateTime > 0L && lastBackupTime <= lastUpdateTime && this.inFlightUpdates.add(operation);
        }

        private void failBackup(BackupOperation operation) {
            this.inFlightUpdates.remove(operation);
        }

        private void succeedBackup(BackupOperation operation, long timestamp) {
            this.lastBackupTimes.put(operation, timestamp);
            this.inFlightUpdates.remove(operation);
        }

        private CompletableFuture<Boolean> backup(BackupOperation operation) {
            ECFlowRuleStore.this.log.debug("Sending flowEntries in bucket {} for device {} to {} for backup.", new Object[]{operation.bucketId().bucket(), operation.bucketId().deviceId(), operation.nodeId()});
            FlowBucket flowBucket = this.getFlowBucket(operation.bucketId());
            CompletableFuture<Boolean> future = new CompletableFuture<Boolean>();
            ECFlowRuleStore.this.clusterCommunicator.sendAndReceive((Object)flowBucket, ECFlowRuleStoreMessageSubjects.FLOW_TABLE_BACKUP, arg_0 -> ((Serializer)ECFlowRuleStore.this.serializer).encode(arg_0), arg_0 -> ((Serializer)ECFlowRuleStore.this.serializer).decode(arg_0), operation.nodeId()).whenComplete((backedupFlows, error) -> {
                Sets.SetView flowsNotBackedUp;
                Sets.SetView setView = flowsNotBackedUp = error != null ? flowBucket.table().keySet() : Sets.difference(flowBucket.table().keySet(), (Set)backedupFlows);
                if (flowsNotBackedUp.size() > 0) {
                    ECFlowRuleStore.this.log.warn("Failed to backup flows: {}. Reason: {}, Node: {}", new Object[]{flowsNotBackedUp, error != null ? error.getMessage() : "none", operation.nodeId()});
                }
                future.complete(backedupFlows != null);
            });
            this.updateFlowCounts(flowBucket);
            return future;
        }

        private Set<FlowId> onBackup(FlowBucket flowBucket) {
            ECFlowRuleStore.this.log.debug("Received flowEntries for {} bucket {} to backup", (Object)flowBucket.bucketId().deviceId(), (Object)flowBucket.bucketId);
            HashSet backedupFlows = Sets.newHashSet();
            try {
                NodeId master = ECFlowRuleStore.this.replicaInfoManager.getReplicaInfoFor(flowBucket.bucketId().deviceId()).master().orElse(null);
                if (!Objects.equals(ECFlowRuleStore.this.local, master)) {
                    Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> backupFlowTable = this.getFlowTable(flowBucket.bucketId().deviceId());
                    backupFlowTable.putAll(flowBucket.table());
                    backupFlowTable.entrySet().removeIf(entry -> this.isInBucket((FlowId)entry.getKey(), flowBucket.bucketId().bucket()) && !flowBucket.table().containsKey(entry.getKey()));
                    backedupFlows.addAll(flowBucket.table().keySet());
                    this.lastUpdateTimes.put(flowBucket.bucketId(), flowBucket.timestamp());
                }
            }
            catch (Exception e) {
                ECFlowRuleStore.this.log.warn("Failure processing backup request", (Throwable)e);
            }
            return backedupFlows;
        }

        private void runAntiEntropy() {
            for (DeviceId deviceId : this.getDevices()) {
                this.runAntiEntropy(deviceId);
            }
        }

        private void runAntiEntropy(DeviceId deviceId) {
            if (!this.isMasterNode(deviceId)) {
                return;
            }
            Set<FlowBucketDigest> digests = this.getDigests(deviceId);
            List<NodeId> backupNodes = ECFlowRuleStore.this.replicaInfoManager.getReplicaInfoFor(deviceId).backups();
            int availableBackupCount = Math.min(ECFlowRuleStore.this.backupCount, backupNodes.size());
            for (int index = 0; index < availableBackupCount; ++index) {
                NodeId backupNode = backupNodes.get(index);
                try {
                    this.runAntiEntropy(deviceId, backupNode, digests);
                    continue;
                }
                catch (Exception e) {
                    ECFlowRuleStore.this.log.error("Anti-entropy for " + deviceId + " to " + backupNode + " failed", (Throwable)e);
                }
            }
        }

        private void runAntiEntropy(DeviceId deviceId, NodeId nodeId, Set<FlowBucketDigest> digests) {
            ECFlowRuleStore.this.log.trace("Sending anti-entropy advertisement for device {} to {}", (Object)deviceId, (Object)nodeId);
            ECFlowRuleStore.this.clusterCommunicator.sendAndReceive(digests, ECFlowRuleStoreMessageSubjects.FLOW_TABLE_ANTI_ENTROPY, arg_0 -> ((Serializer)ECFlowRuleStore.this.serializer).encode(arg_0), arg_0 -> ((Serializer)ECFlowRuleStore.this.serializer).decode(arg_0), nodeId).whenComplete((missingBuckets, error) -> {
                if (error == null) {
                    ECFlowRuleStore.this.log.debug("Detected {} missing buckets on node {} for device {}", new Object[]{missingBuckets.size(), nodeId, deviceId});
                } else {
                    ECFlowRuleStore.this.log.trace("Anti-entropy advertisement for device {} to {} failed", new Object[]{deviceId, nodeId, error});
                }
            });
        }

        private Set<BucketId> onAntiEntropy(DeviceDigest digest) {
            NodeId master = ECFlowRuleStore.this.replicaInfoManager.getReplicaInfoFor(digest.deviceId()).master().orElse(null);
            if (Objects.equals(master, ECFlowRuleStore.this.local)) {
                return ImmutableSet.of();
            }
            HashSet<BucketId> missingBuckets = new HashSet<BucketId>();
            for (FlowBucketDigest flowBucketDigest : digest.digests()) {
                long lastUpdated = this.lastUpdateTimes.getOrDefault(flowBucketDigest.bucketId(), 0L);
                if (lastUpdated >= flowBucketDigest.timestamp()) continue;
                missingBuckets.add(flowBucketDigest.bucketId());
            }
            return missingBuckets;
        }

        private void updateDeviceFlowCounts(DeviceId deviceId) {
            for (int bucket = 0; bucket < 1024; ++bucket) {
                BucketId bucketId = new BucketId(deviceId, bucket);
                FlowBucket flowBucket = this.getFlowBucket(bucketId);
                this.updateFlowCounts(flowBucket);
            }
        }

        private void updateFlowCounts(FlowBucket flowBucket) {
            int flowCount = flowBucket.table().entrySet().stream().mapToInt(e -> ((Map)e.getValue()).values().size()).sum();
            ECFlowRuleStore.this.flowCounts.put((Object)flowBucket.bucketId(), (Object)flowCount);
        }
    }

    private class FlowBucketDigest {
        private final BucketId bucketId;
        private final long timestamp;

        FlowBucketDigest(BucketId bucketId, long timestamp) {
            this.bucketId = bucketId;
            this.timestamp = timestamp;
        }

        BucketId bucketId() {
            return this.bucketId;
        }

        long timestamp() {
            return this.timestamp;
        }

        public int hashCode() {
            return Objects.hash(this.bucketId);
        }

        public boolean equals(Object object) {
            return object instanceof FlowBucketDigest && ((FlowBucketDigest)object).bucketId.equals(this.bucketId);
        }
    }

    private class DeviceDigest {
        private final DeviceId deviceId;
        private final Set<FlowBucketDigest> digests;

        DeviceDigest(DeviceId deviceId, Set<FlowBucketDigest> digests) {
            this.deviceId = deviceId;
            this.digests = digests;
        }

        DeviceId deviceId() {
            return this.deviceId;
        }

        Set<FlowBucketDigest> digests() {
            return this.digests;
        }

        public int hashCode() {
            return Objects.hash(this.deviceId, this.digests);
        }

        public boolean equals(Object object) {
            return object instanceof DeviceDigest && ((DeviceDigest)object).deviceId.equals((Object)this.deviceId);
        }
    }

    private class FlowBucket {
        private final BucketId bucketId;
        private final Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> table;
        private final long timestamp;

        BucketId bucketId() {
            return this.bucketId;
        }

        Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> table() {
            return this.table;
        }

        long timestamp() {
            return this.timestamp;
        }

        FlowBucket(BucketId bucketId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> table, long timestamp) {
            this.bucketId = bucketId;
            this.table = table;
            this.timestamp = timestamp;
        }
    }

    private class BucketId {
        private final DeviceId deviceId;
        private final int bucket;

        BucketId(DeviceId deviceId, int bucket) {
            this.deviceId = deviceId;
            this.bucket = bucket;
        }

        DeviceId deviceId() {
            return this.deviceId;
        }

        int bucket() {
            return this.bucket;
        }

        public int hashCode() {
            return Objects.hash(this.deviceId, this.bucket);
        }

        public boolean equals(Object other) {
            if (other != null && other instanceof BucketId) {
                BucketId that = (BucketId)other;
                return this.deviceId.equals((Object)that.deviceId) && this.bucket == that.bucket;
            }
            return false;
        }
    }

    private class BackupOperation {
        private final NodeId nodeId;
        private final BucketId bucketId;

        BackupOperation(NodeId nodeId, BucketId bucketId) {
            this.nodeId = nodeId;
            this.bucketId = bucketId;
        }

        NodeId nodeId() {
            return this.nodeId;
        }

        BucketId bucketId() {
            return this.bucketId;
        }

        public int hashCode() {
            return Objects.hash(this.nodeId, this.bucketId);
        }

        public boolean equals(Object other) {
            if (other != null && other instanceof BackupOperation) {
                BackupOperation that = (BackupOperation)other;
                return this.nodeId.equals((Object)that.nodeId) && this.bucketId.equals(that.bucketId);
            }
            return false;
        }
    }

    private final class OnStoreBatch
    implements ClusterMessageHandler {
        private OnStoreBatch() {
        }

        public void handle(ClusterMessage message) {
            FlowRuleBatchOperation operation = (FlowRuleBatchOperation)ECFlowRuleStore.this.serializer.decode(message.payload());
            ECFlowRuleStore.this.log.debug("received batch request {}", (Object)operation);
            DeviceId deviceId = operation.deviceId();
            NodeId master = ECFlowRuleStore.this.mastershipService.getMasterFor(deviceId);
            if (!Objects.equals(ECFlowRuleStore.this.local, master)) {
                HashSet<Object> failures = new HashSet<Object>(operation.size());
                for (FlowRuleBatchEntry op : operation.getOperations()) {
                    failures.add(op.target());
                }
                CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures, deviceId);
                message.respond(ECFlowRuleStore.this.serializer.encode((Object)allFailed));
                return;
            }
            ECFlowRuleStore.this.pendingResponses.put(operation.id(), message.sender());
            ECFlowRuleStore.this.storeBatchInternal(operation);
        }
    }
}

