/*
 * Decompiled with CFR 0.152.
 */
package org.jgroups.tests;

import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.jgroups.BlockEvent;
import org.jgroups.Channel;
import org.jgroups.ExtendedReceiverAdapter;
import org.jgroups.JChannel;
import org.jgroups.UnblockEvent;
import org.jgroups.View;
import org.jgroups.tests.ChannelTestBase;
import org.jgroups.util.Util;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;

@Test(groups={"flush"}, sequential=true)
public class ConcurrentFlushTest
extends ChannelTestBase {
    JChannel c1;
    JChannel c2;
    JChannel c3;

    @AfterMethod
    public void tearDown() throws Exception {
        Util.close(this.c3, this.c2, this.c1);
    }

    public boolean useBlocking() {
        return true;
    }

    @Test
    public void testTwoStartFlushesOnSameMemberWithTotalFlush() throws Exception {
        this.c1 = this.createChannel(true, 3);
        this.c1.connect("testTwoStartFlushes");
        this.c2 = this.createChannel(this.c1);
        this.c2.connect("testTwoStartFlushes");
        ConcurrentFlushTest.assertViewsReceived(this.c1, this.c2);
        boolean rc = this.startFlush(this.c1, true);
        assert (rc);
        rc = this.startFlush(this.c1, false);
        assert (rc);
        rc = this.startFlush(this.c1, 1, 500L, false);
        assert (!rc);
        this.stopFlush(this.c1);
        rc = this.startFlush(this.c1, true);
        assert (rc);
    }

    public void testTwoStartFlushesOnDifferentMembersWithTotalFlush() throws Exception {
        this.c1 = this.createChannel(true, 3);
        this.c1.connect("testTwoStartFlushesOnDifferentMembersWithTotalFlush");
        this.c2 = this.createChannel(this.c1);
        this.c2.connect("testTwoStartFlushesOnDifferentMembersWithTotalFlush");
        ConcurrentFlushTest.assertViewsReceived(this.c1, this.c2);
        boolean rc = this.startFlush(this.c1, false);
        assert (rc);
        rc = this.startFlush(this.c2, 1, 500L, false);
        assert (!rc);
        this.stopFlush(this.c1);
        rc = this.startFlush(this.c2, false);
        assert (rc);
        this.stopFlush(this.c2);
        rc = this.startFlush(this.c1, false);
        assert (rc);
        this.stopFlush(this.c2);
        rc = this.startFlush(this.c2, true);
        assert (rc);
    }

    @Test
    public void testConcurrentFlush() throws Exception {
        this.c1 = this.createChannel(true, 2);
        this.c1.connect("testConcurrentFlush");
        this.c2 = this.createChannel(this.c1);
        this.c2.connect("testConcurrentFlush");
        ConcurrentFlushTest.assertViewsReceived(this.c1, this.c2);
        final CountDownLatch startFlushLatch = new CountDownLatch(1);
        final CountDownLatch stopFlushLatch = new CountDownLatch(1);
        CountDownLatch flushStartReceived = new CountDownLatch(2);
        CountDownLatch flushStopReceived = new CountDownLatch(2);
        Thread t1 = new Thread(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void run() {
                try {
                    startFlushLatch.await();
                    boolean rc = Util.startFlush(ConcurrentFlushTest.this.c1);
                    System.out.println("t1: rc=" + rc);
                }
                catch (InterruptedException e) {
                    this.interrupt();
                }
                try {
                    stopFlushLatch.await();
                }
                catch (InterruptedException e) {
                    this.interrupt();
                }
                finally {
                    ConcurrentFlushTest.this.c1.stopFlush();
                }
            }
        };
        Thread t2 = new Thread(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void run() {
                try {
                    startFlushLatch.await();
                    boolean rc = Util.startFlush(ConcurrentFlushTest.this.c2);
                    System.out.println("t2: rc=" + rc);
                }
                catch (InterruptedException e) {
                    this.interrupt();
                }
                try {
                    stopFlushLatch.await();
                }
                catch (InterruptedException e) {
                    this.interrupt();
                }
                finally {
                    ConcurrentFlushTest.this.c2.stopFlush();
                }
            }
        };
        Listener l1 = new Listener("c1", this.c1, flushStartReceived, flushStopReceived);
        Listener l2 = new Listener("c2", this.c2, flushStartReceived, flushStopReceived);
        t1.start();
        t2.start();
        startFlushLatch.countDown();
        ConcurrentFlushTest.assertTrue(flushStartReceived.await(60L, TimeUnit.SECONDS));
        stopFlushLatch.countDown();
        t1.join();
        t2.join();
        ConcurrentFlushTest.assertTrue(flushStopReceived.await(60L, TimeUnit.SECONDS));
        assert (l1.blockReceived);
        assert (l1.unblockReceived);
        assert (l2.blockReceived);
        assert (l2.unblockReceived);
    }

    @Test
    public void testConcurrentFlushAndPartialFlush() throws Exception {
        this.c1 = this.createChannel(true, 3);
        this.c1.connect("testConcurrentFlushAndPartialFlush");
        this.c2 = this.createChannel(this.c1);
        this.c2.connect("testConcurrentFlushAndPartialFlush");
        this.c3 = this.createChannel(this.c1);
        this.c3.connect("testConcurrentFlushAndPartialFlush");
        ConcurrentFlushTest.assertViewsReceived(this.c1, this.c2, this.c3);
        final CountDownLatch startFlushLatch = new CountDownLatch(1);
        final CountDownLatch stopFlushLatch = new CountDownLatch(1);
        CountDownLatch flushStartReceived = new CountDownLatch(2);
        CountDownLatch flushStopReceived = new CountDownLatch(5);
        Thread t1 = new Thread(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void run() {
                try {
                    startFlushLatch.await();
                    boolean rc = Util.startFlush(ConcurrentFlushTest.this.c1);
                    System.out.println("t1: rc=" + rc);
                }
                catch (InterruptedException e) {
                    this.interrupt();
                }
                try {
                    stopFlushLatch.await();
                }
                catch (InterruptedException e) {
                    this.interrupt();
                }
                finally {
                    ConcurrentFlushTest.this.c1.stopFlush();
                }
            }
        };
        Thread t2 = new Thread(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void run() {
                try {
                    startFlushLatch.await();
                    boolean rc = Util.startFlush(ConcurrentFlushTest.this.c2, Arrays.asList(ConcurrentFlushTest.this.c2.getLocalAddress(), ConcurrentFlushTest.this.c3.getLocalAddress()));
                    System.out.println("t2: partial flush rc=" + rc);
                }
                catch (InterruptedException e) {
                    this.interrupt();
                }
                try {
                    stopFlushLatch.await();
                }
                catch (InterruptedException e) {
                    try {
                        this.interrupt();
                    }
                    catch (Throwable throwable) {
                        ConcurrentFlushTest.this.c2.stopFlush(Arrays.asList(ConcurrentFlushTest.this.c2.getLocalAddress(), ConcurrentFlushTest.this.c3.getLocalAddress()));
                        throw throwable;
                    }
                    ConcurrentFlushTest.this.c2.stopFlush(Arrays.asList(ConcurrentFlushTest.this.c2.getLocalAddress(), ConcurrentFlushTest.this.c3.getLocalAddress()));
                }
                ConcurrentFlushTest.this.c2.stopFlush(Arrays.asList(ConcurrentFlushTest.this.c2.getLocalAddress(), ConcurrentFlushTest.this.c3.getLocalAddress()));
            }
        };
        Listener l1 = new Listener("c1", this.c1, flushStartReceived, flushStopReceived);
        Listener l2 = new Listener("c2", this.c2, flushStartReceived, flushStopReceived);
        Listener l3 = new Listener("c3", this.c3, flushStartReceived, flushStopReceived);
        t1.start();
        t2.start();
        startFlushLatch.countDown();
        ConcurrentFlushTest.assertTrue(flushStartReceived.await(60L, TimeUnit.SECONDS));
        stopFlushLatch.countDown();
        t1.join();
        t2.join();
        ConcurrentFlushTest.assertTrue(flushStopReceived.await(60L, TimeUnit.SECONDS));
        assert (l1.blockReceived);
        assert (l1.unblockReceived);
        assert (l2.blockReceived);
        assert (l2.unblockReceived);
        assert (l3.blockReceived);
        assert (l3.unblockReceived);
    }

    private boolean startFlush(Channel ch, boolean automatic_resume) {
        this.log.debug("starting flush on " + ch.getAddress() + " with automatic resume=" + automatic_resume);
        boolean result = Util.startFlush(ch);
        if (automatic_resume) {
            ch.stopFlush();
        }
        return result;
    }

    private boolean startFlush(Channel ch, int num_attempts, long timeout, boolean automatic_resume) {
        this.log.debug("starting flush on " + ch.getAddress() + " with automatic resume=" + automatic_resume);
        boolean result = Util.startFlush(ch, num_attempts, 10L, timeout);
        if (automatic_resume) {
            ch.stopFlush();
        }
        return result;
    }

    private void stopFlush(Channel ch) {
        this.log.debug("calling stopFlush()");
        ch.stopFlush();
    }

    private static void assertViewsReceived(JChannel ... channels) {
        for (JChannel c : channels) {
            ConcurrentFlushTest.assertEquals(c.getView().getMembers().size(), channels.length);
        }
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    private static class Listener
    extends ExtendedReceiverAdapter
    implements ChannelTestBase.EventSequence {
        final String name;
        boolean blockReceived;
        boolean unblockReceived;
        JChannel channel;
        CountDownLatch flushStartReceived;
        CountDownLatch flushStopReceived;
        final List<Object> events = new LinkedList<Object>();

        Listener(String name, JChannel channel, CountDownLatch flushStartReceived, CountDownLatch flushStopReceived) {
            this.name = name;
            this.channel = channel;
            this.flushStartReceived = flushStartReceived;
            this.flushStopReceived = flushStopReceived;
            this.channel.setReceiver(this);
        }

        @Override
        public void unblock() {
            this.unblockReceived = true;
            if (this.flushStopReceived != null) {
                this.flushStopReceived.countDown();
            }
            this.events.add(new UnblockEvent());
        }

        @Override
        public void block() {
            this.blockReceived = true;
            if (this.flushStartReceived != null) {
                this.flushStartReceived.countDown();
            }
            this.events.add(new BlockEvent());
        }

        @Override
        public List<Object> getEvents() {
            return this.events;
        }

        @Override
        public void viewAccepted(View new_view) {
            this.events.add(new_view);
        }

        @Override
        public String getName() {
            return this.name;
        }
    }
}

