/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.client.hotrod;

import java.util.ArrayList;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.infinispan.client.hotrod.Flag;
import org.infinispan.client.hotrod.MetadataValue;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.test.MultiHotRodServersTest;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.server.hotrod.test.HotRodTestingUtil;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups={"stress"}, testName="client.hotrod.ReplaceWithVersionConcurrencyTest", timeOut=900000L)
public class ReplaceWithVersionConcurrencyTest
extends MultiHotRodServersTest {
    static final AtomicInteger globalCounter = new AtomicInteger();
    static final String KEY = "A";
    static final int NUM_THREADS = 20;
    static final int OPS_PER_THREAD = 200;
    static final int TIMEOUT_MINUTES = 5;

    protected void createCacheManagers() throws Throwable {
        ConfigurationBuilder builder = HotRodTestingUtil.hotRodCacheConfiguration((ConfigurationBuilder)ReplaceWithVersionConcurrencyTest.getDefaultClusteredCacheConfig((CacheMode)CacheMode.REPL_SYNC, (boolean)false));
        this.createHotRodServers(2, builder);
    }

    public void testKeepingCounterWithReplaceWithVersion() throws Exception {
        RemoteCache cache = this.client(0).getCache();
        AssertJUnit.assertNull((Object)cache.get((Object)KEY));
        long timeSpent = System.currentTimeMillis();
        ArrayList<Future<Integer>> results = new ArrayList<Future<Integer>>(20);
        ExecutorService executor = Executors.newFixedThreadPool(20, this.getTestThreadFactory("Worker"));
        for (int i = 0; i < 20; ++i) {
            CounterUpdater app = new CounterUpdater((RemoteCache<String, Integer>)cache, KEY, 200);
            Future<Integer> result = executor.submit(app);
            results.add(result);
        }
        executor.shutdown();
        executor.awaitTermination(5L, TimeUnit.MINUTES);
        timeSpent = System.currentTimeMillis() - timeSpent;
        int actual = (Integer)cache.get((Object)KEY);
        int expected = 0;
        for (Future future : results) {
            expected += ((Integer)future.get()).intValue();
        }
        log.info((Object)("Time spent: " + (double)timeSpent / 1000.0 + " secs."));
        AssertJUnit.assertEquals((int)expected, (int)actual);
    }

    static class CounterUpdater
    implements Callable<Integer> {
        static final Log log = LogFactory.getLog(CounterUpdater.class);
        final RemoteCache<String, Integer> cache;
        final String key;
        final int limit;

        CounterUpdater(RemoteCache<String, Integer> cache, String key, int limit) {
            this.cache = cache;
            this.key = key;
            this.limit = limit;
        }

        @Override
        public Integer call() throws Exception {
            int counter = 0;
            log.info((Object)"Start to count.");
            for (int i = 0; i < this.limit; ++i) {
                this.incrementCounter();
                ++counter;
            }
            log.info((Object)("Counted " + counter));
            return counter;
        }

        private void incrementCounter() {
            long version;
            int val;
            MetadataValue versioned;
            while (true) {
                if ((versioned = this.cache.getWithMetadata((Object)this.key)) == null) {
                    if (this.cache.withFlags(new Flag[]{Flag.FORCE_RETURN_VALUE}).putIfAbsent((Object)this.key, (Object)1) != null) continue;
                    log.info((Object)("count=" + globalCounter.getAndIncrement() + ",prev=0,new=1 (first-put)"));
                    return;
                }
                val = (Integer)versioned.getValue() + 1;
                version = versioned.getVersion();
                if (this.cache.replaceWithVersion((Object)this.key, (Object)val, version)) break;
            }
            int count = globalCounter.getAndIncrement();
            log.info((Object)("count=" + count + ",prev=" + versioned.getValue() + ",new=" + val + ",prev-version=" + version));
        }
    }
}

