/*
 * Decompiled with CFR 0.152.
 */
package org.onosproject.store.primitives.impl;

import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import org.onlab.util.SharedExecutors;
import org.onosproject.store.primitives.impl.Database;
import org.onosproject.store.primitives.impl.StateMachineUpdate;
import org.onosproject.store.service.DistributedPrimitive;
import org.onosproject.store.service.DistributedQueue;
import org.onosproject.store.service.Serializer;
import org.onosproject.utils.MeteringAgent;

public class DefaultDistributedQueue<E>
implements DistributedQueue<E> {
    private final String name;
    private final Database database;
    private final Serializer serializer;
    private final Set<CompletableFuture<E>> pendingFutures = Sets.newIdentityHashSet();
    private static final String PRIMITIVE_NAME = "distributedQueue";
    private static final String SIZE = "size";
    private static final String PUSH = "push";
    private static final String POP = "pop";
    private static final String PEEK = "peek";
    private static final String ERROR_NULL_ENTRY = "Null entries are not allowed";
    private final MeteringAgent monitor;

    public DefaultDistributedQueue(String name, Database database, Serializer serializer, boolean meteringEnabled) {
        this.name = (String)Preconditions.checkNotNull((Object)name, (Object)"queue name cannot be null");
        this.database = (Database)Preconditions.checkNotNull((Object)database, (Object)"database cannot be null");
        this.serializer = (Serializer)Preconditions.checkNotNull((Object)serializer, (Object)"serializer cannot be null");
        this.monitor = new MeteringAgent(PRIMITIVE_NAME, name, meteringEnabled);
        this.database.registerConsumer(update -> SharedExecutors.getSingleThreadExecutor().execute(() -> {
            List input;
            String queueName;
            if (update.target() == StateMachineUpdate.Target.QUEUE_PUSH && (queueName = (String)(input = (List)update.input()).get(0)).equals(name)) {
                this.tryPoll();
            }
        }));
    }

    public long size() {
        MeteringAgent.Context timer = this.monitor.startTimer(SIZE);
        return (Long)Futures.getUnchecked((Future)((Object)this.database.queueSize(this.name).whenComplete((r, e) -> timer.stop(e))));
    }

    public void push(E entry) {
        Preconditions.checkNotNull(entry, (Object)ERROR_NULL_ENTRY);
        MeteringAgent.Context timer = this.monitor.startTimer(PUSH);
        Futures.getUnchecked((Future)((Object)this.database.queuePush(this.name, this.serializer.encode(entry)).whenComplete((r, e) -> timer.stop(e))));
    }

    public CompletableFuture<E> pop() {
        MeteringAgent.Context timer = this.monitor.startTimer(POP);
        return ((CompletableFuture)this.database.queuePop(this.name).whenComplete((r, e) -> timer.stop(e))).thenCompose(v -> {
            if (v != null) {
                return CompletableFuture.completedFuture(this.serializer.decode(v));
            }
            CompletableFuture newPendingFuture = new CompletableFuture();
            this.pendingFutures.add(newPendingFuture);
            return newPendingFuture;
        });
    }

    public E peek() {
        MeteringAgent.Context timer = this.monitor.startTimer(PEEK);
        return (E)Futures.getUnchecked((Future)((Object)((CompletableFuture)this.database.queuePeek(this.name).thenApply(v -> v != null ? this.serializer.decode(v) : null)).whenComplete((r, e) -> timer.stop(e))));
    }

    public String name() {
        return this.name;
    }

    public DistributedPrimitive.Type primitiveType() {
        return DistributedPrimitive.Type.QUEUE;
    }

    protected void tryPoll() {
        HashSet completedFutures = Sets.newHashSet();
        for (CompletableFuture<E> future : this.pendingFutures) {
            Object entry = Futures.getUnchecked((Future)((Object)this.database.queuePop(this.name).thenApply(v -> v != null ? this.serializer.decode(v) : null)));
            if (entry == null) break;
            future.complete(entry);
            completedFutures.add(future);
        }
        this.pendingFutures.removeAll(completedFutures);
    }
}

