/*
 * Decompiled with CFR 0.152.
 */
package org.jboss.mq.server;

import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.SortedSet;
import java.util.TreeSet;
import javax.jms.JMSException;
import org.jboss.logging.Logger;
import org.jboss.mq.AcknowledgementRequest;
import org.jboss.mq.DestinationFullException;
import org.jboss.mq.SpyMessage;
import org.jboss.mq.Subscription;
import org.jboss.mq.pm.Tx;
import org.jboss.mq.selectors.Selector;
import org.jboss.mq.server.BasicQueueParameters;
import org.jboss.mq.server.ClientConsumer;
import org.jboss.mq.server.JMSDestinationManager;
import org.jboss.mq.server.MessageCounter;
import org.jboss.mq.server.MessageReference;
import org.jboss.mq.server.RoutedMessage;
import org.jboss.mq.server.SimpleTimer;
import org.jboss.mq.server.SimpleTimerTask;

public class BasicQueue {
    static final Logger log = Logger.getLogger((Class)(class$org$jboss$mq$server$BasicQueue == null ? (class$org$jboss$mq$server$BasicQueue = BasicQueue.class$("org.jboss.mq.server.BasicQueue")) : class$org$jboss$mq$server$BasicQueue));
    SortedSet messages = new TreeSet();
    SimpleTimer messageTimer = new SimpleTimer();
    int scheduledMessageCount = 0;
    JMSDestinationManager server;
    HashSet receivers = new HashSet();
    String description;
    MessageCounter counter;
    HashMap unacknowledgedMessages = new HashMap();
    HashMap unackedByMessageRef = new HashMap();
    HashMap unackedBySubscription = new HashMap();
    HashSet removedSubscribers = new HashSet();
    BasicQueueParameters parameters;
    static /* synthetic */ Class class$org$jboss$mq$server$BasicQueue;

    public BasicQueue(JMSDestinationManager server, String description, BasicQueueParameters parameters) throws JMSException {
        this.server = server;
        this.description = description;
        this.parameters = parameters;
    }

    public String getDescription() {
        return this.description;
    }

    public int getReceiversCount() {
        return this.receivers.size();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ArrayList getReceivers() {
        HashSet hashSet = this.receivers;
        synchronized (hashSet) {
            return new ArrayList(this.receivers);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean isInUse() {
        HashSet hashSet = this.receivers;
        synchronized (hashSet) {
            return this.receivers.size() > 0;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addReceiver(Subscription sub) {
        boolean trace = log.isTraceEnabled();
        SortedSet sortedSet = this.messages;
        synchronized (sortedSet) {
            if (this.messages.size() != 0) {
                Iterator it = this.messages.iterator();
                while (it.hasNext()) {
                    MessageReference message = (MessageReference)it.next();
                    try {
                        if (message.isExpired()) {
                            it.remove();
                            if (trace) {
                                log.trace((Object)("message expired: " + message));
                            }
                            this.dropMessage(message);
                            continue;
                        }
                        if (!sub.accepts(message.getHeaders())) continue;
                        this.queueMessageForSending(sub, message);
                        it.remove();
                        return;
                    }
                    catch (JMSException ignore) {
                        log.info((Object)"Caught unusual exception in addToReceivers.", (Throwable)ignore);
                    }
                }
            }
        }
        this.addToReceivers(sub);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void removeSubscriber(Subscription sub) {
        boolean trace = log.isTraceEnabled();
        this.removeReceiver(sub);
        HashSet hashSet = this.receivers;
        synchronized (hashSet) {
            SortedSet sortedSet = this.messages;
            synchronized (sortedSet) {
                if (this.hasUnackedMessages(sub)) {
                    if (trace) {
                        log.trace((Object)("Delaying removal of subscriber is has unacked messages " + sub));
                    }
                    this.removedSubscribers.add(sub);
                } else {
                    if (trace) {
                        log.trace((Object)("Removing subscriber " + sub));
                    }
                    ((ClientConsumer)sub.clientConsumer).removeRemovedSubscription(sub.subscriptionId);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void clientConsumerStopped(ClientConsumer clientConsumer) {
        HashSet hashSet = this.receivers;
        synchronized (hashSet) {
            Iterator it = this.receivers.iterator();
            while (it.hasNext()) {
                Subscription sub = (Subscription)it.next();
                if (!sub.clientConsumer.equals(clientConsumer)) continue;
                clientConsumer.addBlockedSubscription(sub, 0L);
                it.remove();
            }
        }
    }

    public int getQueueDepth() {
        return this.messages.size();
    }

    public int getScheduledMessageCount() {
        return this.scheduledMessageCount;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addMessage(MessageReference mes, Tx txId) throws JMSException {
        if (this.parameters.maxDepth > 0) {
            SortedSet sortedSet = this.messages;
            synchronized (sortedSet) {
                if (this.messages.size() >= this.parameters.maxDepth) {
                    this.dropMessage(mes);
                    String message = "Maximum size " + this.parameters.maxDepth + " exceeded for " + this.description;
                    log.warn((Object)message);
                    throw new DestinationFullException(message);
                }
            }
        }
        Runnable task = new AddMessagePostRollBackTask(mes);
        this.server.getPersistenceManager().getTxManager().addPostRollbackTask(txId, task);
        task = new AddMessagePostCommitTask(mes);
        this.server.getPersistenceManager().getTxManager().addPostCommitTask(txId, task);
    }

    public void restoreMessage(MessageReference mes) {
        this.internalAddMessage(mes);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public SpyMessage[] browse(String selector) throws JMSException {
        if (selector == null) {
            SpyMessage[] list;
            SortedSet sortedSet = this.messages;
            synchronized (sortedSet) {
                list = new SpyMessage[this.messages.size()];
                Iterator iter = this.messages.iterator();
                int i = 0;
                while (iter.hasNext()) {
                    list[i] = ((MessageReference)iter.next()).getMessage();
                    ++i;
                }
            }
            return list;
        }
        Selector s = new Selector(selector);
        LinkedList<SpyMessage> selection = new LinkedList<SpyMessage>();
        SortedSet sortedSet = this.messages;
        synchronized (sortedSet) {
            Iterator i = this.messages.iterator();
            while (i.hasNext()) {
                MessageReference m = (MessageReference)i.next();
                if (!s.test(m.getHeaders())) continue;
                selection.add(m.getMessage());
            }
        }
        SpyMessage[] list = new SpyMessage[selection.size()];
        list = selection.toArray(list);
        return list;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public SpyMessage receive(Subscription sub, boolean wait) throws JMSException {
        boolean trace = log.isTraceEnabled();
        MessageReference messageRef = null;
        HashSet hashSet = this.receivers;
        synchronized (hashSet) {
            if (sub.getSelector() == null && !sub.noLocal) {
                SortedSet sortedSet = this.messages;
                synchronized (sortedSet) {
                    while (this.messages.size() != 0) {
                        messageRef = (MessageReference)this.messages.first();
                        this.messages.remove(messageRef);
                        if (!messageRef.isExpired()) break;
                        if (trace) {
                            log.trace((Object)("message expired: " + messageRef));
                        }
                        this.dropMessage(messageRef);
                        messageRef = null;
                    }
                }
            }
            SortedSet sortedSet = this.messages;
            synchronized (sortedSet) {
                Iterator i = this.messages.iterator();
                while (i.hasNext()) {
                    MessageReference mr = (MessageReference)i.next();
                    if (mr.isExpired()) {
                        i.remove();
                        if (trace) {
                            log.trace((Object)("message expired: " + mr));
                        }
                        this.dropMessage(mr);
                        continue;
                    }
                    if (!sub.accepts(mr.getHeaders())) continue;
                    messageRef = mr;
                    i.remove();
                    break;
                }
            }
            if (messageRef == null) {
                if (wait) {
                    this.addToReceivers(sub);
                }
            } else {
                this.setupMessageAcknowledgement(sub, messageRef);
            }
        }
        if (messageRef == null) {
            return null;
        }
        return messageRef.getMessage();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void acknowledge(AcknowledgementRequest item, Tx txId) throws JMSException {
        Runnable task;
        UnackedMessageInfo unacked = null;
        SortedSet sortedSet = this.messages;
        synchronized (sortedSet) {
            unacked = (UnackedMessageInfo)this.unacknowledgedMessages.remove(item);
            if (unacked == null) {
                return;
            }
            this.unackedByMessageRef.remove(unacked.messageRef);
            HashMap map = (HashMap)this.unackedBySubscription.get(unacked.sub);
            map.remove(unacked.messageRef);
            if (map.isEmpty()) {
                this.unackedBySubscription.remove(unacked.sub);
            }
        }
        MessageReference m = unacked.messageRef;
        if (!item.isAck) {
            task = new RestoreMessageTask(m);
            this.server.getPersistenceManager().getTxManager().addPostCommitTask(txId, task);
        } else {
            if (m.isPersistent()) {
                this.server.getPersistenceManager().remove(m, txId);
            }
            task = new RestoreMessageTask(m);
            this.server.getPersistenceManager().getTxManager().addPostRollbackTask(txId, task);
            task = new RemoveMessageTask(m);
            this.server.getPersistenceManager().getTxManager().addPostCommitTask(txId, task);
        }
        HashSet hashSet = this.receivers;
        synchronized (hashSet) {
            SortedSet sortedSet2 = this.messages;
            synchronized (sortedSet2) {
                this.checkRemovedSubscribers(unacked.sub);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void nackMessages(Subscription sub) {
        HashSet hashSet = this.receivers;
        synchronized (hashSet) {
            SortedSet sortedSet = this.messages;
            synchronized (sortedSet) {
                int count = 0;
                HashMap map = (HashMap)this.unackedBySubscription.get(sub);
                if (map != null) {
                    Iterator i = ((HashMap)map.clone()).values().iterator();
                    while (i.hasNext()) {
                        AcknowledgementRequest item = (AcknowledgementRequest)i.next();
                        try {
                            this.acknowledge(item, null);
                            ++count;
                        }
                        catch (JMSException ignore) {
                            log.debug((Object)("Unable to nack message: " + item), (Throwable)ignore);
                        }
                    }
                    if (log.isDebugEnabled()) {
                        log.debug((Object)("Nacked " + count + " messages for removed subscription " + sub));
                    }
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void removeAllMessages() throws JMSException {
        this.messageTimer.clear();
        this.scheduledMessageCount = 0;
        HashSet hashSet = this.receivers;
        synchronized (hashSet) {
            SortedSet sortedSet = this.messages;
            synchronized (sortedSet) {
                Iterator<Object> i = ((HashMap)this.unacknowledgedMessages.clone()).keySet().iterator();
                while (i.hasNext()) {
                    AcknowledgementRequest item = (AcknowledgementRequest)i.next();
                    try {
                        this.acknowledge(item, null);
                    }
                    catch (JMSException ignore) {
                        // empty catch block
                    }
                }
                i = this.messages.iterator();
                while (i.hasNext()) {
                    MessageReference message = (MessageReference)i.next();
                    i.remove();
                    this.dropMessage(message);
                }
            }
        }
    }

    public void createMessageCounter(String name, String subscription, boolean topic, boolean durable, int daycountmax) {
        this.counter = new MessageCounter(name, subscription, this, topic, durable, daycountmax);
    }

    public MessageCounter getMessageCounter() {
        return this.counter;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void addToReceivers(Subscription sub) {
        HashSet hashSet = this.receivers;
        synchronized (hashSet) {
            this.receivers.add(sub);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void removeReceiver(Subscription sub) {
        HashSet hashSet = this.receivers;
        synchronized (hashSet) {
            this.receivers.remove(sub);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void internalAddMessage(MessageReference message) {
        boolean trace = log.isTraceEnabled();
        long ts = message.messageScheduledDelivery;
        if (ts > 0L && ts > System.currentTimeMillis()) {
            EnqueueMessageTask t = new EnqueueMessageTask(message);
            this.messageTimer.schedule(t, ts);
            SimpleTimer simpleTimer = this.messageTimer;
            synchronized (simpleTimer) {
                ++this.scheduledMessageCount;
            }
            if (trace) {
                log.trace((Object)("scheduled message at " + new Date(ts) + ": " + message));
            }
            return;
        }
        if (message.isExpired()) {
            if (trace) {
                log.trace((Object)("message expired: " + message));
            }
            this.dropMessage(message);
            return;
        }
        try {
            HashSet t = this.receivers;
            synchronized (t) {
                if (!this.receivers.isEmpty()) {
                    Iterator it = this.receivers.iterator();
                    while (it.hasNext()) {
                        Subscription sub = (Subscription)it.next();
                        if (!sub.accepts(message.getHeaders())) continue;
                        this.queueMessageForSending(sub, message);
                        it.remove();
                        return;
                    }
                }
                SortedSet sortedSet = this.messages;
                synchronized (sortedSet) {
                    this.messages.add(message);
                    if (message.messageExpiration > 0L) {
                        ExpireMessageTask t2 = new ExpireMessageTask(message);
                        this.messageTimer.schedule(t2, message.messageExpiration);
                    }
                }
            }
        }
        catch (JMSException e) {
            log.error((Object)"Caught unusual exception in internalAddMessage.", (Throwable)e);
            this.dropMessage(message);
        }
    }

    protected void queueMessageForSending(Subscription sub, MessageReference message) throws JMSException {
        this.setupMessageAcknowledgement(sub, message);
        RoutedMessage r = new RoutedMessage();
        r.message = message;
        r.subscriptionId = new Integer(sub.subscriptionId);
        ((ClientConsumer)sub.clientConsumer).queueMessageForSending(r);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void setupMessageAcknowledgement(Subscription sub, MessageReference messageRef) throws JMSException {
        SpyMessage message = messageRef.getMessage();
        AcknowledgementRequest ack = new AcknowledgementRequest();
        ack.destination = message.getJMSDestination();
        ack.messageID = message.getJMSMessageID();
        ack.subscriberId = sub.subscriptionId;
        ack.isAck = false;
        SortedSet sortedSet = this.messages;
        synchronized (sortedSet) {
            UnackedMessageInfo unacked = new UnackedMessageInfo(messageRef, sub);
            this.unacknowledgedMessages.put(ack, unacked);
            this.unackedByMessageRef.put(messageRef, ack);
            HashMap<MessageReference, AcknowledgementRequest> map = (HashMap<MessageReference, AcknowledgementRequest>)this.unackedBySubscription.get(sub);
            if (map == null) {
                map = new HashMap<MessageReference, AcknowledgementRequest>();
                this.unackedBySubscription.put(sub, map);
            }
            map.put(messageRef, ack);
        }
    }

    protected void dropMessage(MessageReference message) {
        try {
            if (message.isPersistent()) {
                try {
                    this.server.getPersistenceManager().remove(message, null);
                }
                catch (JMSException e) {
                    try {
                        log.warn((Object)("Message removed from queue, but not from the persistent store: " + message.getMessage()), (Throwable)e);
                    }
                    catch (JMSException x) {
                        log.warn((Object)("Message removed from queue, but not from the persistent store: " + message), (Throwable)e);
                    }
                }
            }
            this.server.getMessageCache().remove(message);
        }
        catch (JMSException e) {
            log.warn((Object)("Error dropping message " + message), (Throwable)e);
        }
    }

    private void checkRemovedSubscribers(Subscription sub) {
        boolean trace = log.isTraceEnabled();
        if (this.removedSubscribers.contains(sub) && !this.hasUnackedMessages(sub)) {
            if (trace) {
                log.trace((Object)("Removing subscriber " + sub));
            }
            this.removedSubscribers.remove(sub);
            ((ClientConsumer)sub.clientConsumer).removeRemovedSubscription(sub.subscriptionId);
        }
    }

    private boolean hasUnackedMessages(Subscription sub) {
        return this.unackedBySubscription.containsKey(sub);
    }

    static /* synthetic */ Class class$(String x0) {
        try {
            return Class.forName(x0);
        }
        catch (ClassNotFoundException x1) {
            throw new NoClassDefFoundError(x1.getMessage());
        }
    }

    private static class UnackedMessageInfo {
        public MessageReference messageRef;
        public Subscription sub;

        public UnackedMessageInfo(MessageReference messageRef, Subscription sub) {
            this.messageRef = messageRef;
            this.sub = sub;
        }
    }

    private class ExpireMessageTask
    extends SimpleTimerTask {
        private MessageReference messageRef;

        public ExpireMessageTask(MessageReference messageRef) {
            this.messageRef = messageRef;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run() {
            SortedSet sortedSet = BasicQueue.this.messages;
            synchronized (sortedSet) {
                if (!BasicQueue.this.messages.remove(this.messageRef)) {
                    return;
                }
            }
            if (log.isTraceEnabled()) {
                log.trace((Object)("message expired: " + this.messageRef));
            }
            BasicQueue.this.dropMessage(this.messageRef);
        }
    }

    private class EnqueueMessageTask
    extends SimpleTimerTask {
        private MessageReference messageRef;

        public EnqueueMessageTask(MessageReference messageRef) {
            this.messageRef = messageRef;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run() {
            if (log.isTraceEnabled()) {
                log.trace((Object)("running message: " + this.messageRef));
            }
            BasicQueue.this.internalAddMessage(this.messageRef);
            SimpleTimer simpleTimer = BasicQueue.this.messageTimer;
            synchronized (simpleTimer) {
                --BasicQueue.this.scheduledMessageCount;
            }
        }
    }

    class RemoveMessageTask
    implements Runnable {
        MessageReference message;

        RemoveMessageTask(MessageReference m) {
            this.message = m;
        }

        public void run() {
            try {
                BasicQueue.this.server.getMessageCache().remove(this.message);
            }
            catch (JMSException e) {
                log.error((Object)"Could not remove an acknowleged message from the message cache: ", (Throwable)e);
            }
        }
    }

    class RestoreMessageTask
    implements Runnable {
        MessageReference message;

        RestoreMessageTask(MessageReference m) {
            this.message = m;
        }

        public void run() {
            if (log.isTraceEnabled()) {
                log.trace((Object)("Restoring message: " + this.message));
            }
            try {
                SpyMessage spyMessage = this.message.getMessage();
                spyMessage.setJMSRedelivered(true);
                if (spyMessage.propertyExists("JMS_JBOSS_REDELIVERY_DELAY")) {
                    log.trace((Object)"message has redelivery delay");
                    long delay = spyMessage.getLongProperty("JMS_JBOSS_REDELIVERY_DELAY");
                    this.message.messageScheduledDelivery = System.currentTimeMillis() + delay;
                }
                if (!spyMessage.propertyExists("JMS_JBOSS_REDELIVERY_COUNT")) {
                    spyMessage.header.jmsProperties.put("JMS_JBOSS_REDELIVERY_COUNT", new Integer(1));
                } else {
                    int c = spyMessage.getIntProperty("JMS_JBOSS_REDELIVERY_COUNT");
                    spyMessage.header.jmsProperties.put("JMS_JBOSS_REDELIVERY_COUNT", new Integer(c + 1));
                }
                this.message.invalidate();
                if (this.message.isPersistent()) {
                    BasicQueue.this.server.getPersistenceManager().update(this.message, null);
                }
            }
            catch (JMSException e) {
                log.error((Object)"Caught unusual exception in restoreMessageTask.", (Throwable)e);
            }
            BasicQueue.this.internalAddMessage(this.message);
        }
    }

    class AddMessagePostCommitTask
    implements Runnable {
        MessageReference message;

        AddMessagePostCommitTask(MessageReference m) {
            this.message = m;
        }

        public void run() {
            BasicQueue.this.internalAddMessage(this.message);
            if (BasicQueue.this.counter != null) {
                BasicQueue.this.counter.incrementCounter();
            }
        }
    }

    class AddMessagePostRollBackTask
    implements Runnable {
        MessageReference message;

        AddMessagePostRollBackTask(MessageReference m) {
            this.message = m;
        }

        public void run() {
            try {
                BasicQueue.this.server.getMessageCache().remove(this.message);
            }
            catch (JMSException e) {
                log.error((Object)"Could not remove message from the message cache after an add rollback: ", (Throwable)e);
            }
        }
    }
}

