/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.client.hotrod.impl.transport.netty;

import io.netty.channel.Channel;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.assertj.core.api.Assertions;
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
import org.infinispan.client.hotrod.exceptions.TransportException;
import org.infinispan.client.hotrod.impl.transport.netty.ChannelFactory;
import org.infinispan.client.hotrod.impl.transport.netty.ChannelOperation;
import org.infinispan.client.hotrod.impl.transport.netty.ChannelPool;
import org.infinispan.client.hotrod.impl.transport.netty.ChannelRecord;
import org.infinispan.client.hotrod.impl.transport.netty.CrashMidOperationTest;
import org.infinispan.client.hotrod.impl.transport.netty.HeaderDecoder;
import org.infinispan.client.hotrod.retry.AbstractRetryTest;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.server.hotrod.test.HotRodTestingUtil;
import org.mockito.Mockito;
import org.testng.annotations.Test;

@Test(testName="client.hotrod.impl.transport.netty.ChannelCloseAndInactiveTest", groups={"functional"})
public class ChannelCloseAndInactiveTest
extends AbstractRetryTest {
    public ChannelCloseAndInactiveTest() {
        this.nbrOfServers = 1;
    }

    @Override
    protected org.infinispan.configuration.cache.ConfigurationBuilder getCacheConfig() {
        org.infinispan.configuration.cache.ConfigurationBuilder builder = HotRodTestingUtil.hotRodCacheConfiguration((org.infinispan.configuration.cache.ConfigurationBuilder)ChannelCloseAndInactiveTest.getDefaultClusteredCacheConfig((CacheMode)CacheMode.DIST_SYNC, (boolean)false));
        builder.clustering().hash().numOwners(1);
        return builder;
    }

    @Override
    protected void amendRemoteCacheManagerConfiguration(ConfigurationBuilder builder) {
        builder.maxRetries(1);
        builder.connectionPool().maxActive(2);
    }

    public void testKillAndInactiveDifferentChannelsConcurrently() throws Exception {
        ChannelFactory channelFactory = this.remoteCacheManager.getChannelFactory();
        InetSocketAddress address = InetSocketAddress.createUnresolved(this.hotRodServer1.getHost(), this.hotRodServer1.getPort());
        CountDownLatch operationLatch = new CountDownLatch(1);
        AtomicReference<Channel> firstChannelRef = new AtomicReference<Channel>();
        AtomicReference<Channel> secondChannelRef = new AtomicReference<Channel>();
        CrashMidOperationTest.NoopRetryingOperation firstOperation = new CrashMidOperationTest.NoopRetryingOperation(0, channelFactory, this.remoteCacheManager.getConfiguration(), firstChannelRef, operationLatch);
        this.fork(() -> (CrashMidOperationTest.NoopRetryingOperation)channelFactory.fetchChannelAndInvoke((SocketAddress)address, (ChannelOperation)firstOperation));
        ChannelCloseAndInactiveTest.eventually(() -> firstChannelRef.get() != null);
        Channel firstChannel = firstChannelRef.get();
        HeaderDecoder firstDecoder = (HeaderDecoder)firstChannel.pipeline().get("header-decoder");
        Assertions.assertThat((int)firstDecoder.registeredOperations()).isOne();
        CrashMidOperationTest.NoopRetryingOperation secondOperation = new CrashMidOperationTest.NoopRetryingOperation(1, channelFactory, this.remoteCacheManager.getConfiguration(), secondChannelRef, operationLatch);
        this.fork(() -> (CrashMidOperationTest.NoopRetryingOperation)channelFactory.fetchChannelAndInvoke((SocketAddress)address, (ChannelOperation)secondOperation));
        ChannelCloseAndInactiveTest.eventually(() -> secondChannelRef.get() != null);
        Channel secondChannel = secondChannelRef.get();
        Assertions.assertThat((int)firstDecoder.registeredOperations()).isOne();
        Channel spyChannel = (Channel)Mockito.spy((Object)secondChannel);
        CountDownLatch closeSecondLatch = new CountDownLatch(1);
        ((Channel)Mockito.doAnswer(ivk -> {
            closeSecondLatch.await(10L, TimeUnit.SECONDS);
            return ivk.callRealMethod();
        }).when((Object)spyChannel)).close();
        ChannelPool pool = ChannelRecord.of((Channel)firstChannel).channelPool();
        Assertions.assertThat((int)pool.getIdle()).isZero();
        Assertions.assertThat((int)pool.getConnected()).isEqualTo(2);
        ChannelRecord.of((Channel)secondChannel).release(spyChannel);
        Assertions.assertThat((int)pool.getIdle()).isOne();
        Assertions.assertThat((boolean)secondOperation.isDone()).isFalse();
        Future exceptionCaught = this.fork(() -> secondOperation.exceptionCaught(secondChannel, (Throwable)new TransportException("oops", (SocketAddress)address)));
        firstChannel.close().awaitUninterruptibly();
        Assertions.assertThat((boolean)firstChannel.isActive()).isFalse();
        closeSecondLatch.countDown();
        ChannelCloseAndInactiveTest.eventually(() -> pool.getConnected() > 0);
        ChannelCloseAndInactiveTest.eventually(exceptionCaught::isDone);
        exceptionCaught.get(10L, TimeUnit.SECONDS);
        ChannelCloseAndInactiveTest.eventually(() -> firstOperation.isDone());
        ChannelCloseAndInactiveTest.eventually(() -> secondOperation.isDone());
        operationLatch.countDown();
        secondOperation.get(10L, TimeUnit.SECONDS);
        firstOperation.get(10L, TimeUnit.SECONDS);
    }
}

