/*
 * Decompiled with CFR 0.152.
 */
package org.jgroups.tests;

import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.jgroups.Address;
import org.jgroups.Channel;
import org.jgroups.JChannel;
import org.jgroups.MergeView;
import org.jgroups.Message;
import org.jgroups.View;
import org.jgroups.tests.ChannelTestBase;
import org.jgroups.util.Util;
import org.testng.annotations.Test;

@Test(groups={"flush"}, sequential=true)
public class ConcurrentStartupTest
extends ChannelTestBase {
    private AtomicInteger mod = new AtomicInteger(0);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Loose catch block
     */
    public void testConcurrentStartupWithState() {
        String[] names = new String[]{"A", "B", "C", "D"};
        int count = names.length;
        ConcurrentStartupChannel[] channels = new ConcurrentStartupChannel[count];
        try {
            Semaphore semaphore = new Semaphore(count);
            semaphore.acquire(count);
            for (int i = 0; i < count; ++i) {
                channels[i] = i == 0 ? new ConcurrentStartupChannel(names[i], semaphore) : new ConcurrentStartupChannel((JChannel)channels[0].getChannel(), names[i], semaphore);
                channels[i].start();
                semaphore.release(1);
                if (i != 0) continue;
                Util.sleep(1500L);
            }
            Channel[] tmp = new Channel[channels.length];
            for (int i = 0; i < channels.length; ++i) {
                tmp[i] = channels[i].getChannel();
            }
            Util.blockUntilViewsReceived(30000L, 500L, tmp);
            System.out.println(">>>> all nodes have the same view <<<<");
            boolean acquired = semaphore.tryAcquire(count, 20L, TimeUnit.SECONDS);
            if (!acquired) {
                this.log.warn("Most likely a bug, analyse the stack below:");
                this.log.warn(Util.dumpThreads());
            }
            System.out.println("Waiting for all channels to have received the " + count + " messages:");
            long end_time = System.currentTimeMillis() + 10000L;
            while (System.currentTimeMillis() < end_time) {
                boolean terminate = true;
                for (ConcurrentStartupChannel ch : channels) {
                    if (ch.getList().size() == count) continue;
                    terminate = false;
                    break;
                }
                if (terminate) break;
                Util.sleep(500L);
            }
            System.out.println("++++++++++++++++++++++++++++++++++++++++++++++++");
            for (ConcurrentStartupChannel channel : channels) {
                System.out.println(channel.getName() + ": state=" + channel.getList());
            }
            System.out.println("++++++++++++++++++++++++++++++++++++++++++++++++");
            for (ConcurrentStartupChannel ch : channels) {
                Set<Address> list = ch.getList();
                if (!ch.merged) assert (list.size() == count) : ": list is " + list + ", should have " + count + " elements";
            }
            System.out.println(">>>> done, all messages received by all channels <<<<");
            for (ConcurrentStartupChannel channel : channels) {
                if (channel.merged) continue;
                ConcurrentStartupTest.checkEventStateTransferSequence(channel);
            }
        }
        catch (Exception ex) {
            for (ConcurrentStartupChannel channel : channels) {
                channel.getChannel().setReceiver(null);
            }
            for (ConcurrentStartupChannel channel : channels) {
                channel.cleanup();
            }
            catch (Throwable throwable) {
                for (ConcurrentStartupChannel channel : channels) {
                    channel.getChannel().setReceiver(null);
                }
                for (ConcurrentStartupChannel channel : channels) {
                    channel.cleanup();
                }
                throw throwable;
            }
        }
        for (ConcurrentStartupChannel channel : channels) {
            channel.getChannel().setReceiver(null);
        }
        for (ConcurrentStartupChannel channel : channels) {
            channel.cleanup();
        }
    }

    protected int getMod() {
        return this.mod.incrementAndGet();
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    protected class ConcurrentStartupChannel
    extends ChannelTestBase.PushChannelApplicationWithSemaphore {
        private final Set<Address> state;
        boolean merged;

        public ConcurrentStartupChannel(String name, Semaphore semaphore) throws Exception {
            super(name, semaphore);
            this.state = new HashSet<Address>();
            this.merged = false;
        }

        public ConcurrentStartupChannel(JChannel ch, String name, Semaphore semaphore) throws Exception {
            super(ch, name, semaphore);
            this.state = new HashSet<Address>();
            this.merged = false;
        }

        @Override
        public void useChannel() throws Exception {
            this.channel.connect("test", null, null, 25000L);
            this.channel.send(null, null, this.channel.getAddress());
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        Set<Address> getList() {
            Set<Address> set = this.state;
            synchronized (set) {
                return this.state;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void receive(Message msg) {
            if (msg.getBuffer() == null) {
                return;
            }
            Address obj = (Address)msg.getObject();
            ConcurrentStartupTest.this.log.info(this.channel.getAddress() + ": received " + obj);
            Set<Address> set = this.state;
            synchronized (set) {
                this.state.add(obj);
            }
        }

        @Override
        public void viewAccepted(View new_view) {
            super.viewAccepted(new_view);
            if (new_view instanceof MergeView) {
                this.merged = true;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void setState(byte[] state) {
            super.setState(state);
            try {
                List tmp = (List)Util.objectFromByteBuffer(state);
                Set<Address> set = this.state;
                synchronized (set) {
                    this.state.addAll(tmp);
                    ConcurrentStartupTest.this.log.info(this.channel.getAddress() + ": state is " + this.state);
                }
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public byte[] getState() {
            super.getState();
            LinkedList<Address> tmp = null;
            Set<Address> set = this.state;
            synchronized (set) {
                tmp = new LinkedList<Address>(this.state);
                try {
                    return Util.objectToByteBuffer(tmp);
                }
                catch (Exception e) {
                    e.printStackTrace();
                    return null;
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void getState(OutputStream ostream) {
            super.getState(ostream);
            ObjectOutputStream oos = null;
            try {
                oos = new ObjectOutputStream(ostream);
                LinkedList<Address> tmp = null;
                Set<Address> set = this.state;
                synchronized (set) {
                    tmp = new LinkedList<Address>(this.state);
                }
                oos.writeObject(tmp);
                oos.flush();
            }
            catch (IOException e) {
                try {
                    e.printStackTrace();
                }
                catch (Throwable throwable) {
                    Util.close(oos);
                    throw throwable;
                }
                Util.close(oos);
            }
            Util.close(oos);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void setState(InputStream istream) {
            super.setState(istream);
            ObjectInputStream ois = null;
            try {
                ois = new ObjectInputStream(istream);
                List tmp = (List)ois.readObject();
                Set<Address> set = this.state;
                synchronized (set) {
                    this.state.addAll(tmp);
                    ConcurrentStartupTest.this.log.info(this.channel.getAddress() + ": state is " + this.state);
                }
            }
            catch (Exception e) {
                try {
                    e.printStackTrace();
                }
                catch (Throwable throwable) {
                    Util.close(ois);
                    throw throwable;
                }
                Util.close(ois);
            }
            Util.close(ois);
        }
    }
}

