/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.client.hotrod;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.TimeUnit;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.StreamingRemoteCache;
import org.infinispan.client.hotrod.VersionedMetadata;
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
import org.infinispan.client.hotrod.test.HotRodClientTestingUtil;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.server.hotrod.HotRodServer;
import org.infinispan.server.hotrod.test.HotRodTestingUtil;
import org.infinispan.test.SingleCacheManagerTest;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;

@Test(testName="client.hotrod.StreamingOpsTest", groups={"functional"})
public class StreamingOpsTest
extends SingleCacheManagerTest {
    private static final Log log = LogFactory.getLog(StreamingOpsTest.class);
    private static final String CACHE_NAME = "theCache";
    private static final int V1_SIZE = 1000000;
    private static final int V2_SIZE = 500000;
    RemoteCache<String, byte[]> remoteCache;
    StreamingRemoteCache<String> streamingRemoteCache;
    private RemoteCacheManager remoteCacheManager;
    protected HotRodServer hotrodServer;

    protected EmbeddedCacheManager createCacheManager() throws Exception {
        org.infinispan.configuration.cache.ConfigurationBuilder builder = HotRodTestingUtil.hotRodCacheConfiguration((org.infinispan.configuration.cache.ConfigurationBuilder)this.getDefaultStandaloneCacheConfig(false));
        EmbeddedCacheManager cm = TestCacheManagerFactory.createCacheManager((org.infinispan.configuration.cache.ConfigurationBuilder)HotRodTestingUtil.hotRodCacheConfiguration());
        cm.defineConfiguration(CACHE_NAME, builder.build());
        cm.getCache(CACHE_NAME);
        return cm;
    }

    protected void setup() throws Exception {
        super.setup();
        this.hotrodServer = HotRodClientTestingUtil.startHotRodServer(this.cacheManager);
        log.info((Object)("Started server on port: " + this.hotrodServer.getPort()));
        this.remoteCacheManager = this.getRemoteCacheManager();
        this.remoteCache = this.remoteCacheManager.getCache(CACHE_NAME);
        this.streamingRemoteCache = this.remoteCache.streaming();
    }

    protected RemoteCacheManager getRemoteCacheManager() {
        ConfigurationBuilder clientBuilder = new ConfigurationBuilder();
        clientBuilder.addServer().host("localhost").port(this.hotrodServer.getPort());
        return new RemoteCacheManager(clientBuilder.build());
    }

    @AfterClass
    public void testDestroyRemoteCacheFactory() {
        HotRodClientTestingUtil.killRemoteCacheManager(this.remoteCacheManager);
        HotRodClientTestingUtil.killServers(this.hotrodServer);
    }

    private void consumeAndCloseStream(InputStream is) throws Exception {
        if (is != null) {
            try {
                while (is.read() >= 0) {
                }
            }
            finally {
                is.close();
            }
        }
    }

    private void writeDataToStream(OutputStream os, int length) throws Exception {
        for (int i = 0; i < length; ++i) {
            os.write(i % 256);
        }
    }

    public void testPutGetStream() throws Exception {
        OutputStream k1os = this.streamingRemoteCache.put((Object)"k1");
        this.writeDataToStream(k1os, 1000000);
        k1os.close();
        InputStream k1is = this.streamingRemoteCache.get((Object)"k1");
        int count = this.readAndCheckDataFromStream(k1is);
        AssertJUnit.assertEquals((int)1000000, (int)count);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private int readAndCheckDataFromStream(InputStream k1is) throws IOException {
        int count = 0;
        try {
            int b = k1is.read();
            while (b >= 0) {
                AssertJUnit.assertEquals((int)(count % 256), (int)b);
                b = k1is.read();
                ++count;
            }
        }
        finally {
            k1is.close();
        }
        return count;
    }

    public void testGetStreamWithMetadata() throws Exception {
        InputStream k1is = this.streamingRemoteCache.get((Object)"k1");
        AssertJUnit.assertNull((String)"expected null but received a stream", (Object)k1is);
        this.consumeAndCloseStream(k1is);
        OutputStream k1os = this.streamingRemoteCache.put((Object)"k1");
        this.writeDataToStream(k1os, 1000000);
        k1os.close();
        k1is = this.streamingRemoteCache.get((Object)"k1");
        AssertJUnit.assertNotNull((String)"expected a stream but received null", (Object)k1is);
        VersionedMetadata k1metadata = (VersionedMetadata)k1is;
        AssertJUnit.assertEquals((int)-1, (int)k1metadata.getLifespan());
        AssertJUnit.assertEquals((int)-1, (int)k1metadata.getMaxIdle());
        this.consumeAndCloseStream(k1is);
        k1os = this.streamingRemoteCache.put((Object)"k1", 5L, TimeUnit.MINUTES);
        this.writeDataToStream(k1os, 1000000);
        k1os.close();
        k1is = this.streamingRemoteCache.get((Object)"k1");
        AssertJUnit.assertNotNull((String)"expected a stream but received null", (Object)k1is);
        k1metadata = (VersionedMetadata)k1is;
        AssertJUnit.assertEquals((long)TimeUnit.MINUTES.toSeconds(5L), (long)k1metadata.getLifespan());
        AssertJUnit.assertEquals((int)-1, (int)k1metadata.getMaxIdle());
        this.consumeAndCloseStream(k1is);
        k1os = this.streamingRemoteCache.put((Object)"k1", 5L, TimeUnit.MINUTES, 3L, TimeUnit.MINUTES);
        this.writeDataToStream(k1os, 1000000);
        k1os.close();
        k1is = this.streamingRemoteCache.get((Object)"k1");
        AssertJUnit.assertNotNull((String)"expected a stream but received null", (Object)k1is);
        k1metadata = (VersionedMetadata)k1is;
        AssertJUnit.assertEquals((long)TimeUnit.MINUTES.toSeconds(5L), (long)k1metadata.getLifespan());
        AssertJUnit.assertEquals((long)TimeUnit.MINUTES.toSeconds(3L), (long)k1metadata.getMaxIdle());
        this.consumeAndCloseStream(k1is);
    }

    public void testPutIfAbsentStream() throws Exception {
        InputStream k1is = this.streamingRemoteCache.get((Object)"k1");
        AssertJUnit.assertNull((String)"expected null but received a stream", (Object)k1is);
        this.consumeAndCloseStream(k1is);
        OutputStream k1os = this.streamingRemoteCache.putIfAbsent((Object)"k1");
        this.writeDataToStream(k1os, 1000000);
        k1os.close();
        k1is = this.streamingRemoteCache.get((Object)"k1");
        AssertJUnit.assertEquals((int)1000000, (int)this.readAndCheckDataFromStream(k1is));
        k1os = this.streamingRemoteCache.putIfAbsent((Object)"k1");
        this.writeDataToStream(k1os, 500000);
        k1os.close();
        k1is = this.streamingRemoteCache.get((Object)"k1");
        AssertJUnit.assertEquals((int)1000000, (int)this.readAndCheckDataFromStream(k1is));
    }

    public void testReplaceStream() throws Exception {
        OutputStream k1os = this.streamingRemoteCache.putIfAbsent((Object)"k1");
        this.writeDataToStream(k1os, 1000000);
        k1os.close();
        InputStream k1is = this.streamingRemoteCache.get((Object)"k1");
        AssertJUnit.assertEquals((int)1000000, (int)this.readAndCheckDataFromStream(k1is));
        long version = ((VersionedMetadata)k1is).getVersion();
        AssertJUnit.assertTrue((String)("Expected a non-zero version: " + version), (version > 0L ? 1 : 0) != 0);
        k1os = this.streamingRemoteCache.replaceWithVersion((Object)"k1", version + 1L);
        this.writeDataToStream(k1os, 500000);
        k1os.close();
        k1is = this.streamingRemoteCache.get((Object)"k1");
        AssertJUnit.assertEquals((int)1000000, (int)this.readAndCheckDataFromStream(k1is));
        k1os = this.streamingRemoteCache.replaceWithVersion((Object)"k1", version);
        this.writeDataToStream(k1os, 500000);
        k1os.close();
        k1is = this.streamingRemoteCache.get((Object)"k1");
        AssertJUnit.assertEquals((int)500000, (int)this.readAndCheckDataFromStream(k1is));
    }
}

