/*
 * Decompiled with CFR 0.152.
 */
package org.onosproject.store.flow.impl;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.collect.Streams;
import com.google.common.util.concurrent.Futures;
import java.util.Collection;
import java.util.Collections;
import java.util.Dictionary;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Modified;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.core.CoreService;
import org.onosproject.core.IdGenerator;
import org.onosproject.event.Event;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.DeviceId;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.flow.CompletedBatchOperation;
import org.onosproject.net.flow.DefaultFlowEntry;
import org.onosproject.net.flow.FlowEntry;
import org.onosproject.net.flow.FlowId;
import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.flow.FlowRuleEvent;
import org.onosproject.net.flow.FlowRuleStore;
import org.onosproject.net.flow.FlowRuleStoreDelegate;
import org.onosproject.net.flow.StoredFlowEntry;
import org.onosproject.net.flow.TableStatisticsEntry;
import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry;
import org.onosproject.net.flow.oldbatch.FlowRuleBatchEvent;
import org.onosproject.net.flow.oldbatch.FlowRuleBatchOperation;
import org.onosproject.net.flow.oldbatch.FlowRuleBatchRequest;
import org.onosproject.persistence.PersistenceService;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.flow.ReplicaInfoEvent;
import org.onosproject.store.flow.ReplicaInfoEventListener;
import org.onosproject.store.flow.ReplicaInfoService;
import org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects;
import org.onosproject.store.impl.MastershipBasedTimestamp;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.WallClockTimestamp;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Component(enabled=false)
@Service
public class ECFlowRuleStore
extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
implements FlowRuleStore {
    private final Logger log = LoggerFactory.getLogger(((Object)((Object)this)).getClass());
    private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
    private static final int DEFAULT_MAX_BACKUP_COUNT = 2;
    private static final boolean DEFAULT_PERSISTENCE_ENABLED = false;
    private static final int DEFAULT_BACKUP_PERIOD_MILLIS = 2000;
    private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000L;
    private static final int FLOW_TABLE_BACKUP_BATCH_SIZE = 1;
    @Property(name="msgHandlerPoolSize", intValue={8}, label="Number of threads in the message handler pool")
    private int msgHandlerPoolSize = 8;
    @Property(name="backupPeriod", intValue={2000}, label="Delay in ms between successive backup runs")
    private int backupPeriod = 2000;
    @Property(name="persistenceEnabled", boolValue={false}, label="Indicates whether or not changes in the flow table should be persisted to disk.")
    private boolean persistenceEnabled = false;
    @Property(name="backupCount", intValue={2}, label="Max number of backup copies for each device")
    private volatile int backupCount = 2;
    private InternalFlowTable flowTable = new InternalFlowTable();
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected ReplicaInfoService replicaInfoManager;
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected ClusterCommunicationService clusterCommunicator;
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected ClusterService clusterService;
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected DeviceService deviceService;
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected CoreService coreService;
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected ComponentConfigService configService;
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected MastershipService mastershipService;
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected PersistenceService persistenceService;
    private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
    private ExecutorService messageHandlingExecutor;
    private ExecutorService eventHandler;
    private ScheduledFuture<?> backupTask;
    private final ScheduledExecutorService backupSenderExecutor = Executors.newSingleThreadScheduledExecutor(Tools.groupedThreads((String)"onos/flow", (String)"backup-sender", (Logger)this.log));
    private EventuallyConsistentMap<DeviceId, List<TableStatisticsEntry>> deviceTableStats;
    private final EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> tableStatsListener = new InternalTableStatsListener();
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected StorageService storageService;
    protected final Serializer serializer = Serializer.using((KryoNamespace)KryoNamespaces.API);
    protected final KryoNamespace.Builder serializerBuilder = KryoNamespace.newBuilder().register(KryoNamespaces.API).register(new Class[]{MastershipBasedTimestamp.class});
    private IdGenerator idGenerator;
    private NodeId local;

    @Activate
    public void activate(ComponentContext context) {
        this.configService.registerProperties(((Object)((Object)this)).getClass());
        this.idGenerator = this.coreService.getIdGenerator("flow-ops-ids");
        this.local = this.clusterService.getLocalNode().id();
        this.eventHandler = Executors.newSingleThreadExecutor(Tools.groupedThreads((String)"onos/flow", (String)"event-handler", (Logger)this.log));
        this.messageHandlingExecutor = Executors.newFixedThreadPool(this.msgHandlerPoolSize, Tools.groupedThreads((String)"onos/store/flow", (String)"message-handlers", (Logger)this.log));
        this.registerMessageHandlers(this.messageHandlingExecutor);
        this.replicaInfoManager.addListener(this.flowTable);
        this.backupTask = this.backupSenderExecutor.scheduleWithFixedDelay(() -> this.flowTable.backup(), 0L, this.backupPeriod, TimeUnit.MILLISECONDS);
        this.deviceTableStats = this.storageService.eventuallyConsistentMapBuilder().withName("onos-flow-table-stats").withSerializer(this.serializerBuilder).withAntiEntropyPeriod(5L, TimeUnit.SECONDS).withTimestampProvider((k, v) -> new WallClockTimestamp()).withTombstonesDisabled().build();
        this.deviceTableStats.addListener(this.tableStatsListener);
        this.logConfig("Started");
    }

    @Deactivate
    public void deactivate(ComponentContext context) {
        this.replicaInfoManager.removeListener(this.flowTable);
        this.backupTask.cancel(true);
        this.configService.unregisterProperties(((Object)((Object)this)).getClass(), false);
        this.unregisterMessageHandlers();
        this.deviceTableStats.removeListener(this.tableStatsListener);
        this.deviceTableStats.destroy();
        this.eventHandler.shutdownNow();
        this.messageHandlingExecutor.shutdownNow();
        this.backupSenderExecutor.shutdownNow();
        this.log.info("Stopped");
    }

    @Modified
    public void modified(ComponentContext context) {
        int newBackupCount;
        int newBackupPeriod;
        int newPoolSize;
        if (context == null) {
            this.logConfig("Default config");
            return;
        }
        Dictionary properties = context.getProperties();
        try {
            String s = Tools.get((Dictionary)properties, (String)"msgHandlerPoolSize");
            newPoolSize = Strings.isNullOrEmpty((String)s) ? this.msgHandlerPoolSize : Integer.parseInt(s.trim());
            s = Tools.get((Dictionary)properties, (String)"backupPeriod");
            newBackupPeriod = Strings.isNullOrEmpty((String)s) ? this.backupPeriod : Integer.parseInt(s.trim());
            s = Tools.get((Dictionary)properties, (String)"backupCount");
            newBackupCount = Strings.isNullOrEmpty((String)s) ? this.backupCount : Integer.parseInt(s.trim());
        }
        catch (ClassCastException | NumberFormatException e) {
            newPoolSize = 8;
            newBackupPeriod = 2000;
            newBackupCount = 2;
        }
        boolean restartBackupTask = false;
        if (newBackupPeriod != this.backupPeriod) {
            this.backupPeriod = newBackupPeriod;
            restartBackupTask = true;
        }
        if (restartBackupTask) {
            if (this.backupTask != null) {
                this.backupTask.cancel(false);
            }
            this.backupTask = this.backupSenderExecutor.scheduleWithFixedDelay(() -> this.flowTable.backup(), 0L, this.backupPeriod, TimeUnit.MILLISECONDS);
        }
        if (newPoolSize != this.msgHandlerPoolSize) {
            this.msgHandlerPoolSize = newPoolSize;
            ExecutorService oldMsgHandler = this.messageHandlingExecutor;
            this.messageHandlingExecutor = Executors.newFixedThreadPool(this.msgHandlerPoolSize, Tools.groupedThreads((String)"onos/store/flow", (String)"message-handlers", (Logger)this.log));
            this.registerMessageHandlers(this.messageHandlingExecutor);
            oldMsgHandler.shutdown();
        }
        if (this.backupCount != newBackupCount) {
            this.backupCount = newBackupCount;
        }
        this.logConfig("Reconfigured");
    }

    private void registerMessageHandlers(ExecutorService executor) {
        this.clusterCommunicator.addSubscriber(ECFlowRuleStoreMessageSubjects.APPLY_BATCH_FLOWS, (ClusterMessageHandler)new OnStoreBatch(), executor);
        this.clusterCommunicator.addSubscriber(ECFlowRuleStoreMessageSubjects.REMOTE_APPLY_COMPLETED, arg_0 -> ((Serializer)this.serializer).decode(arg_0), arg_0 -> ((ECFlowRuleStore)this).notifyDelegate(arg_0), (Executor)executor);
        this.clusterCommunicator.addSubscriber(ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY, arg_0 -> ((Serializer)this.serializer).decode(arg_0), this.flowTable::getFlowEntry, arg_0 -> ((Serializer)this.serializer).encode(arg_0), (Executor)executor);
        this.clusterCommunicator.addSubscriber(ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES, arg_0 -> ((Serializer)this.serializer).decode(arg_0), this.flowTable::getFlowEntries, arg_0 -> ((Serializer)this.serializer).encode(arg_0), (Executor)executor);
        this.clusterCommunicator.addSubscriber(ECFlowRuleStoreMessageSubjects.REMOVE_FLOW_ENTRY, arg_0 -> ((Serializer)this.serializer).decode(arg_0), this::removeFlowRuleInternal, arg_0 -> ((Serializer)this.serializer).encode(arg_0), (Executor)executor);
        this.clusterCommunicator.addSubscriber(ECFlowRuleStoreMessageSubjects.FLOW_TABLE_BACKUP, arg_0 -> ((Serializer)this.serializer).decode(arg_0), x$0 -> this.flowTable.onBackupReceipt(x$0), arg_0 -> ((Serializer)this.serializer).encode(arg_0), (Executor)executor);
    }

    private void unregisterMessageHandlers() {
        this.clusterCommunicator.removeSubscriber(ECFlowRuleStoreMessageSubjects.REMOVE_FLOW_ENTRY);
        this.clusterCommunicator.removeSubscriber(ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES);
        this.clusterCommunicator.removeSubscriber(ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY);
        this.clusterCommunicator.removeSubscriber(ECFlowRuleStoreMessageSubjects.APPLY_BATCH_FLOWS);
        this.clusterCommunicator.removeSubscriber(ECFlowRuleStoreMessageSubjects.REMOTE_APPLY_COMPLETED);
        this.clusterCommunicator.removeSubscriber(ECFlowRuleStoreMessageSubjects.FLOW_TABLE_BACKUP);
    }

    private void logConfig(String prefix) {
        this.log.info("{} with msgHandlerPoolSize = {}; backupPeriod = {}, backupCount = {}", new Object[]{prefix, this.msgHandlerPoolSize, this.backupPeriod, this.backupCount});
    }

    public int getFlowRuleCount() {
        return ((Stream)Streams.stream((Iterable)this.deviceService.getDevices()).parallel()).mapToInt(device -> Iterables.size(this.getFlowEntries(device.id()))).sum();
    }

    public FlowEntry getFlowEntry(FlowRule rule) {
        NodeId master = this.mastershipService.getMasterFor(rule.deviceId());
        if (master == null) {
            this.log.debug("Failed to getFlowEntry: No master for {}", (Object)rule.deviceId());
            return null;
        }
        if (Objects.equals(this.local, master)) {
            return this.flowTable.getFlowEntry(rule);
        }
        this.log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}", (Object)master, (Object)rule.deviceId());
        return (FlowEntry)Tools.futureGetOrElse((Future)this.clusterCommunicator.sendAndReceive((Object)rule, ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY, arg_0 -> ((Serializer)this.serializer).encode(arg_0), arg_0 -> ((Serializer)this.serializer).decode(arg_0), master), (long)5000L, (TimeUnit)TimeUnit.MILLISECONDS, null);
    }

    public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
        NodeId master = this.mastershipService.getMasterFor(deviceId);
        if (master == null) {
            this.log.debug("Failed to getFlowEntries: No master for {}", (Object)deviceId);
            return Collections.emptyList();
        }
        if (Objects.equals(this.local, master)) {
            return this.flowTable.getFlowEntries(deviceId);
        }
        this.log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}", (Object)master, (Object)deviceId);
        return (Iterable)Tools.futureGetOrElse((Future)this.clusterCommunicator.sendAndReceive((Object)deviceId, ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES, arg_0 -> ((Serializer)this.serializer).encode(arg_0), arg_0 -> ((Serializer)this.serializer).decode(arg_0), master), (long)5000L, (TimeUnit)TimeUnit.MILLISECONDS, Collections.emptyList());
    }

    public void storeFlowRule(FlowRule rule) {
        this.storeBatch(new FlowRuleBatchOperation(Collections.singletonList(new FlowRuleBatchEntry(FlowRuleBatchEntry.FlowRuleOperation.ADD, rule)), rule.deviceId(), this.idGenerator.getNewId()));
    }

    public void storeBatch(FlowRuleBatchOperation operation) {
        if (operation.getOperations().isEmpty()) {
            this.notifyDelegate((Event)FlowRuleBatchEvent.completed((FlowRuleBatchRequest)new FlowRuleBatchRequest(operation.id(), Collections.emptySet()), (CompletedBatchOperation)new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
            return;
        }
        DeviceId deviceId = operation.deviceId();
        NodeId master = this.mastershipService.getMasterFor(deviceId);
        if (master == null) {
            this.log.warn("No master for {} ", (Object)deviceId);
            this.updateStoreInternal(operation);
            this.notifyDelegate((Event)FlowRuleBatchEvent.completed((FlowRuleBatchRequest)new FlowRuleBatchRequest(operation.id(), Collections.emptySet()), (CompletedBatchOperation)new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
            return;
        }
        if (Objects.equals(this.local, master)) {
            this.storeBatchInternal(operation);
            return;
        }
        this.log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}", (Object)master, (Object)deviceId);
        this.clusterCommunicator.unicast((Object)operation, ECFlowRuleStoreMessageSubjects.APPLY_BATCH_FLOWS, arg_0 -> ((Serializer)this.serializer).encode(arg_0), master).whenComplete((result, error) -> {
            if (error != null) {
                this.log.warn("Failed to storeBatch: {} to {}", new Object[]{operation, master, error});
                Set allFailures = operation.getOperations().stream().map(op -> (FlowRule)op.target()).collect(Collectors.toSet());
                this.notifyDelegate((Event)FlowRuleBatchEvent.completed((FlowRuleBatchRequest)new FlowRuleBatchRequest(operation.id(), Collections.emptySet()), (CompletedBatchOperation)new CompletedBatchOperation(false, allFailures, deviceId)));
            }
        });
    }

    private void storeBatchInternal(FlowRuleBatchOperation operation) {
        DeviceId did = operation.deviceId();
        Set<FlowRuleBatchEntry> currentOps = this.updateStoreInternal(operation);
        if (currentOps.isEmpty()) {
            this.batchOperationComplete(FlowRuleBatchEvent.completed((FlowRuleBatchRequest)new FlowRuleBatchRequest(operation.id(), Collections.emptySet()), (CompletedBatchOperation)new CompletedBatchOperation(true, Collections.emptySet(), did)));
            return;
        }
        this.notifyDelegate((Event)FlowRuleBatchEvent.requested((FlowRuleBatchRequest)new FlowRuleBatchRequest(operation.id(), currentOps), (DeviceId)operation.deviceId()));
    }

    private Set<FlowRuleBatchEntry> updateStoreInternal(FlowRuleBatchOperation operation) {
        return operation.getOperations().stream().map(op -> {
            switch ((FlowRuleBatchEntry.FlowRuleOperation)op.operator()) {
                case ADD: {
                    DefaultFlowEntry entry = new DefaultFlowEntry((FlowRule)op.target());
                    this.flowTable.remove(entry.deviceId(), (FlowEntry)entry);
                    this.flowTable.add((FlowEntry)entry);
                    return op;
                }
                case REMOVE: {
                    StoredFlowEntry entry = this.flowTable.getFlowEntry((FlowRule)op.target());
                    if (entry == null) break;
                    entry.setState(FlowEntry.FlowEntryState.PENDING_REMOVE);
                    this.log.debug("Setting state of rule to pending remove: {}", (Object)entry);
                    return op;
                }
                case MODIFY: {
                    break;
                }
                default: {
                    this.log.warn("Unknown flow operation operator: {}", (Object)op.operator());
                }
            }
            return null;
        }).filter(Objects::nonNull).collect(Collectors.toSet());
    }

    public void deleteFlowRule(FlowRule rule) {
        this.storeBatch(new FlowRuleBatchOperation(Collections.singletonList(new FlowRuleBatchEntry(FlowRuleBatchEntry.FlowRuleOperation.REMOVE, rule)), rule.deviceId(), this.idGenerator.getNewId()));
    }

    public FlowRuleEvent pendingFlowRule(FlowEntry rule) {
        StoredFlowEntry stored;
        if (this.mastershipService.isLocalMaster(rule.deviceId()) && (stored = this.flowTable.getFlowEntry((FlowRule)rule)) != null && stored.state() != FlowEntry.FlowEntryState.PENDING_ADD) {
            stored.setState(FlowEntry.FlowEntryState.PENDING_ADD);
            return new FlowRuleEvent(FlowRuleEvent.Type.RULE_UPDATED, (FlowRule)rule);
        }
        return null;
    }

    public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
        NodeId master = this.mastershipService.getMasterFor(rule.deviceId());
        if (Objects.equals(this.local, master)) {
            return this.addOrUpdateFlowRuleInternal(rule);
        }
        this.log.warn("Tried to update FlowRule {} state, while the Node was not the master.", (Object)rule);
        return null;
    }

    private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
        StoredFlowEntry stored = this.flowTable.getFlowEntry((FlowRule)rule);
        if (stored != null) {
            stored.setBytes(rule.bytes());
            stored.setLife(rule.life(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
            stored.setLiveType(rule.liveType());
            stored.setPackets(rule.packets());
            stored.setLastSeen();
            if (stored.state() == FlowEntry.FlowEntryState.PENDING_ADD) {
                stored.setState(FlowEntry.FlowEntryState.ADDED);
                return new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADDED, (FlowRule)rule);
            }
            return new FlowRuleEvent(FlowRuleEvent.Type.RULE_UPDATED, (FlowRule)rule);
        }
        this.flowTable.add(rule);
        return null;
    }

    public FlowRuleEvent removeFlowRule(FlowEntry rule) {
        DeviceId deviceId = rule.deviceId();
        NodeId master = this.mastershipService.getMasterFor(deviceId);
        if (Objects.equals(this.local, master)) {
            return this.removeFlowRuleInternal(rule);
        }
        if (master == null) {
            this.log.warn("Failed to removeFlowRule: No master for {}", (Object)deviceId);
            return null;
        }
        this.log.trace("Forwarding removeFlowRule to {}, which is the master for device {}", (Object)master, (Object)deviceId);
        return (FlowRuleEvent)Futures.getUnchecked((Future)this.clusterCommunicator.sendAndReceive((Object)rule, ECFlowRuleStoreMessageSubjects.REMOVE_FLOW_ENTRY, arg_0 -> ((Serializer)this.serializer).encode(arg_0), arg_0 -> ((Serializer)this.serializer).decode(arg_0), master));
    }

    private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
        DeviceId deviceId = rule.deviceId();
        FlowEntry removed = this.flowTable.remove(deviceId, rule);
        return removed != null ? new FlowRuleEvent(FlowRuleEvent.Type.RULE_REMOVED, (FlowRule)removed) : null;
    }

    public void purgeFlowRule(DeviceId deviceId) {
        this.flowTable.purgeFlowRule(deviceId);
    }

    public void purgeFlowRules() {
        this.flowTable.purgeFlowRules();
    }

    public void batchOperationComplete(FlowRuleBatchEvent event) {
        NodeId nodeId = this.pendingResponses.remove(((FlowRuleBatchRequest)event.subject()).batchId());
        if (nodeId == null) {
            this.notifyDelegate((Event)event);
        } else {
            this.clusterCommunicator.unicast((Object)event, ECFlowRuleStoreMessageSubjects.REMOTE_APPLY_COMPLETED, arg_0 -> ((Serializer)this.serializer).encode(arg_0), nodeId);
        }
    }

    public FlowRuleEvent updateTableStatistics(DeviceId deviceId, List<TableStatisticsEntry> tableStats) {
        this.deviceTableStats.put((Object)deviceId, tableStats);
        return null;
    }

    public Iterable<TableStatisticsEntry> getTableStatistics(DeviceId deviceId) {
        NodeId master = this.mastershipService.getMasterFor(deviceId);
        if (master == null) {
            this.log.debug("Failed to getTableStats: No master for {}", (Object)deviceId);
            return Collections.emptyList();
        }
        List tableStats = (List)this.deviceTableStats.get((Object)deviceId);
        if (tableStats == null) {
            return Collections.emptyList();
        }
        return ImmutableList.copyOf((Collection)tableStats);
    }

    public long getActiveFlowRuleCount(DeviceId deviceId) {
        return Streams.stream(this.getTableStatistics(deviceId)).mapToLong(TableStatisticsEntry::activeFlowEntries).sum();
    }

    protected void bindReplicaInfoManager(ReplicaInfoService replicaInfoService) {
        this.replicaInfoManager = replicaInfoService;
    }

    protected void unbindReplicaInfoManager(ReplicaInfoService replicaInfoService) {
        if (this.replicaInfoManager == replicaInfoService) {
            this.replicaInfoManager = null;
        }
    }

    protected void bindClusterCommunicator(ClusterCommunicationService clusterCommunicationService) {
        this.clusterCommunicator = clusterCommunicationService;
    }

    protected void unbindClusterCommunicator(ClusterCommunicationService clusterCommunicationService) {
        if (this.clusterCommunicator == clusterCommunicationService) {
            this.clusterCommunicator = null;
        }
    }

    protected void bindClusterService(ClusterService clusterService) {
        this.clusterService = clusterService;
    }

    protected void unbindClusterService(ClusterService clusterService) {
        if (this.clusterService == clusterService) {
            this.clusterService = null;
        }
    }

    protected void bindDeviceService(DeviceService deviceService) {
        this.deviceService = deviceService;
    }

    protected void unbindDeviceService(DeviceService deviceService) {
        if (this.deviceService == deviceService) {
            this.deviceService = null;
        }
    }

    protected void bindCoreService(CoreService coreService) {
        this.coreService = coreService;
    }

    protected void unbindCoreService(CoreService coreService) {
        if (this.coreService == coreService) {
            this.coreService = null;
        }
    }

    protected void bindConfigService(ComponentConfigService componentConfigService) {
        this.configService = componentConfigService;
    }

    protected void unbindConfigService(ComponentConfigService componentConfigService) {
        if (this.configService == componentConfigService) {
            this.configService = null;
        }
    }

    protected void bindMastershipService(MastershipService mastershipService) {
        this.mastershipService = mastershipService;
    }

    protected void unbindMastershipService(MastershipService mastershipService) {
        if (this.mastershipService == mastershipService) {
            this.mastershipService = null;
        }
    }

    protected void bindPersistenceService(PersistenceService persistenceService) {
        this.persistenceService = persistenceService;
    }

    protected void unbindPersistenceService(PersistenceService persistenceService) {
        if (this.persistenceService == persistenceService) {
            this.persistenceService = null;
        }
    }

    protected void bindStorageService(StorageService storageService) {
        this.storageService = storageService;
    }

    protected void unbindStorageService(StorageService storageService) {
        if (this.storageService == storageService) {
            this.storageService = null;
        }
    }

    private class InternalTableStatsListener
    implements EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> {
        private InternalTableStatsListener() {
        }

        public void event(EventuallyConsistentMapEvent<DeviceId, List<TableStatisticsEntry>> event) {
        }
    }

    private class InternalFlowTable
    implements ReplicaInfoEventListener {
        private final Map<DeviceId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>> flowEntries = Maps.newConcurrentMap();
        private final Map<BackupOperation, Long> lastBackupTimes = Maps.newConcurrentMap();
        private final Map<DeviceId, Long> lastUpdateTimes = Maps.newConcurrentMap();

        private InternalFlowTable() {
        }

        public void event(ReplicaInfoEvent event) {
            ECFlowRuleStore.this.eventHandler.execute(() -> this.handleEvent(event));
        }

        private void handleEvent(ReplicaInfoEvent event) {
            DeviceId deviceId = (DeviceId)event.subject();
            if (!ECFlowRuleStore.this.mastershipService.isLocalMaster(deviceId)) {
                return;
            }
            if (event.type() == ReplicaInfoEvent.Type.MASTER_CHANGED) {
                this.lastUpdateTimes.put(deviceId, System.currentTimeMillis());
            }
            ECFlowRuleStore.this.backupSenderExecutor.schedule(this::backup, 0L, TimeUnit.SECONDS);
        }

        private void sendBackups(NodeId nodeId, Set<DeviceId> deviceIds) {
            Iterables.partition(deviceIds, (int)1).forEach(ids -> this.backupFlowEntries(nodeId, Sets.newHashSet((Iterable)ids)));
        }

        private void backupFlowEntries(NodeId nodeId, Set<DeviceId> deviceIds) {
            if (deviceIds.isEmpty()) {
                return;
            }
            ECFlowRuleStore.this.log.debug("Sending flowEntries for devices {} to {} for backup.", deviceIds, (Object)nodeId);
            ConcurrentMap deviceFlowEntries = Maps.newConcurrentMap();
            deviceIds.forEach(id -> deviceFlowEntries.put(id, this.getFlowTableCopy((DeviceId)id)));
            ECFlowRuleStore.this.clusterCommunicator.sendAndReceive((Object)deviceFlowEntries, ECFlowRuleStoreMessageSubjects.FLOW_TABLE_BACKUP, arg_0 -> ((Serializer)ECFlowRuleStore.this.serializer).encode(arg_0), arg_0 -> ((Serializer)ECFlowRuleStore.this.serializer).decode(arg_0), nodeId).whenComplete((backedupDevices, error) -> {
                Sets.SetView devicesNotBackedup;
                Sets.SetView setView = devicesNotBackedup = error != null ? deviceFlowEntries.keySet() : Sets.difference(deviceFlowEntries.keySet(), (Set)backedupDevices);
                if (devicesNotBackedup.size() > 0) {
                    ECFlowRuleStore.this.log.warn("Failed to backup devices: {}. Reason: {}, Node: {}", new Object[]{devicesNotBackedup, error != null ? error.getMessage() : "none", nodeId});
                }
                if (backedupDevices != null) {
                    backedupDevices.forEach(id -> this.lastBackupTimes.put(new BackupOperation(nodeId, (DeviceId)id), System.currentTimeMillis()));
                }
            });
        }

        private Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> getFlowTable(DeviceId deviceId) {
            if (ECFlowRuleStore.this.persistenceEnabled) {
                return this.flowEntries.computeIfAbsent(deviceId, id -> ECFlowRuleStore.this.persistenceService.persistentMapBuilder().withName("FlowTable:" + deviceId.toString()).withSerializer(new Serializer(){

                    public <T> byte[] encode(T object) {
                        return ECFlowRuleStore.this.serializer.encode(object);
                    }

                    public <T> T decode(byte[] bytes) {
                        return (T)ECFlowRuleStore.this.serializer.decode(bytes);
                    }

                    public <T> T copy(T object) {
                        return (T)ECFlowRuleStore.this.serializer.copy(object);
                    }
                }).build());
            }
            return this.flowEntries.computeIfAbsent(deviceId, id -> Maps.newConcurrentMap());
        }

        private Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> getFlowTableCopy(DeviceId deviceId) {
            HashMap copy = Maps.newHashMap();
            if (ECFlowRuleStore.this.persistenceEnabled) {
                return this.flowEntries.computeIfAbsent(deviceId, id -> ECFlowRuleStore.this.persistenceService.persistentMapBuilder().withName("FlowTable:" + deviceId.toString()).withSerializer(new Serializer(){

                    public <T> byte[] encode(T object) {
                        return ECFlowRuleStore.this.serializer.encode(object);
                    }

                    public <T> T decode(byte[] bytes) {
                        return (T)ECFlowRuleStore.this.serializer.decode(bytes);
                    }

                    public <T> T copy(T object) {
                        return (T)ECFlowRuleStore.this.serializer.copy(object);
                    }
                }).build());
            }
            this.flowEntries.computeIfAbsent(deviceId, id -> Maps.newConcurrentMap()).forEach((k, v) -> copy.put(k, Maps.newHashMap((Map)v)));
            return copy;
        }

        private Map<StoredFlowEntry, StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId, FlowId flowId) {
            return this.getFlowTable(deviceId).computeIfAbsent(flowId, id -> Maps.newConcurrentMap());
        }

        private StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
            return this.getFlowEntriesInternal(rule.deviceId(), rule.id()).get(rule);
        }

        private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
            return this.getFlowTable(deviceId).values().stream().flatMap(m -> m.values().stream()).collect(Collectors.toSet());
        }

        public StoredFlowEntry getFlowEntry(FlowRule rule) {
            return this.getFlowEntryInternal(rule);
        }

        public Set<FlowEntry> getFlowEntries(DeviceId deviceId) {
            return this.getFlowEntriesInternal(deviceId);
        }

        public void add(FlowEntry rule) {
            this.getFlowEntriesInternal(rule.deviceId(), rule.id()).compute((StoredFlowEntry)rule, (k, stored) -> (StoredFlowEntry)rule);
            this.lastUpdateTimes.put(rule.deviceId(), System.currentTimeMillis());
        }

        public FlowEntry remove(DeviceId deviceId, FlowEntry rule) {
            AtomicReference removedRule = new AtomicReference();
            this.getFlowEntriesInternal(rule.deviceId(), rule.id()).computeIfPresent((StoredFlowEntry)rule, (k, stored) -> {
                if (rule instanceof DefaultFlowEntry) {
                    DefaultFlowEntry toRemove = (DefaultFlowEntry)rule;
                    if (stored instanceof DefaultFlowEntry) {
                        DefaultFlowEntry storedEntry = (DefaultFlowEntry)stored;
                        if (toRemove.created() < storedEntry.created()) {
                            ECFlowRuleStore.this.log.debug("Trying to remove more recent flow entry {} (stored: {})", (Object)toRemove, stored);
                            return stored;
                        }
                    }
                }
                removedRule.set(stored);
                return null;
            });
            if (removedRule.get() != null) {
                this.lastUpdateTimes.put(deviceId, System.currentTimeMillis());
                return (FlowEntry)removedRule.get();
            }
            return null;
        }

        public void purgeFlowRule(DeviceId deviceId) {
            this.flowEntries.remove(deviceId);
        }

        public void purgeFlowRules() {
            this.flowEntries.clear();
        }

        private List<NodeId> getBackupNodes(DeviceId deviceId) {
            List<NodeId> allPossibleBackupNodes = ECFlowRuleStore.this.replicaInfoManager.getReplicaInfoFor(deviceId).backups();
            return ImmutableList.copyOf(allPossibleBackupNodes).subList(0, Math.min(allPossibleBackupNodes.size(), ECFlowRuleStore.this.backupCount));
        }

        private void backup() {
            try {
                HashMap devicesToBackupByNode = Maps.newHashMap();
                this.flowEntries.keySet().forEach(deviceId -> {
                    List<NodeId> backupNodes = this.getBackupNodes((DeviceId)deviceId);
                    backupNodes.forEach(backupNode -> {
                        if (this.lastBackupTimes.getOrDefault(new BackupOperation((NodeId)backupNode, (DeviceId)deviceId), 0L) < this.lastUpdateTimes.getOrDefault(deviceId, 0L)) {
                            devicesToBackupByNode.computeIfAbsent(backupNode, nodeId -> Sets.newHashSet()).add(deviceId);
                        }
                    });
                });
                devicesToBackupByNode.forEach(this::sendBackups);
            }
            catch (Exception e) {
                ECFlowRuleStore.this.log.error("Backup failed.", (Throwable)e);
            }
        }

        private Set<DeviceId> onBackupReceipt(Map<DeviceId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>> flowTables) {
            ECFlowRuleStore.this.log.debug("Received flowEntries for {} to backup", flowTables.keySet());
            HashSet backedupDevices = Sets.newHashSet();
            try {
                flowTables.forEach((deviceId, deviceFlowTable) -> {
                    if (!Objects.equals(ECFlowRuleStore.this.local, ECFlowRuleStore.this.mastershipService.getMasterFor(deviceId))) {
                        Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> backupFlowTable = this.getFlowTable((DeviceId)deviceId);
                        backupFlowTable.clear();
                        backupFlowTable.putAll((Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>)deviceFlowTable);
                        backedupDevices.add(deviceId);
                    }
                });
            }
            catch (Exception e) {
                ECFlowRuleStore.this.log.warn("Failure processing backup request", (Throwable)e);
            }
            return backedupDevices;
        }
    }

    private class BackupOperation {
        private final NodeId nodeId;
        private final DeviceId deviceId;

        public BackupOperation(NodeId nodeId, DeviceId deviceId) {
            this.nodeId = nodeId;
            this.deviceId = deviceId;
        }

        public int hashCode() {
            return Objects.hash(this.nodeId, this.deviceId);
        }

        public boolean equals(Object other) {
            if (other != null && other instanceof BackupOperation) {
                BackupOperation that = (BackupOperation)other;
                return this.nodeId.equals((Object)that.nodeId) && this.deviceId.equals((Object)that.deviceId);
            }
            return false;
        }
    }

    private final class OnStoreBatch
    implements ClusterMessageHandler {
        private OnStoreBatch() {
        }

        public void handle(ClusterMessage message) {
            FlowRuleBatchOperation operation = (FlowRuleBatchOperation)ECFlowRuleStore.this.serializer.decode(message.payload());
            ECFlowRuleStore.this.log.debug("received batch request {}", (Object)operation);
            DeviceId deviceId = operation.deviceId();
            NodeId master = ECFlowRuleStore.this.mastershipService.getMasterFor(deviceId);
            if (!Objects.equals(ECFlowRuleStore.this.local, master)) {
                HashSet<Object> failures = new HashSet<Object>(operation.size());
                for (FlowRuleBatchEntry op : operation.getOperations()) {
                    failures.add(op.target());
                }
                CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures, deviceId);
                message.respond(ECFlowRuleStore.this.serializer.encode((Object)allFailed));
                return;
            }
            ECFlowRuleStore.this.pendingResponses.put(operation.id(), message.sender());
            ECFlowRuleStore.this.storeBatchInternal(operation);
        }
    }
}

