/*
 * Decompiled with CFR 0.152.
 */
package com.googlecode.mobilityrpc.network.impl.tcp;

import com.googlecode.mobilityrpc.network.ConnectionId;
import com.googlecode.mobilityrpc.network.impl.ConnectionErrorHandler;
import com.googlecode.mobilityrpc.network.impl.ConnectionInternal;
import com.googlecode.mobilityrpc.network.impl.ConnectionStateListener;
import com.googlecode.mobilityrpc.network.impl.IncomingMessageHandler;
import com.googlecode.mobilityrpc.network.impl.MessageProvider;
import com.googlecode.mobilityrpc.network.impl.tcp.IncomingByteStreamReader;
import com.googlecode.mobilityrpc.network.impl.tcp.OutgoingByteStreamWriter;
import com.googlecode.mobilityrpc.network.impl.tcp.StreamClosedException;
import java.net.Socket;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

public class TCPConnection
implements ConnectionInternal {
    private final Logger logger = Logger.getLogger(this.getClass().getName());
    private final Socket socket;
    private final ConnectionId connectionId;
    private final BlockingQueue<byte[]> outgoingMessageQueue = new LinkedBlockingQueue<byte[]>();
    private final IncomingMessageHandler incomingMessageHandler;
    private final ConnectionStateListener connectionStateListener;
    private IncomingByteStreamReader incomingByteStreamReader = null;
    private OutgoingByteStreamWriter outgoingByteStreamWriter = null;

    public TCPConnection(Socket socket, ConnectionId connectionId, IncomingMessageHandler incomingMessageHandler, ConnectionStateListener connectionStateListener) {
        this.socket = socket;
        this.connectionId = connectionId;
        this.incomingMessageHandler = incomingMessageHandler;
        this.connectionStateListener = connectionStateListener;
    }

    @Override
    public ConnectionId getConnectionId() {
        return this.connectionId;
    }

    @Override
    public void enqueueOutgoingMessage(byte[] message) {
        this.outgoingMessageQueue.add(message);
        if (this.logger.isLoggable(Level.FINER)) {
            this.logger.log(Level.FINER, "Enqueued outgoing message for connection id '" + this.connectionId + "': " + message.length + " bytes");
        }
    }

    @Override
    public void init() {
        if (this.incomingByteStreamReader != null || this.outgoingByteStreamWriter != null) {
            throw new IllegalStateException("Already initialised.");
        }
        try {
            this.incomingByteStreamReader = new IncomingByteStreamReader(this.connectionId, this.socket.getInputStream(), this.incomingMessageHandler, new ConnectionErrorHandler(){

                @Override
                public void handle(Exception e) {
                    if (e instanceof StreamClosedException) {
                        if (TCPConnection.this.logger.isLoggable(Level.FINEST)) {
                            TCPConnection.this.logger.log(Level.FINEST, "Stream closed explicitly by remote side, closing connection: " + TCPConnection.this.connectionId, e);
                        } else {
                            TCPConnection.this.logger.log(Level.FINE, "Stream closed explicitly by remote side, closing connection (enable finest-level logging for more detail): {0}", TCPConnection.this.connectionId);
                        }
                    } else {
                        TCPConnection.this.logger.log(Level.WARNING, "Exception in IncomingByteStreamReader, closing connection: " + TCPConnection.this.connectionId, e);
                    }
                    TCPConnection.this.destroy();
                }
            });
        }
        catch (Exception e) {
            throw new IllegalStateException("Failed to initialize IncomingByteStreamReader for: " + this.connectionId, e);
        }
        try {
            this.outgoingByteStreamWriter = new OutgoingByteStreamWriter(this.connectionId, this.socket.getOutputStream(), new MessageProvider<byte[]>(){

                @Override
                public byte[] getNextMessage() {
                    try {
                        return (byte[])TCPConnection.this.outgoingMessageQueue.take();
                    }
                    catch (InterruptedException e) {
                        throw new IllegalStateException("Interrupted while waiting to take message fom outgoing message queue", e);
                    }
                }
            }, new ConnectionErrorHandler(){

                @Override
                public void handle(Exception e) {
                    TCPConnection.this.logger.log(Level.WARNING, "Exception in OutgoingByteStreamWriter, closing connection: " + TCPConnection.this.connectionId, e);
                    TCPConnection.this.destroy();
                }
            });
        }
        catch (Exception e) {
            throw new IllegalStateException("Failed to initialize OutgoingByteStreamWriter for: " + this.connectionId, e);
        }
        this.incomingByteStreamReader.start();
        this.outgoingByteStreamWriter.start();
        this.logger.log(Level.FINER, "Initialized TCP connection for: {0}", this.connectionId);
    }

    @Override
    public void destroy() {
        if (this.incomingByteStreamReader == null || this.outgoingByteStreamWriter == null) {
            return;
        }
        this.incomingByteStreamReader.shutdown();
        this.outgoingByteStreamWriter.shutdown();
        this.connectionStateListener.notifyConnectionClosed(this);
    }
}

