/*
 * Decompiled with CFR 0.152.
 */
package org.jgroups.protocols;

import java.io.Serializable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.jgroups.Address;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.ReceiverAdapter;
import org.jgroups.protocols.UNICAST;
import org.jgroups.util.Util;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;

@Test(groups={"stack-independent"})
public class UNICAST_ContentionTest {
    JChannel c1;
    JChannel c2;
    static final String props = "SHARED_LOOPBACK(thread_pool.queue_max_size=5000;thread_pool.rejection_policy=discard;thread_pool.min_threads=20;thread_pool.max_threads=20;oob_thread_pool.rejection_policy=discard;enable_bundling=true):UNICAST(timeout=300,600,1200)";
    static final int NUM_THREADS = 200;
    static final int NUM_MSGS = 100;
    static final int SIZE = 1000;

    @AfterMethod
    protected void tearDown() throws Exception {
        Util.close(this.c2, this.c1);
    }

    public static void testSimpleMessageReception() throws Exception {
        int i;
        JChannel c1 = new JChannel(props);
        JChannel c2 = new JChannel(props);
        MyReceiver r1 = new MyReceiver("c1");
        MyReceiver r2 = new MyReceiver("c2");
        c1.setReceiver(r1);
        c2.setReceiver(r2);
        c1.connect("testSimpleMessageReception");
        c2.connect("testSimpleMessageReception");
        int NUM = 100;
        Address c1_addr = c1.getLocalAddress();
        Address c2_addr = c2.getLocalAddress();
        for (i = 1; i <= NUM; ++i) {
            c1.send(c1_addr, null, (Serializable)((Object)"bla"));
            c1.send(c2_addr, null, (Serializable)((Object)"bla"));
            c2.send(c2_addr, null, (Serializable)((Object)"bla"));
            c2.send(c1_addr, null, (Serializable)((Object)"bla"));
        }
        for (i = 0; i < 10 && (r1.getNum() != NUM * 2 || r2.getNum() != NUM * 2); ++i) {
            Util.sleep(500L);
        }
        System.out.println("c1 received " + r1.getNum() + " msgs, " + UNICAST_ContentionTest.getNumberOfRetransmissions(c1) + " retransmissions");
        System.out.println("c2 received " + r2.getNum() + " msgs, " + UNICAST_ContentionTest.getNumberOfRetransmissions(c2) + " retransmissions");
        assert (r1.getNum() == NUM * 2) : "expected " + NUM * 2 + ", but got " + r1.getNum();
        assert (r2.getNum() == NUM * 2) : "expected " + NUM * 2 + ", but got " + r2.getNum();
    }

    public static void testMessageReceptionUnderHighLoad() throws Exception {
        CountDownLatch latch = new CountDownLatch(1);
        JChannel c1 = new JChannel(props);
        JChannel c2 = new JChannel(props);
        MyReceiver r1 = new MyReceiver("c1");
        MyReceiver r2 = new MyReceiver("c2");
        c1.setReceiver(r1);
        c2.setReceiver(r2);
        c1.connect("testSimpleMessageReception");
        c2.connect("testSimpleMessageReception");
        Address c1_addr = c1.getLocalAddress();
        Address c2_addr = c2.getLocalAddress();
        MySender[] c1_senders = new MySender[200];
        for (int i = 0; i < c1_senders.length; ++i) {
            c1_senders[i] = new MySender(c1, c2_addr, latch);
            c1_senders[i].start();
        }
        MySender[] c2_senders = new MySender[200];
        for (int i = 0; i < c2_senders.length; ++i) {
            c2_senders[i] = new MySender(c2, c1_addr, latch);
            c2_senders[i].start();
        }
        Util.sleep(500L);
        latch.countDown();
        long NUM_EXPECTED_MSGS = 20000L;
        for (int i = 0; i < 100 && ((long)r1.getNum() != NUM_EXPECTED_MSGS || (long)r2.getNum() != NUM_EXPECTED_MSGS); ++i) {
            Util.sleep(500L);
        }
        System.out.println("c1 received " + r1.getNum() + " msgs, " + UNICAST_ContentionTest.getNumberOfRetransmissions(c1) + " retransmissions");
        System.out.println("c2 received " + r2.getNum() + " msgs, " + UNICAST_ContentionTest.getNumberOfRetransmissions(c2) + " retransmissions");
        assert ((long)r1.getNum() == NUM_EXPECTED_MSGS) : "expected " + NUM_EXPECTED_MSGS + ", but got " + r1.getNum();
        assert ((long)r2.getNum() == NUM_EXPECTED_MSGS) : "expected " + NUM_EXPECTED_MSGS + ", but got " + r2.getNum();
    }

    private static long getNumberOfRetransmissions(JChannel ch) {
        UNICAST unicast = (UNICAST)ch.getProtocolStack().findProtocol(UNICAST.class);
        return unicast.getNumberOfRetransmissions();
    }

    private static class MyReceiver
    extends ReceiverAdapter {
        final String name;
        final AtomicInteger num = new AtomicInteger(0);
        static final long MOD = 2000L;

        public MyReceiver(String name) {
            this.name = name;
        }

        public void receive(Message msg) {
            if ((long)this.num.incrementAndGet() % 2000L == 0L) {
                System.out.println("[" + this.name + "] received " + this.getNum() + " msgs");
            }
        }

        public int getNum() {
            return this.num.get();
        }
    }

    private static class MySender
    extends Thread {
        private final JChannel ch;
        private final Address dest;
        private final CountDownLatch latch;
        private final byte[] buf = new byte[1000];

        public MySender(JChannel ch, Address dest, CountDownLatch latch) {
            this.ch = ch;
            this.dest = dest;
            this.latch = latch;
        }

        public void run() {
            try {
                this.latch.await();
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
            for (int i = 0; i < 100; ++i) {
                try {
                    Message msg = new Message(this.dest, null, this.buf);
                    this.ch.send(msg);
                    continue;
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

