/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.client.hotrod.retry;

import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.net.SocketAddress;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.IntStream;
import org.infinispan.client.hotrod.DataFormat;
import org.infinispan.client.hotrod.ProtocolVersion;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryCreated;
import org.infinispan.client.hotrod.annotation.ClientListener;
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
import org.infinispan.client.hotrod.event.ClientCacheEntryCreatedEvent;
import org.infinispan.client.hotrod.event.impl.AbstractClientEvent;
import org.infinispan.client.hotrod.exceptions.TransportException;
import org.infinispan.client.hotrod.impl.protocol.Codec;
import org.infinispan.client.hotrod.impl.protocol.Codec25;
import org.infinispan.client.hotrod.test.MultiHotRodServersTest;
import org.infinispan.commons.configuration.ClassAllowList;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.server.hotrod.test.HotRodTestingUtil;
import org.testng.annotations.Test;

@Test(groups={"functional"}, testName="client.hotrod.retry.ClientListenerRetryTest")
public class ClientListenerRetryTest
extends MultiHotRodServersTest {
    private final AtomicInteger counter = new AtomicInteger(0);
    private final FailureInducingCodec failureInducingCodec = new FailureInducingCodec();

    protected void createCacheManagers() throws Throwable {
        this.createHotRodServers(2, this.getCacheConfiguration());
        this.clients.forEach(rcm -> rcm.getChannelFactory().setNegotiatedCodec((Codec)this.failureInducingCodec));
    }

    @Override
    protected ConfigurationBuilder createHotRodClientConfigurationBuilder(String host, int serverPort) {
        return super.createHotRodClientConfigurationBuilder(host, serverPort).version(ProtocolVersion.PROTOCOL_VERSION_25).socketTimeout(60000);
    }

    private org.infinispan.configuration.cache.ConfigurationBuilder getCacheConfiguration() {
        org.infinispan.configuration.cache.ConfigurationBuilder builder = ClientListenerRetryTest.getDefaultClusteredCacheConfig((CacheMode)CacheMode.DIST_SYNC, (boolean)false);
        return HotRodTestingUtil.hotRodCacheConfiguration((org.infinispan.configuration.cache.ConfigurationBuilder)builder);
    }

    @Test
    public void testConnectionDrop() {
        RemoteCache remoteCache = this.client(0).getCache();
        Listener listener = new Listener();
        remoteCache.addClientListener((Object)listener);
        this.assertListenerActive((RemoteCache<Integer, String>)remoteCache, listener);
        this.failureInducingCodec.induceFailure();
        this.addItems((RemoteCache<Integer, String>)remoteCache, 10);
        this.failureInducingCodec.resetFailure();
        this.assertListenerActive((RemoteCache<Integer, String>)remoteCache, listener);
    }

    private void addItems(RemoteCache<Integer, String> cache, int items) {
        IntStream.range(0, items).forEach(i -> cache.put((Object)this.counter.incrementAndGet(), (Object)"value"));
    }

    private void assertListenerActive(RemoteCache<Integer, String> cache, Listener listener) {
        int received = listener.getReceived();
        ClientListenerRetryTest.eventually(() -> {
            cache.put((Object)this.counter.incrementAndGet(), (Object)"value");
            return listener.getReceived() > received;
        });
    }

    @Override
    protected int maxRetries() {
        return 10;
    }

    private static class FailureInducingCodec
    extends Codec25 {
        private volatile boolean failure;
        private final IOException failWith = new IOException("Connection reset by peer");

        private FailureInducingCodec() {
        }

        public AbstractClientEvent readCacheEvent(ByteBuf buf, Function<byte[], DataFormat> listenerDataFormat, short eventTypeId, ClassAllowList allowList, SocketAddress serverAddress) {
            if (this.failure) {
                throw new TransportException((Throwable)this.failWith, serverAddress);
            }
            return super.readCacheEvent(buf, listenerDataFormat, eventTypeId, allowList, serverAddress);
        }

        private void induceFailure() {
            this.failure = true;
        }

        private void resetFailure() {
            this.failure = false;
        }
    }

    @ClientListener
    private static class Listener {
        private final AtomicInteger count = new AtomicInteger(0);

        private Listener() {
        }

        @ClientCacheEntryCreated
        public void handleCreatedEvent(ClientCacheEntryCreatedEvent<?> e) {
            this.count.incrementAndGet();
        }

        int getReceived() {
            return this.count.intValue();
        }
    }
}

