/*
 * Decompiled with CFR 0.152.
 */
package org.jgroups.tests;

import java.util.ArrayList;
import java.util.LinkedList;
import org.jgroups.TimeoutException;
import org.jgroups.util.Queue;
import org.jgroups.util.QueueClosedException;
import org.jgroups.util.Util;
import org.testng.annotations.Test;

@Test(groups={"functional"}, sequential=false)
public class QueueTest {
    public static void testQueue() throws QueueClosedException {
        Queue queue = new Queue();
        queue.add("Q1");
        queue.add("Q2");
        queue.add("Q3");
        assert (queue.peek().equals("Q1"));
        assert (queue.remove().equals("Q1"));
        assert (queue.peek().equals("Q2"));
        assert (queue.remove().equals("Q2"));
        queue.add("Q5");
        queue.close(true);
        try {
            queue.add("Q6");
            assert (false) : "should not get here";
        }
        catch (QueueClosedException qc) {
            // empty catch block
        }
        int size = queue.size();
        queue.removeElement("Q5");
        assert (queue.size() == size - 1);
        assert (queue.peek().equals("Q3"));
        assert (queue.remove().equals("Q3"));
        assert (queue.closed());
    }

    @Test(expectedExceptions={QueueClosedException.class})
    public static void testCloseWithoutFlush() throws QueueClosedException {
        Queue queue = new Queue();
        queue.close(false);
        queue.remove();
    }

    @Test(expectedExceptions={QueueClosedException.class})
    public static void testCloseWithFlush() throws QueueClosedException {
        Queue queue = new Queue();
        queue.close(true);
        queue.remove();
    }

    @Test(expectedExceptions={QueueClosedException.class})
    public static void testCloseWithFlush2() throws QueueClosedException {
        Queue queue = new Queue();
        queue.add(new Integer(1));
        queue.add(new Integer(2));
        queue.add(new Integer(3));
        queue.close(true);
        for (int i = 1; i <= 3; ++i) {
            Object obj = queue.remove();
            assert (obj != null);
            assert (new Integer(i).equals(obj));
        }
        queue.remove();
    }

    public static void testValues() throws QueueClosedException {
        Queue queue = new Queue();
        queue.add(new Integer(1));
        queue.add(new Integer(3));
        queue.add(new Integer(99));
        queue.add(new Integer(8));
        System.out.println("queue: " + Util.dumpQueue(queue));
        int size = queue.size();
        assert (size == 4);
        LinkedList values = queue.values();
        assert (values.size() == size);
    }

    public static void testLargeInsertion() throws QueueClosedException {
        String element = "MyElement";
        Queue queue = new Queue();
        System.out.println("Inserting 100000 elements");
        long start = System.currentTimeMillis();
        for (int i = 0; i < 100000; ++i) {
            queue.add(element);
        }
        long stop = System.currentTimeMillis();
        System.out.println("Took " + (stop - start) + " msecs");
        System.out.println("Removing 100000 elements");
        start = System.currentTimeMillis();
        while (queue.size() > 0) {
            queue.remove();
        }
        stop = System.currentTimeMillis();
        System.out.println("Took " + (stop - start) + " msecs");
    }

    public static void testEmptyQueue() {
        Queue queue = new Queue();
        assert (queue.getFirst() == null);
        assert (queue.getLast() == null);
    }

    public static void testAddAll() throws QueueClosedException {
        Queue queue = new Queue();
        ArrayList<String> l = new ArrayList<String>();
        l.add("one");
        l.add("two");
        l.add("three");
        queue.addAll(l);
        System.out.println("queue is " + queue);
        assert (queue.size() == 3);
        assert (queue.remove().equals("one"));
        assert (queue.size() == 2);
        assert (queue.remove().equals("two"));
        assert (queue.size() == 1);
        assert (queue.remove().equals("three"));
        assert (queue.size() == 0);
    }

    public static void testInsertionAndRemoval() throws Exception {
        Queue queue = new Queue();
        String s1 = "Q1";
        String s2 = "Q2";
        queue.add(s1);
        assert (queue.getFirst() != null);
        assert (queue.getLast() != null);
        assert (queue.getLast().equals(queue.getFirst()));
        queue.add(s2);
        assert (queue.getFirst() != queue.getLast());
        Object o1 = queue.peek();
        Object o2 = queue.getFirst();
        System.out.println("o1=" + o1 + ", o2=" + o2 + ", o1.equals(o2)=" + o1.equals(o2));
        assert (queue.getFirst().equals(queue.peek()));
        queue.remove();
        assert (queue.size() == 1);
        assert (queue.getLast().equals(queue.getFirst()));
        queue.remove();
        assert (queue.size() == 0);
        assert (queue.getFirst() == null);
        assert (queue.getLast() == null);
    }

    public static void testWaitUntilClosed() {
        Queue queue = new Queue();
        queue.close(true);
        queue.waitUntilClosed(0L);
        assert (queue.size() == 0);
    }

    public static void testWaitUntilClosed2() {
        Queue queue;
        block4: {
            queue = new Queue();
            queue.close(true);
            try {
                queue.peek();
                assert (false) : "peek() should throw a QueueClosedException";
            }
            catch (QueueClosedException e) {
                if ($assertionsDisabled || e != null) break block4;
                throw new AssertionError();
            }
        }
        assert (queue.size() == 0);
    }

    public static void testWaitUntilClosed3() throws QueueClosedException {
        Queue queue;
        block6: {
            queue = new Queue();
            queue.add("one");
            queue.close(true);
            Object obj = queue.peek();
            assert (obj.equals("one"));
            assert (queue.size() == 1);
            queue.remove();
            try {
                queue.peek();
                assert (false) : "peek() should throw a QueueClosedException";
            }
            catch (QueueClosedException e) {
                if ($assertionsDisabled || e != null) break block6;
                throw new AssertionError();
            }
        }
        assert (queue.size() == 0);
    }

    public static void testWaitUntilClosed4() throws QueueClosedException {
        final Queue queue = new Queue();
        for (int i = 0; i < 10; ++i) {
            queue.add(new Integer(i));
        }
        new Thread(){

            public void run() {
                while (!queue.closed()) {
                    try {
                        System.out.println("-- removed " + queue.remove());
                        Util.sleep(200L);
                    }
                    catch (QueueClosedException e) {
                        break;
                    }
                }
            }
        }.start();
        queue.close(true);
        queue.waitUntilClosed(0L);
        assert (queue.size() == 0);
    }

    public static void testWaitUntilClosed5() throws QueueClosedException {
        final Queue queue = new Queue();
        for (int i = 0; i < 10; ++i) {
            queue.add(new Integer(i));
        }
        new Thread(){

            public void run() {
                while (!queue.closed()) {
                    try {
                        System.out.println("-- removed " + queue.remove());
                        Util.sleep(200L);
                    }
                    catch (QueueClosedException e) {
                        System.out.println("-- queue is closed, cannot remove element");
                        break;
                    }
                }
            }
        }.start();
        Util.sleep(600L);
        queue.close(false);
        queue.waitUntilClosed(0L);
        assert (queue.size() > 0);
    }

    public static void testRemoveElementNoElement() {
        block4: {
            Queue queue = new Queue();
            String s1 = "Q1";
            try {
                queue.removeElement(s1);
                assert (!queue.closed());
                assert (queue.size() == 0);
            }
            catch (QueueClosedException ex) {
                if ($assertionsDisabled) break block4;
                throw new AssertionError((Object)ex.toString());
            }
        }
    }

    public static void testRemoveElementOneElement() {
        block5: {
            Queue queue = new Queue();
            String s1 = "Q1";
            try {
                queue.add(s1);
                queue.removeElement(s1);
                assert (queue.size() == 0);
                assert (queue.getFirst() == null);
                assert (queue.getLast() == null);
            }
            catch (QueueClosedException ex) {
                if ($assertionsDisabled) break block5;
                throw new AssertionError((Object)ex.toString());
            }
        }
    }

    public static void testRemoveElementTwoElementsFirstFound() {
        block6: {
            String s1 = "Q1";
            String s2 = "Q2";
            Queue queue = new Queue();
            try {
                queue.add(s1);
                queue.add(s2);
                queue.removeElement(s1);
                assert (queue.size() == 1);
                assert (queue.getFirst().equals(s2));
                assert (queue.getLast().equals(s2));
                assert (queue.getFirst().equals(queue.getLast()));
            }
            catch (QueueClosedException ex) {
                if ($assertionsDisabled) break block6;
                throw new AssertionError((Object)ex.toString());
            }
        }
    }

    public static void testRemoveElementTwoElementsSecondFound() {
        block6: {
            String s1 = "Q1";
            String s2 = "Q2";
            Queue queue = new Queue();
            try {
                queue.add(s1);
                queue.add(s2);
                queue.removeElement(s2);
                assert (queue.size() == 1);
                assert (queue.getFirst().equals(s1));
                assert (queue.getLast().equals(s1));
                assert (queue.getFirst().equals(queue.getLast()));
            }
            catch (QueueClosedException ex) {
                if ($assertionsDisabled) break block6;
                throw new AssertionError((Object)ex.toString());
            }
        }
    }

    public static void testRemoveElementThreeElementsFirstFound() {
        block5: {
            String s1 = "Q1";
            String s2 = "Q2";
            String s3 = "Q3";
            Queue queue = new Queue();
            try {
                queue.add(s1);
                queue.add(s2);
                queue.add(s3);
                queue.removeElement(s1);
                assert (queue.size() == 2);
                assert (queue.getFirst().equals(s2));
                assert (queue.getLast().equals(s3));
            }
            catch (QueueClosedException ex) {
                if ($assertionsDisabled) break block5;
                throw new AssertionError((Object)ex.toString());
            }
        }
    }

    public static void testRemoveElementThreeElementsSecondFound() {
        block5: {
            String s1 = "Q1";
            String s2 = "Q2";
            String s3 = "Q3";
            Queue queue = new Queue();
            try {
                queue.add(s1);
                queue.add(s2);
                queue.add(s3);
                queue.removeElement(s2);
                assert (queue.size() == 2);
                assert (queue.getFirst().equals(s1));
                assert (queue.getLast().equals(s3));
            }
            catch (QueueClosedException ex) {
                if ($assertionsDisabled) break block5;
                throw new AssertionError((Object)ex.toString());
            }
        }
    }

    public static void testRemoveElementThreeElementsThirdFound() {
        block5: {
            String s1 = "Q1";
            String s2 = "Q2";
            String s3 = "Q3";
            Queue queue = new Queue();
            try {
                queue.add(s1);
                queue.add(s2);
                queue.add(s3);
                queue.removeElement(s3);
                assert (queue.size() == 2);
                assert (queue.getFirst().equals(s1));
                assert (queue.getLast().equals(s2));
            }
            catch (QueueClosedException ex) {
                if ($assertionsDisabled) break block5;
                throw new AssertionError((Object)ex.toString());
            }
        }
    }

    @Test(expectedExceptions={QueueClosedException.class})
    public static void testRemoveAndClose() throws QueueClosedException {
        final Queue queue = new Queue();
        new Thread(){

            public void run() {
                Util.sleep(1000L);
                queue.close(true);
            }
        }.start();
        queue.remove();
    }

    @Test(expectedExceptions={QueueClosedException.class})
    public static void testRemoveAndCloseWithTimeout() throws QueueClosedException, TimeoutException {
        final Queue queue = new Queue();
        new Thread(){

            public void run() {
                Util.sleep(1000L);
                queue.close(true);
            }
        }.start();
        queue.remove(5000L);
    }

    @Test(expectedExceptions={TimeoutException.class})
    public static void testInterruptAndRemove() throws QueueClosedException, TimeoutException {
        Queue queue = new Queue();
        Thread.currentThread().interrupt();
        queue.remove(2000L);
    }

    @Test(expectedExceptions={QueueClosedException.class})
    public static void testRemoveAndInterrupt() throws QueueClosedException {
        final Queue queue = new Queue();
        Thread closer = new Thread(){

            public void run() {
                Util.sleep(1000L);
                System.out.println("-- closing queue");
                queue.close(false);
            }
        };
        closer.start();
        System.out.println("-- removing element");
        queue.remove();
    }

    public static void testClear() throws QueueClosedException {
        Queue queue = new Queue();
        queue.add("one");
        queue.add("two");
        assert (queue.size() == 2);
        queue.close(true);
        assert (queue.size() == 2);
        queue.clear();
        assert (queue.size() == 0);
        queue = new Queue();
        queue.add("one");
        queue.add("two");
        queue.clear();
        assert (queue.size() == 0);
        queue.add("one");
        queue.add("two");
        assert (queue.size() == 2);
        queue.clear();
        assert (queue.size() == 0);
    }

    public static void testBarrier() throws QueueClosedException {
        RemoveOneItem[] removers = new RemoveOneItem[10];
        Queue queue = new Queue();
        int num_dead = 0;
        for (int i = 0; i < removers.length; ++i) {
            removers[i] = new RemoveOneItem(i, queue);
            removers[i].start();
        }
        Util.sleep(200L);
        System.out.println("-- adding element 99");
        queue.add(new Long(99L));
        System.out.println("-- adding element 100");
        queue.add(new Long(100L));
        long target_time = System.currentTimeMillis() + 10000L;
        do {
            int num = 0;
            for (int i = 0; i < removers.length; ++i) {
                if (removers[i].isAlive()) continue;
                ++num;
            }
            if (num == 2) break;
            Util.sleep(500L);
        } while (target_time > System.currentTimeMillis());
        for (int i = 0; i < removers.length; ++i) {
            System.out.println("remover #" + i + " is " + (removers[i].isAlive() ? "alive" : "terminated"));
            if (removers[i].isAlive()) continue;
            ++num_dead;
        }
        assert (num_dead == 2) : "num_dead was " + num_dead + ", but expected 2";
        queue.close(false);
    }

    public static void testBarrierWithTimeOut() throws QueueClosedException {
        int i;
        Queue queue = new Queue();
        RemoveOneItemWithTimeout[] removers = new RemoveOneItemWithTimeout[10];
        int num_dead = 0;
        for (int i2 = 0; i2 < removers.length; ++i2) {
            removers[i2] = new RemoveOneItemWithTimeout(i2, 15000L, queue);
            removers[i2].start();
        }
        System.out.println("-- adding element 99");
        queue.add(new Long(99L));
        System.out.println("-- adding element 100");
        queue.add(new Long(100L));
        long target_time = System.currentTimeMillis() + 10000L;
        do {
            int num_rsps = 0;
            for (int i3 = 0; i3 < removers.length; ++i3) {
                if (removers[i3].getRetval() == null) continue;
                ++num_rsps;
            }
            if (num_rsps == 2) break;
            Util.sleep(500L);
        } while (target_time > System.currentTimeMillis());
        Util.sleep(3000L);
        for (i = 0; i < removers.length; ++i) {
            System.out.println("remover #" + i + " is " + (removers[i].isAlive() ? "alive" : "terminated"));
            if (removers[i].isAlive()) continue;
            ++num_dead;
        }
        assert (num_dead == 2) : "num_dead should have been 2 but was " + num_dead;
        System.out.println("closing queue - causing all remaining threads to terminate");
        queue.close(false);
        Util.sleep(500L);
        num_dead = 0;
        for (i = 0; i < removers.length; ++i) {
            System.out.println("remover #" + i + " is " + (removers[i].isAlive() ? "alive" : "terminated"));
            if (removers[i].isAlive()) continue;
            ++num_dead;
        }
        assert (num_dead == 10) : "num_dead should have been 10 but was " + num_dead;
    }

    public static void testMultipleWriterOneReader() throws QueueClosedException {
        int i;
        Queue queue = new Queue();
        AddOneItem[] adders = new AddOneItem[10];
        int num_dead = 0;
        int num_items = 0;
        int items = 1000;
        for (i = 0; i < adders.length; ++i) {
            adders[i] = new AddOneItem(i, items, queue);
            adders[i].start();
        }
        Util.sleep(500L);
        while (num_items < adders.length * items) {
            queue.remove();
            ++num_items;
        }
        Util.sleep(1000L);
        for (i = 0; i < adders.length; ++i) {
            System.out.println("adder #" + i + " is " + (adders[i].isAlive() ? "alive" : "terminated"));
            if (adders[i].isAlive()) continue;
            ++num_dead;
        }
        assert (num_dead == 10) : "num_dead should have been 10 but was " + num_dead;
        queue.close(false);
    }

    public static void testConcurrentAddRemove() throws QueueClosedException {
        final Queue queue = new Queue();
        long NUM = 1000000L;
        long num_received = 0L;
        long start = System.currentTimeMillis();
        new Thread(){

            public void run() {
                int i = 0;
                while ((long)i < 1000000L) {
                    try {
                        queue.add(new Object());
                    }
                    catch (QueueClosedException queueClosedException) {
                        // empty catch block
                    }
                    ++i;
                }
            }
        }.start();
        while (num_received < 1000000L) {
            Object ret = queue.remove();
            if (ret == null) continue;
            ++num_received;
        }
        assert (num_received == 1000000L);
        long stop = System.currentTimeMillis();
        System.out.println("time to add/remove 1000000 elements: " + (stop - start));
    }

    public static void testConcurrentAccess() {
        int i;
        Queue queue = new Queue();
        int NUM_THREADS = 10;
        int INTERVAL = 5000;
        Writer[] writers = new Writer[10];
        Reader[] readers = new Reader[10];
        int[] writes = new int[10];
        int[] reads = new int[10];
        long total_reads = 0L;
        long total_writes = 0L;
        for (i = 0; i < writers.length; ++i) {
            readers[i] = new Reader(i, reads, queue);
            readers[i].start();
            writers[i] = new Writer(i, writes, queue);
            writers[i].start();
        }
        Util.sleep(5000L);
        System.out.println("current queue size=" + queue.size());
        for (i = 0; i < writers.length; ++i) {
            writers[i].stopThread();
        }
        for (i = 0; i < readers.length; ++i) {
            readers[i].stopThread();
        }
        queue.close(false);
        System.out.println("current queue size=" + queue.size());
        for (i = 0; i < writers.length; ++i) {
            try {
                writers[i].join(300L);
                readers[i].join(300L);
                continue;
            }
            catch (Exception ex) {
                System.err.println(ex);
            }
        }
        for (i = 0; i < writes.length; ++i) {
            System.out.println("Thread #" + i + ": " + writes[i] + " writes, " + reads[i] + " reads");
            total_writes += (long)writes[i];
            total_reads += (long)reads[i];
        }
        System.out.println("total writes=" + total_writes + ", total_reads=" + total_reads + ", diff=" + Math.abs(total_writes - total_reads));
    }

    static class Reader
    extends Thread {
        int rank;
        int num_reads = 0;
        int[] reads = null;
        boolean running = true;
        Queue queue;

        Reader(int i, int[] reads, Queue queue) {
            super("ReaderThread");
            this.rank = i;
            this.reads = reads;
            this.queue = queue;
            this.setDaemon(true);
        }

        public void run() {
            while (this.running) {
                try {
                    Long el = (Long)this.queue.remove();
                    if (el == null) {
                        System.out.println("QueueTest.Reader.run(): peek() returned null element. queue.size()=" + this.queue.size() + ", queue.closed()=" + this.queue.closed());
                    }
                    assert (el != null);
                    ++this.num_reads;
                }
                catch (QueueClosedException closed) {
                    this.running = false;
                }
                catch (Throwable t) {
                    System.err.println("QueueTest.Reader.run(): exception=" + t);
                }
            }
            this.reads[this.rank] = this.num_reads;
        }

        void stopThread() {
            this.running = false;
        }
    }

    static class Writer
    extends Thread {
        int rank = 0;
        int num_writes = 0;
        boolean running = true;
        int[] writes = null;
        Queue queue;

        Writer(int i, int[] writes, Queue queue) {
            super("WriterThread");
            this.rank = i;
            this.writes = writes;
            this.queue = queue;
            this.setDaemon(true);
        }

        public void run() {
            while (this.running) {
                try {
                    this.queue.add(new Long(System.currentTimeMillis()));
                    ++this.num_writes;
                }
                catch (QueueClosedException closed) {
                    this.running = false;
                }
                catch (Throwable t) {
                    System.err.println("QueueTest.Writer.run(): exception=" + t);
                }
            }
            this.writes[this.rank] = this.num_writes;
        }

        void stopThread() {
            this.running = false;
        }
    }

    static class RemoveOneItemWithTimeout
    extends Thread {
        Long retval = null;
        final int rank;
        final long timeout;
        final Queue queue;

        RemoveOneItemWithTimeout(int rank, long timeout, Queue queue) {
            super("RemoveOneItem thread #" + rank);
            this.rank = rank;
            this.timeout = timeout;
            this.queue = queue;
            this.setDaemon(true);
        }

        public void run() {
            try {
                this.retval = (Long)this.queue.removeWait(this.timeout);
                System.out.println("Thread #" + this.rank + ": retrieved " + this.retval);
            }
            catch (QueueClosedException closed) {
                System.out.println("Thread #" + this.rank + ": queue was closed");
            }
            catch (TimeoutException e) {
                System.out.println("Thread #" + this.rank + ": timeout occurred");
            }
        }

        Long getRetval() {
            return this.retval;
        }
    }

    static class RemoveOneItem
    extends Thread {
        Long retval = null;
        int rank = 0;
        Queue queue;

        RemoveOneItem(int rank, Queue queue) {
            super("RemoveOneItem thread #" + rank);
            this.rank = rank;
            this.queue = queue;
            this.setDaemon(true);
        }

        public void run() {
            try {
                this.retval = (Long)this.queue.remove();
            }
            catch (QueueClosedException closed) {
                System.err.println("Thread #" + this.rank + ": queue was closed");
            }
        }

        Long getRetval() {
            return this.retval;
        }
    }

    static class AddOneItem
    extends Thread {
        Long retval = null;
        int rank = 0;
        int iteration = 0;
        Queue queue;

        AddOneItem(int rank, int iteration, Queue queue) {
            super("AddOneItem thread #" + rank);
            this.rank = rank;
            this.iteration = iteration;
            this.setDaemon(true);
            this.queue = queue;
        }

        public void run() {
            try {
                for (int i = 0; i < this.iteration; ++i) {
                    this.queue.add(new Long(this.rank));
                }
            }
            catch (QueueClosedException closed) {
                System.err.println("Thread #" + this.rank + ": queue was closed");
            }
        }
    }
}

