/*
 * Decompiled with CFR 0.152.
 */
package org.jgroups.tests;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.jgroups.Channel;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.ReceiverAdapter;
import org.jgroups.View;
import org.jgroups.tests.ChannelTestBase;
import org.jgroups.util.Util;
import org.testng.annotations.Test;

@Test(groups={"flush"}, sequential=true)
public class FlushCloseOpenTest
extends ChannelTestBase {
    @Test
    public void testAndLoop() throws Exception {
        for (int i = 1; i <= 4; ++i) {
            JChannel channel = this.createChannel(true, 2);
            ReceiverImpl receiver = new ReceiverImpl();
            channel.setReceiver(receiver);
            ((Channel)channel).setName("A");
            ((Channel)channel).connect("testClust");
            JChannel channel2 = this.createChannel(channel);
            ReceiverImpl receiver2 = new ReceiverImpl();
            channel2.setReceiver(receiver2);
            ((Channel)channel).setName("B");
            ((Channel)channel2).connect("testClust");
            this.sendMessage(channel, (Serializable)((Object)"msg1"));
            this.sendMessage(channel2, (Serializable)((Object)"msg2"));
            assert (Util.startFlush(channel));
            this.assertCount(receiver, 2L, receiver2, 2L);
            ((Channel)channel).stopFlush();
            ((Channel)channel).close();
            channel = this.createChannel(channel2);
            channel.setReceiver(receiver);
            ((Channel)channel).setName("A");
            ((Channel)channel).connect("testClust");
            this.sendMessage(channel2, (Serializable)((Object)"msg3"));
            assert (Util.startFlush(channel2));
            this.assertCount(receiver, 3L, receiver2, 3L);
            ((Channel)channel).stopFlush();
            ((Channel)channel2).close();
            channel2 = this.createChannel(channel);
            channel2.setReceiver(receiver2);
            ((Channel)channel).setName("B");
            ((Channel)channel2).connect("testClust");
            this.sendMessage(channel2, (Serializable)((Object)"msg4"));
            assert (Util.startFlush(channel2));
            this.assertCount(receiver, 4L, receiver2, 4L);
            ((Channel)channel2).stopFlush();
            ((Channel)channel).close();
            ((Channel)channel2).close();
            receiver.receiveCount.set(0L);
            receiver2.receiveCount.set(0L);
            System.out.println("***** Round " + i + " done *****");
        }
    }

    private void sendMessage(Channel channel, Serializable obj) throws Exception {
        if (!channel.isConnected()) {
            this.log.warn("Channel disconnected in send, discarding msg");
            return;
        }
        Message msg = new Message(null, null, obj);
        this.log.debug("Sending message: " + msg);
        channel.send(msg);
        this.log.debug("Sent message: " + msg);
    }

    private void assertCount(ReceiverImpl srv1, long srv1Count, ReceiverImpl srv2, long srv2Count) throws InterruptedException {
        long start = System.currentTimeMillis();
        for (int i = 0; i < 1000 && (srv1.receiveCount.get() != srv1Count || srv2.receiveCount.get() != srv2Count); ++i) {
            Thread.sleep(10L);
        }
        assert (srv1Count == srv1.receiveCount.get()) : "expected " + srv1Count + " but got " + srv1.receiveCount;
        assert (srv2Count == srv2.receiveCount.get()) : "expected " + srv2Count + " but got " + srv2.receiveCount;
        this.log.info("assert OK in " + (System.currentTimeMillis() - start) + "ms");
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    private class ReceiverImpl
    extends ReceiverAdapter {
        final List<Object> msgs = new ArrayList<Object>();
        public final AtomicLong receiveCount = new AtomicLong();

        private ReceiverImpl() {
        }

        public List<Object> getMsgs() {
            return this.msgs;
        }

        @Override
        public void receive(Message msg) {
            try {
                Object data = msg.getObject();
                this.msgs.add(data);
                this.receiveCount.incrementAndGet();
                FlushCloseOpenTest.this.log.debug("Received msg: " + data);
            }
            catch (Exception e) {
                FlushCloseOpenTest.this.log.error("Receive failed", e);
            }
        }

        @Override
        public void viewAccepted(View new_view) {
        }
    }
}

