/*
 * Decompiled with CFR 0.152.
 */
package org.hornetq.core.protocol.proton;

import java.nio.ByteBuffer;
import java.util.EnumSet;
import java.util.Map;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.transaction.Coordinator;
import org.apache.qpid.proton.amqp.transaction.Declare;
import org.apache.qpid.proton.amqp.transaction.Declared;
import org.apache.qpid.proton.amqp.transaction.Discharge;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointError;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.impl.LinkImpl;
import org.apache.qpid.proton.engine.impl.TransportImpl;
import org.apache.qpid.proton.jms.EncodedMessage;
import org.apache.qpid.proton.message.impl.MessageImpl;
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.protocol.proton.HornetQAMQPProtocolMessageBundle;
import org.hornetq.core.protocol.proton.ProtonDeliveryHandler;
import org.hornetq.core.protocol.proton.ProtonRemotingConnection;
import org.hornetq.core.protocol.proton.ProtonSession;
import org.hornetq.core.protocol.proton.ProtonUtils;
import org.hornetq.core.protocol.proton.exceptions.HornetQAMQPException;
import org.hornetq.core.protocol.proton.exceptions.HornetQAMQPIllegalStateException;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.core.server.management.Notification;
import org.hornetq.core.server.management.NotificationListener;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.spi.core.protocol.ConnectionEntry;
import org.hornetq.spi.core.protocol.ProtocolManager;
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.BufferDecoder;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.utils.UUIDGenerator;
import org.jboss.netty.channel.ChannelHandler;

public class ProtonProtocolManager
implements ProtocolManager,
NotificationListener {
    public static final EnumSet<EndpointState> UNINITIALIZED = EnumSet.of(EndpointState.UNINITIALIZED);
    public static final EnumSet<EndpointState> INITIALIZED = EnumSet.complementOf(UNINITIALIZED);
    public static final EnumSet<EndpointState> ACTIVE = EnumSet.of(EndpointState.ACTIVE);
    public static final EnumSet<EndpointState> CLOSED = EnumSet.of(EndpointState.CLOSED);
    public static final EnumSet<EndpointState> ANY_ENDPOINT_STATE = EnumSet.of(EndpointState.CLOSED, EndpointState.ACTIVE, EndpointState.UNINITIALIZED);
    private final HornetQServer server;

    public ProtonProtocolManager(HornetQServer server) {
        this.server = server;
    }

    public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection connection) {
        ProtonRemotingConnection conn = new ProtonRemotingConnection(acceptorUsed, connection, this);
        return new ConnectionEntry((RemotingConnection)conn, null, System.currentTimeMillis(), 60000L);
    }

    public void removeHandler(String name) {
    }

    public int isReadyToHandle(HornetQBuffer buffer) {
        return -1;
    }

    public void handleBuffer(RemotingConnection connection, HornetQBuffer buffer) {
        ProtonRemotingConnection protonRemotingConnection = (ProtonRemotingConnection)connection;
        protonRemotingConnection.setDataReceived();
        byte[] frame = new byte[buffer.capacity()];
        buffer.readBytes(frame);
        protonRemotingConnection.handleFrame(frame);
    }

    public void addChannelHandlers(String protocol, Map<String, ChannelHandler> handlers, BufferDecoder decoder) {
    }

    public boolean isSupportsWebsockets(String protocol) {
        return false;
    }

    public void onNotification(Notification notification) {
    }

    public ServerMessageImpl createServerMessage() {
        return new ServerMessageImpl(this.server.getStorageManager().generateUniqueID(), 512);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void handleMessage(final Receiver receiver, HornetQBuffer buffer, final Delivery delivery, final ProtonRemotingConnection connection, ProtonSession protonSession, String address) throws Exception {
        Object object = connection.getDeliveryLock();
        synchronized (object) {
            int count;
            byte[] data = new byte[1024];
            while ((count = receiver.recv(data, 0, data.length)) > 0) {
                buffer.writeBytes(data, 0, count);
            }
            if (count == 0) {
                return;
            }
            receiver.advance();
            byte[] bytes = new byte[buffer.readableBytes()];
            buffer.readBytes(bytes);
            buffer.clear();
            EncodedMessage encodedMessage = new EncodedMessage((long)delivery.getMessageFormat(), bytes, 0, bytes.length);
            ServerMessageImpl message = ProtonUtils.INBOUND.transform(connection, encodedMessage);
            if (address != null) {
                message.setAddress(new SimpleString(address));
            }
            protonSession.getServerSession().send((ServerMessage)message, true);
            this.server.getStorageManager().afterCompleteOperations(new IOAsyncTask(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                public void done() {
                    Object object = connection.getDeliveryLock();
                    synchronized (object) {
                        receiver.flow(1);
                        delivery.settle();
                    }
                }

                public void onError(int errorCode, String errorMessage) {
                    ((LinkImpl)receiver).setLocalError(new EndpointError("" + errorCode, errorMessage));
                }
            });
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void handleDelivery(final Sender sender, byte[] tag, EncodedMessage encodedMessage, ServerMessage message, ProtonRemotingConnection connection, final boolean preSettle) {
        Object object = connection.getDeliveryLock();
        synchronized (object) {
            final Delivery delivery = sender.delivery(tag, 0, tag.length);
            delivery.setContext((Object)message);
            sender.send(encodedMessage.getArray(), 0, encodedMessage.getLength());
            this.server.getStorageManager().afterCompleteOperations(new IOAsyncTask(){

                public void done() {
                    if (preSettle) {
                        delivery.settle();
                        ((LinkImpl)sender).addCredit(1);
                    } else {
                        sender.advance();
                    }
                }

                public void onError(int errorCode, String errorMessage) {
                    ((LinkImpl)sender).setLocalError(new EndpointError("" + errorCode, errorMessage));
                }
            });
        }
        connection.write();
    }

    void handleNewLink(Link link, ProtonSession protonSession) throws HornetQAMQPException {
        link.setSource(link.getRemoteSource());
        link.setTarget(link.getRemoteTarget());
        if (link instanceof Receiver) {
            Receiver receiver = (Receiver)link;
            if (link.getRemoteTarget() instanceof Coordinator) {
                protonSession.initialise(true);
                Coordinator coordinator = (Coordinator)link.getRemoteTarget();
                protonSession.addTransactionHandler(coordinator, receiver);
            } else {
                protonSession.initialise(false);
                protonSession.addProducer(receiver);
                receiver.flow(100);
            }
        } else {
            protonSession.initialise(false);
            Sender sender = (Sender)link;
            protonSession.addConsumer(sender);
            sender.offer(1);
        }
    }

    public ProtonSession createSession(ProtonRemotingConnection protonConnection, TransportImpl protonTransport) throws HornetQAMQPException {
        String name = UUIDGenerator.getInstance().generateStringUUID();
        return new ProtonSession(name, protonConnection, this, this.server.getStorageManager().newContext(this.server.getExecutorFactory().getExecutor()), this.server, protonTransport);
    }

    void handleActiveLink(Link link) throws HornetQAMQPException {
        link.setSource(link.getRemoteSource());
        link.setTarget(link.getRemoteTarget());
        ProtonDeliveryHandler handler = (ProtonDeliveryHandler)link.getContext();
        handler.checkState();
    }

    public void handleTransaction(Receiver receiver, HornetQBuffer buffer, Delivery delivery, ProtonSession protonSession) throws HornetQAMQPIllegalStateException {
        int count;
        byte[] data = new byte[1024];
        while ((count = receiver.recv(data, 0, data.length)) > 0) {
            buffer.writeBytes(data, 0, count);
        }
        if (count == 0) {
            return;
        }
        receiver.advance();
        byte[] bytes = new byte[buffer.readableBytes()];
        buffer.readBytes(bytes);
        buffer.clear();
        MessageImpl msg = new MessageImpl();
        msg.decode(bytes, 0, bytes.length);
        Object action = ((AmqpValue)msg.getBody()).getValue();
        if (action instanceof Declare) {
            Transaction tx = protonSession.getServerSession().getCurrentTransaction();
            Declared declared = new Declared();
            declared.setTxnId(new Binary(this.longToBytes(tx.getID())));
            delivery.disposition((DeliveryState)declared);
            delivery.settle();
        } else if (action instanceof Discharge) {
            Discharge discharge = (Discharge)action;
            if (discharge.getFail().booleanValue()) {
                try {
                    protonSession.getServerSession().rollback(false);
                }
                catch (Exception e) {
                    throw HornetQAMQPProtocolMessageBundle.BUNDLE.errorRollingbackCoordinator(e.getMessage());
                }
            }
            try {
                protonSession.getServerSession().commit();
            }
            catch (Exception e) {
                throw HornetQAMQPProtocolMessageBundle.BUNDLE.errorCommittingCoordinator(e.getMessage());
            }
            delivery.settle();
        }
    }

    public byte[] longToBytes(long x) {
        ByteBuffer buffer = ByteBuffer.allocate(8);
        buffer.putLong(x);
        return buffer.array();
    }
}

