/*
 * Decompiled with CFR 0.152.
 */
package org.onosproject.store.flow.impl;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.Streams;
import java.util.Collection;
import java.util.Collections;
import java.util.Dictionary;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Modified;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onlab.util.OrderedExecutor;
import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.core.CoreService;
import org.onosproject.core.IdGenerator;
import org.onosproject.event.AbstractListenerManager;
import org.onosproject.event.Event;
import org.onosproject.event.EventListener;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.flow.CompletedBatchOperation;
import org.onosproject.net.flow.DefaultFlowEntry;
import org.onosproject.net.flow.FlowEntry;
import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.flow.FlowRuleEvent;
import org.onosproject.net.flow.FlowRuleStore;
import org.onosproject.net.flow.FlowRuleStoreDelegate;
import org.onosproject.net.flow.StoredFlowEntry;
import org.onosproject.net.flow.TableStatisticsEntry;
import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry;
import org.onosproject.net.flow.oldbatch.FlowRuleBatchEvent;
import org.onosproject.net.flow.oldbatch.FlowRuleBatchOperation;
import org.onosproject.net.flow.oldbatch.FlowRuleBatchRequest;
import org.onosproject.persistence.PersistenceService;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.flow.ReplicaInfo;
import org.onosproject.store.flow.ReplicaInfoEvent;
import org.onosproject.store.flow.ReplicaInfoEventListener;
import org.onosproject.store.flow.ReplicaInfoService;
import org.onosproject.store.flow.impl.BucketId;
import org.onosproject.store.flow.impl.DeviceFlowTable;
import org.onosproject.store.flow.impl.DeviceReplicaInfo;
import org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects;
import org.onosproject.store.flow.impl.FlowBucket;
import org.onosproject.store.flow.impl.LifecycleEvent;
import org.onosproject.store.flow.impl.LifecycleEventListener;
import org.onosproject.store.flow.impl.LifecycleManager;
import org.onosproject.store.impl.MastershipBasedTimestamp;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.ConsistentMapBuilder;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.WallClockTimestamp;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Component(immediate=true)
@Service
public class ECFlowRuleStore
extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
implements FlowRuleStore {
    private final Logger log = LoggerFactory.getLogger(((Object)((Object)this)).getClass());
    private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
    private static final int DEFAULT_MAX_BACKUP_COUNT = 2;
    private static final boolean DEFAULT_PERSISTENCE_ENABLED = false;
    private static final int DEFAULT_BACKUP_PERIOD_MILLIS = 2000;
    private static final int DEFAULT_ANTI_ENTROPY_PERIOD_MILLIS = 5000;
    private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000L;
    @Property(name="msgHandlerPoolSize", intValue={8}, label="Number of threads in the message handler pool")
    private int msgHandlerPoolSize = 8;
    @Property(name="backupPeriod", intValue={2000}, label="Delay in ms between successive backup runs")
    private int backupPeriod = 2000;
    @Property(name="antiEntropyPeriod", intValue={5000}, label="Delay in ms between anti-entropy runs")
    private int antiEntropyPeriod = 5000;
    @Property(name="persistenceEnabled", boolValue={false}, label="Indicates whether or not changes in the flow table should be persisted to disk.")
    private boolean persistenceEnabled = false;
    @Property(name="backupCount", intValue={2}, label="Max number of backup copies for each device")
    private volatile int backupCount = 2;
    private InternalFlowTable flowTable = new InternalFlowTable();
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected ReplicaInfoService replicaInfoManager;
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected ClusterCommunicationService clusterCommunicator;
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected ClusterService clusterService;
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected DeviceService deviceService;
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected CoreService coreService;
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected ComponentConfigService configService;
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected MastershipService mastershipService;
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected PersistenceService persistenceService;
    private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
    private ExecutorService messageHandlingExecutor;
    private ExecutorService eventHandler;
    private final ScheduledExecutorService backupScheduler = Executors.newSingleThreadScheduledExecutor(Tools.groupedThreads((String)"onos/flow", (String)"backup-scheduler", (Logger)this.log));
    private final ExecutorService backupExecutor = Executors.newFixedThreadPool(Math.max(Math.min(Runtime.getRuntime().availableProcessors() * 2, 16), 4), Tools.groupedThreads((String)"onos/flow", (String)"backup-%d", (Logger)this.log));
    private EventuallyConsistentMap<DeviceId, List<TableStatisticsEntry>> deviceTableStats;
    private final EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> tableStatsListener = new InternalTableStatsListener();
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected StorageService storageService;
    protected final Serializer serializer = Serializer.using((KryoNamespace)KryoNamespace.newBuilder().register(KryoNamespaces.API).register(new Class[]{BucketId.class}).register(new Class[]{FlowBucket.class}).register(new Class[]{ImmutablePair.class}).build());
    protected final KryoNamespace.Builder serializerBuilder = KryoNamespace.newBuilder().register(KryoNamespaces.API).register(new Class[]{BucketId.class}).register(new Class[]{MastershipBasedTimestamp.class});
    protected AsyncConsistentMap<DeviceId, Long> mastershipTermLifecycles;
    private IdGenerator idGenerator;
    private NodeId local;

    @Activate
    public void activate(ComponentContext context) {
        this.configService.registerProperties(((Object)((Object)this)).getClass());
        this.idGenerator = this.coreService.getIdGenerator("flow-ops-ids");
        this.local = this.clusterService.getLocalNode().id();
        this.eventHandler = Executors.newSingleThreadExecutor(Tools.groupedThreads((String)"onos/flow", (String)"event-handler", (Logger)this.log));
        this.messageHandlingExecutor = Executors.newFixedThreadPool(this.msgHandlerPoolSize, Tools.groupedThreads((String)"onos/store/flow", (String)"message-handlers", (Logger)this.log));
        this.registerMessageHandlers(this.messageHandlingExecutor);
        this.mastershipTermLifecycles = ((ConsistentMapBuilder)((ConsistentMapBuilder)this.storageService.consistentMapBuilder().withName("onos-flow-store-terms")).withSerializer(this.serializer)).buildAsyncMap();
        this.deviceTableStats = this.storageService.eventuallyConsistentMapBuilder().withName("onos-flow-table-stats").withSerializer(this.serializerBuilder).withAntiEntropyPeriod(5L, TimeUnit.SECONDS).withTimestampProvider((k, v) -> new WallClockTimestamp()).withTombstonesDisabled().build();
        this.deviceTableStats.addListener(this.tableStatsListener);
        this.deviceService.addListener((EventListener)this.flowTable);
        this.deviceService.getDevices().forEach(device -> this.flowTable.addDevice(device.id()));
        this.logConfig("Started");
    }

    @Deactivate
    public void deactivate(ComponentContext context) {
        this.configService.unregisterProperties(((Object)((Object)this)).getClass(), false);
        this.unregisterMessageHandlers();
        this.deviceService.removeListener((EventListener)this.flowTable);
        this.deviceTableStats.removeListener(this.tableStatsListener);
        this.deviceTableStats.destroy();
        this.eventHandler.shutdownNow();
        this.messageHandlingExecutor.shutdownNow();
        this.backupScheduler.shutdownNow();
        this.backupExecutor.shutdownNow();
        this.log.info("Stopped");
    }

    @Modified
    public void modified(ComponentContext context) {
        int newAntiEntropyPeriod;
        int newBackupCount;
        int newBackupPeriod;
        int newPoolSize;
        if (context == null) {
            this.logConfig("Default config");
            return;
        }
        Dictionary properties = context.getProperties();
        try {
            String s = Tools.get((Dictionary)properties, (String)"msgHandlerPoolSize");
            newPoolSize = Strings.isNullOrEmpty((String)s) ? this.msgHandlerPoolSize : Integer.parseInt(s.trim());
            s = Tools.get((Dictionary)properties, (String)"backupPeriod");
            newBackupPeriod = Strings.isNullOrEmpty((String)s) ? this.backupPeriod : Integer.parseInt(s.trim());
            s = Tools.get((Dictionary)properties, (String)"backupCount");
            newBackupCount = Strings.isNullOrEmpty((String)s) ? this.backupCount : Integer.parseInt(s.trim());
            s = Tools.get((Dictionary)properties, (String)"antiEntropyPeriod");
            newAntiEntropyPeriod = Strings.isNullOrEmpty((String)s) ? this.antiEntropyPeriod : Integer.parseInt(s.trim());
        }
        catch (ClassCastException | NumberFormatException e) {
            newPoolSize = 8;
            newBackupPeriod = 2000;
            newBackupCount = 2;
            newAntiEntropyPeriod = 5000;
        }
        if (newBackupPeriod != this.backupPeriod) {
            this.backupPeriod = newBackupPeriod;
            this.flowTable.setBackupPeriod(newBackupPeriod);
        }
        if (newAntiEntropyPeriod != this.antiEntropyPeriod) {
            this.antiEntropyPeriod = newAntiEntropyPeriod;
            this.flowTable.setAntiEntropyPeriod(newAntiEntropyPeriod);
        }
        if (newPoolSize != this.msgHandlerPoolSize) {
            this.msgHandlerPoolSize = newPoolSize;
            ExecutorService oldMsgHandler = this.messageHandlingExecutor;
            this.messageHandlingExecutor = Executors.newFixedThreadPool(this.msgHandlerPoolSize, Tools.groupedThreads((String)"onos/store/flow", (String)"message-handlers", (Logger)this.log));
            this.registerMessageHandlers(this.messageHandlingExecutor);
            oldMsgHandler.shutdown();
        }
        if (this.backupCount != newBackupCount) {
            this.backupCount = newBackupCount;
        }
        this.logConfig("Reconfigured");
    }

    private void registerMessageHandlers(ExecutorService executor) {
        this.clusterCommunicator.addSubscriber(ECFlowRuleStoreMessageSubjects.APPLY_BATCH_FLOWS, (ClusterMessageHandler)new OnStoreBatch(), executor);
        this.clusterCommunicator.addSubscriber(ECFlowRuleStoreMessageSubjects.REMOTE_APPLY_COMPLETED, arg_0 -> ((Serializer)this.serializer).decode(arg_0), arg_0 -> ((ECFlowRuleStore)this).notifyDelegate(arg_0), (Executor)executor);
        this.clusterCommunicator.addSubscriber(ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY, arg_0 -> ((Serializer)this.serializer).decode(arg_0), this.flowTable::getFlowEntry, arg_0 -> ((Serializer)this.serializer).encode(arg_0), (Executor)executor);
        this.clusterCommunicator.addSubscriber(ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES, arg_0 -> ((Serializer)this.serializer).decode(arg_0), this.flowTable::getFlowEntries, arg_0 -> ((Serializer)this.serializer).encode(arg_0), (Executor)executor);
        this.clusterCommunicator.addSubscriber(ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_COUNT, arg_0 -> ((Serializer)this.serializer).decode(arg_0), p -> this.flowTable.getFlowRuleCount((DeviceId)p.getLeft(), (FlowEntry.FlowEntryState)p.getRight()), arg_0 -> ((Serializer)this.serializer).encode(arg_0), (Executor)executor);
        this.clusterCommunicator.addSubscriber(ECFlowRuleStoreMessageSubjects.REMOVE_FLOW_ENTRY, arg_0 -> ((Serializer)this.serializer).decode(arg_0), this::removeFlowRuleInternal, arg_0 -> ((Serializer)this.serializer).encode(arg_0), (Executor)executor);
    }

    private void unregisterMessageHandlers() {
        this.clusterCommunicator.removeSubscriber(ECFlowRuleStoreMessageSubjects.REMOVE_FLOW_ENTRY);
        this.clusterCommunicator.removeSubscriber(ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES);
        this.clusterCommunicator.removeSubscriber(ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_COUNT);
        this.clusterCommunicator.removeSubscriber(ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY);
        this.clusterCommunicator.removeSubscriber(ECFlowRuleStoreMessageSubjects.APPLY_BATCH_FLOWS);
        this.clusterCommunicator.removeSubscriber(ECFlowRuleStoreMessageSubjects.REMOTE_APPLY_COMPLETED);
        this.clusterCommunicator.removeSubscriber(ECFlowRuleStoreMessageSubjects.FLOW_TABLE_BACKUP);
    }

    private void logConfig(String prefix) {
        this.log.info("{} with msgHandlerPoolSize = {}; backupPeriod = {}, backupCount = {}", new Object[]{prefix, this.msgHandlerPoolSize, this.backupPeriod, this.backupCount});
    }

    public int getFlowRuleCount() {
        return ((Stream)Streams.stream((Iterable)this.deviceService.getDevices()).parallel()).mapToInt(device -> this.getFlowRuleCount(device.id())).sum();
    }

    public int getFlowRuleCount(DeviceId deviceId) {
        return this.getFlowRuleCount(deviceId, null);
    }

    public int getFlowRuleCount(DeviceId deviceId, FlowEntry.FlowEntryState state) {
        NodeId master = this.mastershipService.getMasterFor(deviceId);
        if (master == null) {
            this.log.debug("Failed to getFlowRuleCount: No master for {}", (Object)deviceId);
            return 0;
        }
        if (Objects.equals(this.local, master)) {
            return this.flowTable.getFlowRuleCount(deviceId, state);
        }
        this.log.trace("Forwarding getFlowRuleCount to master {} for device {}", (Object)master, (Object)deviceId);
        return (Integer)Tools.futureGetOrElse((Future)this.clusterCommunicator.sendAndReceive((Object)Pair.of((Object)deviceId, (Object)state), ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_COUNT, arg_0 -> ((Serializer)this.serializer).encode(arg_0), arg_0 -> ((Serializer)this.serializer).decode(arg_0), master), (long)5000L, (TimeUnit)TimeUnit.MILLISECONDS, (Object)0);
    }

    public FlowEntry getFlowEntry(FlowRule rule) {
        NodeId master = this.mastershipService.getMasterFor(rule.deviceId());
        if (master == null) {
            this.log.debug("Failed to getFlowEntry: No master for {}", (Object)rule.deviceId());
            return null;
        }
        if (Objects.equals(this.local, master)) {
            return this.flowTable.getFlowEntry(rule);
        }
        this.log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}", (Object)master, (Object)rule.deviceId());
        return (FlowEntry)Tools.futureGetOrElse((Future)this.clusterCommunicator.sendAndReceive((Object)rule, ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY, arg_0 -> ((Serializer)this.serializer).encode(arg_0), arg_0 -> ((Serializer)this.serializer).decode(arg_0), master), (long)5000L, (TimeUnit)TimeUnit.MILLISECONDS, null);
    }

    public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
        NodeId master = this.mastershipService.getMasterFor(deviceId);
        if (master == null) {
            this.log.debug("Failed to getFlowEntries: No master for {}", (Object)deviceId);
            return Collections.emptyList();
        }
        if (Objects.equals(this.local, master)) {
            return this.flowTable.getFlowEntries(deviceId);
        }
        this.log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}", (Object)master, (Object)deviceId);
        return (Iterable)Tools.futureGetOrElse((Future)this.clusterCommunicator.sendAndReceive((Object)deviceId, ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES, arg_0 -> ((Serializer)this.serializer).encode(arg_0), arg_0 -> ((Serializer)this.serializer).decode(arg_0), master), (long)5000L, (TimeUnit)TimeUnit.MILLISECONDS, Collections.emptyList());
    }

    public void storeFlowRule(FlowRule rule) {
        this.storeBatch(new FlowRuleBatchOperation(Collections.singletonList(new FlowRuleBatchEntry(FlowRuleBatchEntry.FlowRuleOperation.ADD, rule)), rule.deviceId(), this.idGenerator.getNewId()));
    }

    public void storeBatch(FlowRuleBatchOperation operation) {
        if (operation.getOperations().isEmpty()) {
            this.notifyDelegate((Event)FlowRuleBatchEvent.completed((FlowRuleBatchRequest)new FlowRuleBatchRequest(operation.id(), Collections.emptySet()), (CompletedBatchOperation)new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
            return;
        }
        DeviceId deviceId = operation.deviceId();
        NodeId master = this.mastershipService.getMasterFor(deviceId);
        if (master == null) {
            this.log.warn("No master for {} ", (Object)deviceId);
            Set allFailures = operation.getOperations().stream().map(op -> (FlowRule)op.target()).collect(Collectors.toSet());
            this.notifyDelegate((Event)FlowRuleBatchEvent.completed((FlowRuleBatchRequest)new FlowRuleBatchRequest(operation.id(), Collections.emptySet()), (CompletedBatchOperation)new CompletedBatchOperation(false, allFailures, deviceId)));
            return;
        }
        if (Objects.equals(this.local, master)) {
            this.storeBatchInternal(operation);
            return;
        }
        this.log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}", (Object)master, (Object)deviceId);
        this.clusterCommunicator.unicast((Object)operation, ECFlowRuleStoreMessageSubjects.APPLY_BATCH_FLOWS, arg_0 -> ((Serializer)this.serializer).encode(arg_0), master).whenComplete((result, error) -> {
            if (error != null) {
                this.log.warn("Failed to storeBatch: {} to {}", new Object[]{operation, master, error});
                Set allFailures = operation.getOperations().stream().map(op -> (FlowRule)op.target()).collect(Collectors.toSet());
                this.notifyDelegate((Event)FlowRuleBatchEvent.completed((FlowRuleBatchRequest)new FlowRuleBatchRequest(operation.id(), Collections.emptySet()), (CompletedBatchOperation)new CompletedBatchOperation(false, allFailures, deviceId)));
            }
        });
    }

    private void storeBatchInternal(FlowRuleBatchOperation operation) {
        DeviceId did = operation.deviceId();
        Set<FlowRuleBatchEntry> currentOps = this.updateStoreInternal(operation);
        if (currentOps.isEmpty()) {
            this.batchOperationComplete(FlowRuleBatchEvent.completed((FlowRuleBatchRequest)new FlowRuleBatchRequest(operation.id(), Collections.emptySet()), (CompletedBatchOperation)new CompletedBatchOperation(true, Collections.emptySet(), did)));
            return;
        }
        this.notifyDelegate((Event)FlowRuleBatchEvent.requested((FlowRuleBatchRequest)new FlowRuleBatchRequest(operation.id(), currentOps), (DeviceId)operation.deviceId()));
    }

    private Set<FlowRuleBatchEntry> updateStoreInternal(FlowRuleBatchOperation operation) {
        return operation.getOperations().stream().map(op -> {
            switch ((FlowRuleBatchEntry.FlowRuleOperation)op.operator()) {
                case ADD: {
                    DefaultFlowEntry entry = new DefaultFlowEntry((FlowRule)op.target());
                    this.flowTable.add((FlowEntry)entry);
                    return op;
                }
                case MODIFY: {
                    DefaultFlowEntry entry = new DefaultFlowEntry((FlowRule)op.target());
                    this.flowTable.update((FlowEntry)entry);
                    return op;
                }
                case REMOVE: {
                    return this.flowTable.update((FlowRule)op.target(), stored -> {
                        stored.setState(FlowEntry.FlowEntryState.PENDING_REMOVE);
                        this.log.debug("Setting state of rule to pending remove: {}", stored);
                        return op;
                    });
                }
            }
            this.log.warn("Unknown flow operation operator: {}", (Object)op.operator());
            return null;
        }).filter(Objects::nonNull).collect(Collectors.toSet());
    }

    public void deleteFlowRule(FlowRule rule) {
        this.storeBatch(new FlowRuleBatchOperation(Collections.singletonList(new FlowRuleBatchEntry(FlowRuleBatchEntry.FlowRuleOperation.REMOVE, rule)), rule.deviceId(), this.idGenerator.getNewId()));
    }

    public FlowRuleEvent pendingFlowRule(FlowEntry rule) {
        if (this.mastershipService.isLocalMaster(rule.deviceId())) {
            return this.flowTable.update((FlowRule)rule, stored -> {
                if (stored.state() == FlowEntry.FlowEntryState.PENDING_ADD) {
                    stored.setState(FlowEntry.FlowEntryState.PENDING_ADD);
                    return new FlowRuleEvent(FlowRuleEvent.Type.RULE_UPDATED, (FlowRule)rule);
                }
                return null;
            });
        }
        return null;
    }

    public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
        NodeId master = this.mastershipService.getMasterFor(rule.deviceId());
        if (Objects.equals(this.local, master)) {
            return this.addOrUpdateFlowRuleInternal(rule);
        }
        this.log.warn("Tried to update FlowRule {} state, while the Node was not the master.", (Object)rule);
        return null;
    }

    private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
        FlowRuleEvent event = this.flowTable.update((FlowRule)rule, stored -> {
            stored.setBytes(rule.bytes());
            stored.setLife(rule.life(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
            stored.setLiveType(rule.liveType());
            stored.setPackets(rule.packets());
            stored.setLastSeen();
            if (stored.state() == FlowEntry.FlowEntryState.PENDING_ADD) {
                stored.setState(FlowEntry.FlowEntryState.ADDED);
                return new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADDED, (FlowRule)rule);
            }
            return new FlowRuleEvent(FlowRuleEvent.Type.RULE_UPDATED, (FlowRule)rule);
        });
        if (event != null) {
            return event;
        }
        this.flowTable.add(rule);
        return null;
    }

    public FlowRuleEvent removeFlowRule(FlowEntry rule) {
        DeviceId deviceId = rule.deviceId();
        NodeId master = this.mastershipService.getMasterFor(deviceId);
        if (Objects.equals(this.local, master)) {
            return this.removeFlowRuleInternal(rule);
        }
        if (master == null) {
            this.log.warn("Failed to removeFlowRule: No master for {}", (Object)deviceId);
            return null;
        }
        this.log.trace("Forwarding removeFlowRule to {}, which is the master for device {}", (Object)master, (Object)deviceId);
        return (FlowRuleEvent)Tools.futureGetOrElse((Future)this.clusterCommunicator.sendAndReceive((Object)rule, ECFlowRuleStoreMessageSubjects.REMOVE_FLOW_ENTRY, arg_0 -> ((Serializer)this.serializer).encode(arg_0), arg_0 -> ((Serializer)this.serializer).decode(arg_0), master), (long)5000L, (TimeUnit)TimeUnit.MILLISECONDS, null);
    }

    private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
        FlowEntry removed = this.flowTable.remove(rule);
        return removed != null ? new FlowRuleEvent(FlowRuleEvent.Type.RULE_REMOVED, (FlowRule)removed) : null;
    }

    public void purgeFlowRule(DeviceId deviceId) {
        this.flowTable.purgeFlowRule(deviceId);
    }

    public void purgeFlowRules() {
        this.flowTable.purgeFlowRules();
    }

    public void batchOperationComplete(FlowRuleBatchEvent event) {
        NodeId nodeId = this.pendingResponses.remove(((FlowRuleBatchRequest)event.subject()).batchId());
        if (nodeId == null) {
            this.notifyDelegate((Event)event);
        } else {
            this.clusterCommunicator.unicast((Object)event, ECFlowRuleStoreMessageSubjects.REMOTE_APPLY_COMPLETED, arg_0 -> ((Serializer)this.serializer).encode(arg_0), nodeId);
        }
    }

    public FlowRuleEvent updateTableStatistics(DeviceId deviceId, List<TableStatisticsEntry> tableStats) {
        this.deviceTableStats.put((Object)deviceId, tableStats);
        return null;
    }

    public Iterable<TableStatisticsEntry> getTableStatistics(DeviceId deviceId) {
        NodeId master = this.mastershipService.getMasterFor(deviceId);
        if (master == null) {
            this.log.debug("Failed to getTableStats: No master for {}", (Object)deviceId);
            return Collections.emptyList();
        }
        List tableStats = (List)this.deviceTableStats.get((Object)deviceId);
        if (tableStats == null) {
            return Collections.emptyList();
        }
        return ImmutableList.copyOf((Collection)tableStats);
    }

    public long getActiveFlowRuleCount(DeviceId deviceId) {
        return Streams.stream(this.getTableStatistics(deviceId)).mapToLong(TableStatisticsEntry::activeFlowEntries).sum();
    }

    protected void bindReplicaInfoManager(ReplicaInfoService replicaInfoService) {
        this.replicaInfoManager = replicaInfoService;
    }

    protected void unbindReplicaInfoManager(ReplicaInfoService replicaInfoService) {
        if (this.replicaInfoManager == replicaInfoService) {
            this.replicaInfoManager = null;
        }
    }

    protected void bindClusterCommunicator(ClusterCommunicationService clusterCommunicationService) {
        this.clusterCommunicator = clusterCommunicationService;
    }

    protected void unbindClusterCommunicator(ClusterCommunicationService clusterCommunicationService) {
        if (this.clusterCommunicator == clusterCommunicationService) {
            this.clusterCommunicator = null;
        }
    }

    protected void bindClusterService(ClusterService clusterService) {
        this.clusterService = clusterService;
    }

    protected void unbindClusterService(ClusterService clusterService) {
        if (this.clusterService == clusterService) {
            this.clusterService = null;
        }
    }

    protected void bindDeviceService(DeviceService deviceService) {
        this.deviceService = deviceService;
    }

    protected void unbindDeviceService(DeviceService deviceService) {
        if (this.deviceService == deviceService) {
            this.deviceService = null;
        }
    }

    protected void bindCoreService(CoreService coreService) {
        this.coreService = coreService;
    }

    protected void unbindCoreService(CoreService coreService) {
        if (this.coreService == coreService) {
            this.coreService = null;
        }
    }

    protected void bindConfigService(ComponentConfigService componentConfigService) {
        this.configService = componentConfigService;
    }

    protected void unbindConfigService(ComponentConfigService componentConfigService) {
        if (this.configService == componentConfigService) {
            this.configService = null;
        }
    }

    protected void bindMastershipService(MastershipService mastershipService) {
        this.mastershipService = mastershipService;
    }

    protected void unbindMastershipService(MastershipService mastershipService) {
        if (this.mastershipService == mastershipService) {
            this.mastershipService = null;
        }
    }

    protected void bindPersistenceService(PersistenceService persistenceService) {
        this.persistenceService = persistenceService;
    }

    protected void unbindPersistenceService(PersistenceService persistenceService) {
        if (this.persistenceService == persistenceService) {
            this.persistenceService = null;
        }
    }

    protected void bindStorageService(StorageService storageService) {
        this.storageService = storageService;
    }

    protected void unbindStorageService(StorageService storageService) {
        if (this.storageService == storageService) {
            this.storageService = null;
        }
    }

    private static class CountMessage {
        private final DeviceId deviceId;
        private final FlowEntry.FlowEntryState state;

        CountMessage(DeviceId deviceId, FlowEntry.FlowEntryState state) {
            this.deviceId = deviceId;
            this.state = state;
        }
    }

    private final class InternalLifecycleManager
    extends AbstractListenerManager<LifecycleEvent, LifecycleEventListener>
    implements LifecycleManager,
    ReplicaInfoEventListener,
    MapEventListener<DeviceId, Long> {
        private final DeviceId deviceId;
        private volatile DeviceReplicaInfo replicaInfo;

        InternalLifecycleManager(DeviceId deviceId) {
            this.deviceId = deviceId;
            ECFlowRuleStore.this.replicaInfoManager.addListener(this);
            ECFlowRuleStore.this.mastershipTermLifecycles.addListener((MapEventListener)this);
            this.replicaInfo = this.toDeviceReplicaInfo(ECFlowRuleStore.this.replicaInfoManager.getReplicaInfoFor(deviceId));
        }

        @Override
        public DeviceReplicaInfo getReplicaInfo() {
            return this.replicaInfo;
        }

        @Override
        public void activate(long term) {
            ReplicaInfo replicaInfo = ECFlowRuleStore.this.replicaInfoManager.getReplicaInfoFor(this.deviceId);
            if (replicaInfo != null && replicaInfo.term() == term) {
                ECFlowRuleStore.this.mastershipTermLifecycles.put((Object)this.deviceId, (Object)term);
            }
        }

        public void event(ReplicaInfoEvent event) {
            if (((DeviceId)event.subject()).equals((Object)this.deviceId)) {
                this.onReplicaInfoChange(event.replicaInfo());
            }
        }

        public void event(MapEvent<DeviceId, Long> event) {
            if (((DeviceId)event.key()).equals((Object)this.deviceId) && event.newValue() != null) {
                this.onActivate((Long)event.newValue().value());
            }
        }

        private void onActivate(long term) {
            ReplicaInfo replicaInfo = ECFlowRuleStore.this.replicaInfoManager.getReplicaInfoFor(this.deviceId);
            if (replicaInfo != null && replicaInfo.term() == term) {
                NodeId master = replicaInfo.master().orElse(null);
                List<NodeId> backups = replicaInfo.backups().subList(0, Math.min(replicaInfo.backups().size(), ECFlowRuleStore.this.backupCount));
                this.listenerRegistry.process((Event)new LifecycleEvent(LifecycleEvent.Type.TERM_ACTIVE, new DeviceReplicaInfo(term, master, backups)));
            }
        }

        private synchronized void onReplicaInfoChange(ReplicaInfo replicaInfo) {
            DeviceReplicaInfo oldReplicaInfo = this.replicaInfo;
            this.replicaInfo = this.toDeviceReplicaInfo(replicaInfo);
            if (oldReplicaInfo == null || oldReplicaInfo.term() < replicaInfo.term()) {
                if (oldReplicaInfo != null) {
                    this.listenerRegistry.process((Event)new LifecycleEvent(LifecycleEvent.Type.TERM_END, oldReplicaInfo));
                }
                this.listenerRegistry.process((Event)new LifecycleEvent(LifecycleEvent.Type.TERM_START, this.replicaInfo));
            } else if (oldReplicaInfo.term() == replicaInfo.term()) {
                this.listenerRegistry.process((Event)new LifecycleEvent(LifecycleEvent.Type.TERM_UPDATE, this.replicaInfo));
            }
        }

        private DeviceReplicaInfo toDeviceReplicaInfo(ReplicaInfo replicaInfo) {
            NodeId master = replicaInfo.master().orElse(null);
            List<NodeId> backups = replicaInfo.backups().subList(0, Math.min(replicaInfo.backups().size(), ECFlowRuleStore.this.backupCount));
            return new DeviceReplicaInfo(replicaInfo.term(), master, backups);
        }

        @Override
        public void close() {
            ECFlowRuleStore.this.replicaInfoManager.removeListener(this);
            ECFlowRuleStore.this.mastershipTermLifecycles.removeListener((MapEventListener)this);
        }
    }

    private class InternalTableStatsListener
    implements EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> {
        private InternalTableStatsListener() {
        }

        public void event(EventuallyConsistentMapEvent<DeviceId, List<TableStatisticsEntry>> event) {
        }
    }

    private class InternalFlowTable
    implements DeviceListener {
        private final Map<DeviceId, DeviceFlowTable> flowTables = Maps.newConcurrentMap();

        private InternalFlowTable() {
        }

        public void event(DeviceEvent event) {
            if (event.type() == DeviceEvent.Type.DEVICE_ADDED) {
                this.addDevice(((Device)event.subject()).id());
            }
        }

        public void addDevice(DeviceId deviceId) {
            this.flowTables.computeIfAbsent(deviceId, id -> new DeviceFlowTable((DeviceId)id, ECFlowRuleStore.this.clusterService, ECFlowRuleStore.this.clusterCommunicator, new InternalLifecycleManager((DeviceId)id), ECFlowRuleStore.this.backupScheduler, (Executor)new OrderedExecutor((Executor)ECFlowRuleStore.this.backupExecutor), ECFlowRuleStore.this.backupPeriod, ECFlowRuleStore.this.antiEntropyPeriod));
        }

        void setBackupPeriod(int backupPeriod) {
            this.flowTables.values().forEach(flowTable -> flowTable.setBackupPeriod(backupPeriod));
        }

        void setAntiEntropyPeriod(int antiEntropyPeriod) {
            this.flowTables.values().forEach(flowTable -> flowTable.setAntiEntropyPeriod(antiEntropyPeriod));
        }

        private DeviceFlowTable getFlowTable(DeviceId deviceId) {
            DeviceFlowTable flowTable = this.flowTables.get(deviceId);
            return flowTable != null ? flowTable : this.flowTables.computeIfAbsent(deviceId, id -> new DeviceFlowTable(deviceId, ECFlowRuleStore.this.clusterService, ECFlowRuleStore.this.clusterCommunicator, new InternalLifecycleManager(deviceId), ECFlowRuleStore.this.backupScheduler, (Executor)new OrderedExecutor((Executor)ECFlowRuleStore.this.backupExecutor), ECFlowRuleStore.this.backupPeriod, ECFlowRuleStore.this.antiEntropyPeriod));
        }

        public int getFlowRuleCount(DeviceId deviceId) {
            return this.getFlowTable(deviceId).count();
        }

        public int getFlowRuleCount(DeviceId deviceId, FlowEntry.FlowEntryState state) {
            if (state == null) {
                return this.getFlowRuleCount(deviceId);
            }
            return (int)this.getFlowTable(deviceId).getFlowEntries().stream().filter(rule -> rule.state() == state).count();
        }

        public StoredFlowEntry getFlowEntry(FlowRule rule) {
            return this.getFlowTable(rule.deviceId()).getFlowEntry(rule);
        }

        public Set<FlowEntry> getFlowEntries(DeviceId deviceId) {
            return this.getFlowTable(deviceId).getFlowEntries();
        }

        public void add(FlowEntry rule) {
            Tools.futureGetOrElse(this.getFlowTable(rule.deviceId()).add(rule), (long)5000L, (TimeUnit)TimeUnit.MILLISECONDS, null);
        }

        public void update(FlowEntry rule) {
            Tools.futureGetOrElse(this.getFlowTable(rule.deviceId()).update(rule), (long)5000L, (TimeUnit)TimeUnit.MILLISECONDS, null);
        }

        public <T> T update(FlowRule rule, Function<StoredFlowEntry, T> function) {
            return (T)Tools.futureGetOrElse(this.getFlowTable(rule.deviceId()).update(rule, function), (long)5000L, (TimeUnit)TimeUnit.MILLISECONDS, null);
        }

        public FlowEntry remove(FlowEntry rule) {
            return (FlowEntry)Tools.futureGetOrElse(this.getFlowTable(rule.deviceId()).remove(rule), (long)5000L, (TimeUnit)TimeUnit.MILLISECONDS, null);
        }

        public void purgeFlowRule(DeviceId deviceId) {
            DeviceFlowTable flowTable = this.flowTables.remove(deviceId);
            if (flowTable != null) {
                flowTable.close();
            }
        }

        public void purgeFlowRules() {
            Iterator<DeviceFlowTable> iterator = this.flowTables.values().iterator();
            while (iterator.hasNext()) {
                iterator.next().close();
                iterator.remove();
            }
        }
    }

    private final class OnStoreBatch
    implements ClusterMessageHandler {
        private OnStoreBatch() {
        }

        public void handle(ClusterMessage message) {
            FlowRuleBatchOperation operation = (FlowRuleBatchOperation)ECFlowRuleStore.this.serializer.decode(message.payload());
            ECFlowRuleStore.this.log.debug("received batch request {}", (Object)operation);
            DeviceId deviceId = operation.deviceId();
            NodeId master = ECFlowRuleStore.this.mastershipService.getMasterFor(deviceId);
            if (!Objects.equals(ECFlowRuleStore.this.local, master)) {
                HashSet<Object> failures = new HashSet<Object>(operation.size());
                for (FlowRuleBatchEntry op : operation.getOperations()) {
                    failures.add(op.target());
                }
                CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures, deviceId);
                message.respond(ECFlowRuleStore.this.serializer.encode((Object)allFailed));
                return;
            }
            ECFlowRuleStore.this.pendingResponses.put(operation.id(), message.sender());
            ECFlowRuleStore.this.storeBatchInternal(operation);
        }
    }
}

