/*
 * Decompiled with CFR 0.152.
 */
package org.onosproject.store.flowext.impl;

import com.google.common.base.Function;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.net.DeviceId;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.flow.CompletedBatchOperation;
import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.flow.FlowRuleBatchEntry;
import org.onosproject.net.flow.FlowRuleBatchEvent;
import org.onosproject.net.flow.FlowRuleBatchRequest;
import org.onosproject.net.flowext.DefaultFlowRuleExt;
import org.onosproject.net.flowext.DownStreamFlowEntry;
import org.onosproject.net.flowext.FlowExtCompletedOperation;
import org.onosproject.net.flowext.FlowRuleExtRouter;
import org.onosproject.net.flowext.FlowRuleExtRouterListener;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.flow.ReplicaInfo;
import org.onosproject.store.flow.ReplicaInfoEventListener;
import org.onosproject.store.flow.ReplicaInfoService;
import org.onosproject.store.flowext.impl.FlowExtRouterMessageSubjects;
import org.onosproject.store.serializers.DecodeTo;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.serializers.StoreSerializer;
import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Component(immediate=true, enabled=false)
@Service
public class DefaultFlowRuleExtRouter
implements FlowRuleExtRouter {
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected ReplicaInfoService replicaInfoManager;
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected ClusterCommunicationService clusterCommunicator;
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected ClusterService clusterService;
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected DeviceService deviceService;
    private int pendingFutureTimeoutMinutes = 5;
    protected Set<FlowRuleExtRouterListener> routerListener = new HashSet<FlowRuleExtRouterListener>();
    private Cache<Long, SettableFuture<FlowExtCompletedOperation>> pendingExtendFutures = CacheBuilder.newBuilder().expireAfterWrite((long)this.pendingFutureTimeoutMinutes, TimeUnit.MINUTES).build();
    private final ExecutorService futureListeners = Executors.newCachedThreadPool(Tools.groupedThreads((String)"onos/flow", (String)"store-peer-responders"));
    private ExecutorService messageHandlingExecutor;
    protected static final StoreSerializer SERIALIZER = new KryoSerializer(){

        protected void setupKryoPool() {
            this.serializerPool = KryoNamespace.newBuilder().register(DistributedStoreSerializers.STORE_COMMON).nextId(310).register(new Class[]{FlowExtCompletedOperation.class}).register(new Class[]{FlowRuleBatchRequest.class}).register(new Class[]{DownStreamFlowEntry.class}).register(new Class[]{DefaultFlowRuleExt.class}).build();
        }
    };
    private ReplicaInfoEventListener replicaInfoEventListener;

    @Activate
    public void activate() {
        this.messageHandlingExecutor = Executors.newFixedThreadPool(4, Tools.groupedThreads((String)"onos/flow", (String)"message-handlers"));
        this.clusterCommunicator.addSubscriber(FlowExtRouterMessageSubjects.APPLY_EXTEND_FLOWS, new ClusterMessageHandler(){

            public void handle(final ClusterMessage message) {
                FlowRuleBatchRequest operation = (FlowRuleBatchRequest)SERIALIZER.decode(message.payload());
                DefaultFlowRuleExtRouter.this.log.info("received batch request {}", (Object)operation);
                final ListenableFuture f = DefaultFlowRuleExtRouter.this.applyBatchInternal(operation);
                f.addListener(new Runnable(){

                    @Override
                    public void run() {
                        FlowExtCompletedOperation result = (FlowExtCompletedOperation)Futures.getUnchecked((Future)f);
                        try {
                            message.respond(SERIALIZER.encode((Object)result));
                        }
                        catch (IOException e) {
                            DefaultFlowRuleExtRouter.this.log.error("Failed to respond back", (Throwable)e);
                        }
                    }
                }, (Executor)DefaultFlowRuleExtRouter.this.futureListeners);
            }
        }, this.messageHandlingExecutor);
        this.replicaInfoManager.addListener(this.replicaInfoEventListener);
        this.log.info("Started");
    }

    @Deactivate
    public void deactivate() {
        this.clusterCommunicator.removeSubscriber(FlowExtRouterMessageSubjects.APPLY_EXTEND_FLOWS);
        this.messageHandlingExecutor.shutdown();
        this.replicaInfoManager.removeListener(this.replicaInfoEventListener);
        this.log.info("Stopped");
    }

    public Future<FlowExtCompletedOperation> applySubBatch(FlowRuleBatchRequest batchOperation) {
        if (batchOperation.ops().isEmpty()) {
            return Futures.immediateFuture((Object)new FlowExtCompletedOperation(batchOperation.batchId(), true, Collections.emptySet()));
        }
        DeviceId deviceId = this.getBatchDeviceId(batchOperation.ops());
        if (deviceId == null) {
            this.log.error("This Batch exists more than two deviceId");
            return null;
        }
        ReplicaInfo replicaInfo = this.replicaInfoManager.getReplicaInfoFor(deviceId);
        if (((NodeId)replicaInfo.master().get()).equals((Object)this.clusterService.getLocalNode().id())) {
            return this.applyBatchInternal(batchOperation);
        }
        this.log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}", replicaInfo.master().orNull(), (Object)deviceId);
        ClusterMessage message = new ClusterMessage(this.clusterService.getLocalNode().id(), FlowExtRouterMessageSubjects.APPLY_EXTEND_FLOWS, SERIALIZER.encode((Object)batchOperation));
        try {
            ListenableFuture responseFuture = this.clusterCommunicator.sendAndReceive(message, (NodeId)replicaInfo.master().get());
            return Futures.transform((ListenableFuture)responseFuture, (Function)new DecodeTo(SERIALIZER));
        }
        catch (IOException e) {
            return Futures.immediateFailedFuture((Throwable)e);
        }
    }

    private ListenableFuture<FlowExtCompletedOperation> applyBatchInternal(FlowRuleBatchRequest batchOperation) {
        SettableFuture r = SettableFuture.create();
        this.pendingExtendFutures.put((Object)batchOperation.batchId(), (Object)r);
        this.notify(batchOperation);
        return r;
    }

    private DeviceId getBatchDeviceId(Collection<FlowRuleBatchEntry> batchOperation) {
        Iterator<FlowRuleBatchEntry> head = batchOperation.iterator();
        FlowRuleBatchEntry headOp = head.next();
        boolean sameId = true;
        for (FlowRuleBatchEntry operation : batchOperation) {
            if (((FlowRule)operation.target()).deviceId() == ((FlowRule)headOp.target()).deviceId()) continue;
            this.log.warn("this batch does not apply on one device Id ");
            sameId = false;
            break;
        }
        return sameId ? ((FlowRule)headOp.target()).deviceId() : null;
    }

    public void notify(FlowRuleBatchRequest request) {
        for (FlowRuleExtRouterListener listener : this.routerListener) {
            listener.notify(FlowRuleBatchEvent.requested((FlowRuleBatchRequest)request, null));
        }
    }

    public void batchOperationComplete(FlowRuleBatchEvent event) {
        Long batchId = ((FlowRuleBatchRequest)event.subject()).batchId();
        SettableFuture future = (SettableFuture)this.pendingExtendFutures.getIfPresent((Object)batchId);
        if (future != null) {
            FlowRuleBatchRequest request = (FlowRuleBatchRequest)event.subject();
            CompletedBatchOperation result = event.result();
            FlowExtCompletedOperation completed = new FlowExtCompletedOperation(request.batchId(), result.isSuccess(), result.failedItems());
            future.set((Object)completed);
            this.pendingExtendFutures.invalidate((Object)batchId);
        }
    }

    public void addListener(FlowRuleExtRouterListener listener) {
        this.routerListener.add(listener);
    }

    public void removeListener(FlowRuleExtRouterListener listener) {
        this.routerListener.remove(listener);
    }

    protected void bindReplicaInfoManager(ReplicaInfoService replicaInfoService) {
        this.replicaInfoManager = replicaInfoService;
    }

    protected void unbindReplicaInfoManager(ReplicaInfoService replicaInfoService) {
        if (this.replicaInfoManager == replicaInfoService) {
            this.replicaInfoManager = null;
        }
    }

    protected void bindClusterCommunicator(ClusterCommunicationService clusterCommunicationService) {
        this.clusterCommunicator = clusterCommunicationService;
    }

    protected void unbindClusterCommunicator(ClusterCommunicationService clusterCommunicationService) {
        if (this.clusterCommunicator == clusterCommunicationService) {
            this.clusterCommunicator = null;
        }
    }

    protected void bindClusterService(ClusterService clusterService) {
        this.clusterService = clusterService;
    }

    protected void unbindClusterService(ClusterService clusterService) {
        if (this.clusterService == clusterService) {
            this.clusterService = null;
        }
    }

    protected void bindDeviceService(DeviceService deviceService) {
        this.deviceService = deviceService;
    }

    protected void unbindDeviceService(DeviceService deviceService) {
        if (this.deviceService == deviceService) {
            this.deviceService = null;
        }
    }
}

