/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.client.hotrod.event;

import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.Search;
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
import org.infinispan.client.hotrod.exceptions.HotRodClientException;
import org.infinispan.client.hotrod.marshall.ProtoStreamMarshaller;
import org.infinispan.client.hotrod.query.testdomain.protobuf.UserPB;
import org.infinispan.client.hotrod.query.testdomain.protobuf.marshallers.MarshallerRegistration;
import org.infinispan.client.hotrod.test.MultiHotRodServersTest;
import org.infinispan.commons.marshall.Marshaller;
import org.infinispan.commons.util.Util;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.Index;
import org.infinispan.manager.CacheContainer;
import org.infinispan.notifications.cachelistener.filter.CacheEventFilterConverterFactory;
import org.infinispan.query.api.continuous.ContinuousQuery;
import org.infinispan.query.api.continuous.ContinuousQueryListener;
import org.infinispan.query.dsl.Expression;
import org.infinispan.query.dsl.Query;
import org.infinispan.query.dsl.QueryFactory;
import org.infinispan.query.dsl.embedded.testdomain.User;
import org.infinispan.query.remote.impl.filter.JPAContinuousQueryProtobufCacheEventFilterConverterFactory;
import org.infinispan.server.hotrod.test.HotRodTestingUtil;
import org.infinispan.test.TestingUtil;
import org.infinispan.util.ControlledTimeService;
import org.infinispan.util.KeyValuePair;
import org.infinispan.util.TimeService;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups={"functional"}, testName="client.hotrod.event.RemoteContinuousQueryTest")
public class RemoteContinuousQueryTest
extends MultiHotRodServersTest {
    private final int NUM_NODES = 5;
    private RemoteCache<String, User> remoteCache;
    private ControlledTimeService timeService = new ControlledTimeService(0L);

    protected void createCacheManagers() throws Throwable {
        org.infinispan.configuration.cache.ConfigurationBuilder cfgBuilder = this.getConfigurationBuilder();
        this.createHotRodServers(5, cfgBuilder);
        this.waitForClusterToForm();
        JPAContinuousQueryProtobufCacheEventFilterConverterFactory factory = new JPAContinuousQueryProtobufCacheEventFilterConverterFactory();
        for (int i = 0; i < 5; ++i) {
            this.server(i).addCacheEventFilterConverterFactory("continuous-query-filter-converter-factory", (CacheEventFilterConverterFactory)factory);
            TestingUtil.replaceComponent((CacheContainer)this.server(i).getCacheManager(), TimeService.class, (Object)this.timeService, (boolean)true);
        }
        this.remoteCache = this.client(0).getCache();
        RemoteCache metadataCache = this.client(0).getCache("___protobuf_metadata");
        metadataCache.put((Object)"sample_bank_account/bank.proto", (Object)Util.read((InputStream)Util.getResourceAsStream((String)"/sample_bank_account/bank.proto", (ClassLoader)((Object)((Object)this)).getClass().getClassLoader())));
        AssertJUnit.assertFalse((boolean)metadataCache.containsKey((Object)".errors"));
        MarshallerRegistration.registerMarshallers(ProtoStreamMarshaller.getSerializationContext((RemoteCacheManager)this.client(0)));
    }

    protected org.infinispan.configuration.cache.ConfigurationBuilder getConfigurationBuilder() {
        org.infinispan.configuration.cache.ConfigurationBuilder cfgBuilder = HotRodTestingUtil.hotRodCacheConfiguration((org.infinispan.configuration.cache.ConfigurationBuilder)RemoteContinuousQueryTest.getDefaultClusteredCacheConfig((CacheMode)CacheMode.DIST_SYNC, (boolean)false));
        cfgBuilder.indexing().index(Index.ALL).addProperty("default.directory_provider", "ram").addProperty("lucene_version", "LUCENE_CURRENT");
        cfgBuilder.expiration().disableReaper();
        return cfgBuilder;
    }

    @Override
    protected ConfigurationBuilder createHotRodClientConfigurationBuilder(int serverPort) {
        return super.createHotRodClientConfigurationBuilder(serverPort).marshaller((Marshaller)new ProtoStreamMarshaller());
    }

    @Test(expectedExceptions={HotRodClientException.class}, expectedExceptionsMessageRegExp=".*ISPN028509:.*")
    public void testDisallowGroupingAndAggregation() {
        Query query = Search.getQueryFactory(this.remoteCache).from(UserPB.class).select(new Expression[]{Expression.max((String)"age")}).having("age").gte((Object)20).build();
        ContinuousQuery continuousQuery = Search.getContinuousQuery(this.remoteCache);
        ContinuousQueryListener<String, Object[]> listener = new ContinuousQueryListener<String, Object[]>(){};
        continuousQuery.addContinuousQueryListener(query, (ContinuousQueryListener)listener);
    }

    public void testContinuousQuery() {
        UserPB user1 = new UserPB();
        user1.setId(1);
        user1.setName("John");
        user1.setSurname("Doe");
        user1.setGender(User.Gender.MALE);
        user1.setAge(22);
        user1.setAccountIds(new HashSet<Integer>(Arrays.asList(1, 2)));
        user1.setNotes("Lorem ipsum dolor sit amet");
        UserPB user2 = new UserPB();
        user2.setId(2);
        user2.setName("Spider");
        user2.setSurname("Man");
        user2.setGender(User.Gender.MALE);
        user2.setAge(32);
        user2.setAccountIds(Collections.singleton(3));
        UserPB user3 = new UserPB();
        user3.setId(3);
        user3.setName("Spider");
        user3.setSurname("Woman");
        user3.setGender(User.Gender.FEMALE);
        user3.setAge(40);
        this.remoteCache.clear();
        this.remoteCache.put((Object)("user" + user1.getId()), (Object)user1);
        this.remoteCache.put((Object)("user" + user2.getId()), (Object)user2);
        this.remoteCache.put((Object)("user" + user3.getId()), (Object)user3);
        AssertJUnit.assertEquals((int)3, (int)this.remoteCache.size());
        QueryFactory qf = Search.getQueryFactory(this.remoteCache);
        Query query = (Query)qf.from(UserPB.class).having("age").lte((Object)Expression.param((String)"ageParam")).build().setParameter("ageParam", (Object)32);
        final LinkedBlockingQueue joined = new LinkedBlockingQueue();
        final LinkedBlockingQueue updated = new LinkedBlockingQueue();
        final LinkedBlockingQueue left = new LinkedBlockingQueue();
        ContinuousQueryListener<String, User> listener = new ContinuousQueryListener<String, User>(){

            public void resultJoining(String key, User value) {
                joined.add(new KeyValuePair((Object)key, (Object)value));
            }

            public void resultUpdated(String key, User value) {
                updated.add(new KeyValuePair((Object)key, (Object)value));
            }

            public void resultLeaving(String key) {
                left.add(key);
            }
        };
        ContinuousQuery continuousQuery = Search.getContinuousQuery(this.remoteCache);
        continuousQuery.addContinuousQueryListener(query, (ContinuousQueryListener)listener);
        this.expectElementsInQueue(joined, 2, kv -> ((User)kv.getValue()).getAge(), 32, 22);
        this.expectElementsInQueue(updated, 0);
        this.expectElementsInQueue(left, 0);
        user3.setAge(30);
        this.remoteCache.put((Object)("user" + user3.getId()), (Object)user3);
        this.expectElementsInQueue(joined, 1, kv -> ((User)kv.getValue()).getAge(), 30);
        this.expectElementsInQueue(updated, 0);
        this.expectElementsInQueue(left, 0);
        user1.setAge(23);
        this.remoteCache.put((Object)("user" + user1.getId()), (Object)user1);
        this.expectElementsInQueue(joined, 0);
        this.expectElementsInQueue(updated, 1, kv -> ((User)kv.getValue()).getAge(), 23);
        this.expectElementsInQueue(left, 0);
        user1.setAge(40);
        user2.setAge(40);
        user3.setAge(40);
        this.remoteCache.put((Object)("user" + user1.getId()), (Object)user1);
        this.remoteCache.put((Object)("user" + user2.getId()), (Object)user2);
        this.remoteCache.put((Object)("user" + user3.getId()), (Object)user3);
        this.expectElementsInQueue(joined, 0);
        this.expectElementsInQueue(updated, 0);
        this.expectElementsInQueue(left, 3);
        this.remoteCache.clear();
        user1.setAge(21);
        user2.setAge(22);
        this.remoteCache.put((Object)"expiredUser1", (Object)user1, 5L, TimeUnit.MILLISECONDS);
        this.remoteCache.put((Object)"expiredUser2", (Object)user2, 5L, TimeUnit.MILLISECONDS);
        this.expectElementsInQueue(joined, 2);
        this.expectElementsInQueue(left, 0);
        this.timeService.advance(6L);
        AssertJUnit.assertNull((Object)this.remoteCache.get((Object)"expiredUser1"));
        AssertJUnit.assertNull((Object)this.remoteCache.get((Object)"expiredUser2"));
        this.expectElementsInQueue(joined, 0);
        this.expectElementsInQueue(left, 2);
        continuousQuery.removeContinuousQueryListener((ContinuousQueryListener)listener);
        user2.setAge(22);
        this.remoteCache.put((Object)("user" + user2.getId()), (Object)user2);
        this.expectElementsInQueue(joined, 0);
        this.expectElementsInQueue(left, 0);
    }

    public void testContinuousQueryWithProjections() {
        UserPB user1 = new UserPB();
        user1.setId(1);
        user1.setName("John");
        user1.setSurname("Doe");
        user1.setGender(User.Gender.MALE);
        user1.setAge(22);
        user1.setAccountIds(new HashSet<Integer>(Arrays.asList(1, 2)));
        user1.setNotes("Lorem ipsum dolor sit amet");
        UserPB user2 = new UserPB();
        user2.setId(2);
        user2.setName("Spider");
        user2.setSurname("Man");
        user2.setGender(User.Gender.MALE);
        user2.setAge(32);
        user2.setAccountIds(Collections.singleton(3));
        UserPB user3 = new UserPB();
        user3.setId(3);
        user3.setName("Spider");
        user3.setSurname("Woman");
        user3.setGender(User.Gender.FEMALE);
        user3.setAge(40);
        this.remoteCache.clear();
        this.remoteCache.put((Object)("user" + user1.getId()), (Object)user1);
        this.remoteCache.put((Object)("user" + user2.getId()), (Object)user2);
        this.remoteCache.put((Object)("user" + user3.getId()), (Object)user3);
        AssertJUnit.assertEquals((int)3, (int)this.remoteCache.size());
        QueryFactory qf = Search.getQueryFactory(this.remoteCache);
        Query query = (Query)qf.from(UserPB.class).select(new String[]{"age"}).having("age").lte((Object)Expression.param((String)"ageParam")).build().setParameter("ageParam", (Object)32);
        final LinkedBlockingQueue joined = new LinkedBlockingQueue();
        final LinkedBlockingQueue updated = new LinkedBlockingQueue();
        final LinkedBlockingQueue left = new LinkedBlockingQueue();
        ContinuousQueryListener<String, Object[]> listener = new ContinuousQueryListener<String, Object[]>(){

            public void resultJoining(String key, Object[] value) {
                joined.add(new KeyValuePair((Object)key, (Object)value));
            }

            public void resultUpdated(String key, Object[] value) {
                updated.add(new KeyValuePair((Object)key, (Object)value));
            }

            public void resultLeaving(String key) {
                left.add(key);
            }
        };
        ContinuousQuery continuousQuery = Search.getContinuousQuery(this.remoteCache);
        continuousQuery.addContinuousQueryListener(query, (ContinuousQueryListener)listener);
        this.expectElementsInQueue(joined, 2, kv -> ((Object[])kv.getValue())[0], 32, 22);
        this.expectElementsInQueue(updated, 0);
        this.expectElementsInQueue(left, 0);
        user3.setAge(30);
        this.remoteCache.put((Object)("user" + user3.getId()), (Object)user3);
        this.expectElementsInQueue(joined, 1, kv -> ((Object[])kv.getValue())[0], 30);
        this.expectElementsInQueue(updated, 0);
        this.expectElementsInQueue(left, 0);
        user1.setAge(23);
        this.remoteCache.put((Object)("user" + user1.getId()), (Object)user1);
        this.expectElementsInQueue(joined, 0);
        this.expectElementsInQueue(updated, 1, kv -> ((Object[])kv.getValue())[0], 23);
        this.expectElementsInQueue(left, 0);
        user1.setAge(40);
        user2.setAge(40);
        user3.setAge(40);
        this.remoteCache.put((Object)("user" + user1.getId()), (Object)user1);
        this.remoteCache.put((Object)("user" + user2.getId()), (Object)user2);
        this.remoteCache.put((Object)("user" + user3.getId()), (Object)user3);
        this.expectElementsInQueue(joined, 0);
        this.expectElementsInQueue(updated, 0);
        this.expectElementsInQueue(left, 3);
        this.remoteCache.clear();
        user1.setAge(21);
        user2.setAge(22);
        this.remoteCache.put((Object)"expiredUser1", (Object)user1, 5L, TimeUnit.MILLISECONDS);
        this.remoteCache.put((Object)"expiredUser2", (Object)user2, 5L, TimeUnit.MILLISECONDS);
        this.expectElementsInQueue(joined, 2);
        this.expectElementsInQueue(left, 0);
        this.timeService.advance(6L);
        AssertJUnit.assertNull((Object)this.remoteCache.get((Object)"expiredUser1"));
        AssertJUnit.assertNull((Object)this.remoteCache.get((Object)"expiredUser2"));
        this.expectElementsInQueue(joined, 0);
        this.expectElementsInQueue(left, 2);
        continuousQuery.removeContinuousQueryListener((ContinuousQueryListener)listener);
        user2.setAge(22);
        this.remoteCache.put((Object)("user" + user2.getId()), (Object)user2);
        this.expectElementsInQueue(joined, 0);
        this.expectElementsInQueue(left, 0);
    }

    public void testContinuousQueryChangingParameter() {
        UserPB user1 = new UserPB();
        user1.setId(1);
        user1.setName("John");
        user1.setSurname("Doe");
        user1.setGender(User.Gender.MALE);
        user1.setAge(22);
        user1.setAccountIds(new HashSet<Integer>(Arrays.asList(1, 2)));
        user1.setNotes("Lorem ipsum dolor sit amet");
        UserPB user2 = new UserPB();
        user2.setId(2);
        user2.setName("Spider");
        user2.setSurname("Man");
        user2.setGender(User.Gender.MALE);
        user2.setAge(32);
        user2.setAccountIds(Collections.singleton(3));
        UserPB user3 = new UserPB();
        user3.setId(3);
        user3.setName("Spider");
        user3.setSurname("Woman");
        user3.setGender(User.Gender.FEMALE);
        user3.setAge(40);
        this.remoteCache.clear();
        this.remoteCache.put((Object)("user" + user1.getId()), (Object)user1);
        this.remoteCache.put((Object)("user" + user2.getId()), (Object)user2);
        this.remoteCache.put((Object)("user" + user3.getId()), (Object)user3);
        AssertJUnit.assertEquals((int)3, (int)this.remoteCache.size());
        QueryFactory qf = Search.getQueryFactory(this.remoteCache);
        Query query = (Query)qf.from(UserPB.class).select(new String[]{"age"}).having("age").lte((Object)Expression.param((String)"ageParam")).build().setParameter("ageParam", (Object)32);
        final LinkedBlockingQueue joined = new LinkedBlockingQueue();
        final LinkedBlockingQueue updated = new LinkedBlockingQueue();
        final LinkedBlockingQueue left = new LinkedBlockingQueue();
        Object listener = new ContinuousQueryListener<String, Object[]>(){

            public void resultJoining(String key, Object[] value) {
                joined.add(new KeyValuePair((Object)key, (Object)value));
            }

            public void resultUpdated(String key, Object[] value) {
                updated.add(new KeyValuePair((Object)key, (Object)value));
            }

            public void resultLeaving(String key) {
                left.add(key);
            }
        };
        ContinuousQuery continuousQuery = Search.getContinuousQuery(this.remoteCache);
        continuousQuery.addContinuousQueryListener(query, (ContinuousQueryListener)listener);
        this.expectElementsInQueue(joined, 2, kv -> ((Object[])kv.getValue())[0], 32, 22);
        this.expectElementsInQueue(updated, 0);
        this.expectElementsInQueue(left, 0);
        continuousQuery.removeContinuousQueryListener((ContinuousQueryListener)listener);
        query.setParameter("ageParam", (Object)40);
        listener = new ContinuousQueryListener<String, Object[]>(){

            public void resultJoining(String key, Object[] value) {
                joined.add(new KeyValuePair((Object)key, (Object)value));
            }

            public void resultUpdated(String key, Object[] value) {
                updated.add(new KeyValuePair((Object)key, (Object)value));
            }

            public void resultLeaving(String key) {
                left.add(key);
            }
        };
        continuousQuery.addContinuousQueryListener(query, (ContinuousQueryListener)listener);
        this.expectElementsInQueue(joined, 3);
        this.expectElementsInQueue(updated, 0);
        this.expectElementsInQueue(left, 0);
        continuousQuery.removeContinuousQueryListener((ContinuousQueryListener)listener);
    }

    private <T> void expectElementsInQueue(BlockingQueue<T> queue, int numElements) {
        this.expectElementsInQueue(queue, numElements, null, new Object[0]);
    }

    private <T, R> void expectElementsInQueue(BlockingQueue<T> queue, int numElements, Function<T, R> valueTransformer, Object ... expectedValue) {
        ArrayList expectedValues;
        if (expectedValue.length != 0) {
            if (expectedValue.length != numElements) {
                throw new IllegalArgumentException("The number of expected values must either match the number of expected elements or no expected values should be specified.");
            }
            expectedValues = new ArrayList(expectedValue.length);
            Collections.addAll(expectedValues, expectedValue);
        } else {
            expectedValues = null;
        }
        for (int i = 0; i < numElements; ++i) {
            T o;
            try {
                o = queue.poll(5L, TimeUnit.SECONDS);
                AssertJUnit.assertNotNull((String)("Queue was empty after reading " + i + " elements!"), o);
            }
            catch (InterruptedException e) {
                throw new AssertionError("Interrupted while waiting for condition", e);
            }
            if (expectedValues == null) continue;
            T v = valueTransformer != null ? valueTransformer.apply(o) : o;
            boolean found = expectedValues.remove(v);
            AssertJUnit.assertTrue((String)("Expectation failed on element number " + i + ", unexpected value: " + v), (boolean)found);
        }
        try {
            T o = queue.poll(5L, TimeUnit.SECONDS);
            AssertJUnit.assertNull((String)"No more elements expected in queue!", o);
        }
        catch (InterruptedException e) {
            throw new AssertionError("Interrupted while waiting for condition", e);
        }
    }
}

