/*
 * Decompiled with CFR 0.152.
 */
package de.zalando.paradox.nakadi.consumer.core.http;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import de.zalando.paradox.nakadi.consumer.core.http.HttpReactiveHandler;
import de.zalando.paradox.nakadi.consumer.core.http.HttpResponseChunk;
import de.zalando.paradox.nakadi.consumer.core.utils.ThrowableUtils;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import rx.Observable;
import rx.Scheduler;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.schedulers.Schedulers;

public class HttpReactiveReceiver
implements Closeable {
    private final Logger log;
    private final AtomicBoolean running = new AtomicBoolean(false);
    private Subscription subscription;
    private final HttpReactiveHandler eventHandler;
    private final Scheduler scheduler;

    public HttpReactiveReceiver(HttpReactiveHandler eventHandler) {
        this.eventHandler = eventHandler;
        this.log = eventHandler.getLogger(this.getClass());
        this.scheduler = Schedulers.io();
    }

    @VisibleForTesting
    HttpReactiveReceiver(HttpReactiveHandler eventHandler, Scheduler scheduler) {
        this.eventHandler = eventHandler;
        this.log = eventHandler.getLogger(this.getClass());
        this.scheduler = scheduler;
    }

    public void init() {
        this.log.info("Starting HTTP event receiver");
        if (!this.running.compareAndSet(false, true)) {
            this.log.info("HTTP reactive receiver is already running");
            return;
        }
        this.eventHandler.init();
        Observable responses = this.eventHandler.createRequest();
        responses = responses.subscribeOn(this.scheduler);
        responses = responses.unsubscribeOn(this.scheduler);
        responses = this.handleSubscription(responses);
        responses = this.handleRestart(responses);
        Action1 error = t -> this.log.error("Subscription handler error [{}] / [{}] ", new Object[]{t.getClass().getName(), ExceptionUtils.getMessage((Throwable)t), t});
        Action0 compete = () -> this.log.info("Subscription handler completed");
        this.subscription = responses.subscribe(this.getAction(), error, compete);
    }

    private <T> Observable<T> handleSubscription(Observable<T> observable) {
        return observable.doOnSubscribe(() -> {
            this.log.debug("Handler subscription started");
            this.eventHandler.onStarted();
        }).doOnUnsubscribe(() -> {
            this.log.debug("Handler subscription finished");
            this.eventHandler.onFinished();
        });
    }

    private <T> Observable<T> handleRestart(Observable<T> observable) {
        return observable.retryWhen(o -> o.compose(this.zipWithFlatMap("retry"))).repeatWhen(o -> o.compose(this.zipWithFlatMap("repeat")));
    }

    private <T> Observable.Transformer<T, Long> zipWithFlatMap(String reason) {
        return observable -> observable.zipWith(Observable.range((int)1, (int)Integer.MAX_VALUE), (t, repeatAttempt) -> {
            if (t instanceof Throwable) {
                this.log.warn("Exception [{}]", (Object)ExceptionUtils.getMessage((Throwable)((Throwable)t)));
            }
            return repeatAttempt;
        }).flatMap(repeatAttempt -> {
            long retryAfterMillis = this.eventHandler.getRetryAfterMillis();
            Preconditions.checkArgument((retryAfterMillis > 0L ? 1 : 0) != 0, (Object)"RetryAfterMillis must be greater than 0");
            this.log.debug("Restart after [{}] running [{}] reason [{}] attempt : [{}]", new Object[]{retryAfterMillis, this.running.get(), reason, repeatAttempt});
            return Observable.timer((long)retryAfterMillis, (TimeUnit)TimeUnit.MILLISECONDS);
        }).takeUntil(stopPredicate -> !this.running.get());
    }

    private Action1<HttpResponseChunk> getAction() {
        return chunk -> {
            block5: {
                if (this.running.get()) {
                    try {
                        if (chunk.getStatusCode() == 200) {
                            this.log.trace("Chunk response event [{}]", (Object)chunk.getContent());
                            this.eventHandler.onResponse(chunk.getContent());
                            break block5;
                        }
                        this.log.error("Chunk response error [{}] / [{}]", (Object)chunk.getStatusCode(), (Object)chunk.getContent());
                        this.eventHandler.onErrorResponse(chunk.getStatusCode(), chunk.getContent());
                    }
                    catch (Throwable t) {
                        this.log.error("Unexpected handler error [{}]", (Object)ExceptionUtils.getMessage((Throwable)t));
                        ThrowableUtils.throwException(t);
                    }
                } else {
                    this.log.error("Receiving payload but not running");
                }
            }
        };
    }

    @Override
    public void close() throws IOException {
        this.log.info("Stopping HTTP event receiver");
        if (!this.running.compareAndSet(true, false)) {
            this.log.debug("HTTP reactive receiver is already stopped");
            return;
        }
        if (null != this.subscription) {
            try {
                this.subscription.unsubscribe();
            }
            finally {
                this.subscription = null;
            }
        }
        this.eventHandler.close();
    }

    public boolean isRunning() {
        return this.running.get();
    }

    public boolean isSubscribed() {
        Subscription s = this.subscription;
        return null != s && !s.isUnsubscribed();
    }
}

