/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.client.hotrod.impl.transport.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.util.concurrent.EventExecutor;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.Assertions;
import org.infinispan.client.hotrod.DataFormat;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.configuration.Configuration;
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
import org.infinispan.client.hotrod.impl.ClientTopology;
import org.infinispan.client.hotrod.impl.operations.RetryOnFailureOperation;
import org.infinispan.client.hotrod.impl.protocol.CodecHolder;
import org.infinispan.client.hotrod.impl.transport.netty.AcquireChannelOperation;
import org.infinispan.client.hotrod.impl.transport.netty.ChannelFactory;
import org.infinispan.client.hotrod.impl.transport.netty.ChannelInitializer;
import org.infinispan.client.hotrod.impl.transport.netty.ChannelOperation;
import org.infinispan.client.hotrod.impl.transport.netty.ChannelPool;
import org.infinispan.client.hotrod.impl.transport.netty.ChannelRecord;
import org.infinispan.client.hotrod.impl.transport.netty.HeaderDecoder;
import org.infinispan.client.hotrod.retry.AbstractRetryTest;
import org.infinispan.client.hotrod.test.HotRodClientTestingUtil;
import org.infinispan.client.hotrod.test.InternalRemoteCacheManager;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.server.hotrod.test.HotRodTestingUtil;
import org.infinispan.test.fwk.CheckPoint;
import org.infinispan.test.fwk.CleanupAfterMethod;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@CleanupAfterMethod
@Test(testName="client.hotrod.impl.transport.netty.CloseBeforeEnqueuingTest", groups={"functional"})
public class CloseBeforeEnqueuingTest
extends AbstractRetryTest {
    @Override
    protected org.infinispan.configuration.cache.ConfigurationBuilder getCacheConfig() {
        org.infinispan.configuration.cache.ConfigurationBuilder builder = HotRodTestingUtil.hotRodCacheConfiguration((org.infinispan.configuration.cache.ConfigurationBuilder)CloseBeforeEnqueuingTest.getDefaultClusteredCacheConfig((CacheMode)CacheMode.DIST_SYNC, (boolean)false));
        builder.clustering().hash().numOwners(1);
        return builder;
    }

    @Override
    protected RemoteCacheManager createRemoteCacheManager(int port) {
        ConfigurationBuilder builder = HotRodClientTestingUtil.newRemoteConfigurationBuilder();
        this.amendRemoteCacheManagerConfiguration(builder);
        builder.forceReturnValues(true).connectionTimeout(5).connectionPool().maxActive(1).addServer().host("127.0.0.1").port(port);
        Configuration configuration = builder.build();
        InternalRemoteCacheManager remoteCacheManager = new InternalRemoteCacheManager(configuration, new CustomChannelFactory(configuration));
        remoteCacheManager.start();
        return remoteCacheManager;
    }

    public void testClosingAndEnqueueing() throws Exception {
        ChannelFactory channelFactory = this.remoteCacheManager.getChannelFactory();
        InetSocketAddress address = InetSocketAddress.createUnresolved(this.hotRodServer1.getHost(), this.hotRodServer1.getPort());
        CountDownLatch operationLatch = new CountDownLatch(1);
        AtomicReference<Channel> channelRef = new AtomicReference<Channel>();
        NoopRetryingOperation firstOperation = new NoopRetryingOperation(0, channelFactory, this.remoteCacheManager.getConfiguration(), channelRef, operationLatch);
        this.fork(() -> (NoopRetryingOperation)channelFactory.fetchChannelAndInvoke((SocketAddress)address, (ChannelOperation)firstOperation));
        CloseBeforeEnqueuingTest.eventually(() -> channelRef.get() != null);
        Channel channel = channelRef.get();
        AssertJUnit.assertTrue((boolean)(channelFactory instanceof CustomChannelFactory));
        AtomicBoolean closedServer = new AtomicBoolean(false);
        ((CustomChannelFactory)channelFactory).setExecuteInstead(() -> {
            HotRodClientTestingUtil.killServers(this.hotRodServer1);
            CloseBeforeEnqueuingTest.eventually(() -> !channel.isActive());
            CloseBeforeEnqueuingTest.eventually(() -> channelFactory.getNumActive((SocketAddress)address) == 0);
            return !closedServer.compareAndSet(false, true);
        });
        operationLatch.countDown();
        NoopRetryingOperation secondOperation = new NoopRetryingOperation(0, channelFactory, this.remoteCacheManager.getConfiguration(), channelRef, null);
        this.fork(() -> (NoopRetryingOperation)channelFactory.fetchChannelAndInvoke((SocketAddress)address, (ChannelOperation)secondOperation));
        secondOperation.get(10L, TimeUnit.SECONDS);
    }

    public void testEnqueueAndReleasing() throws Exception {
        ChannelFactory channelFactory = this.remoteCacheManager.getChannelFactory();
        InetSocketAddress address = InetSocketAddress.createUnresolved(this.hotRodServer1.getHost(), this.hotRodServer1.getPort());
        CompletableFuture firstOp = new CompletableFuture();
        this.fork(() -> (AcquireChannelOperation)channelFactory.fetchChannelAndInvoke((SocketAddress)address, (ChannelOperation)new AcquireChannelOperation(firstOp)));
        Channel firstChannel = (Channel)firstOp.get(10L, TimeUnit.SECONDS);
        CheckPoint checkPoint = new CheckPoint();
        ControlledChannelOperation operation = new ControlledChannelOperation(channelFactory, this.remoteCacheManager.getConfiguration(), checkPoint);
        AtomicBoolean onlyOnce = new AtomicBoolean(true);
        ((CustomChannelFactory)channelFactory).setExecuteInstead(() -> {
            if (!onlyOnce.get()) {
                this.fork(() -> ChannelRecord.of((Channel)firstChannel).release(firstChannel));
                try {
                    checkPoint.trigger("before_execute_operation");
                    checkPoint.awaitStrict("invoke_execute_operation", 10L, TimeUnit.SECONDS);
                }
                catch (Exception exception) {
                    // empty catch block
                }
                return true;
            }
            return !onlyOnce.getAndSet(false);
        });
        Future invoking = this.fork(() -> (ControlledChannelOperation)channelFactory.fetchChannelAndInvoke((SocketAddress)address, (ChannelOperation)operation));
        checkPoint.awaitStrict("before-schedule-read-0", 10L, TimeUnit.SECONDS);
        checkPoint.awaitStrict("before_execute_operation", 10L, TimeUnit.SECONDS);
        checkPoint.trigger("proceed-schedule-read-0");
        CloseBeforeEnqueuingTest.eventually(() -> operation.isDone());
        operation.assertThatExecutedOnlyOnce();
        checkPoint.trigger("invoke_execute_operation");
        operation.assertThatExecutedOnlyOnce();
        CloseBeforeEnqueuingTest.eventually(invoking::isDone);
        Assertions.assertThat((boolean)operation.isDone()).isTrue();
        operation.assertThatExecutedOnlyOnce();
        Assertions.assertThat((int)channelFactory.getNumIdle((SocketAddress)address)).isOne();
    }

    private static class CustomChannelFactory
    extends ChannelFactory {
        private final Configuration configuration;
        private Supplier<Boolean> executeInstead;

        public CustomChannelFactory(Configuration cfg) {
            super(new CodecHolder(cfg.version().getCodec()));
            this.configuration = cfg;
            this.executeInstead = null;
        }

        public void setExecuteInstead(Supplier<Boolean> supplier) {
            this.executeInstead = supplier;
        }

        protected ChannelPool createChannelPool(Bootstrap bootstrap, ChannelInitializer channelInitializer, SocketAddress address) {
            int maxConnections = this.configuration.connectionPool().maxActive();
            if (maxConnections < 0) {
                maxConnections = Integer.MAX_VALUE;
            }
            return new ChannelPool((EventExecutor)bootstrap.config().group().next(), address, channelInitializer, this.configuration.connectionPool().exhaustedAction(), (arg_0, arg_1) -> ((CustomChannelFactory)this).onConnectionEvent(arg_0, arg_1), this.configuration.connectionPool().maxWait(), maxConnections, this.configuration.connectionPool().maxPendingRequests()){

                boolean executeDirectlyIfPossible(ChannelOperation callback, boolean checkCallback) {
                    if (executeInstead != null && !executeInstead.get().booleanValue()) {
                        return false;
                    }
                    return super.executeDirectlyIfPossible(callback, checkCallback);
                }
            };
        }
    }

    private static class NoopRetryingOperation
    extends RetryOnFailureOperation<Void> {
        private final AtomicReference<Channel> channelRef;
        private final CountDownLatch firstOp;
        private final int id;

        protected NoopRetryingOperation(int nbr, ChannelFactory channelFactory, Configuration cfg, AtomicReference<Channel> channelRef, CountDownLatch firstOp) {
            super((short)0, (short)0, null, channelFactory, null, new AtomicReference<ClientTopology>(new ClientTopology(-1, cfg.clientIntelligence())), 0, cfg, DataFormat.builder().build(), null);
            this.channelRef = channelRef;
            this.firstOp = firstOp;
            this.id = nbr;
        }

        public void acceptResponse(ByteBuf buf, short status, HeaderDecoder decoder) {
            this.complete(null);
        }

        protected void executeOperation(Channel channel) {
            if (this.channelRef.compareAndSet(null, channel)) {
                try {
                    this.scheduleRead(channel);
                    this.firstOp.await();
                }
                catch (InterruptedException e) {
                    this.completeExceptionally(e);
                }
                assert (this.isDone()) : "Should be done";
                return;
            }
            this.complete(null);
        }

        public String toString() {
            return "id = " + this.id;
        }
    }

    private static class ControlledChannelOperation
    extends RetryOnFailureOperation<Void> {
        private static final String BEFORE_SCHEDULE_READ = "before-schedule-read-";
        private static final String PROCEED_SCHEDULE_READ = "proceed-schedule-read-";
        private final AtomicInteger counter = new AtomicInteger(0);
        private final CheckPoint checkPoint;

        protected ControlledChannelOperation(ChannelFactory channelFactory, Configuration cfg, CheckPoint checkPoint) {
            super((short)0, (short)0, null, channelFactory, null, new AtomicReference<ClientTopology>(new ClientTopology(-1, cfg.clientIntelligence())), 0, cfg, DataFormat.builder().build(), null);
            this.checkPoint = checkPoint;
        }

        public void acceptResponse(ByteBuf buf, short status, HeaderDecoder decoder) {
            this.complete(null);
        }

        protected void executeOperation(Channel channel) {
            int execution = this.counter.getAndIncrement();
            this.checkPoint.trigger(BEFORE_SCHEDULE_READ + execution);
            try {
                this.checkPoint.awaitStrict(PROCEED_SCHEDULE_READ + execution, 10L, TimeUnit.SECONDS);
                this.scheduleRead(channel);
            }
            catch (Exception e) {
                this.completeExceptionally(e);
            }
            this.complete(null);
        }

        public void assertThatExecutedOnlyOnce() {
            ((AbstractIntegerAssert)Assertions.assertThat((int)this.counter.get()).withFailMessage("Operation executed more than once!", new Object[0])).isOne();
        }
    }
}

