/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.reactivex.impl;

import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.Observable;
import io.vertx.core.Handler;
import io.vertx.core.streams.ReadStream;
import java.util.ArrayDeque;
import java.util.function.Function;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class ReadStreamSubscriber<R, J>
implements Subscriber<R>,
ReadStream<J> {
    private static final Runnable NOOP_ACTION = () -> {};
    private static final Throwable DONE_SENTINEL = new Throwable();
    public static final int BUFFER_SIZE = 16;
    private final Function<R, J> adapter;
    private Handler<Void> endHandler;
    private Handler<Throwable> exceptionHandler;
    private Handler<J> elementHandler;
    private boolean paused = false;
    private Throwable completed;
    private ArrayDeque<R> pending = new ArrayDeque();
    private int requested = 0;
    private Subscription subscription;

    public static <R, J> ReadStream<J> asReadStream(Flowable<R> flowable, Function<R, J> adapter) {
        ReadStreamSubscriber<R, J> observer = new ReadStreamSubscriber<R, J>(adapter);
        flowable.subscribe(observer);
        return observer;
    }

    public static <R, J> ReadStream<J> asReadStream(Observable<R> observable, Function<R, J> adapter) {
        return ReadStreamSubscriber.asReadStream(observable.toFlowable(BackpressureStrategy.BUFFER), adapter);
    }

    public ReadStreamSubscriber(Function<R, J> adapter) {
        this.adapter = adapter;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public ReadStream<J> handler(Handler<J> handler) {
        ReadStreamSubscriber readStreamSubscriber = this;
        synchronized (readStreamSubscriber) {
            this.elementHandler = handler;
        }
        this.checkStatus();
        return this;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public ReadStream<J> pause() {
        ReadStreamSubscriber readStreamSubscriber = this;
        synchronized (readStreamSubscriber) {
            this.paused = true;
        }
        return this;
    }

    @Override
    public ReadStream<J> fetch(long amount) {
        throw new UnsupportedOperationException("todo");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public ReadStream<J> resume() {
        ReadStreamSubscriber readStreamSubscriber = this;
        synchronized (readStreamSubscriber) {
            this.paused = false;
        }
        this.checkStatus();
        return this;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onSubscribe(Subscription s) {
        ReadStreamSubscriber readStreamSubscriber = this;
        synchronized (readStreamSubscriber) {
            this.subscription = s;
        }
        this.checkStatus();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void checkStatus() {
        Runnable action = NOOP_ACTION;
        while (true) {
            J adapted;
            Handler<J> handler;
            ReadStreamSubscriber readStreamSubscriber = this;
            synchronized (readStreamSubscriber) {
                if (this.paused || (handler = this.elementHandler) == null || this.pending.size() <= 0) {
                    if (this.completed != null) {
                        if (this.pending.isEmpty()) {
                            Throwable result;
                            Handler<Throwable> onError;
                            if (this.completed != DONE_SENTINEL) {
                                onError = this.exceptionHandler;
                                result = this.completed;
                                this.exceptionHandler = null;
                            } else {
                                onError = null;
                                result = null;
                            }
                            Handler<Void> onCompleted = this.endHandler;
                            this.endHandler = null;
                            action = () -> {
                                try {
                                    if (onError != null) {
                                        onError.handle(result);
                                    }
                                }
                                finally {
                                    if (onCompleted != null) {
                                        onCompleted.handle(null);
                                    }
                                }
                            };
                        }
                    } else if (this.elementHandler != null && this.requested < 8) {
                        int request = 16 - this.requested;
                        action = () -> this.subscription.request(request);
                        this.requested = 16;
                    }
                    break;
                }
                --this.requested;
                R item = this.pending.poll();
                adapted = this.adapter.apply(item);
            }
            handler.handle(adapted);
        }
        action.run();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public ReadStream<J> endHandler(Handler<Void> handler) {
        ReadStreamSubscriber readStreamSubscriber = this;
        synchronized (readStreamSubscriber) {
            if (this.completed == null || this.pending.size() > 0) {
                this.endHandler = handler;
            } else if (handler != null) {
                throw new IllegalStateException();
            }
        }
        return this;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public ReadStream<J> exceptionHandler(Handler<Throwable> handler) {
        ReadStreamSubscriber readStreamSubscriber = this;
        synchronized (readStreamSubscriber) {
            if (this.completed == null || this.pending.size() > 0) {
                this.exceptionHandler = handler;
            } else if (handler != null) {
                throw new IllegalStateException();
            }
        }
        return this;
    }

    @Override
    public void onComplete() {
        this.onError(DONE_SENTINEL);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onError(Throwable e) {
        ReadStreamSubscriber readStreamSubscriber = this;
        synchronized (readStreamSubscriber) {
            if (this.completed != null) {
                return;
            }
            this.completed = e;
        }
        this.checkStatus();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onNext(R item) {
        ReadStreamSubscriber readStreamSubscriber = this;
        synchronized (readStreamSubscriber) {
            this.pending.add(item);
        }
        this.checkStatus();
    }
}

