/*
 * Decompiled with CFR 0.152.
 */
package org.granite.gravity;

import flex.messaging.messages.AsyncMessage;
import flex.messaging.messages.Message;
import java.io.IOException;
import java.io.ObjectOutput;
import java.io.OutputStream;
import java.net.SocketException;
import java.util.Collection;
import java.util.LinkedList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.granite.context.AMFContextImpl;
import org.granite.context.GraniteContext;
import org.granite.gravity.AsyncHttpContext;
import org.granite.gravity.AsyncPublishedMessage;
import org.granite.gravity.AsyncPublisher;
import org.granite.gravity.AsyncReceiver;
import org.granite.gravity.Channel;
import org.granite.gravity.ChannelFactory;
import org.granite.gravity.Gravity;
import org.granite.gravity.GravityConfig;
import org.granite.gravity.MessagePublishingException;
import org.granite.gravity.MessageReceivingException;
import org.granite.gravity.Subscription;
import org.granite.gravity.udp.UdpReceiver;
import org.granite.gravity.udp.UdpReceiverFactory;
import org.granite.logging.Logger;
import org.granite.messaging.webapp.HttpGraniteContext;
import org.granite.util.ContentType;

public abstract class AbstractChannel
implements Channel {
    private static final Logger log = Logger.getLogger(AbstractChannel.class);
    protected final String id;
    protected final String sessionId;
    protected final String clientType;
    protected final Gravity gravity;
    protected final ChannelFactory<? extends Channel> factory;
    protected final ConcurrentMap<String, Subscription> subscriptions = new ConcurrentHashMap<String, Subscription>();
    protected LinkedList<AsyncPublishedMessage> publishedQueue = new LinkedList();
    protected final Lock publishedQueueLock = new ReentrantLock();
    protected LinkedList<AsyncMessage> receivedQueue = new LinkedList();
    protected final Lock receivedQueueLock = new ReentrantLock();
    protected final AsyncPublisher publisher;
    protected final AsyncReceiver httpReceiver;
    protected UdpReceiver udpReceiver = null;

    protected AbstractChannel(Gravity gravity, String id, ChannelFactory<? extends Channel> factory, String clientType) {
        if (id == null) {
            throw new NullPointerException("id cannot be null");
        }
        this.id = id;
        GraniteContext graniteContext = GraniteContext.getCurrentInstance();
        this.clientType = clientType;
        this.sessionId = graniteContext != null ? graniteContext.getSessionId() : null;
        this.gravity = gravity;
        this.factory = factory;
        this.publisher = new AsyncPublisher(this);
        this.httpReceiver = new AsyncReceiver(this);
    }

    protected abstract boolean hasAsyncHttpContext();

    protected abstract AsyncHttpContext acquireAsyncHttpContext();

    protected abstract void releaseAsyncHttpContext(AsyncHttpContext var1);

    @Override
    public String getId() {
        return this.id;
    }

    @Override
    public String getClientType() {
        return this.clientType;
    }

    public ChannelFactory<? extends Channel> getFactory() {
        return this.factory;
    }

    @Override
    public Gravity getGravity() {
        return this.gravity;
    }

    @Override
    public Subscription addSubscription(String destination, String subTopicId, String subscriptionId, boolean noLocal) {
        Subscription subscription = new Subscription(this, destination, subTopicId, subscriptionId, noLocal);
        Subscription present = this.subscriptions.putIfAbsent(subscriptionId, subscription);
        return present != null ? present : subscription;
    }

    @Override
    public Collection<Subscription> getSubscriptions() {
        return this.subscriptions.values();
    }

    @Override
    public Subscription removeSubscription(String subscriptionId) {
        return (Subscription)this.subscriptions.remove(subscriptionId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void publish(AsyncPublishedMessage message) throws MessagePublishingException {
        if (message == null) {
            throw new NullPointerException("message cannot be null");
        }
        this.publishedQueueLock.lock();
        try {
            this.publishedQueue.add(message);
        }
        finally {
            this.publishedQueueLock.unlock();
        }
        this.publisher.queue(this.getGravity());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean hasPublishedMessage() {
        this.publishedQueueLock.lock();
        try {
            boolean bl = !this.publishedQueue.isEmpty();
            return bl;
        }
        finally {
            this.publishedQueueLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean runPublish() {
        LinkedList<AsyncPublishedMessage> publishedCopy = null;
        this.publishedQueueLock.lock();
        try {
            if (this.publishedQueue.isEmpty()) {
                boolean bl = false;
                return bl;
            }
            publishedCopy = this.publishedQueue;
            this.publishedQueue = new LinkedList();
        }
        finally {
            this.publishedQueueLock.unlock();
        }
        for (AsyncPublishedMessage message : publishedCopy) {
            try {
                message.publish(this);
            }
            catch (Exception e) {
                log.error(e, "Error while trying to publish message: %s", message);
            }
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void receive(AsyncMessage message) throws MessageReceivingException {
        if (message == null) {
            throw new NullPointerException("message cannot be null");
        }
        Gravity gravity = this.getGravity();
        if (this.udpReceiver != null) {
            if (this.udpReceiver.isClosed()) {
                return;
            }
            try {
                this.udpReceiver.receive(message);
            }
            catch (MessageReceivingException e) {
                if (e.getCause() instanceof SocketException) {
                    log.debug(e, "Closing unreachable UDP channel %s", this.getId());
                    this.udpReceiver.close(false);
                }
                log.error(e, "Cannot access UDP channel %s", this.getId());
            }
            return;
        }
        this.receivedQueueLock.lock();
        try {
            if (this.receivedQueue.size() + 1 > gravity.getGravityConfig().getMaxMessagesQueuedPerChannel()) {
                throw new MessageReceivingException((Message)message, "Could not queue message (channel's queue is full) for channel: " + this);
            }
            this.receivedQueue.add(message);
        }
        finally {
            this.receivedQueueLock.unlock();
        }
        if (this.hasAsyncHttpContext()) {
            this.httpReceiver.queue(gravity);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean hasReceivedMessage() {
        this.receivedQueueLock.lock();
        try {
            boolean bl = !this.receivedQueue.isEmpty();
            return bl;
        }
        finally {
            this.receivedQueueLock.unlock();
        }
    }

    @Override
    public boolean runReceive() {
        return this.runReceived(null);
    }

    public ObjectOutput newSerializer(GraniteContext context, OutputStream os) {
        return context.getGraniteConfig().newAMF3Serializer(os);
    }

    public String getSerializerContentType() {
        return ContentType.AMF.mimeType();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void createUdpReceiver(UdpReceiverFactory factory, AsyncHttpContext asyncHttpContext) {
        OutputStream os = null;
        try {
            Message connectMessage = asyncHttpContext.getConnectMessage();
            if (this.udpReceiver == null || this.udpReceiver.isClosed()) {
                this.udpReceiver = factory.newReceiver(this, asyncHttpContext.getRequest(), connectMessage);
            }
            AsyncMessage reply = this.udpReceiver.acknowledge(connectMessage);
            HttpServletRequest request = asyncHttpContext.getRequest();
            HttpServletResponse response = asyncHttpContext.getResponse();
            HttpGraniteContext context = HttpGraniteContext.createThreadIntance(this.gravity.getGraniteConfig(), this.gravity.getServicesConfig(), null, request, response);
            ((AMFContextImpl)context.getAMFContext()).setCurrentAmf3Message(asyncHttpContext.getConnectMessage());
            response.setStatus(200);
            response.setContentType(this.getSerializerContentType());
            response.setDateHeader("Expire", 0L);
            response.setHeader("Cache-Control", "no-store");
            os = response.getOutputStream();
            ObjectOutput serializer = this.newSerializer(context, os);
            serializer.writeObject(new AsyncMessage[]{reply});
            os.flush();
            response.flushBuffer();
        }
        catch (IOException e) {
            log.error(e, "Could not send UDP connect acknowledgement to channel: %s", this);
        }
        finally {
            try {
                GraniteContext.release();
            }
            catch (Exception e) {}
            try {
                if (os != null) {
                    try {
                        os.close();
                    }
                    catch (IOException e) {
                        log.warn(e, "Could not close output stream (ignored)", new Object[0]);
                    }
                }
            }
            finally {
                this.releaseAsyncHttpContext(asyncHttpContext);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean runReceived(AsyncHttpContext asyncHttpContext) {
        Gravity gravity = this.getGravity();
        if (asyncHttpContext != null && gravity.hasUdpReceiverFactory()) {
            UdpReceiverFactory factory = gravity.getUdpReceiverFactory();
            if (factory.isUdpConnectRequest(asyncHttpContext.getConnectMessage())) {
                this.createUdpReceiver(factory, asyncHttpContext);
                return true;
            }
            if (this.udpReceiver != null) {
                if (!this.udpReceiver.isClosed()) {
                    this.udpReceiver.close(false);
                }
                this.udpReceiver = null;
            }
        }
        boolean httpAsParam = asyncHttpContext != null;
        LinkedList<AsyncMessage> messages = null;
        OutputStream os = null;
        try {
            this.receivedQueueLock.lock();
            try {
                if (this.receivedQueue.isEmpty()) {
                    boolean bl = false;
                    return bl;
                }
                if (asyncHttpContext == null && (asyncHttpContext = this.acquireAsyncHttpContext()) == null) {
                    boolean bl = false;
                    return bl;
                }
                messages = this.receivedQueue;
                this.receivedQueue = new LinkedList();
            }
            finally {
                this.receivedQueueLock.unlock();
            }
            HttpServletRequest request = asyncHttpContext.getRequest();
            HttpServletResponse response = asyncHttpContext.getResponse();
            String correlationId = asyncHttpContext.getConnectMessage().getMessageId();
            AsyncMessage[] messagesArray = new AsyncMessage[messages.size()];
            int i = 0;
            for (AsyncMessage message : messages) {
                message.setCorrelationId(correlationId);
                messagesArray[i++] = message;
            }
            HttpGraniteContext context = HttpGraniteContext.createThreadIntance(gravity.getGraniteConfig(), gravity.getServicesConfig(), null, request, response);
            ((AMFContextImpl)context.getAMFContext()).setCurrentAmf3Message(asyncHttpContext.getConnectMessage());
            response.setStatus(200);
            response.setContentType(this.getSerializerContentType());
            response.setDateHeader("Expire", 0L);
            response.setHeader("Cache-Control", "no-store");
            os = response.getOutputStream();
            ObjectOutput serializer = this.newSerializer(context, os);
            log.debug("<< [MESSAGES for channel=%s] %s", this, messagesArray);
            serializer.writeObject(messagesArray);
            os.flush();
            response.flushBuffer();
            boolean bl = true;
            return bl;
        }
        catch (IOException e) {
            log.warn(e, "Could not send messages to channel: %s (retrying later)", this);
            GravityConfig gravityConfig = this.getGravity().getGravityConfig();
            if (gravityConfig.isRetryOnError()) {
                this.receivedQueueLock.lock();
                try {
                    if (this.receivedQueue.size() + messages.size() > gravityConfig.getMaxMessagesQueuedPerChannel()) {
                        log.warn("Channel %s has reached its maximum queue capacity %s (throwing %s messages)", this, gravityConfig.getMaxMessagesQueuedPerChannel(), messages.size());
                    } else {
                        this.receivedQueue.addAll(0, messages);
                    }
                }
                finally {
                    this.receivedQueueLock.unlock();
                }
            }
            boolean bl = true;
            return bl;
        }
        finally {
            try {
                GraniteContext.release();
            }
            catch (Exception e) {}
            try {
                if (os != null) {
                    try {
                        os.close();
                    }
                    catch (IOException e) {
                        log.warn(e, "Could not close output stream (ignored)", new Object[0]);
                    }
                }
            }
            finally {
                if (!httpAsParam) {
                    this.releaseAsyncHttpContext(asyncHttpContext);
                }
            }
        }
    }

    public void destroy() {
        this.destroy(false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void destroy(boolean timeout) {
        try {
            Gravity gravity = this.getGravity();
            gravity.cancel(this.publisher);
            gravity.cancel(this.httpReceiver);
            this.subscriptions.clear();
        }
        finally {
            if (this.udpReceiver != null) {
                if (!this.udpReceiver.isClosed()) {
                    this.udpReceiver.close(timeout);
                }
                this.udpReceiver = null;
            }
        }
    }

    protected boolean queueReceiver() {
        if (this.hasReceivedMessage()) {
            this.httpReceiver.queue(this.getGravity());
            return true;
        }
        return false;
    }

    public boolean equals(Object obj) {
        return obj instanceof Channel && this.id.equals(((Channel)obj).getId());
    }

    public int hashCode() {
        return this.id.hashCode();
    }

    public String toString() {
        return this.getClass().getName() + " {id=" + this.id + ", subscriptions=" + this.subscriptions.values() + "}";
    }
}

