/*
 * Decompiled with CFR 0.152.
 */
package rx.apache.http.consumers;

import java.io.IOException;
import org.apache.http.HttpEntity;
import org.apache.http.HttpException;
import org.apache.http.HttpResponse;
import org.apache.http.entity.ContentType;
import org.apache.http.nio.ContentDecoder;
import org.apache.http.nio.IOControl;
import org.apache.http.nio.protocol.BasicAsyncResponseConsumer;
import org.apache.http.protocol.HttpContext;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.apache.http.ObservableHttpResponse;
import rx.apache.http.consumers.ExpandableByteBuffer;
import rx.apache.http.consumers.ResponseDelegate;
import rx.subscriptions.CompositeSubscription;

class ResponseConsumerBasic
extends BasicAsyncResponseConsumer
implements ResponseDelegate {
    private final Observer<? super ObservableHttpResponse> observer;
    private final CompositeSubscription parentSubscription;

    public ResponseConsumerBasic(Observer<? super ObservableHttpResponse> observer, CompositeSubscription parentSubscription) {
        this.observer = observer;
        this.parentSubscription = parentSubscription;
    }

    @Override
    public void _onResponseReceived(HttpResponse response) throws HttpException, IOException {
        this.onResponseReceived(response);
    }

    @Override
    public void _onContentReceived(ContentDecoder decoder, IOControl ioctrl) throws IOException {
        if (this.parentSubscription.isUnsubscribed()) {
            ioctrl.shutdown();
        }
        this.onContentReceived(decoder, ioctrl);
    }

    @Override
    public void _onEntityEnclosed(HttpEntity entity, ContentType contentType) throws IOException {
        this.onEntityEnclosed(entity, contentType);
    }

    @Override
    public HttpResponse _buildResult(HttpContext context) throws Exception {
        final HttpResponse response = this.buildResult(context);
        Observable<byte[]> contentObservable = Observable.create(new Observable.OnSubscribeFunc<byte[]>(){

            @Override
            public Subscription onSubscribe(Observer<? super byte[]> o) {
                long length = response.getEntity().getContentLength();
                if (length > Integer.MAX_VALUE) {
                    o.onError(new IllegalStateException("Content Length too large for a byte[] => " + length));
                } else {
                    ExpandableByteBuffer buf = new ExpandableByteBuffer((int)length);
                    try {
                        buf.consumeInputStream(response.getEntity().getContent());
                        o.onNext((byte[])buf.getBytes());
                        o.onCompleted();
                    }
                    catch (Throwable e) {
                        o.onError(e);
                    }
                }
                return ResponseConsumerBasic.this.parentSubscription;
            }
        });
        this.observer.onNext(new ObservableHttpResponse(response, contentObservable));
        return response;
    }

    @Override
    public void _releaseResources() {
        this.releaseResources();
    }
}

