/*
 * Decompiled with CFR 0.152.
 */
package rx.apache.http;

import org.apache.http.HttpResponse;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.nio.client.HttpAsyncClient;
import org.apache.http.nio.client.methods.HttpAsyncMethods;
import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
import org.apache.http.protocol.BasicHttpContext;
import org.apache.http.protocol.HttpContext;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.apache.http.ObservableHttpResponse;
import rx.apache.http.consumers.ResponseConsumerDelegate;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;

public class ObservableHttp<T> {
    private final Observable.OnSubscribeFunc<T> onSubscribe;

    private ObservableHttp(Observable.OnSubscribeFunc<T> onSubscribe) {
        this.onSubscribe = onSubscribe;
    }

    private static <T> ObservableHttp<T> create(Observable.OnSubscribeFunc<T> onSubscribe) {
        return new ObservableHttp<T>(onSubscribe);
    }

    public Observable<T> toObservable() {
        return Observable.create(new Observable.OnSubscribeFunc<T>(){

            @Override
            public Subscription onSubscribe(Observer<? super T> observer) {
                return ObservableHttp.this.onSubscribe.onSubscribe(observer);
            }
        });
    }

    public static ObservableHttp<ObservableHttpResponse> createGet(String uri, HttpAsyncClient client) {
        return ObservableHttp.createRequest(HttpAsyncMethods.createGet(uri), client);
    }

    public static ObservableHttp<ObservableHttpResponse> createRequest(HttpAsyncRequestProducer requestProducer, HttpAsyncClient client) {
        return ObservableHttp.createRequest(requestProducer, client, new BasicHttpContext());
    }

    public static ObservableHttp<ObservableHttpResponse> createRequest(final HttpAsyncRequestProducer requestProducer, final HttpAsyncClient client, final HttpContext context) {
        return ObservableHttp.create(new Observable.OnSubscribeFunc<ObservableHttpResponse>(){

            @Override
            public Subscription onSubscribe(final Observer<? super ObservableHttpResponse> observer) {
                CompositeSubscription parentSubscription = new CompositeSubscription();
                parentSubscription.add(Subscriptions.from(client.execute(requestProducer, new ResponseConsumerDelegate(observer, parentSubscription), context, new FutureCallback<HttpResponse>(){

                    @Override
                    public void completed(HttpResponse result) {
                        observer.onCompleted();
                    }

                    @Override
                    public void failed(Exception ex) {
                        observer.onError(ex);
                    }

                    @Override
                    public void cancelled() {
                        observer.onCompleted();
                    }
                })));
                return parentSubscription;
            }
        });
    }
}

