/*
 * Decompiled with CFR 0.152.
 */
package org.jgroups.tests;

import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.Vector;
import org.jgroups.Channel;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.ReceiverAdapter;
import org.jgroups.View;
import org.jgroups.protocols.TP;
import org.jgroups.protocols.pbcast.GMS;
import org.jgroups.stack.Protocol;
import org.jgroups.stack.ProtocolStack;
import org.jgroups.tests.ChannelTestBase;
import org.jgroups.util.Promise;
import org.jgroups.util.Util;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;

@Test(groups={"stack-dependent"}, sequential=true)
public class MessageBundlingTest
extends ChannelTestBase {
    private JChannel ch1;
    private JChannel ch2;
    private MyReceiver r2;
    private static final long LATENCY = 1500L;
    private static final long SLEEP = 5000L;
    private static final boolean BUNDLING = true;
    private static final int MAX_BYTES = 64000;

    @AfterMethod
    void tearDown() throws Exception {
        MessageBundlingTest.closeChannel(this.ch2);
        MessageBundlingTest.closeChannel(this.ch1);
    }

    protected boolean useBlocking() {
        return false;
    }

    public void testLatencyWithoutMessageBundling() throws Exception {
        this.createChannels("testLatencyWithoutMessageBundling");
        Message tmp = new Message();
        MessageBundlingTest.setBundling(this.ch1, false, 64000, 30L);
        this.r2.setNumExpectedMesssages(1);
        Promise<Integer> promise = new Promise<Integer>();
        this.r2.setPromise(promise);
        long time = System.currentTimeMillis();
        this.ch1.send(tmp);
        System.out.println(">>> sent message at " + new Date());
        promise.getResult(5000L);
        List<Long> list = this.r2.getTimes();
        Assert.assertEquals((int)1, (int)list.size());
        Long time2 = list.get(0);
        long diff = time2 - time;
        System.out.println("latency: " + diff + " ms");
        MessageBundlingTest.assertTrue("latency (" + diff + "ms) should be less than " + 1500L + " ms", diff <= 1500L);
    }

    public void testLatencyWithMessageBundling() throws Exception {
        this.createChannels("testLatencyWithMessageBundling");
        Message tmp = new Message();
        this.r2.setNumExpectedMesssages(1);
        Promise<Integer> promise = new Promise<Integer>();
        this.r2.setPromise(promise);
        long time = System.currentTimeMillis();
        this.ch1.send(tmp);
        System.out.println(">>> sent message at " + new Date());
        promise.getResult(5000L);
        List<Long> list = this.r2.getTimes();
        Assert.assertEquals((int)1, (int)list.size());
        Long time2 = list.get(0);
        long diff = time2 - time;
        System.out.println("latency: " + diff + " ms");
        MessageBundlingTest.assertTrue("latency (" + diff + "ms) should be more than the bundling timeout (" + 1500L + "ms), but less than 2 times the LATENCY (" + 3000L + ")", diff >= 1500L && diff <= 3000L);
    }

    public void testLatencyWithMessageBundlingAndLoopback() throws Exception {
        this.createChannels("testLatencyWithMessageBundlingAndLoopback");
        Message tmp = new Message();
        MessageBundlingTest.setLoopback(this.ch1, true);
        MessageBundlingTest.setLoopback(this.ch2, true);
        this.r2.setNumExpectedMesssages(1);
        Promise<Integer> promise = new Promise<Integer>();
        this.r2.setPromise(promise);
        long time = System.currentTimeMillis();
        System.out.println(">>> sending message at " + new Date());
        this.ch1.send(tmp);
        promise.getResult(5000L);
        List<Long> list = this.r2.getTimes();
        Assert.assertEquals((int)1, (int)list.size());
        Long time2 = list.get(0);
        long diff = time2 - time;
        System.out.println("latency: " + diff + " ms");
        MessageBundlingTest.assertTrue("latency (" + diff + "ms) should be more than the bundling timeout (" + 1500L + "ms), but less than 2 times the LATENCY (" + 3000L + ")", diff >= 1500L && diff <= 3000L);
    }

    public void testLatencyWithMessageBundlingAndMaxBytes() throws Exception {
        this.createChannels("testLatencyWithMessageBundlingAndMaxBytes");
        MessageBundlingTest.setLoopback(this.ch1, true);
        MessageBundlingTest.setLoopback(this.ch2, true);
        this.r2.setNumExpectedMesssages(10);
        Promise<Integer> promise = new Promise<Integer>();
        this.r2.setPromise(promise);
        Util.sleep(3000L);
        System.out.println(">>> sending 10 messages at " + new Date());
        for (int i = 0; i < 10; ++i) {
            this.ch1.send(new Message(null, null, new byte[2000]));
        }
        promise.getResult(5000L);
        List<Long> list = this.r2.getTimes();
        Assert.assertEquals((int)10, (int)list.size());
        for (Long val : list) {
            System.out.println(val);
        }
    }

    public void testSimple() throws Exception {
        this.createChannels("testSimple");
        Message tmp = new Message();
        this.ch2.setReceiver(new SimpleReceiver());
        this.ch1.send(tmp);
        System.out.println(">>> sent message at " + new Date());
        Util.sleep(5000L);
    }

    private void createChannels(String cluster) throws Exception {
        this.ch1 = this.createChannel(true, 2);
        MessageBundlingTest.setBundling(this.ch1, true, 64000, 1500L);
        MessageBundlingTest.setLoopback(this.ch1, false);
        this.ch1.setReceiver(new NullReceiver());
        this.ch1.connect("MessageBundlingTest-" + cluster);
        this.ch2 = this.createChannel(this.ch1);
        this.r2 = new MyReceiver();
        this.ch2.setReceiver(this.r2);
        this.ch2.connect("MessageBundlingTest-" + cluster);
        View view = this.ch2.getView();
        assert (view.size() == 2) : " view=" + view;
    }

    private static void setLoopback(JChannel ch, boolean b) {
        ProtocolStack stack = ch.getProtocolStack();
        Vector<Protocol> prots = stack.getProtocols();
        TP transport = (TP)prots.lastElement();
        transport.setLoopback(b);
    }

    private static void setBundling(JChannel ch, boolean enabled, int max_bytes, long timeout) {
        ProtocolStack stack = ch.getProtocolStack();
        Vector<Protocol> prots = stack.getProtocols();
        TP transport = (TP)prots.lastElement();
        transport.setEnableBundling(enabled);
        if (enabled) {
            transport.setMaxBundleSize(max_bytes);
            transport.setMaxBundleTimeout(timeout);
        }
        transport.setEnableUnicastBundling(false);
        if (enabled) {
            GMS gms = (GMS)stack.findProtocol("GMS");
            gms.setViewAckCollectionTimeout(3000L);
            gms.setJoinTimeout(3000L);
        }
    }

    private static void closeChannel(Channel c) {
        if (c != null && (c.isOpen() || c.isConnected())) {
            c.close();
        }
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    private static class MyReceiver
    extends ReceiverAdapter {
        private final List<Long> times = new LinkedList<Long>();
        private int num_expected_msgs;
        private Promise<Integer> promise;

        private MyReceiver() {
        }

        public List<Long> getTimes() {
            return this.times;
        }

        public void setNumExpectedMesssages(int num_expected_msgs) {
            this.num_expected_msgs = num_expected_msgs;
        }

        public void setPromise(Promise<Integer> promise) {
            this.promise = promise;
        }

        public int size() {
            return this.times.size();
        }

        @Override
        public void receive(Message msg) {
            this.times.add(new Long(System.currentTimeMillis()));
            System.out.println("<<< received message from " + msg.getSrc() + " at " + new Date());
            if (this.times.size() >= this.num_expected_msgs && this.promise != null) {
                this.promise.setResult(this.times.size());
            }
        }
    }

    private static class SimpleReceiver
    extends ReceiverAdapter {
        long start = System.currentTimeMillis();

        private SimpleReceiver() {
        }

        public void receive(Message msg) {
            System.out.println("<<< received message from " + msg.getSrc() + " at " + new Date() + ", latency=" + (System.currentTimeMillis() - this.start) + " ms");
        }
    }

    private static class NullReceiver
    extends ReceiverAdapter {
        private NullReceiver() {
        }
    }
}

