/*
 * Decompiled with CFR 0.152.
 */
package org.jgroups.tests;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.jgroups.Address;
import org.jgroups.Channel;
import org.jgroups.Event;
import org.jgroups.ExtendedReceiverAdapter;
import org.jgroups.GetStateEvent;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.View;
import org.jgroups.protocols.FD;
import org.jgroups.protocols.FD_ALL;
import org.jgroups.protocols.pbcast.FLUSH;
import org.jgroups.tests.ChannelTestBase;
import org.jgroups.util.Util;
import org.testng.annotations.Test;

@Test(groups={"flush"}, sequential=false)
public class FlushTest
extends ChannelTestBase {
    @Test
    public void testSingleChannel() throws Exception {
        Semaphore s = new Semaphore(1);
        FlushTestReceiver[] receivers = new FlushTestReceiver[]{new FlushTestReceiver("c1", s, 0, 1)};
        receivers[0].start();
        s.release(1);
        Channel[] tmp = new Channel[receivers.length];
        for (int i = 0; i < receivers.length; ++i) {
            tmp[i] = receivers[i].getChannel();
        }
        Util.blockUntilViewsReceived(60000L, 1000L, tmp);
        s.tryAcquire(1, 60L, TimeUnit.SECONDS);
        receivers[0].cleanup();
        Util.sleep(1000L);
        FlushTest.checkEventStateTransferSequence(receivers[0]);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testJoinFollowedByUnicast() throws Exception {
        JChannel c1 = null;
        JChannel c2 = null;
        try {
            c1 = this.createChannel(true, 2);
            c1.setReceiver(new SimpleReplier(c1, true));
            c1.connect("testJoinFollowedByUnicast");
            Address target = c1.getAddress();
            Message unicast_msg = new Message(target);
            c2 = this.createChannel(c1);
            c2.setReceiver(new SimpleReplier(c2, false));
            c2.connect("testJoinFollowedByUnicast");
            c2.send(unicast_msg);
        }
        catch (Throwable throwable) {
            Util.close(c2, c1);
            throw throwable;
        }
        Util.close(c2, c1);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testStateTransferFollowedByUnicast() throws Exception {
        JChannel c1 = null;
        JChannel c2 = null;
        try {
            c1 = this.createChannel(true, 2);
            c1.setReceiver(new SimpleReplier(c1, true));
            c1.connect("testStateTransferFollowedByUnicast");
            Address target = c1.getAddress();
            Message unicast_msg = new Message(target);
            c2 = this.createChannel(c1);
            c2.setReceiver(new SimpleReplier(c2, false));
            c2.connect("testStateTransferFollowedByUnicast");
            this.log.info("\n** Getting the state **");
            c2.getState(null, 10000L);
            c2.send(unicast_msg);
        }
        catch (Throwable throwable) {
            Util.close(c2, c1);
            throw throwable;
        }
        Util.close(c2, c1);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSequentialFlushInvocation() throws Exception {
        JChannel channel = null;
        JChannel channel2 = null;
        JChannel channel3 = null;
        try {
            channel = this.createChannel(true, 3);
            ((Channel)channel).setName("A");
            channel2 = this.createChannel(channel);
            ((Channel)channel2).setName("B");
            channel3 = this.createChannel(channel);
            ((Channel)channel3).setName("C");
            ((Channel)channel).connect("x");
            ((Channel)channel2).connect("x");
            ((Channel)channel3).connect("x");
            Util.sleep(500L);
            for (int i = 0; i < 100; ++i) {
                System.out.print("flush #" + i + ": ");
                long start = System.currentTimeMillis();
                boolean status = ((Channel)channel).startFlush(false);
                ((Channel)channel).stopFlush();
                long diff = System.currentTimeMillis() - start;
                System.out.println(status ? " OK (in " + diff + " ms)" : " FAIL");
                assert (status);
            }
        }
        catch (Throwable throwable) {
            Util.close(channel, channel2, channel3);
            throw throwable;
        }
        Util.close(channel, channel2, channel3);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testFlushWithCrashedFlushCoordinator() throws Exception {
        JChannel c1 = null;
        JChannel c2 = null;
        JChannel c3 = null;
        try {
            c1 = this.createChannel(true, 3, "C1");
            FlushTest.changeProps(c1);
            c1.connect("testFlushWithCrashedFlushCoordinator");
            c2 = this.createChannel(c1, "C2");
            FlushTest.changeProps(c2);
            c2.connect("testFlushWithCrashedFlushCoordinator");
            c3 = this.createChannel(c1, "C3");
            FlushTest.changeProps(c3);
            c3.connect("testFlushWithCrashedFlushCoordinator");
            System.out.println("shutting down flush coordinator C2");
            c2.down(new Event(94));
            Util.shutdown(c2);
            c1.getProtocolStack().findProtocol(FLUSH.class).setLevel("trace");
            c3.getProtocolStack().findProtocol(FLUSH.class).setLevel("trace");
            Util.blockUntilViewsReceived(10000L, 500L, c1, c3);
            FlushTest.assertTrue("correct view size", c1.getView().size() == 2);
            FlushTest.assertTrue("correct view size", c3.getView().size() == 2);
            c1.getProtocolStack().findProtocol(FLUSH.class).setLevel("warn");
            c3.getProtocolStack().findProtocol(FLUSH.class).setLevel("warn");
        }
        catch (Throwable throwable) {
            Util.close(c3, c2, c1);
            throw throwable;
        }
        Util.close(c3, c2, c1);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testFlushWithCrashedParticipant() throws Exception {
        JChannel c1 = null;
        JChannel c2 = null;
        JChannel c3 = null;
        try {
            c1 = this.createChannel(true, 3, "C1");
            FlushTest.changeProps(c1);
            c1.connect("testFlushWithCrashedParticipant");
            c2 = this.createChannel(c1, "C2");
            FlushTest.changeProps(c2);
            c2.connect("testFlushWithCrashedParticipant");
            c3 = this.createChannel(c1, "C3");
            FlushTest.changeProps(c3);
            c3.connect("testFlushWithCrashedParticipant");
            System.out.println("shutting down C3");
            Util.shutdown(c3);
            System.out.println("C2: starting flush");
            boolean rc = Util.startFlush(c2);
            System.out.println("flush " + (rc ? " was successful" : "failed"));
            assert (rc);
            System.out.println("stopping flush");
            c2.stopFlush();
            System.out.println("waiting for view to contain C1 and C2");
            Util.blockUntilViewsReceived(10000L, 500L, c1, c2);
            System.out.println("C1: view=" + c1.getView() + "\nC2: view=" + c2.getView());
            FlushTest.assertTrue("correct view size", c1.getView().size() == 2);
            FlushTest.assertTrue("correct view size", c2.getView().size() == 2);
        }
        catch (Throwable throwable) {
            Util.close(c3, c2, c1);
            throw throwable;
        }
        Util.close(c3, c2, c1);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testFlushWithCrashedParticipants() throws Exception {
        JChannel c1 = null;
        JChannel c2 = null;
        JChannel c3 = null;
        try {
            c1 = this.createChannel(true, 3, "C1");
            FlushTest.changeProps(c1);
            c1.connect("testFlushWithCrashedFlushCoordinator");
            c2 = this.createChannel(c1, "C2");
            FlushTest.changeProps(c2);
            c2.connect("testFlushWithCrashedFlushCoordinator");
            c3 = this.createChannel(c1, "C3");
            FlushTest.changeProps(c3);
            c3.connect("testFlushWithCrashedFlushCoordinator");
            Util.shutdown(c3);
            Util.shutdown(c1);
            Util.startFlush(c2);
            c2.stopFlush();
            Util.blockUntilViewsReceived(10000L, 500L, c2);
            FlushTest.assertTrue("correct view size", c2.getView().size() == 1);
        }
        catch (Throwable throwable) {
            Util.close(c3, c2, c1);
            throw throwable;
        }
        Util.close(c3, c2, c1);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPartialFlush() throws Exception {
        JChannel c1 = null;
        JChannel c2 = null;
        try {
            c1 = this.createChannel(true, 2);
            c1.setReceiver(new SimpleReplier(c1, true));
            c1.connect("testPartialFlush");
            c2 = this.createChannel(c1);
            c2.setReceiver(new SimpleReplier(c2, false));
            c2.connect("testPartialFlush");
            ArrayList<Address> members = new ArrayList<Address>();
            members.add(c2.getLocalAddress());
            boolean flushedOk = Util.startFlush(c2, members);
            FlushTest.assertTrue("Partial flush worked", flushedOk);
            c2.stopFlush(members);
        }
        catch (Throwable throwable) {
            Util.close(c2, c1);
            throw throwable;
        }
        Util.close(c2, c1);
    }

    @Test
    public void testBlockingNoStateTransfer() {
        String[] names = new String[]{"A", "B", "C", "D"};
        this._testChannels(names, 1);
    }

    @Test
    public void testBlockingWithStateTransfer() {
        String[] names = new String[]{"A", "B", "C", "D"};
        this._testChannels(names, 2);
    }

    @Test
    public void testBlockingWithConnectAndStateTransfer() {
        String[] names = new String[]{"A", "B", "C", "D"};
        this._testChannels(names, 3);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private void _testChannels(String[] names, int connectType) {
        int count = names.length;
        ArrayList<FlushTestReceiver> channels = new ArrayList<FlushTestReceiver>(count);
        try {
            try {
                Semaphore semaphore = new Semaphore(count);
                semaphore.acquire(count);
                boolean first = true;
                for (String channelName : names) {
                    FlushTestReceiver channel = null;
                    channel = first ? new FlushTestReceiver(channelName, semaphore, 0, connectType) : new FlushTestReceiver((JChannel)((FlushTestReceiver)channels.get(0)).getChannel(), channelName, semaphore, 0, connectType);
                    channels.add(channel);
                    channel.start();
                    semaphore.release(1);
                    if (first) {
                        Util.sleep(3000L);
                    }
                    first = false;
                }
                Channel[] tmp = new Channel[channels.size()];
                int cnt = 0;
                for (FlushTestReceiver receiver : channels) {
                    tmp[cnt++] = receiver.getChannel();
                }
                Util.blockUntilViewsReceived(10000L, 1000L, tmp);
                semaphore.tryAcquire(count, 40L, TimeUnit.SECONDS);
            }
            catch (Exception ex) {
                this.log.warn("Exception encountered during test", ex);
                assert (false) : "Exception encountered during test execution: " + ex;
                Object var13_15 = null;
                Util.sleep(1000L);
                for (FlushTestReceiver app : channels) {
                    app.getChannel().setReceiver(null);
                }
                for (FlushTestReceiver app : channels) {
                    app.cleanup();
                }
                for (FlushTestReceiver receiver : channels) {
                    FlushTest.checkEventStateTransferSequence(receiver);
                    System.out.println("event sequence for " + receiver.getChannel().getAddress() + " is OK");
                }
                return;
            }
            Object var13_14 = null;
        }
        catch (Throwable throwable) {
            Object var13_16 = null;
            Util.sleep(1000L);
            for (FlushTestReceiver app : channels) {
                app.getChannel().setReceiver(null);
            }
            for (FlushTestReceiver app : channels) {
                app.cleanup();
            }
            Iterator i$ = channels.iterator();
            while (true) {
                if (!i$.hasNext()) {
                    throw throwable;
                }
                FlushTestReceiver receiver = (FlushTestReceiver)i$.next();
                FlushTest.checkEventStateTransferSequence(receiver);
                System.out.println("event sequence for " + receiver.getChannel().getAddress() + " is OK");
            }
        }
        Util.sleep(1000L);
        for (FlushTestReceiver app : channels) {
            app.getChannel().setReceiver(null);
        }
        for (FlushTestReceiver app : channels) {
            app.cleanup();
        }
        for (FlushTestReceiver receiver : channels) {
            FlushTest.checkEventStateTransferSequence(receiver);
            System.out.println("event sequence for " + receiver.getChannel().getAddress() + " is OK");
        }
    }

    private static void changeProps(JChannel ... channels) {
        for (JChannel ch : channels) {
            FD_ALL fd_all;
            FD fd = (FD)ch.getProtocolStack().findProtocol(FD.class);
            if (fd != null) {
                fd.setTimeout(1000L);
                fd.setMaxTries(2);
            }
            if ((fd_all = (FD_ALL)ch.getProtocolStack().findProtocol(FD_ALL.class)) == null) continue;
            fd_all.setTimeout(2000L);
            fd_all.setInterval(800L);
        }
    }

    private class SimpleReplier
    extends ExtendedReceiverAdapter {
        Channel channel;
        boolean handle_requests = false;

        public SimpleReplier(Channel channel, boolean handle_requests) {
            this.channel = channel;
            this.handle_requests = handle_requests;
        }

        public void receive(Message msg) {
            Message reply = new Message(msg.getSrc());
            try {
                FlushTest.this.log.info("-- MySimpleReplier[" + this.channel.getAddress() + "]: received message from " + msg.getSrc());
                if (this.handle_requests) {
                    FlushTest.this.log.info(", sending reply");
                    this.channel.send(reply);
                } else {
                    System.out.println("\n");
                }
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }

        public void viewAccepted(View new_view) {
            FlushTest.this.log.info("-- MySimpleReplier[" + this.channel.getAddress() + "]: viewAccepted(" + new_view + ")");
        }

        public void block() {
            FlushTest.this.log.info("-- MySimpleReplier[" + this.channel.getAddress() + "]: block()");
        }

        public void unblock() {
            FlushTest.this.log.info("-- MySimpleReplier[" + this.channel.getAddress() + "]: unblock()");
        }
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    private class FlushTestReceiver
    extends ChannelTestBase.PushChannelApplicationWithSemaphore {
        private int connectMethod;
        public static final int CONNECT_ONLY = 1;
        public static final int CONNECT_AND_SEPARATE_GET_STATE = 2;
        public static final int CONNECT_AND_GET_STATE = 3;
        int msgCount;

        protected FlushTestReceiver(String name, Semaphore semaphore, int msgCount, int connectMethod) throws Exception {
            super(name, semaphore);
            this.msgCount = 0;
            this.connectMethod = connectMethod;
            this.msgCount = msgCount;
            this.events = Collections.synchronizedList(new LinkedList());
            if (connectMethod == 1 || connectMethod == 2) {
                this.channel.connect("FlushTestReceiver");
            }
            if (connectMethod == 3) {
                this.channel.connect("FlushTestReceiver", null, null, 25000L);
            }
        }

        protected FlushTestReceiver(JChannel ch, String name, Semaphore semaphore, int msgCount, int connectMethod) throws Exception {
            super(ch, name, semaphore);
            this.msgCount = 0;
            this.connectMethod = connectMethod;
            this.msgCount = msgCount;
            this.events = Collections.synchronizedList(new LinkedList());
            if (connectMethod == 1 || connectMethod == 2) {
                this.channel.connect("FlushTestReceiver");
            }
            if (connectMethod == 3) {
                this.channel.connect("FlushTestReceiver", null, null, 25000L);
            }
        }

        @Override
        public List<Object> getEvents() {
            return new LinkedList<Object>(this.events);
        }

        @Override
        public byte[] getState() {
            this.events.add(new GetStateEvent(null, null));
            return new byte[]{98, 101, 108, 97};
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void getState(OutputStream ostream) {
            super.getState(ostream);
            byte[] payload = new byte[]{98, 101, 108, 97};
            try {
                ostream.write(payload);
            }
            catch (IOException e) {
                e.printStackTrace();
            }
            finally {
                Util.close(ostream);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void setState(InputStream istream) {
            super.setState(istream);
            byte[] payload = new byte[4];
            try {
                istream.read(payload);
            }
            catch (IOException e) {
                e.printStackTrace();
            }
            finally {
                Util.close(istream);
            }
        }

        @Override
        protected void useChannel() throws Exception {
            if (this.connectMethod == 2) {
                this.channel.getState(null, 25000L);
            }
            if (this.msgCount > 0) {
                for (int i = 0; i < this.msgCount; ++i) {
                    this.channel.send(new Message());
                    Util.sleep(100L);
                }
            }
        }
    }
}

