/*
 * Decompiled with CFR 0.152.
 */
package org.onosproject.store.statistic.impl;

import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import java.util.Collections;
import java.util.Dictionary;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Modified;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.DeviceId;
import org.onosproject.net.ElementId;
import org.onosproject.net.PortNumber;
import org.onosproject.net.flow.FlowEntry;
import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.flow.instructions.Instruction;
import org.onosproject.net.flow.instructions.Instructions;
import org.onosproject.net.statistic.FlowStatisticStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.StoreSerializer;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Component(immediate=true)
@Service
public class DistributedFlowStatisticStore
implements FlowStatisticStore {
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    private static final String FORMAT = "Setting: messageHandlerThreadPoolSize={}";
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected MastershipService mastershipService;
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected ClusterCommunicationService clusterCommunicator;
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected ClusterService clusterService;
    private Map<ConnectPoint, Set<FlowEntry>> previous = new ConcurrentHashMap<ConnectPoint, Set<FlowEntry>>();
    private Map<ConnectPoint, Set<FlowEntry>> current = new ConcurrentHashMap<ConnectPoint, Set<FlowEntry>>();
    public static final MessageSubject GET_CURRENT = new MessageSubject("peer-return-current");
    public static final MessageSubject GET_PREVIOUS = new MessageSubject("peer-return-previous");
    protected static final StoreSerializer SERIALIZER = StoreSerializer.using((KryoNamespace)KryoNamespaces.API);
    private NodeId local;
    private ExecutorService messageHandlingExecutor;
    private static final int DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
    @Property(name="messageHandlerThreadPoolSize", intValue={4}, label="Size of thread pool to assign message handler")
    private static int messageHandlerThreadPoolSize = 4;
    private static final long STATISTIC_STORE_TIMEOUT_MILLIS = 3000L;

    @Activate
    public void activate() {
        this.local = this.clusterService.getLocalNode().id();
        this.messageHandlingExecutor = Executors.newFixedThreadPool(messageHandlerThreadPoolSize, Tools.groupedThreads((String)"onos/store/statistic", (String)"message-handlers", (Logger)this.log));
        this.clusterCommunicator.addSubscriber(GET_CURRENT, arg_0 -> ((StoreSerializer)SERIALIZER).decode(arg_0), this::getCurrentStatisticInternal, arg_0 -> ((StoreSerializer)SERIALIZER).encode(arg_0), (Executor)this.messageHandlingExecutor);
        this.clusterCommunicator.addSubscriber(GET_CURRENT, arg_0 -> ((StoreSerializer)SERIALIZER).decode(arg_0), this::getPreviousStatisticInternal, arg_0 -> ((StoreSerializer)SERIALIZER).encode(arg_0), (Executor)this.messageHandlingExecutor);
        this.log.info("Started");
    }

    @Deactivate
    public void deactivate() {
        this.clusterCommunicator.removeSubscriber(GET_PREVIOUS);
        this.clusterCommunicator.removeSubscriber(GET_CURRENT);
        this.messageHandlingExecutor.shutdown();
        this.log.info("Stopped");
    }

    @Modified
    public void modified(ComponentContext context) {
        int newMessageHandlerThreadPoolSize;
        Dictionary properties = context != null ? context.getProperties() : new Properties();
        try {
            String s = Tools.get((Dictionary)properties, (String)"messageHandlerThreadPoolSize");
            newMessageHandlerThreadPoolSize = Strings.isNullOrEmpty((String)s) ? messageHandlerThreadPoolSize : Integer.parseInt(s.trim());
        }
        catch (NumberFormatException e) {
            this.log.warn(e.getMessage());
            newMessageHandlerThreadPoolSize = messageHandlerThreadPoolSize;
        }
        if (newMessageHandlerThreadPoolSize != messageHandlerThreadPoolSize) {
            this.setMessageHandlerThreadPoolSize(newMessageHandlerThreadPoolSize);
            this.restartMessageHandlerThreadPool();
        }
        this.log.info(FORMAT, (Object)messageHandlerThreadPoolSize);
    }

    public synchronized void removeFlowStatistic(FlowRule rule) {
        ConnectPoint cp = this.buildConnectPoint(rule);
        if (cp == null) {
            return;
        }
        this.current.computeIfPresent(cp, (c, e) -> {
            e.remove(rule);
            return e;
        });
        this.previous.computeIfPresent(cp, (c, e) -> {
            e.remove(rule);
            return e;
        });
    }

    public synchronized void addFlowStatistic(FlowEntry rule) {
        ConnectPoint cp = this.buildConnectPoint((FlowRule)rule);
        if (cp == null) {
            return;
        }
        this.current.putIfAbsent(cp, new HashSet());
        this.current.computeIfPresent(cp, (c, e) -> {
            e.add(rule);
            return e;
        });
        this.previous.computeIfPresent(cp, (c, e) -> {
            e.remove(rule);
            return e;
        });
    }

    public synchronized void updateFlowStatistic(FlowEntry rule) {
        ConnectPoint cp = this.buildConnectPoint((FlowRule)rule);
        if (cp == null) {
            return;
        }
        Set<FlowEntry> curr = this.current.get(cp);
        if (curr == null) {
            this.addFlowStatistic(rule);
        } else {
            Optional<FlowEntry> f = curr.stream().filter(c -> rule.equals(c)).findAny();
            if (f.isPresent() && rule.bytes() < f.get().bytes()) {
                this.log.debug("DistributedFlowStatisticStore:updateFlowStatistic(): Invalid Flow Update! Will be removed!! curr flowId=" + Long.toHexString(rule.id().value()) + ", prev flowId=" + Long.toHexString(f.get().id().value()) + ", curr bytes=" + rule.bytes() + ", prev bytes=" + f.get().bytes() + ", curr life=" + rule.life() + ", prev life=" + f.get().life() + ", curr lastSeen=" + rule.lastSeen() + ", prev lastSeen=" + f.get().lastSeen());
                this.removeFlowStatistic((FlowRule)rule);
                return;
            }
            Set<FlowEntry> prev = this.previous.get(cp);
            if (prev == null) {
                prev = new HashSet<FlowEntry>();
                this.previous.put(cp, prev);
            }
            if (f.isPresent()) {
                prev.remove(rule);
                if (!prev.add(f.get())) {
                    this.log.debug("DistributedFlowStatisticStore:updateFlowStatistic(): flowId={}, add failed into previous.", (Object)Long.toHexString(rule.id().value()));
                }
            }
            curr.remove(rule);
            if (!curr.add(rule)) {
                this.log.debug("DistributedFlowStatisticStore:updateFlowStatistic(): flowId={}, add failed into current.", (Object)Long.toHexString(rule.id().value()));
            }
        }
    }

    public Set<FlowEntry> getCurrentFlowStatistic(ConnectPoint connectPoint) {
        DeviceId deviceId = connectPoint.deviceId();
        NodeId master = this.mastershipService.getMasterFor(deviceId);
        if (master == null) {
            this.log.warn("No master for {}", (Object)deviceId);
            return Collections.emptySet();
        }
        if (Objects.equal((Object)this.local, (Object)master)) {
            return this.getCurrentStatisticInternal(connectPoint);
        }
        return (Set)Tools.futureGetOrElse((Future)this.clusterCommunicator.sendAndReceive((Object)connectPoint, GET_CURRENT, arg_0 -> ((StoreSerializer)SERIALIZER).encode(arg_0), arg_0 -> ((StoreSerializer)SERIALIZER).decode(arg_0), master), (long)3000L, (TimeUnit)TimeUnit.MILLISECONDS, Collections.emptySet());
    }

    private synchronized Set<FlowEntry> getCurrentStatisticInternal(ConnectPoint connectPoint) {
        return this.current.get(connectPoint);
    }

    public Set<FlowEntry> getPreviousFlowStatistic(ConnectPoint connectPoint) {
        DeviceId deviceId = connectPoint.deviceId();
        NodeId master = this.mastershipService.getMasterFor(deviceId);
        if (master == null) {
            this.log.warn("No master for {}", (Object)deviceId);
            return Collections.emptySet();
        }
        if (Objects.equal((Object)this.local, (Object)master)) {
            return this.getPreviousStatisticInternal(connectPoint);
        }
        return (Set)Tools.futureGetOrElse((Future)this.clusterCommunicator.sendAndReceive((Object)connectPoint, GET_PREVIOUS, arg_0 -> ((StoreSerializer)SERIALIZER).encode(arg_0), arg_0 -> ((StoreSerializer)SERIALIZER).decode(arg_0), master), (long)3000L, (TimeUnit)TimeUnit.MILLISECONDS, Collections.emptySet());
    }

    private synchronized Set<FlowEntry> getPreviousStatisticInternal(ConnectPoint connectPoint) {
        return this.previous.get(connectPoint);
    }

    private ConnectPoint buildConnectPoint(FlowRule rule) {
        PortNumber port = this.getOutput(rule);
        if (port == null) {
            return null;
        }
        ConnectPoint cp = new ConnectPoint((ElementId)rule.deviceId(), port);
        return cp;
    }

    private PortNumber getOutput(FlowRule rule) {
        for (Instruction i : rule.treatment().allInstructions()) {
            if (i.type() != Instruction.Type.OUTPUT) continue;
            Instructions.OutputInstruction out = (Instructions.OutputInstruction)i;
            return out.port();
        }
        return null;
    }

    private void setMessageHandlerThreadPoolSize(int poolSize) {
        Preconditions.checkArgument((poolSize >= 0 ? 1 : 0) != 0, (Object)"Message handler pool size must be 0 or more");
        messageHandlerThreadPoolSize = poolSize;
    }

    private void restartMessageHandlerThreadPool() {
        ExecutorService prevExecutor = this.messageHandlingExecutor;
        this.messageHandlingExecutor = Executors.newFixedThreadPool(this.getMessageHandlerThreadPoolSize());
        prevExecutor.shutdown();
    }

    private int getMessageHandlerThreadPoolSize() {
        return messageHandlerThreadPoolSize;
    }

    protected void bindMastershipService(MastershipService mastershipService) {
        this.mastershipService = mastershipService;
    }

    protected void unbindMastershipService(MastershipService mastershipService) {
        if (this.mastershipService == mastershipService) {
            this.mastershipService = null;
        }
    }

    protected void bindClusterCommunicator(ClusterCommunicationService clusterCommunicationService) {
        this.clusterCommunicator = clusterCommunicationService;
    }

    protected void unbindClusterCommunicator(ClusterCommunicationService clusterCommunicationService) {
        if (this.clusterCommunicator == clusterCommunicationService) {
            this.clusterCommunicator = null;
        }
    }

    protected void bindClusterService(ClusterService clusterService) {
        this.clusterService = clusterService;
    }

    protected void unbindClusterService(ClusterService clusterService) {
        if (this.clusterService == clusterService) {
            this.clusterService = null;
        }
    }
}

