/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.server.hotrod;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.infinispan.AdvancedCache;
import org.infinispan.Cache;
import org.infinispan.commons.dataconversion.MediaType;
import org.infinispan.commons.dataconversion.Transcoder;
import org.infinispan.commons.dataconversion.TranscoderMarshallerAdapter;
import org.infinispan.commons.logging.LogFactory;
import org.infinispan.commons.marshall.Marshaller;
import org.infinispan.commons.marshall.WrappedByteArray;
import org.infinispan.commons.util.BloomFilter;
import org.infinispan.commons.util.Util;
import org.infinispan.container.versioning.NumericVersion;
import org.infinispan.filter.KeyValueFilterConverter;
import org.infinispan.marshall.core.EncoderRegistry;
import org.infinispan.metadata.Metadata;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryExpired;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved;
import org.infinispan.notifications.cachelistener.event.CacheEntryCreatedEvent;
import org.infinispan.notifications.cachelistener.event.CacheEntryEvent;
import org.infinispan.notifications.cachelistener.event.CacheEntryModifiedEvent;
import org.infinispan.notifications.cachelistener.event.CacheEntryRemovedEvent;
import org.infinispan.notifications.cachelistener.event.Event;
import org.infinispan.notifications.cachelistener.filter.CacheEventConverter;
import org.infinispan.notifications.cachelistener.filter.CacheEventConverterFactory;
import org.infinispan.notifications.cachelistener.filter.CacheEventFilter;
import org.infinispan.notifications.cachelistener.filter.CacheEventFilterConverter;
import org.infinispan.notifications.cachelistener.filter.CacheEventFilterConverterFactory;
import org.infinispan.notifications.cachelistener.filter.CacheEventFilterFactory;
import org.infinispan.notifications.cachelistener.filter.KeyValueFilterConverterAsCacheEventFilterConverter;
import org.infinispan.server.hotrod.ClientEventType;
import org.infinispan.server.hotrod.Events;
import org.infinispan.server.hotrod.HotRodHeader;
import org.infinispan.server.hotrod.HotRodOperation;
import org.infinispan.server.hotrod.HotRodServer;
import org.infinispan.server.hotrod.VersionedEncoder;
import org.infinispan.server.hotrod.logging.Log;
import org.infinispan.util.KeyValuePair;
import org.infinispan.util.concurrent.CompletableFutures;

class ClientListenerRegistry {
    private final EncoderRegistry encoderRegistry;
    private final Executor nonBlockingExecutor;
    private static final Log log = (Log)LogFactory.getLog(ClientListenerRegistry.class, Log.class);
    private final ConcurrentMap<WrappedByteArray, Object> eventSenders = new ConcurrentHashMap<WrappedByteArray, Object>();
    private final ConcurrentMap<String, CacheEventFilterFactory> cacheEventFilterFactories = new ConcurrentHashMap<String, CacheEventFilterFactory>(4, 0.9f, 16);
    private final ConcurrentMap<String, CacheEventConverterFactory> cacheEventConverterFactories = new ConcurrentHashMap<String, CacheEventConverterFactory>(4, 0.9f, 16);
    private final ConcurrentMap<String, CacheEventFilterConverterFactory> cacheEventFilterConverterFactories = new ConcurrentHashMap<String, CacheEventFilterConverterFactory>(4, 0.9f, 16);

    ClientListenerRegistry(EncoderRegistry encoderRegistry, Executor nonBlockingExecutor) {
        this.encoderRegistry = encoderRegistry;
        this.nonBlockingExecutor = nonBlockingExecutor;
    }

    void setEventMarshaller(Optional<Marshaller> eventMarshaller) {
        eventMarshaller.ifPresent(m -> {
            TranscoderMarshallerAdapter adapter = new TranscoderMarshallerAdapter(m);
            if (this.encoderRegistry.isConversionSupported(MediaType.APPLICATION_OBJECT, m.mediaType())) {
                log.skippingMarshallerWrapping(m.mediaType().toString());
            } else {
                this.encoderRegistry.registerTranscoder((Transcoder)adapter);
            }
        });
    }

    void addCacheEventFilterFactory(String name, CacheEventFilterFactory factory) {
        if (factory instanceof CacheEventConverterFactory) {
            throw log.illegalFilterConverterEventFactory(name);
        }
        this.cacheEventFilterFactories.put(name, factory);
    }

    void removeCacheEventFilterFactory(String name) {
        this.cacheEventFilterFactories.remove(name);
    }

    void addCacheEventConverterFactory(String name, CacheEventConverterFactory factory) {
        if (factory instanceof CacheEventFilterFactory) {
            throw log.illegalFilterConverterEventFactory(name);
        }
        this.cacheEventConverterFactories.put(name, factory);
    }

    void removeCacheEventConverterFactory(String name) {
        this.cacheEventConverterFactories.remove(name);
    }

    void addCacheEventFilterConverterFactory(String name, CacheEventFilterConverterFactory factory) {
        this.cacheEventFilterConverterFactories.put(name, factory);
    }

    void removeCacheEventFilterConverterFactory(String name) {
        this.cacheEventFilterConverterFactories.remove(name);
    }

    CompletionStage<Void> addClientListener(Channel ch, HotRodHeader h, byte[] listenerId, AdvancedCache<byte[], byte[]> cache, boolean includeState, String filterFactory, List<byte[]> binaryFilterParams, String converterFactory, List<byte[]> binaryConverterParams, boolean useRawData, int listenerInterests, BloomFilter<byte[]> bloomFilter) {
        CacheEventConverter<byte[], byte[], byte[]> converter;
        CacheEventFilter<byte[], byte[]> filter;
        ClientEventType eventType;
        if (bloomFilter != null) {
            assert (filterFactory == null || filterFactory.isEmpty());
            assert (converterFactory == null || converterFactory.isEmpty());
            assert (!includeState);
            eventType = ClientEventType.createType(false, useRawData, h.version);
            filter = null;
            converter = new CacheEventConverter<byte[], byte[], byte[]>((KeyValueFilterConverter)HotRodServer.ToEmptyBytesKeyValueFilterConverter.INSTANCE);
        } else {
            boolean hasFilter = filterFactory != null && !filterFactory.isEmpty();
            boolean hasConverter = converterFactory != null && !converterFactory.isEmpty();
            eventType = ClientEventType.createType(hasConverter, useRawData, h.version);
            if (hasFilter) {
                if (hasConverter) {
                    if (filterFactory.equals(converterFactory)) {
                        List<byte[]> binaryParams = binaryFilterParams.isEmpty() ? binaryConverterParams : binaryFilterParams;
                        CacheEventFilterConverter<byte[], byte[], byte[]> filterConverter = this.getFilterConverter(h.getValueMediaType(), filterFactory, useRawData, binaryParams);
                        filter = filterConverter;
                        converter = filterConverter;
                    } else {
                        filter = this.getFilter(h.getValueMediaType(), filterFactory, useRawData, binaryFilterParams);
                        converter = this.getConverter(h.getValueMediaType(), converterFactory, useRawData, binaryConverterParams);
                    }
                } else {
                    filter = this.getFilter(h.getValueMediaType(), filterFactory, useRawData, binaryFilterParams);
                    converter = null;
                }
            } else if (hasConverter) {
                filter = null;
                converter = this.getConverter(h.getValueMediaType(), converterFactory, useRawData, binaryConverterParams);
            } else {
                filter = null;
                converter = null;
            }
        }
        Object clientEventSender = this.getClientEventSender(includeState, ch, h.encoder(), h.version, (Cache)cache, listenerId, eventType, h.messageId, bloomFilter);
        this.eventSenders.put(new WrappedByteArray(listenerId), clientEventSender);
        return this.addCacheListener(cache, clientEventSender, filter, converter, listenerInterests, useRawData);
    }

    private CompletionStage<Void> addCacheListener(AdvancedCache<byte[], byte[]> cache, Object clientEventSender, CacheEventFilter<byte[], byte[]> filter, CacheEventConverter<byte[], byte[], byte[]> converter, int listenerInterests, boolean useRawData) {
        HashSet<Object> filterAnnotations;
        if (listenerInterests == 0) {
            filterAnnotations = new HashSet<Class>(Arrays.asList(CacheEntryCreated.class, CacheEntryModified.class, CacheEntryRemoved.class, CacheEntryExpired.class));
        } else {
            filterAnnotations = new HashSet<Class<CacheEntryRemoved>>();
            if ((listenerInterests & 1) == 1) {
                filterAnnotations.add(CacheEntryCreated.class);
            }
            if ((listenerInterests & 2) == 2) {
                filterAnnotations.add(CacheEntryModified.class);
            }
            if ((listenerInterests & 4) == 4) {
                filterAnnotations.add(CacheEntryRemoved.class);
            }
            if ((listenerInterests & 8) == 8) {
                filterAnnotations.add(CacheEntryExpired.class);
            }
        }
        if (converter == null && filter == null) {
            converter = new KeyValueFilterConverterAsCacheEventFilterConverter((KeyValueFilterConverter)HotRodServer.ToEmptyBytesKeyValueFilterConverter.INSTANCE);
            return cache.addStorageFormatFilteredListenerAsync(clientEventSender, filter, (CacheEventConverter)converter, filterAnnotations);
        }
        if (useRawData) {
            return cache.addStorageFormatFilteredListenerAsync(clientEventSender, filter, (CacheEventConverter)converter, filterAnnotations);
        }
        return cache.addFilteredListenerAsync(clientEventSender, filter, (CacheEventConverter)converter, filterAnnotations);
    }

    private CacheEventFilter<byte[], byte[]> getFilter(MediaType requestMedia, String name, Boolean useRawData, List<byte[]> binaryParams) {
        CacheEventFilterFactory factory = this.findFactory(name, this.cacheEventFilterFactories, "key/value filter");
        List<?> params = this.unmarshallParams(requestMedia, binaryParams, useRawData);
        return factory.getFilter(params.toArray());
    }

    private CacheEventConverter<byte[], byte[], byte[]> getConverter(MediaType requestMedia, String name, Boolean useRawData, List<byte[]> binaryParams) {
        CacheEventConverterFactory factory = this.findFactory(name, this.cacheEventConverterFactories, "converter");
        List<?> params = this.unmarshallParams(requestMedia, binaryParams, useRawData);
        return factory.getConverter(params.toArray());
    }

    private CacheEventFilterConverter<byte[], byte[], byte[]> getFilterConverter(MediaType requestMedia, String name, boolean useRawData, List<byte[]> binaryParams) {
        CacheEventFilterConverterFactory factory = this.findFactory(name, this.cacheEventFilterConverterFactories, "converter");
        List<?> params = this.unmarshallParams(requestMedia, binaryParams, useRawData);
        return factory.getFilterConverter(params.toArray());
    }

    private <T> T findFactory(String name, ConcurrentMap<String, T> factories, String factoryType) {
        Object factory = factories.get(name);
        if (factory == null) {
            throw log.missingCacheEventFactory(factoryType, name);
        }
        return (T)factory;
    }

    private List<?> unmarshallParams(MediaType requestMedia, List<byte[]> binaryParams, boolean useRawData) {
        if (useRawData) {
            return binaryParams;
        }
        return binaryParams.stream().map(bp -> this.encoderRegistry.convert(bp, requestMedia, MediaType.APPLICATION_OBJECT)).collect(Collectors.toList());
    }

    CompletionStage<Boolean> removeClientListener(byte[] listenerId, Cache cache) {
        Object sender = this.eventSenders.get(new WrappedByteArray(listenerId));
        if (sender != null) {
            return cache.removeListenerAsync(sender).thenCompose(ignore -> CompletableFutures.completedTrue());
        }
        return CompletableFutures.completedFalse();
    }

    public void stop() {
        this.eventSenders.clear();
        this.cacheEventFilterFactories.clear();
        this.cacheEventConverterFactories.clear();
    }

    void findAndWriteEvents(Channel channel) {
        channel.eventLoop().execute(() -> this.eventSenders.values().forEach(s -> {
            BaseClientEventSender bces;
            if (s instanceof BaseClientEventSender && (bces = (BaseClientEventSender)s).hasChannel(channel)) {
                bces.writeEventsIfPossible();
            }
        }));
    }

    private Object getClientEventSender(boolean includeState, Channel ch, VersionedEncoder encoder, byte version, Cache cache, byte[] listenerId, ClientEventType eventType, long messageId, BloomFilter<byte[]> bloomFilter) {
        BaseClientEventSender bces = includeState ? new StatefulClientEventSender(cache, ch, encoder, listenerId, version, eventType, messageId) : (bloomFilter != null ? new BloomAwareStatelessClientEventSender(cache, ch, encoder, listenerId, version, eventType, bloomFilter) : new StatelessClientEventSender(cache, ch, encoder, listenerId, version, eventType));
        bces.init();
        return bces;
    }

    private abstract class BaseClientEventSender {
        protected final Channel ch;
        protected final VersionedEncoder encoder;
        protected final byte[] listenerId;
        protected final byte version;
        protected final ClientEventType targetEventType;
        protected final Cache cache;
        final int maxQueueSize = 100;
        final AtomicInteger eventSize = new AtomicInteger();
        final Queue<Events.Event> eventQueue = new ConcurrentLinkedQueue<Events.Event>();
        private final Runnable writeEventsIfPossible = this::writeEventsIfPossible;

        BaseClientEventSender(Cache cache, Channel ch, VersionedEncoder encoder, byte[] listenerId, byte version, ClientEventType targetEventType) {
            this.cache = cache;
            this.ch = ch;
            this.encoder = encoder;
            this.listenerId = listenerId;
            this.version = version;
            this.targetEventType = targetEventType;
        }

        void init() {
            this.ch.closeFuture().addListener(f -> {
                log.debug("Channel disconnected, removing event sender listener for id: " + Util.printArray((byte[])this.listenerId));
                this.cache.removeListenerAsync((Object)this).whenComplete((ignore, t) -> this.unblockCommands());
            });
        }

        private void unblockCommands() {
            for (Events.Event event : this.eventQueue) {
                event.eventFuture.complete(null);
            }
        }

        boolean hasChannel(Channel channel) {
            return this.ch == channel;
        }

        void writeEventsIfPossible() {
            boolean submittedUnblock = false;
            boolean written = false;
            while (!this.eventQueue.isEmpty() && this.ch.isWritable()) {
                CompletableFuture<Void> cf;
                this.eventSize.decrementAndGet();
                Events.Event event = this.eventQueue.remove();
                if (log.isTraceEnabled()) {
                    log.tracef("Write event: %s to channel %s", event, this.ch);
                }
                if ((cf = event.eventFuture) != CompletableFutures.completedNull()) {
                    ClientListenerRegistry.this.nonBlockingExecutor.execute(() -> event.eventFuture.complete(null));
                }
                ByteBuf buf = this.ch.alloc().ioBuffer();
                this.encoder.writeEvent(event, buf);
                this.ch.write((Object)buf);
                written = true;
            }
            if (written) {
                this.ch.flush();
            }
        }

        @CacheEntryCreated
        @CacheEntryModified
        @CacheEntryRemoved
        @CacheEntryExpired
        public CompletionStage<Void> onCacheEvent(CacheEntryEvent<byte[], byte[]> event) {
            if (this.isSendEvent(event)) {
                Metadata metadata = event.getMetadata();
                long version = metadata != null && metadata.version() != null ? ((NumericVersion)metadata.version()).getVersion() : 0L;
                Object k = event.getKey();
                Object v = event.getValue();
                return this.sendEvent((byte[])k, (byte[])v, version, event);
            }
            return null;
        }

        boolean isSendEvent(CacheEntryEvent<byte[], byte[]> event) {
            if (this.isChannelDisconnected()) {
                log.debug("Channel disconnected, ignoring event");
                return false;
            }
            switch (event.getType()) {
                case CACHE_ENTRY_CREATED: 
                case CACHE_ENTRY_MODIFIED: {
                    return !event.isPre();
                }
                case CACHE_ENTRY_REMOVED: {
                    CacheEntryRemovedEvent removedEvent = (CacheEntryRemovedEvent)event;
                    return !event.isPre() && removedEvent.getOldValue() != null;
                }
                case CACHE_ENTRY_EXPIRED: {
                    return true;
                }
            }
            throw log.unexpectedEvent((Event)event);
        }

        boolean isChannelDisconnected() {
            return !this.ch.isOpen();
        }

        CompletionStage<Void> sendEvent(byte[] key, byte[] value, long dataVersion, CacheEntryEvent event) {
            CompletableFuture<Void> cf;
            boolean forceWait;
            EventLoop loop = this.ch.eventLoop();
            int size = this.eventSize.incrementAndGet();
            boolean bl = forceWait = size >= 100;
            if (forceWait) {
                if (log.isTraceEnabled()) {
                    log.tracef("Pending event size is %s which is forcing %s to delay operation until it is sent", size, event);
                }
                cf = new CompletableFuture<Void>();
            } else {
                cf = CompletableFutures.completedNull();
            }
            Events.Event remoteEvent = this.createRemoteEvent(key, value, dataVersion, event, cf);
            if (log.isTraceEnabled()) {
                log.tracef("Queue event %s, before queuing event queue size is %d", remoteEvent, size - 1);
            }
            this.eventQueue.add(remoteEvent);
            if (this.ch.isWritable()) {
                loop.submit(this.writeEventsIfPossible);
            }
            return cf;
        }

        private Events.Event createRemoteEvent(byte[] key, byte[] value, long dataVersion, CacheEntryEvent event, CompletableFuture<Void> eventFuture) {
            switch (this.targetEventType) {
                case PLAIN: {
                    switch (event.getType()) {
                        case CACHE_ENTRY_CREATED: 
                        case CACHE_ENTRY_MODIFIED: {
                            KeyValuePair<HotRodOperation, Boolean> responseType = this.getEventResponseType(event);
                            return new Events.KeyWithVersionEvent(this.version, this.getEventId(event), (HotRodOperation)((Object)responseType.getKey()), this.listenerId, (Boolean)responseType.getValue(), key, dataVersion, eventFuture);
                        }
                        case CACHE_ENTRY_REMOVED: 
                        case CACHE_ENTRY_EXPIRED: {
                            KeyValuePair<HotRodOperation, Boolean> responseType = this.getEventResponseType(event);
                            return new Events.KeyEvent(this.version, this.getEventId(event), (HotRodOperation)((Object)responseType.getKey()), this.listenerId, (boolean)((Boolean)responseType.getValue()), key, eventFuture);
                        }
                    }
                    throw log.unexpectedEvent((Event)event);
                }
                case CUSTOM_PLAIN: {
                    KeyValuePair<HotRodOperation, Boolean> responseType = this.getEventResponseType(event);
                    return new Events.CustomEvent(this.version, this.getEventId(event), (HotRodOperation)((Object)responseType.getKey()), this.listenerId, (boolean)((Boolean)responseType.getValue()), value, eventFuture);
                }
                case CUSTOM_RAW: {
                    KeyValuePair<HotRodOperation, Boolean> responseType = this.getEventResponseType(event);
                    return new Events.CustomRawEvent(this.version, this.getEventId(event), (HotRodOperation)((Object)responseType.getKey()), this.listenerId, (boolean)((Boolean)responseType.getValue()), value, eventFuture);
                }
            }
            throw new IllegalArgumentException("Event type not supported: " + (Object)((Object)this.targetEventType));
        }

        protected long getEventId(CacheEntryEvent event) {
            return 0L;
        }

        private KeyValuePair<HotRodOperation, Boolean> getEventResponseType(CacheEntryEvent event) {
            switch (event.getType()) {
                case CACHE_ENTRY_CREATED: {
                    return new KeyValuePair((Object)HotRodOperation.CACHE_ENTRY_CREATED_EVENT, (Object)((CacheEntryCreatedEvent)event).isCommandRetried());
                }
                case CACHE_ENTRY_MODIFIED: {
                    return new KeyValuePair((Object)HotRodOperation.CACHE_ENTRY_MODIFIED_EVENT, (Object)((CacheEntryModifiedEvent)event).isCommandRetried());
                }
                case CACHE_ENTRY_REMOVED: {
                    return new KeyValuePair((Object)HotRodOperation.CACHE_ENTRY_REMOVED_EVENT, (Object)((CacheEntryRemovedEvent)event).isCommandRetried());
                }
                case CACHE_ENTRY_EXPIRED: {
                    return new KeyValuePair((Object)HotRodOperation.CACHE_ENTRY_EXPIRED_EVENT, (Object)false);
                }
            }
            throw log.unexpectedEvent((Event)event);
        }
    }

    @Listener(clustered=true)
    private class BloomAwareStatelessClientEventSender
    extends StatelessClientEventSender {
        private final BloomFilter<byte[]> bloomFilter;

        BloomAwareStatelessClientEventSender(Cache cache, Channel ch, VersionedEncoder encoder, byte[] listenerId, byte version, ClientEventType targetEventType, BloomFilter<byte[]> bloomFilter) {
            super(cache, ch, encoder, listenerId, version, targetEventType);
            this.bloomFilter = bloomFilter;
        }

        @Override
        boolean isSendEvent(CacheEntryEvent<byte[], byte[]> event) {
            if (super.isSendEvent(event)) {
                if (this.bloomFilter.possiblyPresent((Object)((byte[])event.getKey()))) {
                    if (log.isTraceEnabled()) {
                        log.tracef("Event %s passed bloom filter", event);
                    }
                    return true;
                }
                if (log.isTraceEnabled()) {
                    log.tracef("Event %s didn't pass bloom filter", event);
                }
            }
            return false;
        }
    }

    @Listener(clustered=true)
    private class StatelessClientEventSender
    extends BaseClientEventSender {
        StatelessClientEventSender(Cache cache, Channel ch, VersionedEncoder encoder, byte[] listenerId, byte version, ClientEventType targetEventType) {
            super(cache, ch, encoder, listenerId, version, targetEventType);
        }
    }

    @Listener(clustered=true, includeCurrentState=true)
    private class StatefulClientEventSender
    extends BaseClientEventSender {
        private final long messageId;

        StatefulClientEventSender(Cache cache, Channel ch, VersionedEncoder encoder, byte[] listenerId, byte version, ClientEventType targetEventType, long messageId) {
            super(cache, ch, encoder, listenerId, version, targetEventType);
            this.messageId = messageId;
        }

        @Override
        protected long getEventId(CacheEntryEvent event) {
            return event.isCurrentState() ? this.messageId : 0L;
        }
    }
}

