/*
 * Decompiled with CFR 0.152.
 */
package org.jboss.mq;

import java.util.LinkedList;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.transaction.xa.XAException;
import org.jboss.logging.Logger;
import org.jboss.mq.SpyConsumer;
import org.jboss.mq.SpyEncapsulatedMessage;
import org.jboss.mq.SpyJMSException;
import org.jboss.mq.SpyMessage;
import org.jboss.mq.SpySession;
import org.jboss.mq.Subscription;

public class SpyMessageConsumer
implements MessageConsumer,
SpyConsumer,
Runnable {
    static Logger log = Logger.getLogger((Class)(class$org$jboss$mq$SpyMessageConsumer == null ? (class$org$jboss$mq$SpyMessageConsumer = SpyMessageConsumer.class$("org.jboss.mq.SpyMessageConsumer")) : class$org$jboss$mq$SpyMessageConsumer));
    public SpySession session;
    public Subscription subscription = new Subscription();
    protected boolean closed;
    protected Object stateLock = new Object();
    protected boolean receiving = false;
    protected boolean waitingForMessage = false;
    protected boolean listening = false;
    protected Thread listenerThread = null;
    MessageListener messageListener;
    LinkedList messages;
    boolean sessionConsumer;
    static /* synthetic */ Class class$org$jboss$mq$SpyMessageConsumer;

    SpyMessageConsumer(SpySession s, boolean sessionConsumer) {
        this.session = s;
        this.sessionConsumer = sessionConsumer;
        this.messageListener = null;
        this.closed = false;
        this.messages = new LinkedList();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setMessageListener(MessageListener listener) throws JMSException {
        if (this.closed) {
            throw new IllegalStateException("The MessageConsumer is closed");
        }
        Object object = this.stateLock;
        synchronized (object) {
            if (this.receiving) {
                throw new JMSException("Another thread is already in receive.");
            }
            boolean oldListening = this.listening;
            this.listening = listener != null;
            this.messageListener = listener;
            if (!this.sessionConsumer && this.listening && !oldListening && this.listenerThread == null) {
                this.listenerThread = new Thread((Runnable)this, "MessageListenerThread - " + this.subscription.destination.getName());
                this.listenerThread.start();
            }
        }
    }

    public String getMessageSelector() throws JMSException {
        if (this.closed) {
            throw new IllegalStateException("The MessageConsumer is closed");
        }
        return this.subscription.messageSelector;
    }

    public MessageListener getMessageListener() throws JMSException {
        if (this.closed) {
            throw new IllegalStateException("The MessageConsumer is closed");
        }
        return this.messageListener;
    }

    public Subscription getSubscription() {
        return this.subscription;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * WARNING - Removed back jump from a try to a catch block - possible behaviour change.
     * Unable to fully structure code
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public Message receive() throws JMSException {
        if (this.closed) {
            throw new IllegalStateException("The MessageConsumer is closed");
        }
        var1_1 = this.stateLock;
        synchronized (var1_1) {
            if (this.receiving) {
                throw new JMSException("Another thread is already in receive.");
            }
            if (this.listening) {
                throw new JMSException("A message listener is already registered");
            }
            this.receiving = true;
        }
        var2_2 = this.messages;
        synchronized (var2_2) {
            message = this.getMessage();
            if (message != null) {
                var4_4 = this.stateLock;
                synchronized (var4_4) {
                    this.receiving = false;
                    return message;
                }
            }
            while ((msg = this.session.connection.receive(this.subscription, 0L)) != null) {
                mes /* !! */  = this.preProcessMessage(msg);
                if (mes /* !! */  == null) continue;
                var6_8 = this.stateLock;
                synchronized (var6_8) {
                    this.receiving = false;
                    return mes /* !! */ ;
                }
            }
            try {
                this.waitingForMessage = true;
lbl27:
                // 2 sources

                while (true) {
                    if (this.closed) {
                        msg = null;
                        this.waitingForMessage = false;
                        mes /* !! */  = this.stateLock;
                    }
                    ** GOTO lbl-1000
                    break;
                }
            }
            catch (InterruptedException e) {
                newE = new SpyJMSException("Receive interupted");
                newE.setLinkedException(e);
                throw newE;
            }
            catch (Throwable var10_10) {
                this.waitingForMessage = false;
                var11_11 = this.stateLock;
                synchronized (var11_11) {
                    this.receiving = false;
                    throw var10_10;
                }
            }
            synchronized (mes /* !! */ ) {
                this.receiving = false;
                return msg;
            }
lbl-1000:
            // 1 sources

            {
                mes = this.getMessage();
                if (mes == null) ** GOTO lbl-1000
                mes /* !! */  = mes;
                this.waitingForMessage = false;
                var6_9 = this.stateLock;
            }
            synchronized (var6_9) {
                this.receiving = false;
                return mes /* !! */ ;
            }
lbl-1000:
            // 1 sources

            {
                this.messages.wait();
                ** continue;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * WARNING - Removed back jump from a try to a catch block - possible behaviour change.
     * Unable to fully structure code
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public Message receive(long timeOut) throws JMSException {
        if (timeOut == 0L) {
            return this.receive();
        }
        if (this.closed) {
            throw new IllegalStateException("The MessageConsumer is closed");
        }
        var3_2 = this.stateLock;
        synchronized (var3_2) {
            if (this.receiving) {
                throw new JMSException("Another thread is already in receive.");
            }
            if (this.listening) {
                throw new JMSException("A message listener is already registered");
            }
            this.receiving = true;
        }
        endTime = System.currentTimeMillis() + timeOut;
        var6_4 = this.messages;
        synchronized (var6_4) {
            message = this.getMessage();
            if (message != null) {
                var8_6 = this.stateLock;
                synchronized (var8_6) {
                    this.receiving = false;
                    return message;
                }
            }
            while ((msg = this.session.connection.receive(this.subscription, timeOut)) != null) {
                mes = this.preProcessMessage(msg);
                if (mes == null) continue;
                var10_14 = this.stateLock;
                synchronized (var10_14) {
                    this.receiving = false;
                    return mes;
                }
            }
            try {
                this.waitingForMessage = true;
lbl30:
                // 2 sources

                while (true) {
                    if (this.closed) {
                        msg = null;
                        this.waitingForMessage = false;
                        mes = this.stateLock;
                    }
                    ** GOTO lbl-1000
                    break;
                }
            }
            catch (InterruptedException e) {
                newE = new SpyJMSException("Receive interupted");
                newE.setLinkedException(e);
                throw newE;
            }
            catch (Throwable var15_18) {
                this.waitingForMessage = false;
                var16_19 = this.stateLock;
                synchronized (var16_19) {
                    this.receiving = false;
                    throw var15_18;
                }
            }
            synchronized (mes) {
                this.receiving = false;
                return msg;
            }
lbl-1000:
            // 1 sources

            {
                mes = this.getMessage();
                if (mes == null) ** GOTO lbl-1000
                mes = mes;
                this.waitingForMessage = false;
                var10_15 = this.stateLock;
            }
            synchronized (var10_15) {
                this.receiving = false;
                return mes;
            }
lbl-1000:
            // 1 sources

            {
                att = endTime - System.currentTimeMillis();
                if (att > 0L) ** GOTO lbl-1000
                var11_16 = null;
                this.waitingForMessage = false;
                var12_17 = this.stateLock;
            }
            synchronized (var12_17) {
                this.receiving = false;
                return var11_16;
            }
lbl-1000:
            // 1 sources

            {
                this.messages.wait(att);
                ** continue;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Message receiveNoWait() throws JMSException {
        SpyMessage msg;
        if (this.closed) {
            throw new IllegalStateException("The MessageConsumer is closed");
        }
        Object object = this.stateLock;
        synchronized (object) {
            if (this.receiving) {
                throw new JMSException("Another thread is already in receive.");
            }
            if (this.listening) {
                throw new JMSException("A message listener is already registered");
            }
            this.receiving = true;
        }
        LinkedList linkedList = this.messages;
        synchronized (linkedList) {
            Message mes = this.getMessage();
            if (mes != null) {
                Object object2 = this.stateLock;
                synchronized (object2) {
                    this.receiving = false;
                }
                return mes;
            }
        }
        while ((msg = this.session.connection.receive(this.subscription, -1L)) != null) {
            Message mes = this.preProcessMessage(msg);
            if (mes == null) continue;
            Object object3 = this.stateLock;
            synchronized (object3) {
                this.receiving = false;
            }
            return mes;
        }
        Object object4 = this.stateLock;
        synchronized (object4) {
            this.receiving = false;
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() throws JMSException {
        log.debug((Object)"Message consumer closing.");
        LinkedList linkedList = this.messages;
        synchronized (linkedList) {
            if (this.closed) {
                return;
            }
            this.closed = true;
            this.messages.notify();
        }
        if (this.listenerThread != null && !Thread.currentThread().equals(this.listenerThread)) {
            try {
                this.listenerThread.join();
            }
            catch (InterruptedException e) {
                // empty catch block
            }
        }
        if (!this.sessionConsumer) {
            this.session.removeConsumer(this);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addMessage(SpyMessage message) throws JMSException {
        if (this.closed) {
            log.debug((Object)"WARNING: NACK issued. The message consumer was closed.");
            this.session.connection.send(message.getAcknowledgementRequest(false));
            return;
        }
        if (this.subscription.accepts(message.header)) {
            if (this.sessionConsumer) {
                this.sessionConsumerProcessMessage(message);
            } else {
                LinkedList linkedList = this.messages;
                synchronized (linkedList) {
                    if (this.waitingForMessage) {
                        this.messages.addLast(message);
                        this.messages.notifyAll();
                    } else {
                        log.debug((Object)"WARNING: NACK issued. The message consumer was not waiting for a message.");
                        this.session.connection.send(message.getAcknowledgementRequest(false));
                    }
                }
            }
        } else {
            log.debug((Object)"WARNING: NACK issued. The subscription did not accept the message");
            this.session.connection.send(message.getAcknowledgementRequest(false));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run() {
        SpyMessage mes = null;
        try {
            while (true) {
                block32: {
                    boolean recovered;
                    SpyMessage message;
                    block33: {
                        MessageListener thisListener;
                        if (mes == null) {
                            LinkedList linkedList = this.messages;
                            synchronized (linkedList) {
                                if (this.closed) {
                                    this.waitingForMessage = false;
                                    break;
                                }
                                if (this.messages.isEmpty()) {
                                    mes = this.session.connection.receive(this.subscription, 0L);
                                }
                                if (mes == null) {
                                    this.waitingForMessage = true;
                                    while (this.messages.isEmpty() && !this.closed || !this.session.running) {
                                        try {
                                            this.messages.wait();
                                        }
                                        catch (InterruptedException e) {
                                            // empty catch block
                                        }
                                    }
                                    if (this.closed) {
                                        this.waitingForMessage = false;
                                        break;
                                    }
                                    mes = (SpyMessage)this.messages.removeFirst();
                                    this.waitingForMessage = false;
                                }
                            }
                            mes.session = this.session;
                            if (!mes.isOutdated()) continue;
                            mes.doAcknowledge();
                            mes = null;
                            continue;
                        }
                        Object e = this.stateLock;
                        synchronized (e) {
                            if (!this.isListening()) {
                                if (mes != null) {
                                    this.session.connection.send(mes.getAcknowledgementRequest(false));
                                }
                                this.listenerThread = null;
                                mes = null;
                                break;
                            }
                            thisListener = this.messageListener;
                        }
                        message = mes;
                        if (mes instanceof SpyEncapsulatedMessage) {
                            message = ((SpyEncapsulatedMessage)mes).getMessage();
                        }
                        if (this.session.transacted) {
                            this.session.connection.spyXAResourceManager.ackMessage(this.session.getCurrentTransactionId(), mes);
                        }
                        try {
                            this.session.addUnacknowlegedMessage(message);
                            thisListener.onMessage((Message)message);
                        }
                        catch (RuntimeException e2) {
                            log.warn((Object)("Message listener " + thisListener + " threw a RuntimeException."));
                        }
                        if (this.session.transacted) break block32;
                        if (this.session.acknowledgeMode == 1) break block33;
                        if (this.session.acknowledgeMode != 3) break block32;
                    }
                    LinkedList linkedList = this.messages;
                    synchronized (linkedList) {
                        recovered = this.messages.contains(message);
                    }
                    if (!recovered) {
                        mes.doAcknowledge();
                    }
                }
                mes = null;
            }
        }
        catch (JMSException e) {
            log.warn((Object)"Message consumer closing due to error in listening thread.", (Throwable)e);
            try {
                this.close();
            }
            catch (Exception ignore) {}
        }
    }

    public String toString() {
        return "SpyMessageConsumer: " + this.subscription;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected boolean isListening() {
        Object object = this.stateLock;
        synchronized (object) {
            return this.listening;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void sessionConsumerProcessMessage(SpyMessage message) throws JMSException {
        block20: {
            block21: {
                block19: {
                    MessageListener thisListener;
                    message.session = this.session;
                    if (message.isOutdated()) {
                        log.debug((Object)"I dropped a message (timeout)");
                        message.doAcknowledge();
                        return;
                    }
                    Object object = this.stateLock;
                    synchronized (object) {
                        thisListener = this.messageListener;
                    }
                    Object anonymousTXID = null;
                    if (this.session.transacted) {
                        if (this.session.getCurrentTransactionId() == null) {
                            anonymousTXID = this.session.connection.spyXAResourceManager.startTx();
                            this.session.setCurrentTransactionId(anonymousTXID);
                        }
                        this.session.connection.spyXAResourceManager.ackMessage(this.session.getCurrentTransactionId(), message);
                    }
                    if (thisListener != null) {
                        SpyMessage mes = message;
                        if (message instanceof SpyEncapsulatedMessage) {
                            mes = ((SpyEncapsulatedMessage)message).getMessage();
                        }
                        this.session.addUnacknowlegedMessage(mes);
                        thisListener.onMessage((Message)mes);
                    }
                    if (!this.session.transacted) break block19;
                    if (anonymousTXID != null && this.session.getCurrentTransactionId() == anonymousTXID) {
                        try {
                            try {
                                this.session.connection.spyXAResourceManager.endTx(anonymousTXID, true);
                                this.session.connection.spyXAResourceManager.rollback(anonymousTXID);
                            }
                            catch (XAException e) {
                                log.error((Object)"Could not rollback", (Throwable)e);
                                Object var7_9 = null;
                                this.session.unsetCurrentTransactionId(anonymousTXID);
                            }
                            Object var7_8 = null;
                            this.session.unsetCurrentTransactionId(anonymousTXID);
                        }
                        catch (Throwable throwable) {
                            Object var7_10 = null;
                            this.session.unsetCurrentTransactionId(anonymousTXID);
                            throw throwable;
                        }
                        throw new SpyJMSException("Messaged delivery was not controled by a Transaction Manager");
                    }
                    break block20;
                }
                if (this.session.acknowledgeMode == 1) break block21;
                if (this.session.acknowledgeMode != 3) break block20;
            }
            message.doAcknowledge();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void restartProcessing() {
        LinkedList linkedList = this.messages;
        synchronized (linkedList) {
            this.messages.notifyAll();
        }
    }

    Message getMessage() {
        LinkedList linkedList = this.messages;
        synchronized (linkedList) {
            while (true) {
                try {
                    SpyMessage mes;
                    Message rc;
                    do {
                        if (this.messages.size() != 0) continue;
                        return null;
                    } while ((rc = this.preProcessMessage(mes = (SpyMessage)this.messages.removeFirst())) == null);
                    return rc;
                }
                catch (Exception e) {
                    log.error((Object)"Ignoring error", (Throwable)e);
                    continue;
                }
                break;
            }
        }
    }

    Message preProcessMessage(SpyMessage message) throws JMSException {
        block5: {
            block7: {
                block8: {
                    block6: {
                        message.session = this.session;
                        this.session.addUnacknowlegedMessage(message);
                        if (message.isOutdated()) {
                            log.debug((Object)"I dropped a message (timeout)");
                            message.doAcknowledge();
                            return null;
                        }
                        if (this.isListening()) break block5;
                        if (!this.session.transacted) break block6;
                        this.session.connection.spyXAResourceManager.ackMessage(this.session.getCurrentTransactionId(), message);
                        break block7;
                    }
                    if (this.session.acknowledgeMode == 1) break block8;
                    if (this.session.acknowledgeMode != 3) break block7;
                }
                message.doAcknowledge();
            }
            if (message instanceof SpyEncapsulatedMessage) {
                return ((SpyEncapsulatedMessage)message).getMessage();
            }
            return message;
        }
        return message;
    }

    static /* synthetic */ Class class$(String x0) {
        try {
            return Class.forName(x0);
        }
        catch (ClassNotFoundException x1) {
            throw new NoClassDefFoundError(x1.getMessage());
        }
    }
}

