/*
 * Decompiled with CFR 0.152.
 */
package org.jgroups.tests;

import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.jgroups.Address;
import org.jgroups.Channel;
import org.jgroups.ChannelClosedException;
import org.jgroups.ChannelNotConnectedException;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.ReceiverAdapter;
import org.jgroups.View;
import org.jgroups.protocols.DISCARD;
import org.jgroups.protocols.TP;
import org.jgroups.protocols.UNICAST;
import org.jgroups.protocols.pbcast.NAKACK;
import org.jgroups.protocols.pbcast.STABLE;
import org.jgroups.stack.Protocol;
import org.jgroups.stack.ProtocolStack;
import org.jgroups.tests.ChannelTestBase;
import org.jgroups.util.Util;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
@Test(groups={"stack-dependent"}, sequential=true)
public class OOBTest
extends ChannelTestBase {
    private JChannel c1;
    private JChannel c2;

    @BeforeMethod
    public void init() throws Exception {
        this.c1 = this.createChannel(true, 2);
        this.c1.setName("C1");
        this.c2 = this.createChannel(this.c1);
        this.c2.setName("C2");
        OOBTest.setOOBPoolSize(this.c1, this.c2);
        OOBTest.setStableGossip(this.c1, this.c2);
        this.c1.connect("OOBMcastTest");
        this.c2.connect("OOBMcastTest");
        View view = this.c2.getView();
        this.log.info("view = " + view);
        assert (view.size() == 2) : "view is " + view;
    }

    @AfterMethod
    public void cleanup() {
        Util.sleep(1000L);
        Util.close(this.c2, this.c1);
    }

    public void testNonBlockingUnicastOOBMessage() throws ChannelNotConnectedException, ChannelClosedException {
        Address dest = this.c2.getAddress();
        this.send(dest);
    }

    public void testNonBlockingMulticastOOBMessage() throws ChannelNotConnectedException, ChannelClosedException {
        this.send(null);
    }

    public void testRegularAndOOBUnicasts() throws Exception {
        DISCARD discard = new DISCARD();
        ProtocolStack stack = this.c1.getProtocolStack();
        stack.insertProtocol((Protocol)discard, 2, UNICAST.class);
        Address dest = this.c2.getAddress();
        Message m1 = new Message(dest, null, Integer.valueOf(1));
        Message m2 = new Message(dest, null, Integer.valueOf(2));
        m2.setFlag((byte)1);
        Message m3 = new Message(dest, null, Integer.valueOf(3));
        MyReceiver receiver = new MyReceiver("C2");
        this.c2.setReceiver(receiver);
        this.c1.send(m1);
        discard.setDropDownUnicasts(1);
        this.c1.send(m2);
        this.c1.send(m3);
        OOBTest.sendStableMessages(this.c1, this.c2);
        Util.sleep(1000L);
        Collection<Integer> list = receiver.getMsgs();
        assert (list.size() == 3) : "list is " + list;
        assert (list.contains(1) && list.contains(2) && list.contains(3));
    }

    public void testRegularAndOOBUnicasts2() throws Exception {
        DISCARD discard = new DISCARD();
        ProtocolStack stack = this.c1.getProtocolStack();
        stack.insertProtocol((Protocol)discard, 2, UNICAST.class);
        Address dest = this.c2.getAddress();
        Message m1 = new Message(dest, null, Integer.valueOf(1));
        Message m2 = new Message(dest, null, Integer.valueOf(2));
        m2.setFlag((byte)1);
        Message m3 = new Message(dest, null, Integer.valueOf(3));
        m3.setFlag((byte)1);
        Message m4 = new Message(dest, null, Integer.valueOf(4));
        MyReceiver receiver = new MyReceiver("C2");
        this.c2.setReceiver(receiver);
        this.c1.send(m1);
        discard.setDropDownUnicasts(1);
        this.c1.send(m3);
        discard.setDropDownUnicasts(1);
        this.c1.send(m2);
        this.c1.send(m4);
        Util.sleep(1000L);
        Collection<Integer> list = receiver.getMsgs();
        int count = 10;
        while (list.size() < 4 && --count > 0) {
            Util.sleep(500L);
            OOBTest.sendStableMessages(this.c1, this.c2);
        }
        this.log.info("list = " + list);
        assert (list.size() == 4) : "list is " + list;
        assert (list.contains(1) && list.contains(2) && list.contains(3) && list.contains(4));
    }

    public void testRegularAndOOBMulticasts() throws Exception {
        DISCARD discard = new DISCARD();
        ProtocolStack stack = this.c1.getProtocolStack();
        stack.insertProtocol((Protocol)discard, 2, NAKACK.class);
        this.c1.setOpt(3, false);
        Address dest = null;
        Message m1 = new Message(dest, null, Integer.valueOf(1));
        Message m2 = new Message(dest, null, Integer.valueOf(2));
        m2.setFlag((byte)1);
        Message m3 = new Message(dest, null, Integer.valueOf(3));
        MyReceiver receiver = new MyReceiver("C2");
        this.c2.setReceiver(receiver);
        this.c1.send(m1);
        discard.setDropDownMulticasts(1);
        this.c1.send(m2);
        this.c1.send(m3);
        Util.sleep(500L);
        Collection<Integer> list = receiver.getMsgs();
        for (int i = 0; i < 10; ++i) {
            this.log.info("list = " + list);
            if (list.size() == 3) break;
            Util.sleep(1000L);
            OOBTest.sendStableMessages(this.c1, this.c2);
        }
        assert (list.size() == 3) : "list is " + list;
        assert (list.contains(1) && list.contains(2) && list.contains(3));
    }

    @Test(invocationCount=5)
    public void testRandomRegularAndOOBMulticasts() throws Exception {
        int i;
        DISCARD discard = new DISCARD();
        discard.setLocalAddress(this.c1.getAddress());
        discard.setDownDiscardRate(0.5);
        ProtocolStack stack = this.c1.getProtocolStack();
        stack.insertProtocol((Protocol)discard, 2, NAKACK.class);
        MyReceiver r1 = new MyReceiver("C1");
        MyReceiver r2 = new MyReceiver("C2");
        this.c1.setReceiver(r1);
        this.c2.setReceiver(r2);
        int NUM_MSGS = 20;
        int NUM_THREADS = 10;
        this.send(null, 20, 10, 0.5);
        Collection<Integer> one = r1.getMsgs();
        Collection<Integer> two = r2.getMsgs();
        for (i = 0; i < 10 && (one.size() != 20 || two.size() != 20); ++i) {
            this.log.info("one size " + one.size() + ", two size " + two.size());
            Util.sleep(1000L);
            OOBTest.sendStableMessages(this.c1, this.c2);
        }
        this.log.info("one size " + one.size() + ", two size " + two.size());
        stack.removeProtocol("DISCARD");
        for (i = 0; i < 5 && (one.size() != 20 || two.size() != 20); ++i) {
            OOBTest.sendStableMessages(this.c1, this.c2);
            Util.sleep(500L);
        }
        System.out.println("C1 received " + one.size() + " messages (" + 20 + " expected)" + "\nC2 received " + two.size() + " messages (" + 20 + " expected)");
        this.check(20, one, two);
    }

    public void testOOBMessageLoss() throws ChannelNotConnectedException, ChannelClosedException {
        int i;
        Util.close((Channel)this.c2);
        MySleepingReceiver receiver = new MySleepingReceiver("C1", 1000L);
        this.c1.setReceiver(receiver);
        TP transport = this.c1.getProtocolStack().getTransport();
        transport.setOOBRejectionPolicy("discard");
        int NUM = 10;
        for (int i2 = 1; i2 <= 10; ++i2) {
            Message msg = new Message(null, null, Integer.valueOf(i2));
            msg.setFlag((byte)1);
            this.c1.send(msg);
        }
        STABLE stable = (STABLE)this.c1.getProtocolStack().findProtocol(STABLE.class);
        if (stable != null) {
            stable.runMessageGarbageCollection();
        }
        Collection<Integer> msgs = receiver.getMsgs();
        for (i = 0; i < 20 && msgs.size() != 10; ++i) {
            Util.sleep(1000L);
            OOBTest.sendStableMessages(this.c1, this.c2);
        }
        System.out.println("msgs = " + Util.print(msgs));
        assert (msgs.size() == 10) : "expected 10 messages but got " + msgs.size() + ", msgs=" + Util.print(msgs);
        for (i = 1; i <= 10; ++i) {
            assert (msgs.contains(i));
        }
    }

    public void testOOBUnicastMessageLoss() throws ChannelNotConnectedException, ChannelClosedException {
        int i;
        MySleepingReceiver receiver = new MySleepingReceiver("C2", 1000L);
        this.c2.setReceiver(receiver);
        this.c1.getProtocolStack().getTransport().setOOBRejectionPolicy("discard");
        int NUM = 10;
        Address dest = this.c2.getAddress();
        for (int i2 = 1; i2 <= 10; ++i2) {
            Message msg = new Message(dest, null, Integer.valueOf(i2));
            msg.setFlag((byte)1);
            this.c1.send(msg);
        }
        Collection<Integer> msgs = receiver.getMsgs();
        for (i = 0; i < 20 && msgs.size() != 10; ++i) {
            Util.sleep(1000L);
        }
        assert (msgs.size() == 10) : "expected 10 messages but got " + msgs.size() + ", msgs=" + Util.print(msgs);
        for (i = 1; i <= 10; ++i) {
            assert (msgs.contains(i));
        }
    }

    private void send(final Address dest, int num_msgs, int num_threads, final double oob_prob) throws Exception {
        if (num_threads <= 0) {
            throw new IllegalArgumentException("number of threads <= 0");
        }
        if (num_msgs % num_threads != 0) {
            throw new IllegalArgumentException("number of messages ( " + num_msgs + ") needs to be divisible by " + "the number o threads (" + num_threads + ")");
        }
        if (num_threads > 1) {
            int i;
            final int msgs_per_thread = num_msgs / num_threads;
            Thread[] threads = new Thread[num_threads];
            final AtomicInteger counter = new AtomicInteger(0);
            for (i = 0; i < threads.length; ++i) {
                threads[i] = new Thread(){

                    public void run() {
                        for (int j = 0; j < msgs_per_thread; ++j) {
                            JChannel sender = Util.tossWeightedCoin(0.5) ? OOBTest.this.c1 : OOBTest.this.c2;
                            boolean oob = Util.tossWeightedCoin(oob_prob);
                            int num = counter.incrementAndGet();
                            Message msg = new Message(dest, null, Integer.valueOf(num));
                            if (oob) {
                                msg.setFlag((byte)1);
                            }
                            try {
                                ((Channel)sender).send(msg);
                                continue;
                            }
                            catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                    }
                };
                threads[i].start();
            }
            for (i = 0; i < threads.length; ++i) {
                threads[i].join(20000L);
            }
            return;
        }
        for (int i = 0; i < num_msgs; ++i) {
            JChannel sender = Util.tossWeightedCoin(0.5) ? this.c1 : this.c2;
            boolean oob = Util.tossWeightedCoin(oob_prob);
            Message msg = new Message(dest, null, Integer.valueOf(i));
            if (oob) {
                msg.setFlag((byte)1);
            }
            ((Channel)sender).send(msg);
        }
    }

    private void send(Address dest) throws ChannelNotConnectedException, ChannelClosedException {
        ReentrantLock lock = new ReentrantLock();
        BlockingReceiver receiver = new BlockingReceiver(lock);
        int NUM = 10;
        this.c2.setReceiver(receiver);
        lock.lock();
        this.c1.send(new Message(dest, null, Long.valueOf(1L)));
        Util.sleep(1000L);
        for (int i = 2; i <= 10; ++i) {
            Message msg = new Message(dest, null, Long.valueOf(i));
            msg.setFlag((byte)1);
            this.c1.send(msg);
        }
        OOBTest.sendStableMessages(this.c1, this.c2);
        Util.sleep(500L);
        List<Long> list = receiver.getMsgs();
        for (int i = 0; i < 10; ++i) {
            this.log.info("list = " + list);
            if (list.size() == 9) break;
            Util.sleep(1000L);
        }
        System.out.println("list = " + list);
        assert (list.size() == 9) : "list is " + list;
        assert (list.contains(2L));
        this.log.info("[" + Thread.currentThread().getName() + "]: unlocking lock");
        if (lock.isHeldByCurrentThread()) {
            lock.unlock();
        }
        Util.sleep(10L);
        list = receiver.getMsgs();
        System.out.println("list = " + list);
        assert (list.size() == 10) : "list is " + list;
        for (long i = 1L; i <= 10L; ++i) {
            assert (list.contains(i));
        }
    }

    private void check(int num_expected_msgs, Collection<Integer> ... lists) {
        for (Collection<Integer> list : lists) {
            this.log.info("list: " + list);
        }
        for (Collection<Integer> list : lists) {
            TreeSet<Integer> missing = new TreeSet<Integer>();
            if (list.size() == num_expected_msgs) continue;
            for (int i = 0; i < num_expected_msgs; ++i) {
                missing.add(i);
            }
            missing.removeAll(list);
            assert (list.size() == num_expected_msgs) : "expected " + num_expected_msgs + " elements, but got " + list.size() + " (list=" + list + "), missing=" + missing;
        }
    }

    private static void setOOBPoolSize(JChannel ... channels) {
        for (JChannel channel : channels) {
            TP transport = ((Channel)channel).getProtocolStack().getTransport();
            transport.setOOBThreadPoolMinThreads(1);
            transport.setOOBThreadPoolMaxThreads(2);
        }
    }

    private static void setStableGossip(JChannel ... channels) {
        for (JChannel channel : channels) {
            ProtocolStack stack = ((Channel)channel).getProtocolStack();
            STABLE stable = (STABLE)stack.findProtocol(STABLE.class);
            stable.setDesiredAverageGossip(2000L);
        }
    }

    private static void sendStableMessages(JChannel ... channels) {
        for (JChannel ch : channels) {
            STABLE stable = (STABLE)ch.getProtocolStack().findProtocol(STABLE.class);
            if (stable == null) continue;
            stable.runMessageGarbageCollection();
        }
    }

    private static class MySleepingReceiver
    extends MyReceiver {
        final long sleep_time;

        public MySleepingReceiver(String name, long sleep_time) {
            super(name);
            this.sleep_time = sleep_time;
        }

        public void receive(Message msg) {
            super.receive(msg);
            System.out.println("-- received " + msg.getObject());
            Util.sleep(this.sleep_time);
        }
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    private static class MyReceiver
    extends ReceiverAdapter {
        private final Collection<Integer> msgs = new ConcurrentLinkedQueue<Integer>();
        final String name;

        public MyReceiver(String name) {
            this.name = name;
        }

        public Collection<Integer> getMsgs() {
            return this.msgs;
        }

        @Override
        public void receive(Message msg) {
            Integer val = (Integer)msg.getObject();
            this.msgs.add(val);
        }
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    private static class BlockingReceiver
    extends ReceiverAdapter {
        final Lock lock;
        final List<Long> msgs = Collections.synchronizedList(new LinkedList());

        public BlockingReceiver(Lock lock) {
            this.lock = lock;
        }

        public List<Long> getMsgs() {
            return this.msgs;
        }

        @Override
        public void receive(Message msg) {
            if (!msg.isFlagSet((byte)1)) {
                this.lock.lock();
                this.lock.unlock();
            }
            this.msgs.add((Long)msg.getObject());
        }
    }
}

