/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.client.hotrod.counter;

import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.infinispan.client.hotrod.counter.AbstractCounterTest;
import org.infinispan.client.hotrod.counter.impl.NotificationManager;
import org.infinispan.client.hotrod.counter.impl.RemoteCounterManager;
import org.infinispan.commons.test.ExceptionRunnable;
import org.infinispan.counter.api.CounterEvent;
import org.infinispan.counter.api.CounterListener;
import org.infinispan.counter.api.Handle;
import org.infinispan.server.hotrod.HotRodServer;
import org.infinispan.server.hotrod.counter.impl.BaseCounterImplTest;
import org.infinispan.test.TestingUtil;
import org.testng.annotations.Test;

@Test(groups={"functional"})
public abstract class BaseCounterAPITest<T>
extends AbstractCounterTest {
    private static final CounterListener EXCEPTION_LISTENER = entry -> {
        throw new RuntimeException("induced");
    };

    public void testExceptionInListener(Method method) throws InterruptedException {
        String counterName = method.getName();
        T counter = this.defineAndCreateCounter(counterName, 0L);
        Handle<BaseCounterImplTest.EventLogger> handle1 = this.addListenerTo(counter, new BaseCounterImplTest.EventLogger());
        Handle<CounterListener> handleEx = this.addListenerTo(counter, EXCEPTION_LISTENER);
        Handle<BaseCounterImplTest.EventLogger> handle2 = this.addListenerTo(counter, new BaseCounterImplTest.EventLogger());
        this.add(counter, 1L, 1L);
        this.add(counter, -1L, 0L);
        this.add(counter, 10L, 10L);
        this.add(counter, 1L, 11L);
        this.add(counter, 2L, 13L);
        BaseCounterImplTest.assertNextValidEvent(handle1, (long)0L, (long)1L);
        BaseCounterImplTest.assertNextValidEvent(handle1, (long)1L, (long)0L);
        BaseCounterImplTest.assertNextValidEvent(handle1, (long)0L, (long)10L);
        BaseCounterImplTest.assertNextValidEvent(handle1, (long)10L, (long)11L);
        BaseCounterImplTest.assertNextValidEvent(handle1, (long)11L, (long)13L);
        BaseCounterImplTest.assertNextValidEvent(handle2, (long)0L, (long)1L);
        BaseCounterImplTest.assertNextValidEvent(handle2, (long)1L, (long)0L);
        BaseCounterImplTest.assertNextValidEvent(handle2, (long)0L, (long)10L);
        BaseCounterImplTest.assertNextValidEvent(handle2, (long)10L, (long)11L);
        BaseCounterImplTest.assertNextValidEvent(handle2, (long)11L, (long)13L);
        BaseCounterImplTest.assertNoEvents(handle1);
        BaseCounterImplTest.assertNoEvents(handle2);
        handle1.remove();
        handle2.remove();
        handleEx.remove();
    }

    public void testConcurrentListenerAddAndRemove(Method method) throws InterruptedException {
        String counterName = method.getName();
        this.defineAndCreateCounter(counterName, 1L);
        List<T> counters = this.getCounters(counterName);
        List<IncrementTask> taskList = counters.stream().map(x$0 -> new IncrementTask(x$0)).collect(Collectors.toList());
        List<Future> futureTaskList = taskList.stream().map(arg_0 -> ((BaseCounterAPITest)this).fork(arg_0)).collect(Collectors.toList());
        T counter = counters.get(0);
        Handle<BaseCounterImplTest.EventLogger> handle = this.addListenerTo(counter, new BaseCounterImplTest.EventLogger());
        this.eventually(() -> ((BaseCounterImplTest.EventLogger)handle.getCounterListener()).size() > 5);
        handle.remove();
        taskList.forEach(IncrementTask::stop);
        futureTaskList.forEach(this::awaitFuture);
        this.drainAndCheckEvents(handle);
        BaseCounterImplTest.assertNoEvents(handle);
        this.increment(counter);
        BaseCounterImplTest.assertNoEvents(handle);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testListenerFailover(Method method) throws Exception {
        String counterName = method.getName();
        T counter = this.defineAndCreateCounter(counterName, 2L);
        Handle<BaseCounterImplTest.EventLogger> handle = this.addListenerTo(counter, new BaseCounterImplTest.EventLogger());
        this.add(counter, 1L, 3L);
        BaseCounterImplTest.assertNextValidEvent(handle, (long)2L, (long)3L);
        InetSocketAddress eventAddress = this.findEventServer();
        int killIndex = -1;
        for (int i = 0; i < this.servers.size(); ++i) {
            if (((HotRodServer)this.servers.get(i)).getAddress().getPort() != eventAddress.getPort()) continue;
            killIndex = i;
            break;
        }
        assert (killIndex != -1);
        try {
            this.killServer(killIndex);
            this.add(counter, 1L, 4L);
            this.add(counter, 1L, 5L);
            CounterEvent event = ((BaseCounterImplTest.EventLogger)handle.getCounterListener()).waitingPoll();
            if (event.getOldValue() == 3L) {
                BaseCounterImplTest.assertValidEvent((CounterEvent)event, (long)3L, (long)4L);
                BaseCounterImplTest.assertNextValidEvent(handle, (long)4L, (long)5L);
            } else {
                BaseCounterImplTest.assertValidEvent((CounterEvent)event, (long)4L, (long)5L);
            }
            handle.remove();
        }
        finally {
            TestingUtil.waitForNoRebalance((Collection)this.caches("org.infinispan.COUNTER"));
        }
    }

    abstract void increment(T var1);

    abstract void add(T var1, long var2, long var4);

    abstract T defineAndCreateCounter(String var1, long var2);

    abstract <L extends CounterListener> Handle<L> addListenerTo(T var1, L var2);

    abstract List<T> getCounters(String var1);

    private InetSocketAddress findEventServer() {
        Object notificationManager = TestingUtil.extractField(RemoteCounterManager.class, (Object)this.counterManager(), (String)"notificationManager");
        Object dispatcher = TestingUtil.extractField(NotificationManager.class, (Object)notificationManager, (String)"dispatcher");
        SocketAddress address = (SocketAddress)TestingUtil.extractField((Object)dispatcher, (String)"address");
        return (InetSocketAddress)address;
    }

    private void awaitFuture(Future<?> future) {
        try {
            future.get();
        }
        catch (InterruptedException interruptedException) {
        }
        catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    private void drainAndCheckEvents(Handle<BaseCounterImplTest.EventLogger> handle) throws InterruptedException {
        CounterEvent event = ((BaseCounterImplTest.EventLogger)handle.getCounterListener()).waitingPoll();
        this.log.tracef("First Event=%s", (Object)event);
        long preValue = event.getOldValue();
        BaseCounterImplTest.assertValidEvent((CounterEvent)event, (long)preValue++, (long)preValue);
        while ((event = ((BaseCounterImplTest.EventLogger)handle.getCounterListener()).poll()) != null) {
            this.log.tracef("Next Event=%s", (Object)event);
            BaseCounterImplTest.assertValidEvent((CounterEvent)event, (long)preValue++, (long)preValue);
        }
    }

    private class IncrementTask
    implements ExceptionRunnable {
        private final T counter;
        private volatile boolean run;

        private IncrementTask(T counter) {
            this.counter = counter;
            this.run = true;
        }

        public void run() {
            while (this.run) {
                BaseCounterAPITest.this.increment(this.counter);
            }
        }

        void stop() {
            this.run = false;
        }
    }
}

