/*
 * Decompiled with CFR 0.152.
 */
package org.mule.providers.vm;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import org.mule.MuleManager;
import org.mule.config.i18n.CoreMessages;
import org.mule.impl.MuleMessage;
import org.mule.providers.AbstractMessageDispatcher;
import org.mule.providers.vm.VMConnector;
import org.mule.providers.vm.VMMessageReceiver;
import org.mule.providers.vm.i18n.VMMessages;
import org.mule.transaction.TransactionCallback;
import org.mule.transaction.TransactionTemplate;
import org.mule.transformers.simple.ObjectToByteArray;
import org.mule.umo.UMOEvent;
import org.mule.umo.UMOMessage;
import org.mule.umo.endpoint.UMOEndpointURI;
import org.mule.umo.endpoint.UMOImmutableEndpoint;
import org.mule.umo.provider.DispatchException;
import org.mule.umo.provider.NoReceiverForEndpointException;
import org.mule.umo.provider.UMOStreamMessageAdapter;
import org.mule.util.queue.Queue;
import org.mule.util.queue.QueueSession;

public class VMMessageDispatcher
extends AbstractMessageDispatcher {
    private final VMConnector connector;
    private final ObjectToByteArray objectToByteArray;

    public VMMessageDispatcher(UMOImmutableEndpoint endpoint) {
        super(endpoint);
        this.connector = (VMConnector)endpoint.getConnector();
        this.objectToByteArray = new ObjectToByteArray();
    }

    protected UMOMessage doReceive(long timeout) throws Exception {
        if (!this.connector.isQueueEvents()) {
            throw new UnsupportedOperationException("Receive requested on VM Connector, but queueEvents is false");
        }
        QueueSession queueSession = this.connector.getQueueSession();
        Queue queue = queueSession.getQueue(this.endpoint.getEndpointURI().getAddress());
        if (queue == null) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)("No queue with name " + this.endpoint.getEndpointURI().getAddress()));
            }
            return null;
        }
        UMOEvent event = null;
        if (this.logger.isDebugEnabled()) {
            this.logger.debug((Object)("Waiting for a message on " + this.endpoint.getEndpointURI().getAddress()));
        }
        try {
            event = (UMOEvent)queue.poll(timeout);
        }
        catch (InterruptedException e) {
            this.logger.error((Object)("Failed to receive event from queue: " + this.endpoint.getEndpointURI()));
        }
        if (event != null) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)("Event received: " + event));
            }
            return event.getMessage();
        }
        if (this.logger.isDebugEnabled()) {
            this.logger.debug((Object)("No event received after " + timeout + " ms"));
        }
        return null;
    }

    protected void doDispatch(final UMOEvent event) throws Exception {
        UMOEndpointURI endpointUri = event.getEndpoint().getEndpointURI();
        if (endpointUri == null) {
            throw new DispatchException(CoreMessages.objectIsNull("Endpoint"), event.getMessage(), event.getEndpoint());
        }
        if (this.connector.isQueueEvents()) {
            QueueSession session = this.connector.getQueueSession();
            Queue queue = session.getQueue(endpointUri.getAddress());
            queue.put(event);
        } else {
            final VMMessageReceiver receiver = this.connector.getReceiver(event.getEndpoint().getEndpointURI());
            if (receiver == null) {
                this.logger.warn((Object)("No receiver for endpointUri: " + event.getEndpoint().getEndpointURI()));
                return;
            }
            if (event.isStreaming()) {
                PipedInputStream in = new PipedInputStream();
                PipedOutputStream out = new PipedOutputStream(in);
                UMOStreamMessageAdapter sma = this.connector.getStreamMessageAdapter(in, out);
                sma.write(event);
            }
            TransactionTemplate tt = new TransactionTemplate(receiver.getEndpoint().getTransactionConfig(), this.connector.getExceptionListener());
            TransactionCallback cb = new TransactionCallback(){

                public Object doInTransaction() throws Exception {
                    receiver.onEvent(event);
                    return null;
                }
            };
            tt.execute(cb);
        }
        if (this.logger.isDebugEnabled()) {
            this.logger.debug((Object)("dispatched Event on endpointUri: " + endpointUri));
        }
    }

    protected UMOMessage doSend(final UMOEvent event) throws Exception {
        UMOMessage retMessage = null;
        UMOEndpointURI endpointUri = event.getEndpoint().getEndpointURI();
        final VMMessageReceiver receiver = this.connector.getReceiver(endpointUri);
        if (receiver == null) {
            if (this.connector.isQueueEvents()) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug((Object)("Writing to queue as there is no receiver on connector: " + this.connector.getName() + ", for endpointUri: " + event.getEndpoint().getEndpointURI()));
                }
                this.doDispatch(event);
                return null;
            }
            throw new NoReceiverForEndpointException(VMMessages.noReceiverForEndpoint(this.connector.getName(), event.getEndpoint().getEndpointURI()));
        }
        if (event.isStreaming()) {
            PipedInputStream in = new PipedInputStream();
            PipedOutputStream out = new PipedOutputStream(in);
            UMOStreamMessageAdapter sma = this.connector.getStreamMessageAdapter(in, out);
            sma.write(event);
        }
        TransactionTemplate tt = new TransactionTemplate(receiver.getEndpoint().getTransactionConfig(), this.connector.getExceptionListener());
        TransactionCallback cb = new TransactionCallback(){

            public Object doInTransaction() throws Exception {
                return receiver.onCall(event);
            }
        };
        retMessage = (UMOMessage)tt.execute(cb);
        if (event.isStreaming() && retMessage != null) {
            InputStream in = retMessage.getPayload() instanceof InputStream ? (InputStream)retMessage.getPayload() : new ByteArrayInputStream((byte[])this.objectToByteArray.transform(retMessage.getPayload()));
            UMOStreamMessageAdapter sma = this.connector.getStreamMessageAdapter(in, null);
            retMessage = new MuleMessage((Object)sma, retMessage);
        }
        if (this.logger.isDebugEnabled()) {
            this.logger.debug((Object)("sent event on endpointUri: " + event.getEndpoint().getEndpointURI()));
        }
        return retMessage;
    }

    protected void doDispose() {
    }

    protected void doConnect() throws Exception {
        if (this.connector.isQueueEvents()) {
            MuleManager.getConfiguration().getQueueProfile().configureQueue(this.endpoint.getEndpointURI().getAddress());
        }
    }

    protected void doDisconnect() throws Exception {
    }
}

