/*
 * Decompiled with CFR 0.152.
 */
package org.atmosphere.cpr;

import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.atmosphere.cache.BroadcastMessage;
import org.atmosphere.cache.CacheMessage;
import org.atmosphere.cpr.AtmosphereConfig;
import org.atmosphere.cpr.AtmosphereRequest;
import org.atmosphere.cpr.AtmosphereResource;
import org.atmosphere.cpr.AtmosphereResourceEvent;
import org.atmosphere.cpr.AtmosphereResourceEventImpl;
import org.atmosphere.cpr.AtmosphereResourceEventListener;
import org.atmosphere.cpr.AtmosphereResourceImpl;
import org.atmosphere.cpr.BroadcastFilter;
import org.atmosphere.cpr.Broadcaster;
import org.atmosphere.cpr.BroadcasterCache;
import org.atmosphere.cpr.BroadcasterConfig;
import org.atmosphere.cpr.BroadcasterFuture;
import org.atmosphere.cpr.BroadcasterLifeCyclePolicy;
import org.atmosphere.cpr.BroadcasterLifeCyclePolicyListener;
import org.atmosphere.cpr.BroadcasterListener;
import org.atmosphere.cpr.Deliver;
import org.atmosphere.cpr.FrameworkConfig;
import org.atmosphere.lifecycle.LifecycleHandler;
import org.atmosphere.pool.PoolableBroadcasterFactory;
import org.atmosphere.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultBroadcaster
implements Broadcaster {
    public static final int POLLING_DEFAULT = 100;
    public static final String CACHED = DefaultBroadcaster.class.getName() + ".messagesCached";
    private static final Logger logger = LoggerFactory.getLogger(DefaultBroadcaster.class);
    private static final String DESTROYED = "This Broadcaster has been destroyed and cannot be used {} by invoking {}";
    private static final List<AtmosphereResourceEventListener> EMPTY_LISTENERS = new ArrayList<AtmosphereResourceEventListener>();
    protected final ConcurrentLinkedQueue<AtmosphereResource> resources = new ConcurrentLinkedQueue();
    protected BroadcasterConfig bc;
    protected final BlockingQueue<Deliver> messages = new LinkedBlockingQueue<Deliver>();
    protected Collection<BroadcasterListener> broadcasterListeners;
    protected final AtomicBoolean started = new AtomicBoolean(false);
    protected final AtomicBoolean initialized = new AtomicBoolean(false);
    protected final AtomicBoolean destroyed = new AtomicBoolean(false);
    protected Broadcaster.SCOPE scope = Broadcaster.SCOPE.APPLICATION;
    protected String name = DefaultBroadcaster.class.getSimpleName();
    protected final ConcurrentLinkedQueue<Deliver> delayedBroadcast = new ConcurrentLinkedQueue();
    protected final ConcurrentLinkedQueue<Deliver> broadcastOnResume = new ConcurrentLinkedQueue();
    protected final ConcurrentLinkedQueue<BroadcasterLifeCyclePolicyListener> lifeCycleListeners = new ConcurrentLinkedQueue();
    protected final ConcurrentHashMap<String, WriteQueue> writeQueues = new ConcurrentHashMap();
    protected final WriteQueue uniqueWriteQueue = new WriteQueue("-1");
    protected final AtomicInteger dispatchThread = new AtomicInteger();
    protected Future<?>[] notifierFuture;
    protected Future<?>[] asyncWriteFuture;
    private Broadcaster.POLICY policy = Broadcaster.POLICY.FIFO;
    private final AtomicLong maxSuspendResource = new AtomicLong(-1L);
    private final AtomicBoolean requestScoped = new AtomicBoolean(false);
    private final AtomicBoolean recentActivity = new AtomicBoolean(false);
    private BroadcasterLifeCyclePolicy lifeCyclePolicy = new BroadcasterLifeCyclePolicy.Builder().policy(BroadcasterLifeCyclePolicy.ATMOSPHERE_RESOURCE_POLICY.NEVER).build();
    protected URI uri;
    protected AtmosphereConfig config;
    private final Object[] awaitBarrier = new Object[0];
    private final AtomicBoolean outOfOrderBroadcastSupported = new AtomicBoolean(false);
    protected int writeTimeoutInSecond = -1;
    protected int waitTime = 100;
    private boolean backwardCompatible;
    private LifecycleHandler lifecycleHandler;
    private Future<?> currentLifecycleTask;
    private boolean cacheOnIOFlushException = true;
    protected boolean sharedListeners;
    protected boolean candidateForPoolable;
    protected final String usingTokenIdForAttribute = UUID.randomUUID().toString();

    @Override
    public Broadcaster initialize(String name, URI uri, AtmosphereConfig config) {
        this.name = name;
        this.uri = uri;
        this.config = config;
        this.bc = this.createBroadcasterConfig(config);
        String s = config.getInitParameter("org.atmosphere.cpr.BroadcasterCache.strategy");
        if (s != null) {
            logger.warn("{} is no longer supported. Use BroadcastInterceptor instead. By default the original message will be cached.", (Object)"org.atmosphere.cpr.BroadcasterCache.strategy");
        }
        if ((s = config.getInitParameter("org.atmosphere.cpr.Broadcaster.supportOutOfOrderBroadcast")) != null) {
            this.outOfOrderBroadcastSupported.set(Boolean.valueOf(s));
        }
        if ((s = config.getInitParameter("org.atmosphere.cpr.Broadcaster.threadWaitTime")) != null) {
            this.waitTime = Integer.valueOf(s);
        }
        if ((s = config.getInitParameter("org.atmosphere.cpr.Broadcaster.writeTimeout")) != null) {
            this.writeTimeoutInSecond = Integer.valueOf(s);
        }
        if (this.outOfOrderBroadcastSupported.get()) {
            logger.trace("{} supports Out Of Order Broadcast: {}", (Object)name, (Object)this.outOfOrderBroadcastSupported.get());
        }
        this.initialized.set(true);
        this.backwardCompatible = Boolean.parseBoolean(config.getInitParameter("org.atmosphere.websocket.backwardCompatible.atmosphereResource"));
        this.cacheOnIOFlushException = config.getInitParameter("org.atmosphere.cpr.Broadcaster.cacheOnIOFlushException", true);
        this.sharedListeners = config.getInitParameter("org.atmosphere.cpr.Broadcaster.sharedListenersList", false);
        this.broadcasterListeners = this.sharedListeners ? config.getBroadcasterFactory().broadcasterListeners() : new ConcurrentLinkedQueue<BroadcasterListener>();
        this.candidateForPoolable = PoolableBroadcasterFactory.class.isAssignableFrom(config.getBroadcasterFactory().getClass());
        return this;
    }

    public Broadcaster initialize(String name, AtmosphereConfig config) {
        return this.initialize(name, URI.create("http://localhost"), config);
    }

    protected BroadcasterConfig createBroadcasterConfig(AtmosphereConfig config) {
        return new BroadcasterConfig(config.framework().broadcasterFilters, config, this.getID()).init();
    }

    @Override
    public synchronized void destroy() {
        try {
            logger.trace("Broadcaster {} will be pooled: {}", (Object)this.getID(), (Object)this.candidateForPoolable);
            if (!this.candidateForPoolable) {
                if (this.notifyOnPreDestroy()) {
                    return;
                }
                if (this.destroyed.getAndSet(true)) {
                    return;
                }
                logger.trace("Broadcaster {} is being destroyed and cannot be re-used. Policy was {}", (Object)this.getID(), (Object)this.policy);
                logger.trace("Broadcaster {} is being destroyed and cannot be re-used. Resources are {}", (Object)this.getID(), (Object)this.resources);
                this.started.set(false);
                this.releaseExternalResources();
                this.killReactiveThreads();
                if (this.bc != null) {
                    this.bc.destroy();
                }
                this.lifeCycleListeners.clear();
                this.delayedBroadcast.clear();
                if (!this.sharedListeners) {
                    this.broadcasterListeners.clear();
                }
            }
            this.resources.clear();
            this.broadcastOnResume.clear();
            this.messages.clear();
            this.writeQueues.clear();
            if (this.config.getBroadcasterFactory() != null) {
                this.config.getBroadcasterFactory().remove(this, this.getID());
            }
        }
        catch (Throwable t) {
            logger.error("Unexpected exception during Broadcaster destroy {}", (Object)this.getID(), (Object)t);
        }
    }

    @Override
    public Collection<AtmosphereResource> getAtmosphereResources() {
        return Collections.unmodifiableCollection(this.resources);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void setScope(Broadcaster.SCOPE scope) {
        if (this.destroyed.get()) {
            logger.debug(DESTROYED, (Object)this.getID(), (Object)"setScope");
            return;
        }
        this.scope = scope;
        if (scope != Broadcaster.SCOPE.REQUEST) {
            return;
        }
        logger.debug("Changing broadcaster scope for {}. This broadcaster will be destroyed.", (Object)this.getID());
        ConcurrentLinkedQueue<AtmosphereResource> concurrentLinkedQueue = this.resources;
        synchronized (concurrentLinkedQueue) {
            try {
                for (AtmosphereResource resource : this.resources) {
                    Object b = this.config.getBroadcasterFactory().get(this.getClass(), this.getClass().getSimpleName() + "/" + this.config.uuidProvider().generateUuid());
                    if (DefaultBroadcaster.class.isAssignableFrom(this.getClass())) {
                        BroadcasterCache cache = this.config.framework().newClassInstance(BroadcasterCache.class, this.bc.getBroadcasterCache().getClass());
                        cache.configure(this.config);
                        b.getBroadcasterConfig().setBroadcasterCache(cache);
                    }
                    resource.setBroadcaster((Broadcaster)b);
                    b.setScope(Broadcaster.SCOPE.REQUEST);
                    if (resource.getAtmosphereResourceEvent().isSuspended()) {
                        b.addAtmosphereResource(resource);
                    }
                    logger.debug("Resource {} not using broadcaster {}", (Object)resource, (Object)b.getID());
                }
                if (this.resources.isEmpty()) {
                    return;
                }
                this.destroy();
            }
            catch (Exception e) {
                logger.error("Failed to set request scope for current resources", e);
            }
        }
    }

    @Override
    public Broadcaster.SCOPE getScope() {
        return this.scope;
    }

    @Override
    public synchronized void setID(String id) {
        if (id == null) {
            id = this.getClass().getSimpleName() + "/" + this.config.uuidProvider().generateUuid();
        }
        if (this.config.getBroadcasterFactory() == null) {
            return;
        }
        Object b = this.config.getBroadcasterFactory().lookup(this.getClass(), id);
        if (b != null && b.getScope() == Broadcaster.SCOPE.REQUEST) {
            throw new IllegalStateException("Broadcaster ID already assigned to SCOPE.REQUEST. Cannot change the id");
        }
        if (b != null) {
            return;
        }
        this.config.getBroadcasterFactory().remove(this, this.name);
        this.name = id;
        this.config.getBroadcasterFactory().add(this, this.name);
        this.bc.broadcasterID(this.name);
    }

    public Broadcaster rename(String id) {
        this.name = id;
        return this;
    }

    @Override
    public String getID() {
        return this.name;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void resumeAll() {
        ConcurrentLinkedQueue<AtmosphereResource> concurrentLinkedQueue = this.resources;
        synchronized (concurrentLinkedQueue) {
            for (AtmosphereResource r : this.resources) {
                try {
                    r.resume();
                }
                catch (Throwable t) {
                    logger.trace("resumeAll", t);
                }
                finally {
                    this.removeAtmosphereResource(r);
                }
            }
        }
    }

    @Override
    public void releaseExternalResources() {
    }

    @Override
    public void setBroadcasterLifeCyclePolicy(BroadcasterLifeCyclePolicy lifeCyclePolicy) {
        this.lifeCyclePolicy = lifeCyclePolicy;
        if (this.lifecycleHandler != null) {
            this.lifecycleHandler.on(this);
        }
    }

    @Override
    public BroadcasterLifeCyclePolicy getBroadcasterLifeCyclePolicy() {
        return this.lifeCyclePolicy;
    }

    @Override
    public void addBroadcasterLifeCyclePolicyListener(BroadcasterLifeCyclePolicyListener b) {
        this.lifeCycleListeners.add(b);
    }

    @Override
    public void removeBroadcasterLifeCyclePolicyListener(BroadcasterLifeCyclePolicyListener b) {
        this.lifeCycleListeners.remove(b);
    }

    @Override
    public boolean isDestroyed() {
        return this.destroyed.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    @Override
    public Future<Object> awaitAndBroadcast(Object t, long time, TimeUnit timeUnit) {
        if (!this.resources.isEmpty()) return this.broadcast(t);
        Object[] objectArray = this.awaitBarrier;
        synchronized (this.awaitBarrier) {
            try {
                logger.trace("Awaiting for AtmosphereResource for {} {}", (Object)time, (Object)timeUnit);
                this.awaitBarrier.wait(this.translateTimeUnit(time, timeUnit));
            }
            catch (Throwable e) {
                logger.warn("awaitAndBroadcast", e);
                // ** MonitorExit[var5_4] (shouldn't be in output)
                return null;
            }
            return this.broadcast(t);
        }
    }

    @Override
    public Broadcaster addBroadcasterListener(BroadcasterListener b) {
        if (!this.sharedListeners && !this.broadcasterListeners.contains(b)) {
            this.broadcasterListeners.add(b);
        }
        return this;
    }

    @Override
    public Broadcaster removeBroadcasterListener(BroadcasterListener b) {
        if (!this.sharedListeners) {
            this.broadcasterListeners.remove(b);
        }
        return this;
    }

    protected Runnable getBroadcastHandler() {
        return new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                while (!DefaultBroadcaster.this.isDestroyed()) {
                    Deliver msg = null;
                    try {
                        msg = DefaultBroadcaster.this.messages.poll(DefaultBroadcaster.this.waitTime, TimeUnit.MILLISECONDS);
                        if (msg == null) {
                            DefaultBroadcaster.this.dispatchThread.decrementAndGet();
                            return;
                        }
                    }
                    catch (InterruptedException ex) {
                        logger.trace("{} got interrupted for Broadcaster {}", (Object)Thread.currentThread().getName(), (Object)DefaultBroadcaster.this.getID());
                        logger.trace("", ex);
                        return;
                    }
                    finally {
                        if (DefaultBroadcaster.this.outOfOrderBroadcastSupported.get()) {
                            DefaultBroadcaster.this.bc.getExecutorService().submit(this);
                        }
                    }
                    try {
                        logger.trace("{} is about to broadcast {}", (Object)DefaultBroadcaster.this.getID(), (Object)msg);
                        DefaultBroadcaster.this.push(msg);
                    }
                    catch (Throwable ex) {
                        if (!DefaultBroadcaster.this.started.get() || DefaultBroadcaster.this.destroyed.get()) {
                            logger.trace("Failed to submit broadcast handler runnable on shutdown for Broadcaster {}", (Object)DefaultBroadcaster.this.getID(), (Object)ex);
                            return;
                        }
                        logger.warn("This message {} will be lost", (Object)msg);
                        logger.warn("Failed to submit broadcast handler runnable to for Broadcaster" + DefaultBroadcaster.this.getID(), ex);
                    }
                    finally {
                        if (!DefaultBroadcaster.this.outOfOrderBroadcastSupported.get()) continue;
                        return;
                    }
                }
            }
        };
    }

    protected Runnable getAsyncWriteHandler(final WriteQueue writeQueue) {
        return new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             * Enabled aggressive block sorting
             * Enabled unnecessary exception pruning
             * Enabled aggressive exception aggregation
             */
            @Override
            public void run() {
                while (!DefaultBroadcaster.this.isDestroyed()) {
                    Object object;
                    AsyncWriteToken token = null;
                    try {
                        token = writeQueue.queue.poll(DefaultBroadcaster.this.waitTime, TimeUnit.MILLISECONDS);
                        if (token != null) {
                        }
                        if (!DefaultBroadcaster.this.outOfOrderBroadcastSupported.get()) {
                            object = writeQueue;
                            synchronized (object) {
                                if (writeQueue.queue.isEmpty()) {
                                    writeQueue.monitored.set(false);
                                    DefaultBroadcaster.this.writeQueues.remove(writeQueue.uuid);
                                    return;
                                }
                            }
                        }
                    }
                    catch (InterruptedException ex) {
                        logger.trace("{} got interrupted for Broadcaster {}", (Object)Thread.currentThread().getName(), (Object)DefaultBroadcaster.this.getID());
                        logger.trace("", ex);
                        return;
                    }
                    finally {
                        if (!DefaultBroadcaster.this.bc.getAsyncWriteService().isShutdown() && DefaultBroadcaster.this.outOfOrderBroadcastSupported.get()) {
                            DefaultBroadcaster.this.bc.getAsyncWriteService().submit(this);
                        }
                    }
                    if (token == null) continue;
                    object = token.resource;
                    synchronized (object) {
                        block30: {
                            try {
                                logger.trace("About to write to {}", (Object)token.resource);
                                DefaultBroadcaster.this.executeAsyncWrite(token);
                                if (DefaultBroadcaster.this.bc.getAsyncWriteService().isShutdown() || !DefaultBroadcaster.this.outOfOrderBroadcastSupported.get()) break block30;
                            }
                            catch (Throwable ex) {
                                try {
                                    if (!DefaultBroadcaster.this.started.get() || DefaultBroadcaster.this.destroyed.get()) {
                                        logger.trace("Failed to execute a write operation. Broadcaster is destroyed or not yet started for Broadcaster {}", (Object)DefaultBroadcaster.this.getID(), (Object)ex);
                                        return;
                                    }
                                    try {
                                        if (token != null) {
                                            logger.warn("This message {} will be lost for AtmosphereResource {}, adding it to the BroadcasterCache", token.originalMessage, (Object)(token.resource != null ? token.resource.uuid() : "null"));
                                            DefaultBroadcaster.this.cacheLostMessage(token.resource, token, true);
                                        }
                                        if (token != null) {
                                            DefaultBroadcaster.this.removeAtmosphereResource(token.resource, false);
                                        }
                                    }
                                    catch (Throwable throwable) {
                                        if (token != null) {
                                            DefaultBroadcaster.this.removeAtmosphereResource(token.resource, false);
                                        }
                                        logger.warn("Failed to execute a write operation for Broadcaster " + DefaultBroadcaster.this.getID(), ex);
                                        throw throwable;
                                    }
                                    logger.warn("Failed to execute a write operation for Broadcaster " + DefaultBroadcaster.this.getID(), ex);
                                }
                                catch (Throwable throwable) {
                                    throw throwable;
                                }
                                finally {
                                    if (!DefaultBroadcaster.this.bc.getAsyncWriteService().isShutdown() && DefaultBroadcaster.this.outOfOrderBroadcastSupported.get()) {
                                        return;
                                    }
                                }
                            }
                            return;
                        }
                    }
                }
            }
        };
    }

    protected void start() {
        if (!this.initialized.get()) {
            logger.warn("Broadcaster {} not initialized", (Object)this.getID());
        }
        if (!this.started.getAndSet(true)) {
            this.bc.getBroadcasterCache().start();
            if (this.notifierFuture == null && this.asyncWriteFuture == null) {
                this.spawnReactor();
            }
        }
    }

    protected void spawnReactor() {
        this.killReactiveThreads();
        int threads = this.outOfOrderBroadcastSupported.get() ? this.reactiveThreadsCount() : 1;
        this.notifierFuture = new Future[threads];
        if (this.outOfOrderBroadcastSupported.get()) {
            this.asyncWriteFuture = new Future[threads];
            for (int i = 0; i < threads; ++i) {
                this.notifierFuture[i] = this.bc.getExecutorService().submit(this.getBroadcastHandler());
                this.asyncWriteFuture[i] = this.bc.getAsyncWriteService().submit(this.getAsyncWriteHandler(this.uniqueWriteQueue));
            }
        } else {
            this.notifierFuture[0] = this.bc.getExecutorService().submit(this.getBroadcastHandler());
        }
        this.dispatchThread.set(threads);
    }

    protected void killReactiveThreads() {
        if (this.notifierFuture != null) {
            for (Future<?> f : this.notifierFuture) {
                if (f == null) continue;
                f.cancel(false);
            }
        }
        if (this.asyncWriteFuture != null) {
            for (Future<?> f : this.asyncWriteFuture) {
                if (f == null) continue;
                f.cancel(false);
            }
        }
    }

    protected int reactiveThreadsCount() {
        return Runtime.getRuntime().availableProcessors() * 2;
    }

    protected void push(Deliver deliver) {
        if (this.destroyed.get()) {
            return;
        }
        this.deliverPush(deliver, true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void deliverPush(Deliver deliver, boolean rec) {
        block41: {
            Object finalMsg;
            Serializable b;
            this.recentActivity.set(true);
            String prevMessage = deliver.message.toString();
            if (rec && !this.delayedBroadcast.isEmpty()) {
                Iterator<Deliver> i = this.delayedBroadcast.iterator();
                b = new StringBuilder();
                while (i.hasNext()) {
                    Deliver e = i.next();
                    e.future.cancel(true);
                    try {
                        if (e.message instanceof String && deliver.message instanceof String) {
                            ((StringBuilder)b).append(e.message);
                            continue;
                        }
                        this.deliverPush(e, false);
                    }
                    finally {
                        i.remove();
                    }
                }
                if (((StringBuilder)b).length() > 0) {
                    deliver.message = ((StringBuilder)b).append(deliver.message).toString();
                }
            }
            if ((finalMsg = this.callable(deliver.message)) == null) {
                logger.error("Callable exception. Please catch all exceptions from your callable. Message {} will be lost and all AtmosphereResource associated with this Broadcaster resumed.", deliver.message);
                this.entryDone(deliver.future);
                switch (deliver.type) {
                    case ALL: {
                        b = this.resources;
                        synchronized (b) {
                            for (AtmosphereResource r : this.resources) {
                                if (!Utils.resumableTransport(r.transport())) continue;
                                try {
                                    r.resume();
                                }
                                catch (Throwable t) {
                                    logger.trace("resumeAll", t);
                                }
                            }
                            break;
                        }
                    }
                    case RESOURCE: {
                        deliver.resource.resume();
                        break;
                    }
                    case SET: {
                        for (AtmosphereResource r : deliver.resources) {
                            r.resume();
                        }
                        break;
                    }
                }
                return;
            }
            this.notifyOnMessage(deliver);
            Object prevM = deliver.originalMessage;
            Object object = deliver.originalMessage = deliver.originalMessage != deliver.message ? this.callable(deliver.originalMessage) : finalMsg;
            if (deliver.originalMessage == null) {
                logger.trace("Broadcasted message was null {}", prevM);
                this.entryDone(deliver.future);
                return;
            }
            deliver.message = finalMsg;
            HashMap<String, CacheMessage> cacheForSet = deliver.type == Deliver.TYPE.SET ? new HashMap<String, CacheMessage>() : null;
            switch (deliver.type) {
                case ALL: {
                    deliver.cache = this.bc.getBroadcasterCache().addToCache(this.getID(), "null", new BroadcastMessage(deliver.originalMessage));
                    break;
                }
                case RESOURCE: {
                    deliver.cache = this.bc.getBroadcasterCache().addToCache(this.getID(), deliver.resource.uuid(), new BroadcastMessage(deliver.originalMessage));
                    break;
                }
                case SET: {
                    for (AtmosphereResource r : deliver.resources) {
                        cacheForSet.put(r.uuid(), this.bc.getBroadcasterCache().addToCache(this.getID(), r.uuid(), new BroadcastMessage(deliver.originalMessage)));
                    }
                    break;
                }
            }
            if (this.resources.isEmpty()) {
                logger.trace("No resource available for {} and message {}", (Object)this.getID(), finalMsg);
                this.entryDone(deliver.future);
                if (cacheForSet != null) {
                    cacheForSet.clear();
                }
                return;
            }
            try {
                if (logger.isTraceEnabled()) {
                    for (AtmosphereResource r : this.resources) {
                        logger.trace("AtmosphereResource {} available for {}", (Object)r.uuid(), deliver.message);
                    }
                }
                boolean hasFilters = this.bc.hasPerRequestFilters();
                Object beforeProcessingMessage = deliver.message;
                switch (deliver.type) {
                    case ALL: {
                        AtomicInteger count = new AtomicInteger(this.resources.size());
                        for (AtmosphereResource r : this.resources) {
                            deliver.message = beforeProcessingMessage;
                            boolean deliverMessage = this.perRequestFilter(r, deliver);
                            if (this.endBroadcast(deliver, r, deliver.cache, deliverMessage) || !deliver.writeLocally) continue;
                            this.queueWriteIO(r, hasFilters ? new Deliver(r, deliver) : deliver, count);
                        }
                        break;
                    }
                    case RESOURCE: {
                        boolean deliverMessage = this.perRequestFilter(deliver.resource, deliver);
                        if (this.endBroadcast(deliver, deliver.resource, deliver.cache, deliverMessage)) {
                            return;
                        }
                        if (!deliver.writeLocally) break;
                        this.queueWriteIO(deliver.resource, deliver, new AtomicInteger(1));
                        break;
                    }
                    case SET: {
                        AtomicInteger count = new AtomicInteger(deliver.resources.size());
                        for (AtmosphereResource r : deliver.resources) {
                            deliver.message = beforeProcessingMessage;
                            boolean deliverMessage = this.perRequestFilter(r, deliver);
                            CacheMessage cacheMsg = (CacheMessage)cacheForSet.remove(r.uuid());
                            if (this.endBroadcast(deliver, r, cacheMsg, deliverMessage) || !deliver.writeLocally) continue;
                            this.queueWriteIO(r, new Deliver(r, deliver, cacheMsg), count);
                        }
                        break;
                    }
                }
                deliver.message = prevMessage;
            }
            catch (InterruptedException ex) {
                logger.debug(ex.getMessage(), ex);
                if (cacheForSet == null) break block41;
                cacheForSet.clear();
            }
        }
    }

    protected boolean endBroadcast(Deliver deliver, AtmosphereResource r, CacheMessage cacheMsg, boolean deliverMessage) {
        if (!deliverMessage || deliver.message == null) {
            logger.debug("Skipping broadcast delivery {} for resource {} ", deliver.message, (Object)(deliver.resource != null ? deliver.resource.uuid() : "null"));
            this.bc.getBroadcasterCache().clearCache(this.getID(), r.uuid(), cacheMsg);
            this.entryDone(deliver.future);
            return true;
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void queueWriteIO(AtmosphereResource r, Deliver deliver, AtomicInteger count) throws InterruptedException {
        if (deliver.async) {
            if (!this.bc.getBroadcasterCache().getClass().equals(BroadcasterCache.DEFAULT.getClass().getName()) && (r.isResumed() || r.isCancelled())) {
                logger.trace("AtmosphereResource {} has been resumed or cancelled, unable to Broadcast message {}", (Object)r.uuid(), deliver.message);
                AtmosphereResource r2 = this.config.resourcesFactory().find(r.uuid());
                logger.trace("Found an AtmosphereResource {} in state {}", (Object)r2, (Object)r.isSuspended());
                if (r2 != null && r2.isSuspended() && r.hashCode() != r2.hashCode()) {
                    this.removeAtmosphereResource(r2);
                    this.checkCachedAndPush(r2, r2.getAtmosphereResourceEvent());
                }
                return;
            }
            AsyncWriteToken w = new AsyncWriteToken(r, deliver.message, deliver.future, deliver.originalMessage, deliver.cache, count);
            if (!this.outOfOrderBroadcastSupported.get()) {
                WriteQueue writeQueue = this.writeQueues.get(r.uuid());
                if (writeQueue == null) {
                    writeQueue = new WriteQueue(r.uuid());
                    this.writeQueues.put(r.uuid(), writeQueue);
                }
                writeQueue.queue.put(w);
                WriteQueue writeQueue2 = writeQueue;
                synchronized (writeQueue2) {
                    if (!writeQueue.monitored.getAndSet(true)) {
                        logger.trace("Broadcaster {} is about to queueWriteIO for AtmosphereResource {}", (Object)this.name, (Object)r.uuid());
                        this.bc.getAsyncWriteService().submit(this.getAsyncWriteHandler(writeQueue));
                    }
                }
            } else {
                this.uniqueWriteQueue.queue.offer(w);
            }
        } else {
            this.executeBlockingWrite(r, deliver, count);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void executeBlockingWrite(AtmosphereResource r, Deliver deliver, AtomicInteger count) throws InterruptedException {
        AtmosphereResource atmosphereResource = r;
        synchronized (atmosphereResource) {
            this.executeAsyncWrite(new AsyncWriteToken(r, deliver.message, deliver.future, deliver.originalMessage, deliver.cache, count));
        }
    }

    protected boolean perRequestFilter(AtmosphereResource r, Deliver msg) {
        if (r == null) {
            logger.trace("Null AtmosphereResource passed inside a Set");
            return false;
        }
        if (this.bc.hasPerRequestFilters()) {
            BroadcastFilter.BroadcastAction a = this.bc.filter(r, msg.message, msg.originalMessage);
            if (a.action() == BroadcastFilter.BroadcastAction.ACTION.ABORT) {
                return false;
            }
            msg.message = a.message();
        }
        return true;
    }

    private Object callable(Object msg) {
        if (Callable.class.isAssignableFrom(msg.getClass())) {
            try {
                return ((Callable)Callable.class.cast(msg)).call();
            }
            catch (Exception e) {
                logger.warn("Callable exception", e);
                return null;
            }
        }
        return msg;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void executeAsyncWrite(AsyncWriteToken token) {
        boolean notifyListeners = true;
        boolean lostCandidate = false;
        if (token.resource == null) {
            throw new NullPointerException();
        }
        AtmosphereResourceEventImpl event = (AtmosphereResourceEventImpl)token.resource.getAtmosphereResourceEvent();
        AtmosphereResourceImpl r = (AtmosphereResourceImpl)AtmosphereResourceImpl.class.cast(token.resource);
        boolean willBeResumed = Utils.resumableTransport(r.transport());
        ArrayList<AtmosphereResourceEventListener> listeners = willBeResumed ? new ArrayList<AtmosphereResourceEventListener>() : EMPTY_LISTENERS;
        AtmosphereRequest request = r.getRequest(false);
        try {
            event.setMessage(token.msg);
            if (!this.isAtmosphereResourceValid(r)) {
                logger.trace("AtmosphereResource {} state is invalid for Broadcaster {}. Message will be cached", (Object)r.uuid(), (Object)this.name);
                this.removeAtmosphereResource(r, false);
                return;
            }
            this.bc.getBroadcasterCache().clearCache(this.getID(), r.uuid(), token.cache);
            try {
                request.setAttribute(this.getID(), token.future);
                request.setAttribute("org.atmosphere.cpr.CometSupport.maxInactiveActivity", System.currentTimeMillis());
                request.setAttribute(this.usingTokenIdForAttribute, token);
                if (willBeResumed && !r.atmosphereResourceEventListener().isEmpty()) {
                    listeners.addAll(r.atmosphereResourceEventListener());
                }
                this.prepareInvokeOnStateChange(r, event);
            }
            catch (Throwable t) {
                logger.debug("Invalid AtmosphereResource state {}. The connection has been remotely closed and message {} will be added to the configured BroadcasterCache for later retrieval", (Object)r.uuid(), event.getMessage());
                logger.trace("If you are using Tomcat 7.0.22 and lower, you're most probably hitting http://is.gd/NqicFT");
                logger.trace("ApplicationConfig.CACHE_MESSAGE_ON_IO_FLUSH_EXCEPTION {}", (Object)this.cacheOnIOFlushException, (Object)t);
                lostCandidate = this.cacheOnIOFlushException ? this.cacheOnIOFlushException : this.cacheMessageOnIOException(t);
                this.removeAtmosphereResource(r, false);
                r.removeFromAllBroadcasters();
                event.setCancelled(true);
                event.setThrowable(t);
                r.setIsInScope(false);
                if (notifyListeners) {
                    if (willBeResumed) {
                        event.setMessage(token.msg);
                        for (AtmosphereResourceEventListener e : listeners) {
                            e.onBroadcast(event);
                        }
                    } else if (!event.isResumedOnTimeout()) {
                        r.notifyListeners();
                    }
                }
                if (token.lastBroadcasted()) {
                    this.notifyBroadcastListener();
                }
                if (token.future != null) {
                    token.future.done();
                }
                if (lostCandidate) {
                    this.cacheLostMessage(r, token, true);
                }
                try {
                    request.removeAttribute(this.getID());
                    request.removeAttribute(this.usingTokenIdForAttribute);
                }
                catch (NullPointerException ex) {
                    logger.trace("NPE after the message has been written for {}", (Object)r.uuid());
                }
                token.destroy();
                return;
            }
            try {
                request.setAttribute(FrameworkConfig.MESSAGE_WRITTEN, "true");
            }
            catch (NullPointerException ex) {
                logger.trace("NPE after the message has been written for {}", (Object)r.uuid());
            }
        }
        finally {
            if (notifyListeners) {
                if (willBeResumed) {
                    event.setMessage(token.msg);
                    for (AtmosphereResourceEventListener e : listeners) {
                        e.onBroadcast(event);
                    }
                } else if (!event.isResumedOnTimeout()) {
                    r.notifyListeners();
                }
            }
            if (token.lastBroadcasted()) {
                this.notifyBroadcastListener();
            }
            if (token.future != null) {
                token.future.done();
            }
            if (lostCandidate) {
                this.cacheLostMessage(r, token, true);
            }
            try {
                request.removeAttribute(this.getID());
                request.removeAttribute(this.usingTokenIdForAttribute);
            }
            catch (NullPointerException ex) {
                logger.trace("NPE after the message has been written for {}", (Object)r.uuid());
            }
            token.destroy();
        }
    }

    protected boolean cacheMessageOnIOException(Throwable cause) {
        for (StackTraceElement element : cause.getStackTrace()) {
            if (!element.getMethodName().equals("flush") && !element.getMethodName().equals("flushBuffer")) continue;
            return false;
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected boolean checkCachedAndPush(AtmosphereResource r, AtmosphereResourceEvent e) {
        boolean cache = this.retrieveTrackedBroadcast(r, e);
        if (!cache) {
            return false;
        }
        if (!((List)e.getMessage()).isEmpty()) {
            logger.debug("Sending cached message {} to {}", e.getMessage(), (Object)r.uuid());
            List cacheMessages = (List)e.getMessage();
            BroadcasterFuture<Object> f = new BroadcasterFuture<Object>(e.getMessage(), 1);
            LinkedList<Object> filteredMessage = new LinkedList<Object>();
            LinkedList filteredMessageClone = null;
            for (Object o : cacheMessages) {
                Deliver deliver;
                Object newMessage = this.filter(o);
                if (newMessage == null || !this.perRequestFilter(r, deliver = new Deliver(newMessage, r, f, o)) || deliver.message == null) continue;
                filteredMessage.addLast(deliver.message);
            }
            if (filteredMessage.isEmpty()) {
                return false;
            }
            e.setMessage(filteredMessage);
            boolean willBeResumed = Utils.resumableTransport(r.transport());
            if (willBeResumed) {
                filteredMessageClone = (LinkedList)filteredMessage.clone();
            }
            ArrayList<AtmosphereResourceEventListener> listeners = willBeResumed ? new ArrayList<AtmosphereResourceEventListener>() : EMPTY_LISTENERS;
            AtmosphereResourceImpl rImpl = (AtmosphereResourceImpl)AtmosphereResourceImpl.class.cast(r);
            if (willBeResumed && !rImpl.atmosphereResourceEventListener().isEmpty()) {
                listeners.addAll(rImpl.atmosphereResourceEventListener());
            }
            AtmosphereResourceImpl atmosphereResourceImpl = rImpl;
            synchronized (atmosphereResourceImpl) {
                try {
                    rImpl.getRequest().setAttribute(CACHED, "true");
                    this.prepareInvokeOnStateChange(r, e);
                }
                catch (Throwable t) {
                    logger.error("Unable to write cached message {} for {}", e.getMessage(), (Object)r.uuid());
                    logger.error("", t);
                    for (Object o : cacheMessages) {
                        this.bc.getBroadcasterCache().addToCache(this.getID(), r != null ? r.uuid() : "null", new BroadcastMessage(o));
                    }
                    return true;
                }
                if (willBeResumed) {
                    e.setMessage(filteredMessageClone);
                }
                for (AtmosphereResourceEventListener l : willBeResumed ? listeners : rImpl.atmosphereResourceEventListener()) {
                    l.onBroadcast(e);
                }
                switch (r.transport()) {
                    case UNDEFINED: 
                    case JSONP: 
                    case AJAX: 
                    case LONG_POLLING: {
                        return true;
                    }
                    case SSE: {
                        break;
                    }
                    default: {
                        try {
                            r.getResponse().flushBuffer();
                            break;
                        }
                        catch (IOException ioe) {
                            logger.trace("", ioe);
                            ((AtmosphereResourceImpl)AtmosphereResourceImpl.class.cast(r))._destroy();
                        }
                    }
                }
            }
        }
        return false;
    }

    protected boolean retrieveTrackedBroadcast(AtmosphereResource r, AtmosphereResourceEvent e) {
        logger.trace("Checking cached message for {}", (Object)r.uuid());
        List<Object> missedMsg = this.bc.getBroadcasterCache().retrieveFromCache(this.getID(), r.uuid());
        if (missedMsg != null && !missedMsg.isEmpty()) {
            e.setMessage(missedMsg);
            return true;
        }
        return false;
    }

    protected void invokeOnStateChange(AtmosphereResource r, AtmosphereResourceEvent e) {
        block2: {
            try {
                logger.trace("{} is broadcasting to {}", (Object)this.name, (Object)r.uuid());
                r.getAtmosphereHandler().onStateChange(e);
            }
            catch (Throwable t) {
                if (InterruptedException.class.isAssignableFrom(t.getClass())) break block2;
                this.onException(t, r);
            }
        }
    }

    protected void prepareInvokeOnStateChange(AtmosphereResource r, AtmosphereResourceEvent e) {
        if (this.writeTimeoutInSecond != -1) {
            logger.trace("Registering Write timeout {} for {}", (Object)this.writeTimeoutInSecond, (Object)r.uuid());
            WriteOperation w = new WriteOperation(r, e, Thread.currentThread());
            this.bc.getScheduledExecutorService().schedule(w, (long)this.writeTimeoutInSecond, TimeUnit.MILLISECONDS);
            try {
                w.call();
            }
            catch (Exception ex) {
                logger.warn("", ex);
            }
        } else {
            this.invokeOnStateChange(r, e);
        }
    }

    public void onException(Throwable t, AtmosphereResource ar) {
        this.onException(t, ar, true);
    }

    public void onException(Throwable t, final AtmosphereResource ar, boolean notifyAndCache) {
        final AtmosphereResourceImpl r = (AtmosphereResourceImpl)AtmosphereResourceImpl.class.cast(ar);
        logger.trace("I/O Exception (or related) during execution of the write operation for AtmosphereResource {} and Broadcaster {}. Message will be cached {}", ar.uuid(), this.getID(), String.valueOf(notifyAndCache));
        logger.trace("{}", t);
        this.removeAtmosphereResource(r);
        if (notifyAndCache) {
            AtmosphereResourceEventImpl event = r.getAtmosphereResourceEvent();
            event.setThrowable(t);
            r.notifyListeners(event);
            r.removeEventListeners();
        }
        if (notifyAndCache) {
            this.cacheLostMessage(r, (AsyncWriteToken)r.getRequest(false).getAttribute(this.usingTokenIdForAttribute), notifyAndCache);
        }
        if (this.bc != null && this.bc.getAsyncWriteService() != null) {
            this.bc.getAsyncWriteService().execute(new Runnable(){

                @Override
                public void run() {
                    try {
                        logger.trace("Forcing connection close {}", (Object)ar.uuid());
                        r.resume();
                        r.close();
                    }
                    catch (Throwable t) {
                        logger.trace("Was unable to resume a corrupted AtmosphereResource {}", (Object)r);
                        logger.trace("Cause", t);
                    }
                }
            });
        } else {
            r.resume();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void cacheLostMessage(AtmosphereResource r, boolean force) {
        AtmosphereRequest request = ((AtmosphereResourceImpl)AtmosphereResourceImpl.class.cast(r)).getRequest(false);
        try {
            this.cacheLostMessage(r, (AsyncWriteToken)request.getAttribute(this.usingTokenIdForAttribute), force);
        }
        finally {
            request.removeAttribute(this.usingTokenIdForAttribute);
        }
    }

    public void cacheLostMessage(AtmosphereResource r, AsyncWriteToken token) {
        this.cacheLostMessage(r, token, false);
    }

    public void cacheLostMessage(AtmosphereResource r, AsyncWriteToken token, boolean force) {
        if (!force) {
            return;
        }
        try {
            if (token != null && token.originalMessage != null) {
                this.bc.getBroadcasterCache().addToCache(this.getID(), r != null ? r.uuid() : "null", new BroadcastMessage(String.valueOf(token.future.hashCode()), token.originalMessage));
                logger.trace("Lost message cached {}", token.originalMessage);
            }
        }
        catch (Throwable t2) {
            logger.error("Unable to cache message {} for AtmosphereResource {}", token.originalMessage, (Object)(r != null ? r.uuid() : ""));
            logger.error("Unable to cache message", t2);
        }
    }

    @Override
    public void setSuspendPolicy(long maxSuspendResource, Broadcaster.POLICY policy) {
        this.maxSuspendResource.set(maxSuspendResource);
        this.policy = policy;
    }

    @Override
    public Future<Object> broadcast(Object msg) {
        if (this.destroyed.get()) {
            logger.debug(DESTROYED, (Object)this.getID(), (Object)"broadcast(T msg)");
            return this.futureDone(msg);
        }
        this.start();
        Object newMsg = this.filter(msg);
        if (newMsg == null) {
            logger.debug("Broadcast Interrupted {}", msg);
            return this.futureDone(msg);
        }
        int callee = this.resources.isEmpty() ? 1 : this.resources.size();
        BroadcasterFuture<Object> f = new BroadcasterFuture<Object>(newMsg, callee);
        this.dispatchMessages(new Deliver(newMsg, f, msg));
        return f;
    }

    protected BroadcasterFuture<Object> futureDone(Object msg) {
        this.notifyBroadcastListener();
        return new BroadcasterFuture<Object>(msg).done();
    }

    protected void dispatchMessages(Deliver e) {
        this.messages.offer(e);
        if (this.dispatchThread.get() == 0) {
            this.dispatchThread.incrementAndGet();
            this.getBroadcasterConfig().getExecutorService().submit(this.getBroadcastHandler());
        }
    }

    protected Object filter(Object msg) {
        BroadcastFilter.BroadcastAction a = this.bc.filter(msg);
        if (a.action() == BroadcastFilter.BroadcastAction.ACTION.ABORT || msg == null) {
            return null;
        }
        return a.message();
    }

    @Override
    public Future<Object> broadcast(Object msg, AtmosphereResource r) {
        if (this.destroyed.get()) {
            logger.debug(DESTROYED, (Object)this.getID(), (Object)"broadcast(T msg, AtmosphereResource r");
            return this.futureDone(msg);
        }
        this.start();
        Object newMsg = this.filter(msg);
        if (newMsg == null) {
            return this.futureDone(msg);
        }
        BroadcasterFuture<Object> f = new BroadcasterFuture<Object>(newMsg, 1);
        this.dispatchMessages(new Deliver(newMsg, r, f, msg));
        return f;
    }

    @Override
    public Future<Object> broadcastOnResume(Object msg) {
        if (this.destroyed.get()) {
            logger.debug(DESTROYED, (Object)this.getID(), (Object)"broadcastOnResume(T msg)");
            return this.futureDone(msg);
        }
        this.start();
        Object newMsg = this.filter(msg);
        if (newMsg == null) {
            return this.futureDone(msg);
        }
        BroadcasterFuture<Object> f = new BroadcasterFuture<Object>(newMsg, this.resources.size());
        this.broadcastOnResume.offer(new Deliver(newMsg, f, msg));
        return f;
    }

    protected void broadcastOnResume(AtmosphereResource r) {
        for (Deliver e : this.broadcastOnResume) {
            e.async = false;
            this.push(new Deliver(r, e));
        }
        if (this.resources.isEmpty()) {
            this.broadcastOnResume.clear();
        }
    }

    @Override
    public Future<Object> broadcast(Object msg, Set<AtmosphereResource> subset) {
        if (this.destroyed.get()) {
            logger.debug(DESTROYED, (Object)this.getID(), (Object)"broadcast(T msg, Set<AtmosphereResource> subset)");
            return this.futureDone(msg);
        }
        this.start();
        Object newMsg = this.filter(msg);
        if (newMsg == null) {
            return this.futureDone(msg);
        }
        BroadcasterFuture<Object> f = new BroadcasterFuture<Object>(null, newMsg, subset.size());
        this.dispatchMessages(new Deliver(newMsg, subset, f, msg));
        return f;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     * Converted monitor instructions to comments
     * Lifted jumps to return sites
     */
    @Override
    public Broadcaster addAtmosphereResource(AtmosphereResource r) {
        try {
            if (this.destroyed.get()) {
                logger.debug(DESTROYED, (Object)this.getID(), (Object)"addAtmosphereResource(AtmosphereResource r");
                DefaultBroadcaster defaultBroadcaster = this;
                return defaultBroadcaster;
            }
            this.start();
            if (this.scope == Broadcaster.SCOPE.REQUEST && this.requestScoped.getAndSet(true)) {
                throw new IllegalStateException("Broadcaster " + this + " cannot be used as its scope is set to REQUEST");
            }
            if (this.maxSuspendResource.get() > 0L && (long)this.resources.size() >= this.maxSuspendResource.get()) {
                if (this.policy == Broadcaster.POLICY.FIFO) {
                    AtmosphereResource resource = this.resources.poll();
                    try {
                        logger.warn("Too many resource. Forcing resume of {} ", (Object)resource.uuid());
                        resource.resume();
                    }
                    catch (Throwable t) {
                        logger.warn("failed to resume resource {} ", (Object)resource, (Object)t);
                    }
                } else if (this.policy == Broadcaster.POLICY.REJECT) {
                    throw new RejectedExecutionException(String.format("Maximum suspended AtmosphereResources %s", this.maxSuspendResource));
                }
            }
            if (!r.isSuspended()) {
                logger.warn("AtmosphereResource {} is not suspended. If cached messages exists, this may cause unexpected situation. Suspend first", (Object)r.uuid());
            }
            if (!this.backwardCompatible && this.resources.contains(r)) {
                boolean duplicate;
                boolean bl = duplicate = r.transport() != AtmosphereResource.TRANSPORT.WEBSOCKET || ((AtmosphereResourceImpl)AtmosphereResourceImpl.class.cast(r)).getRequest(false).getAttribute(FrameworkConfig.INJECTED_ATMOSPHERE_RESOURCE) != null;
                if (!duplicate) {
                    logger.debug("Duplicate resource {}", (Object)r.uuid());
                    DefaultBroadcaster defaultBroadcaster = this;
                    return defaultBroadcaster;
                }
                AtmosphereResourceImpl dup = (AtmosphereResourceImpl)this.config.resourcesFactory().find(r.uuid());
                if (dup != null && dup != r) {
                    if (!dup.isPendingClose()) {
                        logger.warn("Duplicate resource {}. Could be caused by a dead connection not detected by your server. Replacing the old one with the fresh one", (Object)r.uuid());
                    } else {
                        logger.debug("Not yet closed resource still active {}", (Object)r.uuid());
                    }
                    ((AtmosphereResourceImpl)AtmosphereResourceImpl.class.cast(dup)).dirtyClose();
                } else {
                    logger.debug("Duplicate resource {}", (Object)r.uuid());
                    DefaultBroadcaster defaultBroadcaster = this;
                    return defaultBroadcaster;
                }
            }
            if (!this.bc.getBroadcasterCache().getClass().equals(BroadcasterCache.DEFAULT.getClass().getName())) {
                ConcurrentLinkedQueue<AtmosphereResource> concurrentLinkedQueue = this.resources;
                // MONITORENTER : concurrentLinkedQueue
                this.cacheAndSuspend(r);
                // MONITOREXIT : concurrentLinkedQueue
                return this;
            }
            this.cacheAndSuspend(r);
            return this;
        }
        finally {
            if (!this.resources.isEmpty()) {
                Object[] objectArray = this.awaitBarrier;
            }
        }
    }

    protected void cacheAndSuspend(AtmosphereResource r) {
        if (!this.isAtmosphereResourceValid(r)) {
            logger.debug("Unable to add AtmosphereResource {}", (Object)r.uuid());
            return;
        }
        boolean wasResumed = this.checkCachedAndPush(r, r.getAtmosphereResourceEvent());
        if (!wasResumed && this.isAtmosphereResourceValid(r)) {
            String parentUUID;
            logger.trace("Associating AtmosphereResource {} with Broadcaster {}", (Object)r.uuid(), (Object)this.getID());
            String string = parentUUID = r.transport().equals((Object)AtmosphereResource.TRANSPORT.WEBSOCKET) ? (String)((AtmosphereResourceImpl)AtmosphereResourceImpl.class.cast(r)).getRequest(false).getAttribute("org.atmosphere.cpr.AtmosphereResource.suspended.uuid") : null;
            if (!this.backwardCompatible && parentUUID != null) {
                AtmosphereResource p = this.config.resourcesFactory().find(parentUUID);
                if (p != null && !this.resources.contains(p)) {
                    this.notifyAndAdd(p);
                } else if (p == null) {
                    this.notifyAndAdd(r);
                } else {
                    logger.trace("AtmosphereResource {} was already mapped to {}", (Object)r.uuid(), (Object)parentUUID);
                }
            } else {
                this.notifyAndAdd(r);
            }
        } else if (!wasResumed) {
            logger.debug("Unable to add AtmosphereResource {} to {}", (Object)r.uuid(), (Object)this.name);
        }
    }

    protected void notifyAndAdd(AtmosphereResource r) {
        this.resources.add(r);
        r.addBroadcaster(this);
        this.notifyOnAddAtmosphereResourceListener(r);
    }

    private boolean isAtmosphereResourceValid(AtmosphereResource r) {
        return !r.isResumed() && !r.isCancelled() && ((AtmosphereResourceImpl)AtmosphereResourceImpl.class.cast(r)).isInScope();
    }

    protected void entryDone(BroadcasterFuture<?> f) {
        this.notifyBroadcastListener();
        if (f != null) {
            f.done();
        }
    }

    protected void notifyBroadcastListener() {
        for (BroadcasterListener b : this.broadcasterListeners) {
            try {
                b.onComplete(this);
            }
            catch (Exception ex) {
                logger.warn("", ex);
            }
        }
    }

    protected void notifyOnAddAtmosphereResourceListener(AtmosphereResource r) {
        for (BroadcasterListener b : this.broadcasterListeners) {
            try {
                b.onAddAtmosphereResource(this, r);
            }
            catch (Exception ex) {
                logger.warn("", ex);
            }
        }
    }

    protected void notifyOnRemoveAtmosphereResourceListener(AtmosphereResource r) {
        for (BroadcasterListener b : this.broadcasterListeners) {
            try {
                b.onRemoveAtmosphereResource(this, r);
            }
            catch (Exception ex) {
                logger.warn("", ex);
            }
        }
    }

    protected void notifyOnMessage(Deliver deliver) {
        for (BroadcasterListener b : this.broadcasterListeners) {
            try {
                b.onMessage(this, deliver);
            }
            catch (Exception ex) {
                logger.warn("", ex);
            }
        }
    }

    @Override
    public Broadcaster removeAtmosphereResource(AtmosphereResource r) {
        return this.removeAtmosphereResource(r, true);
    }

    protected Broadcaster removeAtmosphereResource(AtmosphereResource r, boolean executeDone) {
        AtmosphereResourceImpl aImpl;
        BroadcasterFuture f;
        if (this.destroyed.get()) {
            logger.debug(DESTROYED, (Object)this.getID(), (Object)"removeAtmosphereResource(AtmosphereResource r)");
            return this;
        }
        boolean removed = this.resources.remove(r);
        if (removed) {
            if (r.isSuspended()) {
                logger.trace("Excluded from {} : {}", (Object)this.getID(), (Object)r.uuid());
                this.bc.getBroadcasterCache().excludeFromCache(this.getID(), r);
            }
            this.notifyOnRemoveAtmosphereResourceListener(r);
        } else {
            logger.trace("Unable to remove {} from {}", (Object)r.uuid(), (Object)this.getID());
        }
        r.removeBroadcaster(this);
        if (!removed) {
            return this;
        }
        logger.trace("Removing AtmosphereResource {} for Broadcaster {}", (Object)r.uuid(), (Object)this.name);
        this.writeQueues.remove(r.uuid());
        if (executeDone && (f = (BroadcasterFuture)(aImpl = (AtmosphereResourceImpl)AtmosphereResourceImpl.class.cast(r)).getRequest(false).getAttribute(this.getID())) != null && !f.isDone() && !f.isCancelled()) {
            aImpl.getRequest(false).removeAttribute(this.getID());
            this.entryDone(f);
        }
        return this;
    }

    @Override
    public void setBroadcasterConfig(BroadcasterConfig bc) {
        this.bc = bc;
    }

    @Override
    public BroadcasterConfig getBroadcasterConfig() {
        return this.bc;
    }

    @Override
    public Future<Object> delayBroadcast(Object o) {
        return this.delayBroadcast(o, 0L, null);
    }

    @Override
    public Future<Object> delayBroadcast(final Object o, long delay, TimeUnit t) {
        if (this.destroyed.get()) {
            logger.debug(DESTROYED, (Object)this.getID(), (Object)"delayBroadcast(final T o, long delay, TimeUnit t)");
            return null;
        }
        this.start();
        Object msg = this.filter(o);
        if (msg == null) {
            return null;
        }
        final BroadcasterFuture<Object> future = new BroadcasterFuture<Object>(msg);
        final Deliver e = new Deliver(msg, future, o);
        if (delay > 0L) {
            ScheduledFuture<Object> f = this.bc.getScheduledExecutorService().schedule(new Callable<Object>(){

                @Override
                public Object call() throws Exception {
                    DefaultBroadcaster.this.delayedBroadcast.remove(e);
                    if (Callable.class.isAssignableFrom(o.getClass())) {
                        try {
                            Object r = ((Callable)Callable.class.cast(o)).call();
                            Object msg = DefaultBroadcaster.this.filter(r);
                            if (msg != null) {
                                Deliver deliver = new Deliver(msg, future, r);
                                DefaultBroadcaster.this.push(deliver);
                            }
                            return msg;
                        }
                        catch (Exception e1) {
                            logger.error("", (Object)e);
                        }
                    }
                    Object msg = DefaultBroadcaster.this.filter(o);
                    Deliver e2 = new Deliver(msg, future, o);
                    DefaultBroadcaster.this.push(e2);
                    return msg;
                }
            }, delay, t);
            e.future = new BroadcasterFuture<Object>(f, msg);
        }
        this.delayedBroadcast.offer(e);
        return future;
    }

    @Override
    public Future<Object> scheduleFixedBroadcast(Object o, long period, TimeUnit t) {
        return this.scheduleFixedBroadcast(o, 0L, period, t);
    }

    @Override
    public Future<Object> scheduleFixedBroadcast(final Object o, long waitFor, long period, TimeUnit t) {
        if (this.destroyed.get()) {
            logger.debug(DESTROYED, (Object)this.getID(), (Object)"scheduleFixedBroadcast(final Object o, long waitFor, long period, TimeUnit t)");
            return null;
        }
        this.start();
        if (period == 0L || t == null) {
            return null;
        }
        Object msg = this.filter(o);
        if (msg == null) {
            return null;
        }
        final BroadcasterFuture<Object> f = new BroadcasterFuture<Object>(msg);
        return this.bc.getScheduledExecutorService().scheduleWithFixedDelay(new Runnable(){

            @Override
            public void run() {
                if (Callable.class.isAssignableFrom(o.getClass())) {
                    try {
                        Object r = ((Callable)Callable.class.cast(o)).call();
                        Object msg = DefaultBroadcaster.this.filter(r);
                        if (msg != null) {
                            Deliver deliver = new Deliver(msg, f, r);
                            DefaultBroadcaster.this.push(deliver);
                        }
                        return;
                    }
                    catch (Exception e) {
                        logger.error("", e);
                    }
                }
                Object msg = DefaultBroadcaster.this.filter(o);
                Deliver e = new Deliver(msg, f, o);
                DefaultBroadcaster.this.push(e);
            }
        }, waitFor, period, t);
    }

    public String toString() {
        return "\n\tName: " + this.name + "\n\tAtmosphereResource: " + this.resources.size() + "\n\tBroadcasterCache " + this.bc.getBroadcasterCache();
    }

    private long translateTimeUnit(long period, TimeUnit tu) {
        if (period == -1L) {
            return period;
        }
        switch (tu) {
            case SECONDS: {
                return TimeUnit.MILLISECONDS.convert(period, TimeUnit.SECONDS);
            }
            case MINUTES: {
                return TimeUnit.MILLISECONDS.convert(period, TimeUnit.MINUTES);
            }
            case HOURS: {
                return TimeUnit.MILLISECONDS.convert(period, TimeUnit.HOURS);
            }
            case DAYS: {
                return TimeUnit.MILLISECONDS.convert(period, TimeUnit.DAYS);
            }
            case MILLISECONDS: {
                return period;
            }
            case MICROSECONDS: {
                return TimeUnit.MILLISECONDS.convert(period, TimeUnit.MICROSECONDS);
            }
            case NANOSECONDS: {
                return TimeUnit.MILLISECONDS.convert(period, TimeUnit.NANOSECONDS);
            }
        }
        return period;
    }

    boolean notifyOnPreDestroy() {
        for (BroadcasterListener b : this.broadcasterListeners) {
            try {
                b.onPreDestroy(this);
            }
            catch (RuntimeException ex) {
                if (BroadcasterListener.BroadcastListenerException.class.isAssignableFrom(ex.getClass())) {
                    logger.trace("onPreDestroy", ex);
                    return true;
                }
                logger.warn("onPreDestroy", ex);
            }
        }
        return false;
    }

    public Collection<BroadcasterListener> broadcasterListeners() {
        return this.broadcasterListeners;
    }

    public BroadcasterLifeCyclePolicy lifeCyclePolicy() {
        return this.lifeCyclePolicy;
    }

    public ConcurrentLinkedQueue<BroadcasterLifeCyclePolicyListener> lifeCycleListeners() {
        return this.lifeCycleListeners;
    }

    public BlockingQueue<Deliver> messages() {
        return this.messages;
    }

    public ConcurrentHashMap<String, WriteQueue> writeQueues() {
        return this.writeQueues;
    }

    public Broadcaster.POLICY policy() {
        return this.policy;
    }

    public boolean outOfOrderBroadcastSupported() {
        return this.outOfOrderBroadcastSupported.get();
    }

    public AtomicBoolean recentActivity() {
        return this.recentActivity;
    }

    public LifecycleHandler lifecycleHandler() {
        return this.lifecycleHandler;
    }

    public DefaultBroadcaster lifecycleHandler(LifecycleHandler lifecycleHandler) {
        this.lifecycleHandler = lifecycleHandler;
        return this;
    }

    public Future<?> currentLifecycleTask() {
        return this.currentLifecycleTask;
    }

    public DefaultBroadcaster currentLifecycleTask(Future<?> currentLifecycleTask) {
        this.currentLifecycleTask = currentLifecycleTask;
        return this;
    }

    protected static final class AsyncWriteToken {
        AtmosphereResource resource;
        Object msg;
        BroadcasterFuture future;
        Object originalMessage;
        CacheMessage cache;
        AtomicInteger count;

        public AsyncWriteToken(AtmosphereResource resource, Object msg, BroadcasterFuture future, Object originalMessage, AtomicInteger count) {
            this.resource = resource;
            this.msg = msg;
            this.future = future;
            this.originalMessage = originalMessage;
            this.count = count;
        }

        public AsyncWriteToken(AtmosphereResource resource, Object msg, BroadcasterFuture future, Object originalMessage, CacheMessage cache, AtomicInteger count) {
            this.resource = resource;
            this.msg = msg;
            this.future = future;
            this.originalMessage = originalMessage;
            this.cache = cache;
            this.count = count;
        }

        public void destroy() {
            this.resource = null;
            this.msg = null;
            this.future = null;
            this.originalMessage = null;
        }

        public boolean lastBroadcasted() {
            return this.count.decrementAndGet() == 0;
        }

        public String toString() {
            return "AsyncWriteToken{resource=" + this.resource + ", msg=" + this.msg + ", future=" + this.future + '}';
        }
    }

    final class WriteOperation
    implements Callable<Object> {
        private final AtmosphereResource r;
        private final AtmosphereResourceEvent e;
        private AtomicBoolean completed = new AtomicBoolean();
        private AtomicBoolean executed = new AtomicBoolean();
        private final Thread ioThread;

        private WriteOperation(AtmosphereResource r, AtmosphereResourceEvent e, Thread ioThread) {
            this.r = r;
            this.e = e;
            this.ioThread = ioThread;
        }

        @Override
        public Object call() throws Exception {
            if (!this.completed.getAndSet(true)) {
                DefaultBroadcaster.this.invokeOnStateChange(this.r, this.e);
                logger.trace("Cancelling Write timeout {} for {}", (Object)DefaultBroadcaster.this.writeTimeoutInSecond, (Object)this.r.uuid());
                this.executed.set(true);
            } else if (!this.executed.get()) {
                try {
                    this.ioThread.interrupt();
                }
                catch (Throwable t) {
                    logger.trace("I/O failure, unable to interrupt the thread", t);
                }
                logger.trace("Honoring Write timeout {} for {}", (Object)DefaultBroadcaster.this.writeTimeoutInSecond, (Object)this.r.uuid());
                DefaultBroadcaster.this.onException(new IOException("Unable to write after " + DefaultBroadcaster.this.writeTimeoutInSecond), this.r);
                ((AtmosphereResourceImpl)AtmosphereResourceImpl.class.cast(this.r)).cancel();
            }
            return null;
        }

        public void interrupt() {
        }
    }

    public static final class WriteQueue {
        final BlockingQueue<AsyncWriteToken> queue = new LinkedBlockingQueue<AsyncWriteToken>();
        final AtomicBoolean monitored = new AtomicBoolean();
        final String uuid;

        private WriteQueue(String uuid) {
            this.uuid = uuid;
        }

        public List<String> asString() {
            ArrayList<String> l = new ArrayList<String>();
            for (AsyncWriteToken w : this.queue) {
                l.add(w.toString());
            }
            return l;
        }
    }
}

