/*
 * Decompiled with CFR 0.152.
 */
package org.onosproject.store.flow.impl;

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.onlab.util.KryoNamespace;
import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.net.DeviceId;
import org.onosproject.net.flow.FlowEntry;
import org.onosproject.net.flow.FlowId;
import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.flow.StoredFlowEntry;
import org.onosproject.store.LogicalTimestamp;
import org.onosproject.store.Timestamp;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.flow.impl.BackupOperation;
import org.onosproject.store.flow.impl.BucketId;
import org.onosproject.store.flow.impl.DeviceReplicaInfo;
import org.onosproject.store.flow.impl.FlowBucket;
import org.onosproject.store.flow.impl.FlowBucketDigest;
import org.onosproject.store.flow.impl.LifecycleEvent;
import org.onosproject.store.flow.impl.LifecycleEventListener;
import org.onosproject.store.flow.impl.LifecycleManager;
import org.onosproject.store.flow.impl.LogicalClock;
import org.onosproject.store.flow.impl.Timestamped;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DeviceFlowTable {
    private static final int NUM_BUCKETS = 1024;
    private static final Serializer SERIALIZER = Serializer.using((KryoNamespace)KryoNamespace.newBuilder().register(KryoNamespaces.API).register(new Class[]{BucketId.class}).register(new Class[]{FlowBucket.class}).register(new Class[]{FlowBucketDigest.class}).register(new Class[]{LogicalTimestamp.class}).register(new Class[]{Timestamped.class}).build());
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    private final MessageSubject getDigestsSubject;
    private final MessageSubject getBucketSubject;
    private final MessageSubject backupSubject;
    private final DeviceId deviceId;
    private final ClusterCommunicationService clusterCommunicator;
    private final LifecycleManager lifecycleManager;
    private final ScheduledExecutorService executorService;
    private final NodeId localNodeId;
    private final LogicalClock clock = new LogicalClock();
    private volatile DeviceReplicaInfo replicaInfo;
    private volatile long activeTerm;
    private final LifecycleEventListener lifecycleEventListener = new LifecycleEventListener(){

        public void event(LifecycleEvent event) {
            DeviceFlowTable.this.executorService.execute(() -> DeviceFlowTable.this.onLifecycleEvent(event));
        }
    };
    private ScheduledFuture<?> backupFuture;
    private ScheduledFuture<?> antiEntropyFuture;
    private final Map<Integer, Queue<Runnable>> flowTasks = Maps.newConcurrentMap();
    private final Map<Integer, FlowBucket> flowBuckets = Maps.newConcurrentMap();
    private final Map<BackupOperation, LogicalTimestamp> lastBackupTimes = Maps.newConcurrentMap();
    private final Set<BackupOperation> inFlightUpdates = Sets.newConcurrentHashSet();

    DeviceFlowTable(DeviceId deviceId, ClusterService clusterService, ClusterCommunicationService clusterCommunicator, LifecycleManager lifecycleManager, ScheduledExecutorService executorService, long backupPeriod, long antiEntropyPeriod) {
        this.deviceId = deviceId;
        this.clusterCommunicator = clusterCommunicator;
        this.lifecycleManager = lifecycleManager;
        this.executorService = executorService;
        this.localNodeId = clusterService.getLocalNode().id();
        this.addListeners();
        for (int i = 0; i < 1024; ++i) {
            this.flowBuckets.put(i, new FlowBucket(new BucketId(deviceId, i)));
        }
        this.getDigestsSubject = new MessageSubject(String.format("flow-store-%s-digests", deviceId));
        this.getBucketSubject = new MessageSubject(String.format("flow-store-%s-bucket", deviceId));
        this.backupSubject = new MessageSubject(String.format("flow-store-%s-backup", deviceId));
        this.setBackupPeriod(backupPeriod);
        this.setAntiEntropyPeriod(antiEntropyPeriod);
        this.registerSubscribers();
        this.startTerm(lifecycleManager.getReplicaInfo());
    }

    synchronized void setBackupPeriod(long backupPeriod) {
        ScheduledFuture<?> backupFuture = this.backupFuture;
        if (backupFuture != null) {
            backupFuture.cancel(false);
        }
        this.backupFuture = this.executorService.scheduleAtFixedRate(this::backup, backupPeriod, backupPeriod, TimeUnit.MILLISECONDS);
    }

    synchronized void setAntiEntropyPeriod(long antiEntropyPeriod) {
        ScheduledFuture<?> antiEntropyFuture = this.antiEntropyFuture;
        if (antiEntropyFuture != null) {
            antiEntropyFuture.cancel(false);
        }
        this.antiEntropyFuture = this.executorService.scheduleAtFixedRate(this::runAntiEntropy, antiEntropyPeriod, antiEntropyPeriod, TimeUnit.MILLISECONDS);
    }

    public int count() {
        return this.flowBuckets.values().stream().mapToInt(FlowBucket::count).sum();
    }

    public StoredFlowEntry getFlowEntry(FlowRule rule) {
        return this.getBucket(rule.id()).getFlowEntries(rule.id()).get(rule);
    }

    public Set<FlowEntry> getFlowEntries() {
        return this.flowBuckets.values().stream().flatMap(bucket -> bucket.getFlowBucket().values().stream()).flatMap(entries -> entries.values().stream()).collect(Collectors.toSet());
    }

    private FlowBucket getBucket(FlowId flowId) {
        return this.getBucket(this.bucket(flowId));
    }

    private FlowBucket getBucket(int bucketId) {
        return this.flowBuckets.get(bucketId);
    }

    private int bucket(FlowId flowId) {
        return Math.abs((int)((Long)flowId.id() % 1024L));
    }

    private Set<FlowBucketDigest> getDigests() {
        return this.flowBuckets.values().stream().map(bucket -> bucket.getDigest()).collect(Collectors.toSet());
    }

    private FlowBucketDigest getDigest(int bucket) {
        return this.flowBuckets.get(bucket).getDigest();
    }

    public CompletableFuture<Void> add(FlowEntry rule) {
        return this.runInTerm(rule.id(), (bucket, term) -> {
            bucket.add(rule, (long)term, this.clock);
            return null;
        });
    }

    public CompletableFuture<Void> update(FlowEntry rule) {
        return this.runInTerm(rule.id(), (bucket, term) -> {
            bucket.update(rule, (long)term, this.clock);
            return null;
        });
    }

    public <T> CompletableFuture<T> update(FlowRule rule, Function<StoredFlowEntry, T> function) {
        return this.runInTerm(rule.id(), (bucket, term) -> bucket.update(rule, function, (long)term, this.clock));
    }

    public CompletableFuture<FlowEntry> remove(FlowEntry rule) {
        return this.runInTerm(rule.id(), (bucket, term) -> bucket.remove(rule, (long)term, this.clock));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <T> CompletableFuture<T> runInTerm(FlowId flowId, BiFunction<FlowBucket, Long, T> function) {
        DeviceReplicaInfo replicaInfo = this.lifecycleManager.getReplicaInfo();
        if (!replicaInfo.isMaster(this.localNodeId)) {
            return Tools.exceptionalFuture((Throwable)new IllegalStateException());
        }
        FlowBucket bucket = this.getBucket(flowId);
        long term = replicaInfo.term();
        if (this.activeTerm < term) {
            this.log.debug("Enqueueing operation for device {}", (Object)this.deviceId);
            Map<Integer, Queue<Runnable>> map = this.flowTasks;
            synchronized (map) {
                if (this.activeTerm < term) {
                    CompletableFuture future = new CompletableFuture();
                    this.flowTasks.computeIfAbsent(bucket.bucketId().bucket(), b -> new LinkedList()).add(() -> future.complete(function.apply(bucket, term)));
                    return future;
                }
            }
        }
        return CompletableFuture.completedFuture(function.apply(bucket, term));
    }

    private void backup() {
        DeviceReplicaInfo replicaInfo = this.lifecycleManager.getReplicaInfo();
        if (!replicaInfo.isMaster(this.localNodeId)) {
            return;
        }
        for (NodeId nodeId : replicaInfo.backups()) {
            try {
                this.backup(nodeId, replicaInfo.term());
            }
            catch (Exception e) {
                this.log.error("Backup of " + this.deviceId + " to " + nodeId + " failed", (Throwable)e);
            }
        }
    }

    private void backup(NodeId nodeId, long term) {
        for (FlowBucket bucket : this.flowBuckets.values()) {
            if (bucket.term() != term) continue;
            LogicalTimestamp timestamp = bucket.timestamp();
            BackupOperation operation = new BackupOperation(nodeId, bucket.bucketId().bucket());
            if (!this.startBackup(operation, timestamp)) continue;
            this.backup(bucket.copy(), nodeId).whenCompleteAsync((succeeded, error) -> {
                if (error != null) {
                    this.log.debug("Backup operation {} failed", (Object)operation, error);
                    this.failBackup(operation);
                } else if (succeeded.booleanValue()) {
                    this.succeedBackup(operation, timestamp);
                    this.backup(nodeId, term);
                } else {
                    this.log.debug("Backup operation {} failed: term mismatch", (Object)operation);
                    this.failBackup(operation);
                }
            }, (Executor)this.executorService);
        }
    }

    private boolean startBackup(BackupOperation operation, LogicalTimestamp timestamp) {
        LogicalTimestamp lastBackupTime = this.lastBackupTimes.get(operation);
        return timestamp != null && (lastBackupTime == null || lastBackupTime.isOlderThan((Timestamp)timestamp)) && this.inFlightUpdates.add(operation);
    }

    private void failBackup(BackupOperation operation) {
        this.inFlightUpdates.remove(operation);
    }

    private void succeedBackup(BackupOperation operation, LogicalTimestamp timestamp) {
        this.lastBackupTimes.put(operation, timestamp);
        this.inFlightUpdates.remove(operation);
    }

    private void resetBackup(BackupOperation operation) {
        this.lastBackupTimes.remove(operation);
    }

    private CompletableFuture<Boolean> backup(FlowBucket bucket, NodeId nodeId) {
        if (this.log.isDebugEnabled()) {
            this.log.debug("Backing up {} flow entries in bucket {} to {}", new Object[]{bucket.count(), bucket.bucketId(), nodeId});
        }
        return this.sendWithTimestamp(bucket, this.backupSubject, nodeId);
    }

    private boolean onBackup(FlowBucket flowBucket) {
        if (this.log.isDebugEnabled()) {
            this.log.debug("{} - Received {} flow entries in bucket {} to backup", new Object[]{this.deviceId, flowBucket.count(), flowBucket.bucketId()});
        }
        try {
            DeviceReplicaInfo replicaInfo = this.lifecycleManager.getReplicaInfo();
            if (flowBucket.term() != replicaInfo.term()) {
                this.log.debug("Term mismatch for device {}: {} != {}", new Object[]{this.deviceId, flowBucket.term(), replicaInfo});
                return false;
            }
            this.flowBuckets.compute(flowBucket.bucketId().bucket(), (id, bucket) -> flowBucket.getDigest().isNewerThan(bucket.getDigest()) ? flowBucket : bucket);
            return true;
        }
        catch (Exception e) {
            this.log.warn("Failure processing backup request", (Throwable)e);
            return false;
        }
    }

    private void runAntiEntropy() {
        DeviceReplicaInfo replicaInfo = this.lifecycleManager.getReplicaInfo();
        if (!replicaInfo.isMaster(this.localNodeId)) {
            return;
        }
        for (NodeId nodeId : replicaInfo.backups()) {
            this.runAntiEntropy(nodeId);
        }
    }

    private void runAntiEntropy(NodeId nodeId) {
        this.requestDigests(nodeId).thenAcceptAsync(digests -> {
            for (FlowBucketDigest remoteDigest : digests) {
                FlowBucket localBucket = this.getBucket(remoteDigest.bucket());
                if (!localBucket.getDigest().isNewerThan(remoteDigest)) continue;
                this.log.debug("Detected missing flow entries on node {} in bucket {}/{}", new Object[]{nodeId, this.deviceId, remoteDigest.bucket()});
                this.resetBackup(new BackupOperation(nodeId, remoteDigest.bucket()));
            }
        }, (Executor)this.executorService);
    }

    private CompletableFuture<Set<FlowBucketDigest>> requestDigests(NodeId nodeId) {
        return this.sendWithTimestamp(this.deviceId, this.getDigestsSubject, nodeId);
    }

    private void syncFlows(DeviceReplicaInfo prevReplicaInfo, DeviceReplicaInfo newReplicaInfo) {
        if (prevReplicaInfo == null) {
            this.activateMaster(newReplicaInfo);
        } else if (prevReplicaInfo.master() != null && !prevReplicaInfo.master().equals((Object)this.localNodeId)) {
            this.syncFlowsOnMaster(prevReplicaInfo, newReplicaInfo);
        } else {
            this.syncFlowsOnBackups(prevReplicaInfo, newReplicaInfo);
        }
    }

    private void syncFlowsOnMaster(DeviceReplicaInfo prevReplicaInfo, DeviceReplicaInfo newReplicaInfo) {
        this.syncFlowsOn(prevReplicaInfo.master()).whenCompleteAsync((result, error) -> {
            if (error != null) {
                this.log.debug("Failed to synchronize flows on previous master {}", (Object)prevReplicaInfo.master(), error);
                this.syncFlowsOnBackups(prevReplicaInfo, newReplicaInfo);
            } else {
                this.activateMaster(newReplicaInfo);
            }
        }, (Executor)this.executorService);
    }

    private void syncFlowsOnBackups(DeviceReplicaInfo prevReplicaInfo, DeviceReplicaInfo newReplicaInfo) {
        List<NodeId> backups = prevReplicaInfo.backups().stream().filter(nodeId -> !nodeId.equals((Object)this.localNodeId)).collect(Collectors.toList());
        this.syncFlowsOn(backups).whenCompleteAsync((result, error) -> {
            if (error != null) {
                this.log.debug("Failed to synchronize flows on previous backup nodes {}", (Object)backups, error);
            }
            this.activateMaster(newReplicaInfo);
        }, (Executor)this.executorService);
    }

    private CompletableFuture<Void> syncFlowsOn(Collection<NodeId> nodes) {
        return nodes.isEmpty() ? CompletableFuture.completedFuture(null) : Tools.firstOf(nodes.stream().map(node -> this.syncFlowsOn((NodeId)node)).collect(Collectors.toList())).thenApply(v -> null);
    }

    private CompletableFuture<Void> syncFlowsOn(NodeId nodeId) {
        return ((CompletableFuture)this.requestDigests(nodeId).thenCompose(digests -> Tools.allOf(digests.stream().filter(digest -> digest.isNewerThan(this.getDigest(digest.bucket()))).map(digest -> this.syncBucketOn(nodeId, digest.bucket())).collect(Collectors.toList())))).thenApply(v -> null);
    }

    private CompletableFuture<Void> syncBucketOn(NodeId nodeId, int bucketNumber) {
        return this.requestBucket(nodeId, bucketNumber).thenAcceptAsync(flowBucket -> this.flowBuckets.compute(flowBucket.bucketId().bucket(), (id, bucket) -> flowBucket.getDigest().isNewerThan(bucket.getDigest()) ? flowBucket : bucket), (Executor)this.executorService);
    }

    private CompletableFuture<FlowBucket> requestBucket(NodeId nodeId, int bucket) {
        this.log.debug("Requesting flow bucket {} from {}", (Object)bucket, (Object)nodeId);
        return this.sendWithTimestamp(bucket, this.getBucketSubject, nodeId);
    }

    private FlowBucket onGetBucket(int bucketId) {
        return this.flowBuckets.get(bucketId).copy();
    }

    private void activateMaster(DeviceReplicaInfo replicaInfo) {
        this.log.debug("Activating term {} for device {}", (Object)replicaInfo.term(), (Object)this.deviceId);
        for (int i = 0; i < 1024; ++i) {
            this.activateBucket(i);
        }
        this.lifecycleManager.activate(replicaInfo.term());
        this.activeTerm = replicaInfo.term();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void activateBucket(int bucket) {
        Queue<Runnable> tasks;
        Map<Integer, Queue<Runnable>> map = this.flowTasks;
        synchronized (map) {
            tasks = this.flowTasks.remove(bucket);
        }
        if (tasks != null) {
            this.log.debug("Completing enqueued operations for device {}", (Object)this.deviceId);
            tasks.forEach(task -> task.run());
        }
    }

    private void onLifecycleEvent(LifecycleEvent event) {
        this.log.debug("Received lifecycle event for device {}: {}", (Object)this.deviceId, (Object)event);
        switch ((LifecycleEvent.Type)event.type()) {
            case TERM_START: {
                this.startTerm((DeviceReplicaInfo)event.subject());
                break;
            }
            case TERM_ACTIVE: {
                this.activateTerm((DeviceReplicaInfo)event.subject());
                break;
            }
            case TERM_UPDATE: {
                this.updateTerm((DeviceReplicaInfo)event.subject());
                break;
            }
        }
    }

    private void startTerm(DeviceReplicaInfo replicaInfo) {
        DeviceReplicaInfo oldReplicaInfo = this.replicaInfo;
        this.replicaInfo = replicaInfo;
        if (replicaInfo.isMaster(this.localNodeId)) {
            this.log.info("Synchronizing device {} flows for term {}", (Object)this.deviceId, (Object)replicaInfo.term());
            this.syncFlows(oldReplicaInfo, replicaInfo);
        }
    }

    private void activateTerm(DeviceReplicaInfo replicaInfo) {
        if (replicaInfo.term() < this.replicaInfo.term()) {
            return;
        }
        if (replicaInfo.term() > this.replicaInfo.term()) {
            this.replicaInfo = replicaInfo;
        }
        if (!replicaInfo.isMaster(this.localNodeId) && !replicaInfo.isBackup(this.localNodeId)) {
            this.flowBuckets.values().forEach(bucket -> bucket.clear());
        }
        this.activeTerm = replicaInfo.term();
    }

    private void updateTerm(DeviceReplicaInfo replicaInfo) {
        if (replicaInfo.term() == this.replicaInfo.term()) {
            this.replicaInfo = replicaInfo;
            if (this.activeTerm == replicaInfo.term() && !replicaInfo.isMaster(this.localNodeId) && !replicaInfo.isBackup(this.localNodeId)) {
                this.flowBuckets.values().forEach(bucket -> bucket.clear());
            }
        }
    }

    private <M, R> CompletableFuture<R> sendWithTimestamp(M message, MessageSubject subject, NodeId toNodeId) {
        return this.clusterCommunicator.sendAndReceive(this.clock.timestamp(message), subject, arg_0 -> ((Serializer)SERIALIZER).encode(arg_0), arg_0 -> ((Serializer)SERIALIZER).decode(arg_0), toNodeId).thenApply(response -> {
            this.clock.tick(response.timestamp());
            return response.value();
        });
    }

    private <M, R> void receiveWithTimestamp(MessageSubject subject, Function<M, R> function) {
        this.clusterCommunicator.addSubscriber(subject, arg_0 -> ((Serializer)SERIALIZER).decode(arg_0), request -> {
            this.clock.tick(request.timestamp());
            return this.clock.timestamp(function.apply(request.value()));
        }, arg_0 -> ((Serializer)SERIALIZER).encode(arg_0), (Executor)this.executorService);
    }

    private void registerSubscribers() {
        this.receiveWithTimestamp(this.getDigestsSubject, v -> this.getDigests());
        this.receiveWithTimestamp(this.getBucketSubject, this::onGetBucket);
        this.receiveWithTimestamp(this.backupSubject, this::onBackup);
    }

    private void unregisterSubscribers() {
        this.clusterCommunicator.removeSubscriber(this.getDigestsSubject);
        this.clusterCommunicator.removeSubscriber(this.getBucketSubject);
        this.clusterCommunicator.removeSubscriber(this.backupSubject);
    }

    private void addListeners() {
        this.lifecycleManager.addListener(this.lifecycleEventListener);
    }

    private void removeListeners() {
        this.lifecycleManager.removeListener(this.lifecycleEventListener);
    }

    private synchronized void cancelFutures() {
        ScheduledFuture<?> antiEntropyFuture;
        ScheduledFuture<?> backupFuture = this.backupFuture;
        if (backupFuture != null) {
            backupFuture.cancel(false);
        }
        if ((antiEntropyFuture = this.antiEntropyFuture) != null) {
            antiEntropyFuture.cancel(false);
        }
    }

    public void purge() {
        this.flowTasks.clear();
        this.flowBuckets.values().forEach(bucket -> bucket.purge());
        this.lastBackupTimes.clear();
        this.inFlightUpdates.clear();
    }

    public void close() {
        this.removeListeners();
        this.unregisterSubscribers();
        this.cancelFutures();
        this.lifecycleManager.close();
    }
}

