/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.server.resp;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.CharsetUtil;
import java.lang.invoke.MethodHandles;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import org.infinispan.commons.logging.LogFactory;
import org.infinispan.commons.marshall.WrappedByteArray;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
import org.infinispan.notifications.cachelistener.event.CacheEntryEvent;
import org.infinispan.server.resp.Resp3Handler;
import org.infinispan.server.resp.RespRequestHandler;
import org.infinispan.server.resp.RespServer;
import org.infinispan.server.resp.logging.Log;
import org.infinispan.util.concurrent.AggregateCompletionStage;
import org.infinispan.util.concurrent.CompletionStages;

public class SubscriberHandler
extends RespRequestHandler {
    private static final Log log = (Log)LogFactory.getLog(MethodHandles.lookup().lookupClass(), Log.class);
    public static final byte[] PREFIX_CHANNEL_BYTES = new byte[]{-114, 16, 78, -3, 127};
    private final Resp3Handler handler;
    private final RespServer respServer;
    Map<WrappedByteArray, PubSubListener> specificChannelSubscribers = new HashMap<WrappedByteArray, PubSubListener>();

    public SubscriberHandler(RespServer respServer, Resp3Handler prevHandler) {
        this.respServer = respServer;
        this.handler = prevHandler;
    }

    public static byte[] keyToChannel(byte[] keyBytes) {
        byte[] result = new byte[keyBytes.length + PREFIX_CHANNEL_BYTES.length];
        System.arraycopy(PREFIX_CHANNEL_BYTES, 0, result, 0, PREFIX_CHANNEL_BYTES.length);
        System.arraycopy(keyBytes, 0, result, PREFIX_CHANNEL_BYTES.length, keyBytes.length);
        return result;
    }

    public static byte[] channelToKey(byte[] channelBytes) {
        return Arrays.copyOfRange(channelBytes, PREFIX_CHANNEL_BYTES.length, channelBytes.length);
    }

    @Override
    public void handleChannelDisconnect(ChannelHandlerContext ctx) {
        this.removeAllListeners();
    }

    @Override
    public CompletionStage<RespRequestHandler> handleRequest(ChannelHandlerContext ctx, String type, List<byte[]> arguments) {
        switch (type) {
            case "SUBSCRIBE": {
                AggregateCompletionStage aggregateCompletionStage = CompletionStages.aggregateCompletionStage();
                for (byte[] keyChannel : arguments) {
                    WrappedByteArray wrappedByteArray;
                    if (log.isTraceEnabled()) {
                        log.tracef("Subscriber for channel: " + CharsetUtil.UTF_8.decode(ByteBuffer.wrap(keyChannel)), new Object[0]);
                    }
                    if (this.specificChannelSubscribers.get(wrappedByteArray = new WrappedByteArray(keyChannel)) != null) continue;
                    PubSubListener pubSubListener = new PubSubListener(ctx.channel());
                    this.specificChannelSubscribers.put(wrappedByteArray, pubSubListener);
                    byte[] channel = SubscriberHandler.keyToChannel(keyChannel);
                    CompletionStage stage = this.respServer.getCache().addListenerAsync((Object)pubSubListener, (key, prevValue, prevMetadata, value, metadata, eventType) -> Arrays.equals(key, channel), null);
                    aggregateCompletionStage.dependsOn(this.handleStageListenerError(stage, keyChannel, true));
                }
                return this.sendSubscriptions(ctx, aggregateCompletionStage.freeze(), arguments, true);
            }
            case "UNSUBSCRIBE": {
                AggregateCompletionStage aggregateCompletionStage = CompletionStages.aggregateCompletionStage();
                if (arguments.size() == 0) {
                    return this.unsubscribeAll(ctx);
                }
                for (byte[] keyChannel : arguments) {
                    WrappedByteArray wrappedByteArray = new WrappedByteArray(keyChannel);
                    PubSubListener listener = this.specificChannelSubscribers.remove(wrappedByteArray);
                    if (listener == null) continue;
                    aggregateCompletionStage.dependsOn(this.handleStageListenerError(this.respServer.getCache().removeListenerAsync((Object)listener), keyChannel, false));
                }
                return this.sendSubscriptions(ctx, aggregateCompletionStage.freeze(), arguments, false);
            }
            case "PING": {
                this.handler.handleRequest(ctx, type, arguments);
                break;
            }
            case "RESET": 
            case "QUIT": {
                this.removeAllListeners();
                return this.handler.handleRequest(ctx, type, arguments);
            }
            case "PSUBSCRIBE": 
            case "PUNSUBSCRIBE": {
                ctx.writeAndFlush((Object)RespRequestHandler.stringToByteBuf("-ERR not implemented yet\r\n", ctx.alloc()));
                break;
            }
            default: {
                return super.handleRequest(ctx, type, arguments);
            }
        }
        return this.myStage;
    }

    private CompletionStage<Void> handleStageListenerError(CompletionStage<Void> stage, byte[] keyChannel, boolean subscribeOrUnsubscribe) {
        return stage.whenComplete((__, t) -> {
            if (t != null) {
                if (subscribeOrUnsubscribe) {
                    log.exceptionWhileRegisteringListener((Throwable)t, CharsetUtil.UTF_8.decode(ByteBuffer.wrap(keyChannel)));
                } else {
                    log.exceptionWhileRemovingListener((Throwable)t, CharsetUtil.UTF_8.decode(ByteBuffer.wrap(keyChannel)));
                }
            }
        });
    }

    private void removeAllListeners() {
        Iterator<Map.Entry<WrappedByteArray, PubSubListener>> iterator = this.specificChannelSubscribers.entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry<WrappedByteArray, PubSubListener> entry = iterator.next();
            PubSubListener listener = entry.getValue();
            this.respServer.getCache().removeListenerAsync((Object)listener);
            iterator.remove();
        }
    }

    private CompletionStage<RespRequestHandler> unsubscribeAll(ChannelHandlerContext ctx) {
        AggregateCompletionStage aggregateCompletionStage = CompletionStages.aggregateCompletionStage();
        ArrayList<byte[]> channels = new ArrayList<byte[]>(this.specificChannelSubscribers.size());
        Iterator<Map.Entry<WrappedByteArray, PubSubListener>> iterator = this.specificChannelSubscribers.entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry<WrappedByteArray, PubSubListener> entry = iterator.next();
            PubSubListener listener = entry.getValue();
            CompletionStage stage = this.respServer.getCache().removeListenerAsync((Object)listener);
            byte[] keyChannel = entry.getKey().getBytes();
            channels.add(keyChannel);
            aggregateCompletionStage.dependsOn(this.handleStageListenerError(stage, keyChannel, false));
            iterator.remove();
        }
        return this.sendSubscriptions(ctx, aggregateCompletionStage.freeze(), channels, false);
    }

    private CompletionStage<RespRequestHandler> sendSubscriptions(ChannelHandlerContext ctx, CompletionStage<Void> stageToWaitFor, Collection<byte[]> keyChannels, boolean subscribeOrUnsubscribe) {
        return this.stageToReturn(stageToWaitFor, ctx, (? super E __, Throwable t) -> {
            if (t != null) {
                if (subscribeOrUnsubscribe) {
                    ctx.writeAndFlush((Object)"-ERR Failure adding client listener");
                } else {
                    ctx.writeAndFlush((Object)"-ERR Failure unsubscribing client listener");
                }
                return;
            }
            for (byte[] keyChannel : keyChannels) {
                int bufferCap = subscribeOrUnsubscribe ? 20 : 22;
                String initialCharSeq = subscribeOrUnsubscribe ? "*2\r\n$9\r\nsubscribe\r\n$" : "*2\r\n$11\r\nunsubscribe\r\n$";
                ByteBuf subscribeBuffer = ctx.alloc().buffer(bufferCap + (int)Math.log10(keyChannel.length) + 1 + keyChannel.length + 2);
                subscribeBuffer.writeCharSequence((CharSequence)(initialCharSeq + keyChannel.length + "\r\n"), CharsetUtil.UTF_8);
                subscribeBuffer.writeBytes(keyChannel);
                subscribeBuffer.writeCharSequence((CharSequence)"\r\n", CharsetUtil.UTF_8);
                ctx.writeAndFlush((Object)subscribeBuffer);
            }
        });
    }

    @Listener(clustered=true)
    static class PubSubListener {
        private final Channel channel;

        PubSubListener(Channel channel) {
            this.channel = channel;
        }

        @CacheEntryCreated
        @CacheEntryModified
        public CompletionStage<Void> onEvent(CacheEntryEvent<byte[], byte[]> entryEvent) {
            byte[] key = SubscriberHandler.channelToKey((byte[])entryEvent.getKey());
            byte[] value = (byte[])entryEvent.getValue();
            if (key.length > 0 && value != null && value.length > 0) {
                ByteBuf byteBuf = this.channel.alloc().buffer(14 + (int)Math.log10(key.length) + 1 + 2 + key.length + 2 + 1 + (int)Math.log10(value.length) + 1 + 2 + value.length + 2);
                byteBuf.writeCharSequence((CharSequence)("*3\r\n$7\r\nmessage\r\n$" + key.length + "\r\n"), CharsetUtil.UTF_8);
                byteBuf.writeBytes(key);
                byteBuf.writeCharSequence((CharSequence)("\r\n$" + value.length + "\r\n"), CharsetUtil.UTF_8);
                byteBuf.writeBytes(value);
                byteBuf.writeCharSequence((CharSequence)"\r\n", CharsetUtil.UTF_8);
                this.channel.writeAndFlush((Object)byteBuf);
            }
            return CompletableFutures.completedNull();
        }
    }
}

