/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.client.hotrod.stress;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import org.infinispan.Cache;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryCreated;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryModified;
import org.infinispan.client.hotrod.annotation.ClientListener;
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
import org.infinispan.client.hotrod.event.ClientEvent;
import org.infinispan.client.hotrod.logging.Log;
import org.infinispan.client.hotrod.logging.LogFactory;
import org.infinispan.client.hotrod.test.HotRodClientTestingUtil;
import org.infinispan.client.hotrod.test.InternalRemoteCacheManager;
import org.infinispan.client.hotrod.test.MultiHotRodServersTest;
import org.infinispan.commons.util.concurrent.ConcurrentHashSet;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.distribution.DistributionTestHelper;
import org.infinispan.server.hotrod.HotRodServer;
import org.infinispan.server.hotrod.test.HotRodTestingUtil;
import org.testng.annotations.Test;

@Test(groups={"stress"}, testName="client.hotrod.event.ClusterClientEventStressTest")
public class ClusterClientEventStressTest
extends MultiHotRodServersTest {
    private static final Log log = LogFactory.getLog(ClusterClientEventStressTest.class);
    static final int NUM_SERVERS = 3;
    static final int NUM_OWNERS = 2;
    static final int NUM_CLIENTS = 1;
    static final int NUM_THREADS_PER_CLIENT = 6;
    static final int NUM_OPERATIONS = 1000;
    static final int NUM_EVENTS = 6000;
    static Set<String> ALL_KEYS = new ConcurrentHashSet();
    static ExecutorService EXEC = Executors.newCachedThreadPool();
    static ClientEntryListener listener;

    protected void createCacheManagers() throws Throwable {
        this.createHotRodServers(3, this.getCacheConfiguration());
    }

    private org.infinispan.configuration.cache.ConfigurationBuilder getCacheConfiguration() {
        org.infinispan.configuration.cache.ConfigurationBuilder builder = ClusterClientEventStressTest.getDefaultClusteredCacheConfig((CacheMode)CacheMode.DIST_SYNC, (boolean)false);
        builder.clustering().hash().numOwners(2).expiration().maxIdle(1000L).wakeUpInterval(5000L).jmxStatistics().enable();
        return HotRodTestingUtil.hotRodCacheConfiguration((org.infinispan.configuration.cache.ConfigurationBuilder)builder);
    }

    RemoteCacheManager getRemoteCacheManager(int port) {
        ConfigurationBuilder builder = new ConfigurationBuilder();
        builder.addServer().host("127.0.0.1").port(port);
        InternalRemoteCacheManager rcm = new InternalRemoteCacheManager(builder.build());
        rcm.getCache();
        return rcm;
    }

    Map<String, RemoteCacheManager> createClients() {
        HashMap<String, RemoteCacheManager> remotecms = new HashMap<String, RemoteCacheManager>(1);
        for (int i = 0; i < 1; ++i) {
            remotecms.put("c" + i, this.getRemoteCacheManager(this.server(0).getPort()));
        }
        return remotecms;
    }

    public void testAddClientListenerDuringOperations() {
        CyclicBarrier barrier = new CyclicBarrier(7);
        ArrayList<Future<Void>> futures = new ArrayList<Future<Void>>(6);
        ArrayList listeners = new ArrayList(1);
        Map<String, RemoteCacheManager> remotecms = this.createClients();
        for (Map.Entry<String, RemoteCacheManager> entry : remotecms.entrySet()) {
            RemoteCache remote = entry.getValue().getCache();
            for (int i = 0; i < 6; ++i) {
                String prefix = String.format("%s-t%d-", entry.getKey(), i);
                Put call = new Put(prefix, barrier, (RemoteCache<String, String>)remote, this.servers);
                futures.add(EXEC.submit(call));
            }
        }
        ClusterClientEventStressTest.barrierAwait(barrier);
        ClusterClientEventStressTest.barrierAwait(barrier);
        for (Future future : futures) {
            ClusterClientEventStressTest.futureGet(future);
        }
    }

    int countEvents(List<ClientEntryListener> listeners) {
        Integer count = listeners.stream().reduce(0, (acc, l) -> acc + l.count.get(), (x, y) -> x + y);
        log.infof("Event count is %d, target %d%n", (Object)count, (Object)6000);
        return count;
    }

    static List<String> generateKeys(String prefix, List<HotRodServer> servers, ThreadLocalRandom r) {
        ArrayList<String> keys = new ArrayList<String>();
        List combos = Arrays.asList({servers.get(0), servers.get(1)}, {servers.get(1), servers.get(0)}, {servers.get(1), servers.get(2)}, {servers.get(2), servers.get(1)}, {servers.get(2), servers.get(0)}, {servers.get(0), servers.get(2)});
        for (int i = 0; i < 1000; ++i) {
            HotRodServer[] owners = (HotRodServer[])combos.get(i % combos.size());
            String key = ClusterClientEventStressTest.getStringKey(prefix, owners, r);
            if (ALL_KEYS.contains(key)) {
                throw new AssertionError((Object)("Key already in use: " + key));
            }
            keys.add(key);
            ALL_KEYS.add(key);
        }
        return keys;
    }

    static String getStringKey(String prefix, HotRodServer[] owners, ThreadLocalRandom r) {
        Integer dummyInt;
        String dummyKey;
        byte[] dummy;
        Cache firstOwnerCache = owners[0].getCacheManager().getCache();
        Cache otherOwnerCache = owners[1].getCacheManager().getCache();
        int attemptsLeft = 1000;
        do {
            dummyInt = r.nextInt();
        } while ((!DistributionTestHelper.isFirstOwner((Cache)firstOwnerCache, (Object)(dummy = HotRodClientTestingUtil.toBytes(dummyKey = prefix + dummyInt))) || !DistributionTestHelper.isOwner((Cache)otherOwnerCache, (Object)dummy)) && --attemptsLeft >= 0);
        if (attemptsLeft < 0) {
            throw new IllegalStateException("Could not find any key owned by " + firstOwnerCache + " as primary owner and " + otherOwnerCache + " as secondary owner");
        }
        log.infof("Integer key %s hashes to primary [cluster=%s,hotrod=%s] and secondary [cluster=%s,hotrod=%s]", new Object[]{dummyKey, firstOwnerCache.getCacheManager().getAddress(), owners[0].getAddress(), otherOwnerCache.getCacheManager().getAddress(), owners[1].getAddress()});
        return dummyKey;
    }

    static int barrierAwait(CyclicBarrier barrier) {
        try {
            return barrier.await();
        }
        catch (InterruptedException | BrokenBarrierException e) {
            throw new AssertionError((Object)e);
        }
    }

    static <T> T futureGet(Future<T> future) {
        try {
            return future.get();
        }
        catch (InterruptedException | ExecutionException e) {
            throw new AssertionError((Object)e);
        }
    }

    @ClientListener
    static class ClientEntryListener {
        final AtomicInteger count = new AtomicInteger();

        ClientEntryListener() {
        }

        @ClientCacheEntryCreated
        @ClientCacheEntryModified
        public void handleClientEvent(ClientEvent event) {
            int countSoFar = this.count.incrementAndGet();
            if (countSoFar % 100 == 0) {
                log.debugf("Reached %s", countSoFar);
            }
        }
    }

    static class Put
    implements Callable<Void> {
        static final ThreadLocalRandom R = ThreadLocalRandom.current();
        final CyclicBarrier barrier;
        final RemoteCache<String, String> remote;
        final List<HotRodServer> servers;
        final List<String> keys;

        public Put(String prefix, CyclicBarrier barrier, RemoteCache<String, String> remote, List<HotRodServer> servers) {
            this.barrier = barrier;
            this.remote = remote;
            this.servers = servers;
            this.keys = ClusterClientEventStressTest.generateKeys(prefix, servers, R);
            Collections.shuffle(this.keys);
        }

        @Override
        public Void call() throws Exception {
            ClusterClientEventStressTest.barrierAwait(this.barrier);
            try {
                for (int i = 0; i < 1000; ++i) {
                    String value = this.keys.get(i);
                    this.remote.put((Object)value, (Object)value);
                    if (!value.startsWith("c0-t0") || i != 500) continue;
                    listener = new ClientEntryListener();
                    this.remote.addClientListener((Object)listener);
                }
                Void void_ = null;
                return void_;
            }
            finally {
                ClusterClientEventStressTest.barrierAwait(this.barrier);
            }
        }
    }
}

