/*
 * Decompiled with CFR 0.152.
 */
package org.onosproject.store.statistic.impl;

import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.DeviceId;
import org.onosproject.net.ElementId;
import org.onosproject.net.PortNumber;
import org.onosproject.net.flow.FlowEntry;
import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.flow.instructions.Instruction;
import org.onosproject.net.flow.instructions.Instructions;
import org.onosproject.net.statistic.StatisticStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.flow.ReplicaInfo;
import org.onosproject.store.flow.ReplicaInfoService;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Component(immediate=true)
@Service
public class DistributedStatisticStore
implements StatisticStore {
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected ReplicaInfoService replicaInfoManager;
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected ClusterCommunicationService clusterCommunicator;
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected ClusterService clusterService;
    private Map<ConnectPoint, InternalStatisticRepresentation> representations = new ConcurrentHashMap<ConnectPoint, InternalStatisticRepresentation>();
    private Map<ConnectPoint, Set<FlowEntry>> previous = new ConcurrentHashMap<ConnectPoint, Set<FlowEntry>>();
    private Map<ConnectPoint, Set<FlowEntry>> current = new ConcurrentHashMap<ConnectPoint, Set<FlowEntry>>();
    protected static final KryoSerializer SERIALIZER = new KryoSerializer(){

        protected void setupKryoPool() {
            this.serializerPool = KryoNamespace.newBuilder().register(KryoNamespaces.API).nextId(300).build();
        }
    };
    private static final long STATISTIC_STORE_TIMEOUT_MILLIS = 3000L;

    @Activate
    public void activate() {
        this.clusterCommunicator.addSubscriber(StatisticStoreMessageSubjects.GET_CURRENT, new ClusterMessageHandler(){

            public void handle(ClusterMessage message) {
                ConnectPoint cp = (ConnectPoint)SERIALIZER.decode(message.payload());
                try {
                    message.respond(SERIALIZER.encode((Object)DistributedStatisticStore.this.getCurrentStatisticInternal(cp)));
                }
                catch (IOException e) {
                    DistributedStatisticStore.this.log.error("Failed to respond back", (Throwable)e);
                }
            }
        });
        this.clusterCommunicator.addSubscriber(StatisticStoreMessageSubjects.GET_PREVIOUS, new ClusterMessageHandler(){

            public void handle(ClusterMessage message) {
                ConnectPoint cp = (ConnectPoint)SERIALIZER.decode(message.payload());
                try {
                    message.respond(SERIALIZER.encode((Object)DistributedStatisticStore.this.getPreviousStatisticInternal(cp)));
                }
                catch (IOException e) {
                    DistributedStatisticStore.this.log.error("Failed to respond back", (Throwable)e);
                }
            }
        });
        this.log.info("Started");
    }

    @Deactivate
    public void deactivate() {
        this.log.info("Stopped");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void prepareForStatistics(FlowRule rule) {
        InternalStatisticRepresentation rep;
        ConnectPoint cp = this.buildConnectPoint(rule);
        if (cp == null) {
            return;
        }
        Map<ConnectPoint, InternalStatisticRepresentation> map = this.representations;
        synchronized (map) {
            rep = this.getOrCreateRepresentation(cp);
        }
        rep.prepare();
    }

    public synchronized void removeFromStatistics(FlowRule rule) {
        Set<FlowEntry> values;
        ConnectPoint cp = this.buildConnectPoint(rule);
        if (cp == null) {
            return;
        }
        InternalStatisticRepresentation rep = this.representations.get(cp);
        if (rep != null && rep.remove(rule)) {
            this.updatePublishedStats(cp, Collections.emptySet());
        }
        if ((values = this.current.get(cp)) != null) {
            values.remove(rule);
        }
        if ((values = this.previous.get(cp)) != null) {
            values.remove(rule);
        }
    }

    public void addOrUpdateStatistic(FlowEntry rule) {
        ConnectPoint cp = this.buildConnectPoint((FlowRule)rule);
        if (cp == null) {
            return;
        }
        InternalStatisticRepresentation rep = this.representations.get(cp);
        if (rep != null && rep.submit(rule)) {
            this.updatePublishedStats(cp, rep.get());
        }
    }

    private synchronized void updatePublishedStats(ConnectPoint cp, Set<FlowEntry> flowEntries) {
        Set<FlowEntry> curr = this.current.get(cp);
        if (curr == null) {
            curr = new HashSet<FlowEntry>();
        }
        this.previous.put(cp, curr);
        this.current.put(cp, flowEntries);
    }

    public Set<FlowEntry> getCurrentStatistic(ConnectPoint connectPoint) {
        DeviceId deviceId = connectPoint.deviceId();
        ReplicaInfo replicaInfo = this.replicaInfoManager.getReplicaInfoFor(deviceId);
        if (!replicaInfo.master().isPresent()) {
            this.log.warn("No master for {}", (Object)deviceId);
            return Collections.emptySet();
        }
        if (((NodeId)replicaInfo.master().get()).equals((Object)this.clusterService.getLocalNode().id())) {
            return this.getCurrentStatisticInternal(connectPoint);
        }
        ClusterMessage message = new ClusterMessage(this.clusterService.getLocalNode().id(), StatisticStoreMessageSubjects.GET_CURRENT, SERIALIZER.encode((Object)connectPoint));
        try {
            ListenableFuture response = this.clusterCommunicator.sendAndReceive(message, (NodeId)replicaInfo.master().get());
            return (Set)SERIALIZER.decode((byte[])response.get(3000L, TimeUnit.MILLISECONDS));
        }
        catch (IOException | InterruptedException | ExecutionException | TimeoutException e) {
            this.log.warn("Unable to communicate with peer {}", replicaInfo.master().get());
            return Collections.emptySet();
        }
    }

    private synchronized Set<FlowEntry> getCurrentStatisticInternal(ConnectPoint connectPoint) {
        return this.current.get(connectPoint);
    }

    public Set<FlowEntry> getPreviousStatistic(ConnectPoint connectPoint) {
        DeviceId deviceId = connectPoint.deviceId();
        ReplicaInfo replicaInfo = this.replicaInfoManager.getReplicaInfoFor(deviceId);
        if (!replicaInfo.master().isPresent()) {
            this.log.warn("No master for {}", (Object)deviceId);
            return Collections.emptySet();
        }
        if (((NodeId)replicaInfo.master().get()).equals((Object)this.clusterService.getLocalNode().id())) {
            return this.getPreviousStatisticInternal(connectPoint);
        }
        ClusterMessage message = new ClusterMessage(this.clusterService.getLocalNode().id(), StatisticStoreMessageSubjects.GET_PREVIOUS, SERIALIZER.encode((Object)connectPoint));
        try {
            ListenableFuture response = this.clusterCommunicator.sendAndReceive(message, (NodeId)replicaInfo.master().get());
            return (Set)SERIALIZER.decode((byte[])response.get(3000L, TimeUnit.MILLISECONDS));
        }
        catch (IOException | InterruptedException | ExecutionException | TimeoutException e) {
            this.log.warn("Unable to communicate with peer {}", replicaInfo.master().get());
            return Collections.emptySet();
        }
    }

    private synchronized Set<FlowEntry> getPreviousStatisticInternal(ConnectPoint connectPoint) {
        return this.previous.get(connectPoint);
    }

    private InternalStatisticRepresentation getOrCreateRepresentation(ConnectPoint cp) {
        if (this.representations.containsKey(cp)) {
            return this.representations.get(cp);
        }
        InternalStatisticRepresentation rep = new InternalStatisticRepresentation();
        this.representations.put(cp, rep);
        return rep;
    }

    private ConnectPoint buildConnectPoint(FlowRule rule) {
        PortNumber port = this.getOutput(rule);
        if (port == null) {
            this.log.debug("Rule {} has no output.", (Object)rule);
            return null;
        }
        ConnectPoint cp = new ConnectPoint((ElementId)rule.deviceId(), port);
        return cp;
    }

    private PortNumber getOutput(FlowRule rule) {
        for (Instruction i : rule.treatment().instructions()) {
            if (i.type() == Instruction.Type.OUTPUT) {
                Instructions.OutputInstruction out = (Instructions.OutputInstruction)i;
                return out.port();
            }
            if (i.type() != Instruction.Type.DROP) continue;
            return PortNumber.P0;
        }
        return null;
    }

    protected void bindReplicaInfoManager(ReplicaInfoService replicaInfoService) {
        this.replicaInfoManager = replicaInfoService;
    }

    protected void unbindReplicaInfoManager(ReplicaInfoService replicaInfoService) {
        if (this.replicaInfoManager == replicaInfoService) {
            this.replicaInfoManager = null;
        }
    }

    protected void bindClusterCommunicator(ClusterCommunicationService clusterCommunicationService) {
        this.clusterCommunicator = clusterCommunicationService;
    }

    protected void unbindClusterCommunicator(ClusterCommunicationService clusterCommunicationService) {
        if (this.clusterCommunicator == clusterCommunicationService) {
            this.clusterCommunicator = null;
        }
    }

    protected void bindClusterService(ClusterService clusterService) {
        this.clusterService = clusterService;
    }

    protected void unbindClusterService(ClusterService clusterService) {
        if (this.clusterService == clusterService) {
            this.clusterService = null;
        }
    }

    private class InternalStatisticRepresentation {
        private final AtomicInteger counter = new AtomicInteger(0);
        private final Set<FlowEntry> rules = new HashSet<FlowEntry>();

        private InternalStatisticRepresentation() {
        }

        public void prepare() {
            this.counter.incrementAndGet();
        }

        public synchronized boolean remove(FlowRule rule) {
            this.rules.remove(rule);
            return this.counter.decrementAndGet() == 0;
        }

        public synchronized boolean submit(FlowEntry rule) {
            if (this.rules.contains(rule)) {
                this.rules.remove(rule);
            }
            this.rules.add(rule);
            if (this.counter.get() == 0) {
                return true;
            }
            return this.counter.decrementAndGet() == 0;
        }

        public synchronized Set<FlowEntry> get() {
            this.counter.set(this.rules.size());
            return Sets.newHashSet(this.rules);
        }
    }
}

