/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.client.hotrod.retry;

import io.netty.channel.Channel;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.assertj.core.api.Assertions;
import org.infinispan.AdvancedCache;
import org.infinispan.client.hotrod.ProtocolVersion;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryModified;
import org.infinispan.client.hotrod.annotation.ClientCacheFailover;
import org.infinispan.client.hotrod.annotation.ClientListener;
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
import org.infinispan.client.hotrod.event.ClientCacheEntryModifiedEvent;
import org.infinispan.client.hotrod.event.ClientCacheFailoverEvent;
import org.infinispan.client.hotrod.impl.protocol.Codec;
import org.infinispan.client.hotrod.impl.protocol.Codec25;
import org.infinispan.client.hotrod.impl.transport.netty.ChannelOperation;
import org.infinispan.client.hotrod.impl.transport.netty.ChannelRecord;
import org.infinispan.client.hotrod.retry.AbstractRetryTest;
import org.infinispan.client.hotrod.test.NoopChannelOperation;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.context.InvocationContext;
import org.infinispan.interceptors.AsyncInterceptor;
import org.infinispan.interceptors.DDAsyncInterceptor;
import org.infinispan.server.hotrod.test.HotRodTestingUtil;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CleanupAfterMethod;
import org.infinispan.util.concurrent.AggregateCompletionStage;
import org.infinispan.util.concurrent.CompletionStages;
import org.testng.annotations.Test;

@CleanupAfterMethod
@Test(groups={"functional"}, testName="client.hotrod.retry.ClientListenerFailoverBusyTest")
public class ClientListenerFailoverBusyTest
extends AbstractRetryTest {
    private static final int MAX_PENDING_REQUESTS = 10;

    public ClientListenerFailoverBusyTest() {
        this.nbrOfServers = 1;
    }

    @Override
    protected org.infinispan.configuration.cache.ConfigurationBuilder getCacheConfig() {
        org.infinispan.configuration.cache.ConfigurationBuilder builder = ClientListenerFailoverBusyTest.getDefaultClusteredCacheConfig((CacheMode)CacheMode.DIST_SYNC, (boolean)false);
        return HotRodTestingUtil.hotRodCacheConfiguration((org.infinispan.configuration.cache.ConfigurationBuilder)builder);
    }

    @Override
    protected RemoteCacheManager createClient() {
        RemoteCacheManager rcm = super.createClient();
        rcm.getChannelFactory().setNegotiatedCodec((Codec)new Codec25());
        return rcm;
    }

    @Override
    protected void amendRemoteCacheManagerConfiguration(ConfigurationBuilder builder) {
        builder.version(ProtocolVersion.PROTOCOL_VERSION_25).connectionPool().maxActive(2).maxPendingRequests(11);
    }

    public void testWithASingleOperation() throws Exception {
        this.testListenerWithSlowServer(1);
    }

    public void testWithMultipleOperations() throws Exception {
        this.testListenerWithSlowServer(10);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testListenerWithSlowServer(int numberOfOperations) throws Exception {
        AdvancedCache<?, ?> cache = this.cacheToHit(1);
        InetSocketAddress address = InetSocketAddress.createUnresolved(this.hotRodServer1.getHost(), this.hotRodServer1.getPort());
        Channel channel = (Channel)((NoopChannelOperation)this.channelFactory.fetchChannelAndInvoke((SocketAddress)address, (ChannelOperation)new NoopChannelOperation())).get(10L, TimeUnit.SECONDS);
        ChannelRecord.of((Channel)channel).release(channel);
        Listener listener = new Listener();
        this.remoteCache.addClientListener((Object)listener);
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(numberOfOperations);
        CyclicBarrier barrier = new CyclicBarrier(numberOfOperations + 1);
        DelayedInterceptor interceptor = new DelayedInterceptor(barrier, executor);
        TestingUtil.extractInterceptorChain(cache).addInterceptor((AsyncInterceptor)interceptor, 1);
        AggregateCompletionStage operations = CompletionStages.aggregateCompletionStage();
        for (int i = 0; i < numberOfOperations; ++i) {
            operations.dependsOn((CompletionStage)this.remoteCache.putAsync((Object)1, (Object)("v" + i)));
        }
        int eventsBeforeFailover = listener.getReceived();
        Assertions.assertThat((int)eventsBeforeFailover).isZero();
        Assertions.assertThat((boolean)listener.didFailover()).isFalse();
        channel.close().awaitUninterruptibly();
        ClientListenerFailoverBusyTest.eventually(() -> this.channelFactory.getNumActive() == 1);
        ClientListenerFailoverBusyTest.eventually(listener::didFailover);
        try {
            barrier.await(10L, TimeUnit.SECONDS);
            operations.freeze().toCompletableFuture().get(10L, TimeUnit.SECONDS);
            if (numberOfOperations > 1) {
                ClientListenerFailoverBusyTest.eventually(() -> String.format("Never got more events: %d of %d", eventsBeforeFailover, listener.getReceived()), () -> listener.getReceived() > eventsBeforeFailover);
            }
        }
        finally {
            TestingUtil.extractInterceptorChain(cache).removeInterceptor(DelayedInterceptor.class);
            executor.shutdown();
        }
    }

    @ClientListener
    private static class Listener {
        private final AtomicInteger count = new AtomicInteger(0);
        private final AtomicBoolean failover = new AtomicBoolean(false);

        private Listener() {
        }

        @ClientCacheEntryModified
        public void handleModifiedEvent(ClientCacheEntryModifiedEvent<?> ignore) {
            this.count.incrementAndGet();
        }

        @ClientCacheFailover
        public void handleFailoverEvent(ClientCacheFailoverEvent ignore) {
            this.failover.set(true);
        }

        int getReceived() {
            return this.count.intValue();
        }

        boolean didFailover() {
            return this.failover.get();
        }
    }

    public static class DelayedInterceptor
    extends DDAsyncInterceptor {
        private final CyclicBarrier barrier;
        private final ExecutorService executor;

        public DelayedInterceptor(CyclicBarrier barrier, ExecutorService executor) {
            this.barrier = barrier;
            this.executor = executor;
        }

        public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
            CompletableFuture cf = new CompletableFuture();
            this.executor.submit(() -> {
                try {
                    this.barrier.await();
                    cf.complete(super.visitPutKeyValueCommand(ctx, command));
                }
                catch (Throwable e) {
                    cf.completeExceptionally(e);
                }
            });
            return DelayedInterceptor.asyncValue(cf);
        }
    }
}

