/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.server.resp.commands.list.blocking;

import io.netty.channel.ChannelHandlerContext;
import java.lang.invoke.MethodHandles;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import org.infinispan.AdvancedCache;
import org.infinispan.commons.logging.LogFactory;
import org.infinispan.commons.marshall.WrappedByteArray;
import org.infinispan.encoding.DataConversion;
import org.infinispan.multimap.impl.EmbeddedMultimapListCache;
import org.infinispan.multimap.impl.ListBucket;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
import org.infinispan.notifications.cachelistener.event.CacheEntryEvent;
import org.infinispan.notifications.cachelistener.filter.CacheEventFilter;
import org.infinispan.server.resp.Resp3Handler;
import org.infinispan.server.resp.RespCommand;
import org.infinispan.server.resp.RespRequestHandler;
import org.infinispan.server.resp.RespUtil;
import org.infinispan.server.resp.commands.Resp3Command;
import org.infinispan.server.resp.commands.list.blocking.PopConfiguration;
import org.infinispan.server.resp.filter.EventListenerConverter;
import org.infinispan.server.resp.filter.EventListenerKeysFilter;
import org.infinispan.server.resp.logging.Log;
import org.infinispan.server.resp.meta.ClientMetadata;
import org.infinispan.server.resp.serialization.ResponseWriter;
import org.infinispan.server.resp.tx.TransactionContext;

public abstract class AbstractBlockingPop
extends RespCommand
implements Resp3Command {
    private static final Log log = (Log)LogFactory.getLog(MethodHandles.lookup().lookupClass(), Log.class);

    public AbstractBlockingPop(int arity, int firstKeyPos, int lastKeyPos, int steps) {
        super(arity, firstKeyPos, lastKeyPos, steps);
    }

    abstract PopConfiguration parseArguments(Resp3Handler var1, List<byte[]> var2);

    @Override
    public CompletionStage<RespRequestHandler> perform(Resp3Handler handler, ChannelHandlerContext ctx, List<byte[]> arguments) {
        EmbeddedMultimapListCache<byte[], byte[]> listMultimap = handler.getListMultimap();
        PopConfiguration configuration = this.parseArguments(handler, arguments);
        if (configuration == null) {
            return handler.myStage();
        }
        CompletionStage<Collection<byte[]>> pollStage = AbstractBlockingPop.pollAllKeys(listMultimap, configuration);
        if (TransactionContext.isInTransactionContext(ctx)) {
            return handler.stageToReturn(pollStage, ctx, ResponseWriter.ARRAY_BULK_STRING);
        }
        return handler.stageToReturn(pollStage.thenCompose(v -> v != null && !v.isEmpty() ? CompletableFuture.completedFuture(v) : this.addSubscriber(configuration, handler)), ctx, ResponseWriter.ARRAY_BULK_STRING);
    }

    private CompletableFuture<Collection<byte[]>> addSubscriber(PopConfiguration configuration, Resp3Handler handler) {
        if (log.isTraceEnabled()) {
            log.tracef("Subscriber for keys: " + String.valueOf(configuration.keys()), new Object[0]);
        }
        AdvancedCache cache = handler.typedCache(null);
        DataConversion vc = cache.getValueDataConversion();
        PubSubListener pubSubListener = new PubSubListener(handler, cache, configuration);
        EventListenerKeysFilter filter = new EventListenerKeysFilter((byte[][])configuration.keys().toArray(x$0 -> new byte[x$0][]));
        CompletionStage addListenerStage = cache.addListenerAsync((Object)pubSubListener, (CacheEventFilter)filter, new EventListenerConverter(vc));
        addListenerStage.whenComplete((ignore, t) -> {
            if (t != null) {
                pubSubListener.synchronizer.resultFuture.completeExceptionally((Throwable)t);
                return;
            }
            pubSubListener.startTimer(configuration.timeout());
            pubSubListener.synchronizer.onListenerAdded();
        });
        ClientMetadata metadata = handler.respServer().metadataRepository().client();
        metadata.incrementBlockedClients();
        metadata.recordBlockedKeys(configuration.keys().size());
        pubSubListener.getFuture().whenComplete((ignore, t) -> {
            metadata.decrementBlockedClients();
            metadata.recordBlockedKeys(-configuration.keys().size());
        });
        return pubSubListener.getFuture();
    }

    private static CompletionStage<Collection<byte[]>> pollAllKeys(EmbeddedMultimapListCache<byte[], byte[]> listMultimap, PopConfiguration configuration) {
        CompletionStage<Collection<Object>> pollStage = AbstractBlockingPop.pollKeyValue(listMultimap, configuration.key(0), configuration);
        for (int i = 1; i < configuration.keys().size(); ++i) {
            byte[] keyChannel = configuration.key(i);
            pollStage = pollStage.thenCompose(v -> v == null || v.isEmpty() ? AbstractBlockingPop.pollKeyValue(listMultimap, keyChannel, configuration) : CompletableFuture.completedFuture(v));
        }
        return pollStage;
    }

    static CompletionStage<Collection<byte[]>> pollKeyValue(EmbeddedMultimapListCache<byte[], byte[]> mmList, byte[] key, PopConfiguration configuration) {
        CompletionStage cs = configuration.isHead() ? mmList.pollFirst((Object)key, (long)configuration.count()) : mmList.pollLast((Object)key, (long)configuration.count());
        return cs.thenApply(v -> {
            if (v == null || v.isEmpty()) {
                return null;
            }
            ArrayList<byte[]> res = new ArrayList<byte[]>(1 + v.size());
            res.add(key);
            res.addAll((Collection<byte[]>)v);
            return res;
        });
    }

    @Listener(clustered=true)
    public static class PubSubListener {
        private final AdvancedCache<byte[], Object> cache;
        private volatile ScheduledFuture<?> scheduledTimer;
        private final Resp3Handler handler;
        private final PollListenerSynchronizer synchronizer;

        private PubSubListener(Resp3Handler handler, AdvancedCache<byte[], Object> cache, PopConfiguration configuration) {
            this.cache = cache;
            this.handler = handler;
            this.synchronizer = new PollListenerSynchronizer(handler.getListMultimap(), configuration);
            this.synchronizer.resultFuture.whenComplete((ignore_v, ignore_t) -> {
                this.deleteTimer();
                cache.removeListenerAsync((Object)this);
            });
        }

        public CompletableFuture<Collection<byte[]>> getFuture() {
            return this.synchronizer.resultFuture;
        }

        private void startTimer(long timeout) {
            this.deleteTimer();
            this.scheduledTimer = timeout > 0L ? this.handler.getScheduler().schedule(() -> {
                this.cache.removeListenerAsync((Object)this);
                this.synchronizer.resultFuture.complete(null);
            }, timeout, TimeUnit.MILLISECONDS) : null;
        }

        private void deleteTimer() {
            if (this.scheduledTimer != null) {
                this.scheduledTimer.cancel(true);
            }
            this.scheduledTimer = null;
        }

        @CacheEntryCreated
        @CacheEntryModified
        public void onEvent(CacheEntryEvent<Object, Object> entryEvent) {
            try {
                if (entryEvent.getValue() instanceof ListBucket) {
                    byte[] key = this.unwrapKey(entryEvent.getKey());
                    this.synchronizer.onEvent(key);
                }
            }
            catch (Exception ex) {
                this.synchronizer.resultFuture.completeExceptionally(ex);
            }
        }

        private byte[] unwrapKey(Object key) {
            return key instanceof WrappedByteArray ? ((WrappedByteArray)key).getBytes() : (byte[])key;
        }
    }

    public static class PollListenerSynchronizer {
        private final ArrayDeque<Object> keyQueue = new ArrayDeque();
        private final CompletableFuture<Collection<byte[]>> resultFuture = new CompletableFuture();
        private final EmbeddedMultimapListCache<byte[], byte[]> multimapList;
        private final BiConsumer<? super Collection<byte[]>, ? super Throwable> whenCompleteConsumer;
        private volatile boolean canPollJustEventKey;
        private final PopConfiguration configuration;

        private PollListenerSynchronizer(EmbeddedMultimapListCache<byte[], byte[]> multimapList, PopConfiguration configuration) {
            this.multimapList = multimapList;
            this.configuration = configuration;
            this.whenCompleteConsumer = (v, t) -> {
                if (t != null) {
                    if (!RespUtil.isWrongTypeError(t)) {
                        this.resultFuture.completeExceptionally((Throwable)t);
                    }
                } else if (v != null && !v.isEmpty()) {
                    this.resultFuture.complete((Collection<byte[]>)v);
                } else {
                    Object key;
                    PollListenerSynchronizer pollListenerSynchronizer = this;
                    synchronized (pollListenerSynchronizer) {
                        if (this.keyQueue.poll() == this) {
                            this.canPollJustEventKey = true;
                        }
                        key = this.keyQueue.peek();
                    }
                    if (key != null) {
                        this.runPoll(key);
                    }
                }
            };
        }

        private void runPoll(Object key) {
            if (this.canPollJustEventKey && key != this) {
                AbstractBlockingPop.pollKeyValue(this.multimapList, (byte[])key, this.configuration).whenComplete(this.whenCompleteConsumer);
            } else {
                AbstractBlockingPop.pollAllKeys(this.multimapList, this.configuration).whenComplete(this.whenCompleteConsumer);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void onListenerAdded() {
            boolean emptyQueue;
            PollListenerSynchronizer pollListenerSynchronizer = this;
            synchronized (pollListenerSynchronizer) {
                emptyQueue = this.keyQueue.isEmpty();
                this.keyQueue.offer(this);
            }
            if (emptyQueue) {
                AbstractBlockingPop.pollAllKeys(this.multimapList, this.configuration).whenComplete(this.whenCompleteConsumer);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void onEvent(byte[] key) {
            boolean emptyQueue;
            PollListenerSynchronizer pollListenerSynchronizer = this;
            synchronized (pollListenerSynchronizer) {
                emptyQueue = this.keyQueue.isEmpty();
                this.keyQueue.offer(key);
            }
            if (emptyQueue) {
                this.runPoll(key);
            }
        }
    }
}

