/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.client.hotrod.event;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.Search;
import org.infinispan.client.hotrod.query.testdomain.protobuf.UserPB;
import org.infinispan.client.hotrod.query.testdomain.protobuf.marshallers.TestDomainSCI;
import org.infinispan.client.hotrod.test.InternalRemoteCacheManager;
import org.infinispan.client.hotrod.test.MultiHotRodServersTest;
import org.infinispan.commons.time.TimeService;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.cache.IndexStorage;
import org.infinispan.manager.CacheContainer;
import org.infinispan.notifications.cachelistener.filter.CacheEventFilterConverterFactory;
import org.infinispan.protostream.SerializationContextInitializer;
import org.infinispan.query.api.continuous.ContinuousQuery;
import org.infinispan.query.api.continuous.ContinuousQueryListener;
import org.infinispan.query.dsl.Query;
import org.infinispan.query.dsl.QueryFactory;
import org.infinispan.query.dsl.embedded.testdomain.User;
import org.infinispan.query.remote.impl.filter.IckleContinuousQueryProtobufCacheEventFilterConverterFactory;
import org.infinispan.server.hotrod.test.HotRodTestingUtil;
import org.infinispan.test.TestingUtil;
import org.infinispan.util.ControlledTimeService;
import org.infinispan.util.KeyValuePair;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups={"functional"}, testName="client.hotrod.event.RemoteContinuousQueryLeavingRemoteCacheManagerTest")
public class RemoteContinuousQueryLeavingRemoteCacheManagerTest
extends MultiHotRodServersTest {
    private final int NUM_NODES = 1;
    private RemoteCache<String, User> remoteCache;
    private final ControlledTimeService timeService = new ControlledTimeService();

    protected void createCacheManagers() throws Throwable {
        ConfigurationBuilder cfgBuilder = this.getConfigurationBuilder();
        this.createHotRodServers(1, cfgBuilder);
        this.waitForClusterToForm();
        IckleContinuousQueryProtobufCacheEventFilterConverterFactory factory = new IckleContinuousQueryProtobufCacheEventFilterConverterFactory();
        for (int i = 0; i < 1; ++i) {
            this.server(i).addCacheEventFilterConverterFactory("continuous-query-filter-converter-factory", (CacheEventFilterConverterFactory)factory);
            TestingUtil.replaceComponent((CacheContainer)this.server(i).getCacheManager(), TimeService.class, (Object)this.timeService, (boolean)true);
        }
        this.remoteCache = this.client(0).getCache();
    }

    protected ConfigurationBuilder getConfigurationBuilder() {
        ConfigurationBuilder cfgBuilder = HotRodTestingUtil.hotRodCacheConfiguration((ConfigurationBuilder)RemoteContinuousQueryLeavingRemoteCacheManagerTest.getDefaultClusteredCacheConfig((CacheMode)CacheMode.DIST_SYNC, (boolean)false));
        cfgBuilder.indexing().enable().storage(IndexStorage.LOCAL_HEAP).addIndexedEntity("sample_bank_account.User");
        cfgBuilder.expiration().disableReaper();
        return cfgBuilder;
    }

    @Override
    protected SerializationContextInitializer contextInitializer() {
        return TestDomainSCI.INSTANCE;
    }

    private Listener applyContinuousQuery(RemoteCache<String, User> cacheToUse) {
        QueryFactory qf = Search.getQueryFactory(cacheToUse);
        Query query = qf.create("FROM sample_bank_account.User WHERE age <= :ageParam").setParameter("ageParam", (Object)32);
        ContinuousQuery continuousQuery = Search.getContinuousQuery(cacheToUse);
        Listener listener = new Listener();
        continuousQuery.addContinuousQueryListener(query, (ContinuousQueryListener)listener);
        return listener;
    }

    public void testContinuousQueryRemoveRCM() {
        InternalRemoteCacheManager extraRemoteCacheManager = new InternalRemoteCacheManager(this.createHotRodClientConfigurationBuilder(this.server(0)).build());
        RemoteCache extraRemoteCache = extraRemoteCacheManager.getCache();
        UserPB user1 = new UserPB();
        user1.setId(1);
        user1.setName("John");
        user1.setSurname("Doe");
        user1.setGender(User.Gender.MALE);
        user1.setAge(22);
        user1.setAccountIds(new HashSet<Integer>(Arrays.asList(1, 2)));
        user1.setNotes("Lorem ipsum dolor sit amet");
        this.remoteCache.put((Object)("user" + user1.getId()), (Object)user1);
        Listener listener = this.applyContinuousQuery(this.remoteCache);
        Listener extraListener = this.applyContinuousQuery((RemoteCache<String, User>)extraRemoteCache);
        this.expectElementsInQueue(listener.joined, 1, kv -> ((User)kv.getValue()).getAge(), 22);
        this.expectElementsInQueue(extraListener.joined, 1, kv -> ((User)kv.getValue()).getAge(), 22);
        this.expectElementsInQueue(listener.updated, 0);
        this.expectElementsInQueue(extraListener.updated, 0);
        this.expectElementsInQueue(listener.left, 0);
        this.expectElementsInQueue(extraListener.left, 0);
        extraRemoteCacheManager.stop();
        user1.setAge(23);
        this.remoteCache.put((Object)("user" + user1.getId()), (Object)user1);
        this.expectElementsInQueue(listener.joined, 0);
        this.expectElementsInQueue(listener.updated, 1, kv -> ((User)kv.getValue()).getAge(), 23);
        this.expectElementsInQueue(listener.left, 0);
    }

    private <T> void expectElementsInQueue(BlockingQueue<T> queue, int numElements) {
        this.expectElementsInQueue(queue, numElements, null, new Object[0]);
    }

    private <T, R> void expectElementsInQueue(BlockingQueue<T> queue, int numElements, Function<T, R> valueTransformer, Object ... expectedValue) {
        ArrayList expectedValues;
        if (expectedValue.length != 0) {
            if (expectedValue.length != numElements) {
                throw new IllegalArgumentException("The number of expected values must either match the number of expected elements or no expected values should be specified.");
            }
            expectedValues = new ArrayList(expectedValue.length);
            Collections.addAll(expectedValues, expectedValue);
        } else {
            expectedValues = null;
        }
        for (int i = 0; i < numElements; ++i) {
            T o;
            try {
                o = queue.poll(5L, TimeUnit.SECONDS);
                AssertJUnit.assertNotNull((String)("Queue was empty after reading " + i + " elements!"), o);
            }
            catch (InterruptedException e) {
                throw new AssertionError("Interrupted while waiting for condition", e);
            }
            if (expectedValues == null) continue;
            T v = valueTransformer != null ? valueTransformer.apply(o) : o;
            boolean found = expectedValues.remove(v);
            AssertJUnit.assertTrue((String)("Expectation failed on element number " + i + ", unexpected value: " + v), (boolean)found);
        }
        try {
            T o = queue.poll(100L, TimeUnit.MILLISECONDS);
            AssertJUnit.assertNull((String)"No more elements expected in queue!", o);
        }
        catch (InterruptedException e) {
            throw new AssertionError("Interrupted while waiting for condition", e);
        }
    }

    static class Listener
    implements ContinuousQueryListener<String, User> {
        final BlockingQueue<KeyValuePair<String, User>> joined = new LinkedBlockingQueue<KeyValuePair<String, User>>();
        final BlockingQueue<KeyValuePair<String, User>> updated = new LinkedBlockingQueue<KeyValuePair<String, User>>();
        final BlockingQueue<String> left = new LinkedBlockingQueue<String>();

        Listener() {
        }

        public void resultJoining(String key, User value) {
            this.joined.add((KeyValuePair<String, User>)new KeyValuePair((Object)key, (Object)value));
        }

        public void resultUpdated(String key, User value) {
            this.updated.add((KeyValuePair<String, User>)new KeyValuePair((Object)key, (Object)value));
        }

        public void resultLeaving(String key) {
            this.left.add(key);
        }
    }
}

