/*
 * Decompiled with CFR 0.152.
 */
package de.zalando.paradox.nakadi.consumer.core.http.okhttp;

import com.google.common.base.Stopwatch;
import de.zalando.paradox.nakadi.consumer.core.AuthorizationValueProvider;
import de.zalando.paradox.nakadi.consumer.core.http.HttpGetRequest;
import de.zalando.paradox.nakadi.consumer.core.http.HttpResponseChunk;
import de.zalando.paradox.nakadi.consumer.core.utils.ThrowableUtils;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import okhttp3.Call;
import okhttp3.Headers;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okio.BufferedSource;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func0;
import rx.functions.Func1;

public class RxHttpRequest {
    private static final Logger LOGGER = LoggerFactory.getLogger(RxHttpRequest.class);
    private static final String AUTHORIZATION_HEADER = "Authorization";
    static final String BATCH_SPLITTER = "\n";
    private final OkHttpClient client;
    private final AuthorizationValueProvider authorizationValueProvider;

    public RxHttpRequest(long readTimeoutMillis, @Nullable AuthorizationValueProvider authorizationValueProvider) {
        this.authorizationValueProvider = authorizationValueProvider;
        this.client = new OkHttpClient.Builder().readTimeout(readTimeoutMillis, TimeUnit.MILLISECONDS).build();
    }

    public Observable<HttpResponseChunk> createRequest(HttpGetRequest requestProducer) {
        Func0 resourceFactory = () -> {
            Request request = null;
            try {
                request = RxHttpRequest.getRequest(requestProducer.getUrl(), this.withAuthorization(requestProducer.getHeaders()));
                LOGGER.info("Request [{}]", (Object)request);
                Call call = this.client.newCall(request);
                Response response = call.execute();
                LOGGER.debug("Received response with code [{}] and headers [{}]", (Object)response.code(), (Object)response.headers());
                return new HttpCall(call, response);
            }
            catch (Throwable t) {
                LOGGER.error("Encountered error while making request [{}] [{}]", (Object)request, (Object)ExceptionUtils.getMessage((Throwable)t));
                ThrowableUtils.throwException(t);
                return null;
            }
        };
        Func1 observableFactory = httpCall -> {
            BufferedSource source = ((HttpCall)httpCall).response.body().source();
            Scanner scanner = new Scanner(source.inputStream(), "UTF-8").useDelimiter(BATCH_SPLITTER);
            Spliterator<String> splt = Spliterators.spliterator(scanner, Long.MAX_VALUE, 272);
            Stream stream = (Stream)StreamSupport.stream(splt, false).onClose(scanner::close);
            int code = ((HttpCall)httpCall).response.code();
            Observable alternate = code != 200 ? Observable.just((Object)new HttpResponseChunk(code, "")) : Observable.empty();
            return Observable.from(stream::iterator).map(s -> {
                if (StringUtils.isNotEmpty((CharSequence)s)) {
                    return new HttpResponseChunk(code, (String)s);
                }
                LOGGER.warn("Received empty content");
                return new HttpResponseChunk(code, "");
            }).switchIfEmpty(alternate);
        };
        Action1 disposeAction = httpCall -> {
            LOGGER.debug("Dispose request [{}]", (Object)requestProducer.getUrl());
            httpCall.dispose();
        };
        AtomicReference stopWatchRef = new AtomicReference();
        return Observable.using((Func0)resourceFactory, (Func1)observableFactory, (Action1)disposeAction, (boolean)true).doOnSubscribe(() -> stopWatchRef.set(Stopwatch.createStarted())).doOnTerminate(() -> {
            Stopwatch stopWatch = (Stopwatch)stopWatchRef.get();
            LOGGER.info("Processed in [{}] [{}]", (Object)(null != stopWatch ? stopWatch.stop().toString() : "?"), (Object)requestProducer.getUrl());
        });
    }

    private Map<String, String> withAuthorization(Map<String, String> headers) {
        if (null == this.authorizationValueProvider || headers.containsKey(AUTHORIZATION_HEADER)) {
            return headers;
        }
        HashMap<String, String> result = new HashMap<String, String>(headers);
        String value = (String)this.authorizationValueProvider.get();
        result.put(AUTHORIZATION_HEADER, value);
        return result;
    }

    private static Request getRequest(URL url, Map<String, String> headers) {
        return new Request.Builder().url(url).headers(Headers.of(headers)).build();
    }

    private static class HttpCall {
        private final Call call;
        private final Response response;
        private final long threadId;

        HttpCall(Call call, Response response) {
            this.call = call;
            this.response = response;
            this.threadId = Thread.currentThread().getId();
        }

        void dispose() {
            if (this.threadId == Thread.currentThread().getId()) {
                if (null != this.response.body()) {
                    try {
                        this.response.body().close();
                    }
                    catch (Throwable t) {
                        LOGGER.error("Dispose error request [{}]", (Object)this.call.request(), (Object)t);
                    }
                }
            } else {
                try {
                    if (this.call.isCanceled()) {
                        LOGGER.warn("Already cancelled request [{}]", (Object)this.call.request());
                    } else {
                        LOGGER.info("Cancel request [{}]", (Object)this.call.request());
                        this.call.cancel();
                    }
                }
                catch (Throwable t) {
                    LOGGER.error("Cancel error request [{}]", (Object)this.call.request(), (Object)t);
                }
            }
        }
    }
}

