/*
 * Decompiled with CFR 0.152.
 */
package org.jgroups.tests;

import java.io.Serializable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.ReceiverAdapter;
import org.jgroups.util.Util;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;

@Test(groups={"stack-independent"})
public class BundlerTest {
    JChannel c1;
    JChannel c2;
    static final String props = "SHARED_LOOPBACK(thread_pool.queue_max_size=5000;thread_pool.rejection_policy=discard;thread_pool.min_threads=20;thread_pool.max_threads=20;oob_thread_pool.rejection_policy=discard;enable_bundling=true)";
    static final int NUM_THREADS = 200;
    static final int NUM_MSGS = 1000;
    static final int SIZE = 1000;

    @AfterMethod
    protected void tearDown() throws Exception {
        Util.close(this.c2, this.c1);
    }

    public static void testSimpleMessageReception() throws Exception {
        int i;
        JChannel c1 = new JChannel(props);
        JChannel c2 = new JChannel(props);
        MyReceiver r1 = new MyReceiver("c1");
        MyReceiver r2 = new MyReceiver("c2");
        c1.setReceiver(r1);
        c2.setReceiver(r2);
        c1.connect("testSimpleMessageReception");
        c2.connect("testSimpleMessageReception");
        int NUM = 100;
        for (i = 1; i <= NUM; ++i) {
            c1.send(null, null, (Serializable)((Object)"bla"));
            c2.send(null, null, (Serializable)((Object)"bla"));
        }
        for (i = 0; i < 10 && (r1.getNum() != NUM * 2 || r2.getNum() != NUM * 2); ++i) {
            Util.sleep(500L);
        }
        System.out.println("c1 received " + r1.getNum() + " msgs");
        System.out.println("c2 received " + r2.getNum() + " msgs");
        assert (r1.getNum() == NUM * 2) : "expected " + NUM * 2 + ", but got " + r1.getNum();
        assert (r2.getNum() == NUM * 2) : "expected " + NUM * 2 + ", but got " + r2.getNum();
    }

    public static void testMessageReceptionUnderHighLoad() throws Exception {
        CountDownLatch latch = new CountDownLatch(1);
        JChannel c1 = new JChannel(props);
        JChannel c2 = new JChannel(props);
        MyReceiver r1 = new MyReceiver("c1");
        MyReceiver r2 = new MyReceiver("c2");
        c1.setReceiver(r1);
        c2.setReceiver(r2);
        c1.connect("testSimpleMessageReception");
        c2.connect("testSimpleMessageReception");
        System.out.println("starting to send messages");
        MySender[] c1_senders = new MySender[200];
        for (int i = 0; i < c1_senders.length; ++i) {
            c1_senders[i] = new MySender(c1, latch);
            c1_senders[i].start();
        }
        MySender[] c2_senders = new MySender[200];
        for (int i = 0; i < c2_senders.length; ++i) {
            c2_senders[i] = new MySender(c2, latch);
            c2_senders[i].start();
        }
        Util.sleep(500L);
        long start = System.currentTimeMillis();
        latch.countDown();
        long NUM_EXPECTED_MSGS = 400000L;
        for (int i = 0; i < 1000 && ((long)r1.getNum() < NUM_EXPECTED_MSGS || (long)r2.getNum() < NUM_EXPECTED_MSGS); ++i) {
            Util.sleep(2000L);
        }
        System.out.println("c1 received " + r1.getNum() + " msgs");
        System.out.println("c2 received " + r2.getNum() + " msgs");
        assert ((long)r1.getNum() == NUM_EXPECTED_MSGS) : "expected " + NUM_EXPECTED_MSGS + ", but got " + r1.getNum();
        assert ((long)r2.getNum() == NUM_EXPECTED_MSGS) : "expected " + NUM_EXPECTED_MSGS + ", but got " + r2.getNum();
        long diff = System.currentTimeMillis() - start;
        System.out.println("sending and receiving of " + NUM_EXPECTED_MSGS + " took " + diff + " ms");
    }

    private static class MyReceiver
    extends ReceiverAdapter {
        final String name;
        final AtomicInteger num = new AtomicInteger(0);
        static final long MOD = 20000L;

        public MyReceiver(String name) {
            this.name = name;
        }

        public void receive(Message msg) {
            int count = this.num.getAndIncrement();
            if (count > 0 && (long)count % 20000L == 0L) {
                System.out.println("[" + this.name + "] received " + count + " msgs");
            }
        }

        public int getNum() {
            return this.num.get();
        }
    }

    private static class MySender
    extends Thread {
        private final JChannel ch;
        private final CountDownLatch latch;
        private final byte[] buf = new byte[1000];

        public MySender(JChannel ch, CountDownLatch latch) {
            this.ch = ch;
            this.latch = latch;
        }

        public void run() {
            try {
                this.latch.await();
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
            for (int i = 0; i < 1000; ++i) {
                try {
                    Message msg = new Message(null, null, this.buf);
                    this.ch.send(msg);
                    continue;
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

