/*
 * Decompiled with CFR 0.152.
 */
package org.modeshape.jcr.clustering;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.ObjectStreamClass;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jcr.RepositoryException;
import org.jgroups.Address;
import org.jgroups.Channel;
import org.jgroups.ChannelListener;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.ReceiverAdapter;
import org.jgroups.View;
import org.jgroups.conf.ProtocolStackConfigurator;
import org.jgroups.conf.XmlConfigurator;
import org.jgroups.fork.ForkChannel;
import org.jgroups.protocols.CENTRAL_LOCK;
import org.jgroups.protocols.FORK;
import org.jgroups.stack.Protocol;
import org.jgroups.stack.ProtocolStack;
import org.modeshape.common.SystemFailureException;
import org.modeshape.common.annotation.ThreadSafe;
import org.modeshape.common.i18n.I18nResource;
import org.modeshape.common.logging.Logger;
import org.modeshape.common.util.StringUtil;
import org.modeshape.jcr.clustering.ClusteringI18n;
import org.modeshape.jcr.clustering.MessageConsumer;

@ThreadSafe
public abstract class ClusteringService {
    protected static final Logger LOGGER = Logger.getLogger(ClusteringService.class);
    private static final long DEFAULT_MAX_CLOCK_DELAY_CLUSTER_MILLIS = TimeUnit.MINUTES.toMillis(10L);
    protected final Listener listener;
    protected final Receiver receiver;
    protected final String clusterName;
    protected Channel channel;
    private final long maxAllowedClockDelayMillis;
    private final AtomicInteger membersInCluster;
    private final AtomicBoolean isOpen;
    private final Set<MessageConsumer<Serializable>> consumers;

    protected ClusteringService(String clusterName) {
        assert (clusterName != null);
        this.clusterName = clusterName;
        this.listener = new Listener();
        this.receiver = new Receiver();
        this.isOpen = new AtomicBoolean(false);
        this.membersInCluster = new AtomicInteger(1);
        this.maxAllowedClockDelayMillis = DEFAULT_MAX_CLOCK_DELAY_CLUSTER_MILLIS;
        this.consumers = new CopyOnWriteArraySet<MessageConsumer<Serializable>>();
    }

    public void restart() throws Exception {
        this.shutdown();
        this.init();
    }

    public synchronized void addConsumer(MessageConsumer<? extends Serializable> consumer) {
        this.consumers.add(consumer);
    }

    public synchronized boolean shutdown() {
        if (this.channel == null) {
            return false;
        }
        Address address = this.channel.getAddress();
        LOGGER.debug("{0} shutting down clustering service...", new Object[]{address});
        this.consumers.clear();
        this.isOpen.set(false);
        try {
            this.channel.disconnect();
            this.channel.removeChannelListener((ChannelListener)this.listener);
            this.channel.setReceiver(null);
            this.channel.close();
            LOGGER.debug("{0} successfully closed main channel", new Object[]{address});
        }
        finally {
            this.channel = null;
        }
        this.membersInCluster.set(1);
        return true;
    }

    public boolean isOpen() {
        return this.isOpen.get();
    }

    public boolean multipleMembersInCluster() {
        return this.membersInCluster.get() > 1;
    }

    public int membersInCluster() {
        return this.membersInCluster.get();
    }

    public String clusterName() {
        return this.channel.getClusterName();
    }

    public long getMaxAllowedClockDelayMillis() {
        return this.maxAllowedClockDelayMillis;
    }

    public boolean sendMessage(Serializable payload) {
        if (!this.isOpen() || !this.multipleMembersInCluster()) {
            return false;
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("{0} SENDING {1} ", new Object[]{this.toString(), payload});
        }
        try {
            byte[] messageData = this.toByteArray(payload);
            Message jgMessage = new Message(null, this.channel.getAddress(), messageData);
            this.channel.send(jgMessage);
            return true;
        }
        catch (Exception e) {
            throw new SystemFailureException(ClusteringI18n.errorSendingMessage.text(new Object[]{this.clusterName()}), (Throwable)e);
        }
    }

    public String toString() {
        StringBuilder sb = new StringBuilder("ClusteringService[cluster_name='");
        sb.append(this.clusterName()).append("', address=").append(this.getChannel().getAddress()).append("]");
        return sb.toString();
    }

    public static ClusteringService startStandalone(String clusterName, String jgroupsConfig) {
        StandaloneClusteringService clusteringService = new StandaloneClusteringService(clusterName, jgroupsConfig);
        ((ClusteringService)clusteringService).init();
        return clusteringService;
    }

    public static ClusteringService startStandalone(String clusterName, Channel channel) {
        StandaloneClusteringService clusteringService = new StandaloneClusteringService(clusterName, channel);
        ((ClusteringService)clusteringService).init();
        return clusteringService;
    }

    public static ClusteringService startForked(Channel mainChannel) {
        if (!mainChannel.isConnected()) {
            throw new IllegalStateException(ClusteringI18n.channelNotConnected.text(new Object[0]));
        }
        ForkedClusteringService clusteringService = new ForkedClusteringService(mainChannel);
        ((ClusteringService)clusteringService).init();
        return clusteringService;
    }

    private byte[] toByteArray(Object payload) throws IOException {
        ByteArrayOutputStream output = new ByteArrayOutputStream();
        try (ObjectOutputStream stream = new ObjectOutputStream(output);){
            stream.writeObject(payload);
        }
        return output.toByteArray();
    }

    protected Serializable fromByteArray(byte[] data, ClassLoader classLoader) throws IOException, ClassNotFoundException {
        if (classLoader == null) {
            classLoader = ClusteringService.class.getClassLoader();
        }
        try (ObjectInputStreamWithClassLoader input = new ObjectInputStreamWithClassLoader(new ByteArrayInputStream(data), classLoader);){
            Serializable serializable = (Serializable)input.readObject();
            return serializable;
        }
    }

    public Channel getChannel() {
        return this.channel;
    }

    protected abstract void init();

    private static class ForkedClusteringService
    extends ClusteringService {
        private static final String FORK_CHANNEL_NAME = "modeshape-fork-channel";
        private static final Map<String, List<String>> FORK_STACKS_BY_CHANNEL_NAME = new HashMap<String, List<String>>();
        private final Channel mainChannel;

        protected ForkedClusteringService(Channel mainChannel) {
            super(mainChannel.getClusterName());
            this.mainChannel = mainChannel;
        }

        @Override
        protected void init() {
            try {
                boolean alreadyHasForkProtocol;
                ProtocolStack stack = this.mainChannel.getProtocolStack();
                Protocol topProtocol = stack.getTopProtocol();
                String forkStackId = this.clusterName;
                boolean bl = alreadyHasForkProtocol = stack.findProtocol(FORK.class) != null;
                if (!alreadyHasForkProtocol) {
                    FORK fork = new FORK();
                    fork.setProtocolStack(stack);
                    stack.insertProtocol((Protocol)fork, 1, topProtocol.getClass());
                }
                this.channel = new ForkChannel(this.mainChannel, forkStackId, FORK_CHANNEL_NAME, new Protocol[]{new CENTRAL_LOCK()});
                this.channel.addChannelListener((ChannelListener)this.listener);
                this.channel.setReceiver((org.jgroups.Receiver)this.receiver);
                this.channel.connect(FORK_CHANNEL_NAME);
                if (!alreadyHasForkProtocol) {
                    String mainChannelName = this.mainChannel.getName();
                    List<String> existingForksForChannel = FORK_STACKS_BY_CHANNEL_NAME.get(mainChannelName);
                    if (existingForksForChannel == null) {
                        existingForksForChannel = new ArrayList<String>();
                        FORK_STACKS_BY_CHANNEL_NAME.put(mainChannelName, existingForksForChannel);
                    }
                    existingForksForChannel.add(forkStackId);
                }
            }
            catch (RuntimeException rt) {
                throw rt;
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        @Override
        public synchronized boolean shutdown() {
            if (super.shutdown()) {
                String mainChannelName = this.mainChannel.getName();
                List<String> forksForChannel = FORK_STACKS_BY_CHANNEL_NAME.get(mainChannelName);
                if (forksForChannel != null) {
                    forksForChannel.remove(this.clusterName);
                    if (forksForChannel.isEmpty()) {
                        FORK_STACKS_BY_CHANNEL_NAME.remove(mainChannelName);
                        Protocol removed = this.mainChannel.getProtocolStack().removeProtocol(FORK.class);
                        if (removed != null) {
                            LOGGER.debug("FORK protocol removed from original channel stack for channel {0}", new Object[]{mainChannelName});
                        } else {
                            LOGGER.debug("FORK protocol not found in original channel stack for channel {0}", new Object[]{mainChannelName});
                        }
                    }
                }
                return true;
            }
            return false;
        }
    }

    private static class StandaloneClusteringService
    extends ClusteringService {
        private final String jgroupsConfig;

        protected StandaloneClusteringService(String clusterName, String jgroupsConfig) {
            super(clusterName);
            this.jgroupsConfig = jgroupsConfig;
            this.channel = null;
        }

        protected StandaloneClusteringService(String clusterName, Channel channel) {
            super(clusterName);
            this.jgroupsConfig = null;
            this.channel = channel;
        }

        @Override
        protected void init() {
            try {
                ProtocolStack protocolStack;
                Protocol centralLock;
                if (this.channel == null) {
                    this.channel = this.newChannel(this.jgroupsConfig);
                }
                if ((centralLock = (protocolStack = this.channel.getProtocolStack()).findProtocol(CENTRAL_LOCK.class)) == null) {
                    CENTRAL_LOCK lockingProtocol = new CENTRAL_LOCK();
                    lockingProtocol.init();
                    protocolStack.addProtocol((Protocol)lockingProtocol);
                }
                this.channel.addChannelListener((ChannelListener)this.listener);
                this.channel.setReceiver((org.jgroups.Receiver)this.receiver);
                this.channel.connect(this.clusterName);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private JChannel newChannel(String jgroupsConfig) throws Exception {
            if (StringUtil.isBlank((String)jgroupsConfig)) {
                return new JChannel();
            }
            XmlConfigurator configurator = null;
            InputStream stream = ClusteringService.class.getClassLoader().getResourceAsStream(jgroupsConfig);
            if (stream == null) {
                LOGGER.debug("Unable to locate configuration file '{0}' using the clustering service class loader.", new Object[]{jgroupsConfig});
                try {
                    stream = new FileInputStream(jgroupsConfig);
                }
                catch (FileNotFoundException e) {
                    throw new RepositoryException(ClusteringI18n.missingConfigurationFile.text(new Object[]{jgroupsConfig}));
                }
            }
            try {
                configurator = XmlConfigurator.getInstance((InputStream)stream);
            }
            catch (IOException e) {
                LOGGER.debug((Throwable)e, "Channel configuration is not a classpath resource", new Object[0]);
                stream = new ByteArrayInputStream(jgroupsConfig.getBytes());
                try {
                    configurator = XmlConfigurator.getInstance((InputStream)stream);
                }
                catch (IOException e1) {
                    LOGGER.debug((Throwable)e, "Channel configuration is not valid XML content", new Object[0]);
                }
            }
            finally {
                if (stream != null) {
                    try {
                        stream.close();
                    }
                    catch (IOException iOException) {}
                }
            }
            if (configurator == null) {
                throw new RepositoryException(ClusteringI18n.channelConfigurationError.text(new Object[]{jgroupsConfig}));
            }
            return new JChannel((ProtocolStackConfigurator)configurator);
        }
    }

    private static class ObjectInputStreamWithClassLoader
    extends ObjectInputStream {
        private ClassLoader cl;

        public ObjectInputStreamWithClassLoader(InputStream in, ClassLoader cl) throws IOException {
            super(in);
            this.cl = cl;
        }

        @Override
        protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException {
            if (this.cl == null) {
                return super.resolveClass(desc);
            }
            try {
                return Class.forName(desc.getName(), false, this.cl);
            }
            catch (ClassNotFoundException ex) {
                return super.resolveClass(desc);
            }
        }

        @Override
        public void close() throws IOException {
            super.close();
            this.cl = null;
        }
    }

    protected class Listener
    implements ChannelListener {
        protected Listener() {
        }

        public void channelClosed(Channel channel) {
            ClusteringService.this.isOpen.set(false);
        }

        public void channelConnected(Channel channel) {
            ClusteringService.this.isOpen.set(true);
        }

        public void channelDisconnected(Channel channel) {
            ClusteringService.this.isOpen.set(false);
        }
    }

    protected final class Receiver
    extends ReceiverAdapter {
        protected Receiver() {
        }

        public void block() {
            ClusteringService.this.isOpen.set(false);
        }

        public void receive(Message message) {
            try {
                Serializable payload = ClusteringService.this.fromByteArray(message.getBuffer(), ((Object)((Object)this)).getClass().getClassLoader());
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("{0} RECEIVED {1}", new Object[]{ClusteringService.this.toString(), payload});
                }
                for (MessageConsumer consumer : ClusteringService.this.consumers) {
                    if (!consumer.getPayloadType().isAssignableFrom(payload.getClass())) continue;
                    consumer.consume(payload);
                }
            }
            catch (Exception e) {
                String msg = ClusteringI18n.errorReceivingMessage.text(new Object[]{ClusteringService.this.clusterName()});
                throw new SystemFailureException(msg, (Throwable)e);
            }
        }

        public void suspect(Address suspectedMbr) {
            LOGGER.error((I18nResource)ClusteringI18n.memberOfClusterIsSuspect, new Object[]{ClusteringService.this.clusterName(), suspectedMbr});
        }

        public void viewAccepted(View newView) {
            int membersCount = newView.getMembers().size();
            ClusteringService.this.membersInCluster.set(membersCount);
            if (LOGGER.isDebugEnabled()) {
                String clusterServiceInfo = ClusteringService.this.toString();
                LOGGER.debug("{0}: new cluster member joined: {1}, total count: {2}", new Object[]{clusterServiceInfo, newView, membersCount});
                if (ClusteringService.this.membersInCluster.get() > 1) {
                    LOGGER.debug("{0}: there are now multiple members in cluster; changes will be propagated throughout the cluster", new Object[]{clusterServiceInfo});
                } else if (ClusteringService.this.membersInCluster.get() == 1) {
                    LOGGER.debug("{0}: there is only one member in cluster; changes will be propagated only locally", new Object[]{clusterServiceInfo});
                }
            }
        }
    }
}

