/*
 * Decompiled with CFR 0.152.
 */
package rx.operators;

import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.operators.NotificationLite;
import rx.schedulers.ImmediateScheduler;
import rx.schedulers.TestScheduler;
import rx.schedulers.TrampolineScheduler;
import rx.subscriptions.Subscriptions;

public class OperatorObserveOnBounded<T>
implements Observable.Operator<T, T> {
    private final Scheduler scheduler;
    private final int bufferSize;
    private final NotificationLite<T> on = NotificationLite.instance();

    public OperatorObserveOnBounded(Scheduler scheduler, int bufferSize) {
        this.scheduler = scheduler;
        this.bufferSize = OperatorObserveOnBounded.roundToNextPowerOfTwoIfNecessary(bufferSize);
    }

    public OperatorObserveOnBounded(Scheduler scheduler) {
        this(scheduler, 1);
    }

    private static int roundToNextPowerOfTwoIfNecessary(int num) {
        if ((num & -num) == num) {
            return num;
        }
        int result = 1;
        while (num != 0) {
            num >>= 1;
            result <<= 1;
        }
        return result;
    }

    @Override
    public Subscriber<? super T> call(Subscriber<? super T> child) {
        if (this.scheduler instanceof ImmediateScheduler) {
            return child;
        }
        if (this.scheduler instanceof TrampolineScheduler) {
            return child;
        }
        if (this.scheduler instanceof TestScheduler) {
            return child;
        }
        return new ObserveOnSubscriber(child);
    }

    private static class InterruptibleBlockingQueue<E> {
        private final Semaphore semaphore;
        private volatile boolean interrupted = false;
        private final E[] buffer;
        private AtomicLong tail = new AtomicLong();
        private AtomicLong head = new AtomicLong();
        private final int capacity;
        private final int mask;

        public InterruptibleBlockingQueue(int size) {
            this.semaphore = new Semaphore(size);
            this.capacity = size;
            this.mask = size - 1;
            this.buffer = new Object[size];
        }

        public void interrupt() {
            this.interrupted = true;
            this.semaphore.release();
        }

        public void addBlocking(E e) throws InterruptedException {
            if (this.interrupted) {
                throw new InterruptedException("Interrupted by Unsubscribe");
            }
            this.semaphore.acquire();
            if (this.interrupted) {
                throw new InterruptedException("Interrupted by Unsubscribe");
            }
            if (e == null) {
                throw new IllegalArgumentException("Can not put null");
            }
            if (this.offer(e)) {
                return;
            }
            throw new IllegalStateException("Queue is full");
        }

        private boolean offer(E e) {
            long _t = this.tail.get();
            if (_t - this.head.get() == (long)this.capacity) {
                return false;
            }
            int index = (int)(_t & (long)this.mask);
            this.buffer[index] = e;
            this.tail.lazySet(_t + 1L);
            return true;
        }

        public E poll() {
            if (this.interrupted) {
                return null;
            }
            long _h = this.head.get();
            if (this.tail.get() == _h) {
                return null;
            }
            int index = (int)(_h & (long)this.mask);
            E v = this.buffer[index];
            this.buffer[index] = null;
            this.head.lazySet(_h + 1L);
            if (v != null) {
                this.semaphore.release();
            }
            return v;
        }

        public int size() {
            long currentHead;
            long currentTail;
            int size;
            do {
                currentHead = this.head.get();
            } while ((size = (int)((currentTail = this.tail.get()) - currentHead)) > this.buffer.length);
            return size;
        }
    }

    private class ObserveOnSubscriber
    extends Subscriber<T> {
        final Subscriber<? super T> observer;
        private volatile Scheduler.Inner recursiveScheduler;
        private final InterruptibleBlockingQueue<Object> queue;
        final AtomicLong counter;

        public ObserveOnSubscriber(Subscriber<? super T> observer) {
            super(observer);
            this.queue = new InterruptibleBlockingQueue(OperatorObserveOnBounded.this.bufferSize);
            this.counter = new AtomicLong(0L);
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            block2: {
                try {
                    this.queue.addBlocking(OperatorObserveOnBounded.this.on.next(t));
                    this.schedule();
                }
                catch (InterruptedException e) {
                    if (this.isUnsubscribed()) break block2;
                    this.onError(e);
                }
            }
        }

        @Override
        public void onCompleted() {
            try {
                this.queue.addBlocking(OperatorObserveOnBounded.this.on.completed());
                this.schedule();
            }
            catch (InterruptedException e) {
                this.onError(e);
            }
        }

        @Override
        public void onError(Throwable e) {
            try {
                this.queue.addBlocking(OperatorObserveOnBounded.this.on.error(e));
                this.schedule();
            }
            catch (InterruptedException e2) {
                this.observer.onError(e2);
            }
        }

        protected void schedule() {
            if (this.counter.getAndIncrement() == 0L) {
                if (this.recursiveScheduler == null) {
                    this.add(Subscriptions.create(new Action0(){

                        @Override
                        public void call() {
                            ObserveOnSubscriber.this.queue.interrupt();
                        }
                    }));
                    this.add(OperatorObserveOnBounded.this.scheduler.schedule(new Action1<Scheduler.Inner>(){

                        @Override
                        public void call(Scheduler.Inner inner) {
                            ObserveOnSubscriber.this.recursiveScheduler = inner;
                            ObserveOnSubscriber.this.pollQueue();
                        }
                    }));
                } else {
                    this.recursiveScheduler.schedule(new Action1<Scheduler.Inner>(){

                        @Override
                        public void call(Scheduler.Inner inner) {
                            ObserveOnSubscriber.this.pollQueue();
                        }
                    });
                }
            }
        }

        private void pollQueue() {
            do {
                Object v = this.queue.poll();
                OperatorObserveOnBounded.this.on.accept(this.observer, v);
            } while (this.counter.decrementAndGet() > 0L);
        }
    }
}

