/*
 * Decompiled with CFR 0.152.
 */
package rx.operators;

import java.util.Deque;
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import rx.Observable;
import rx.Observer;
import rx.Scheduler;
import rx.Subscriber;
import rx.Subscription;
import rx.operators.SafeObservableSubscription;
import rx.schedulers.Timestamped;

public final class OperationTakeLast {
    public static <T> Observable.OnSubscribeFunc<T> takeLast(final Observable<? extends T> items, final int count) {
        return new Observable.OnSubscribeFunc<T>(){

            @Override
            public Subscription onSubscribe(Observer<? super T> observer) {
                return new TakeLast(items, count).onSubscribe(observer);
            }
        };
    }

    public static <T> Observable.OnSubscribeFunc<T> takeLast(Observable<? extends T> source, long time, TimeUnit unit, Scheduler scheduler) {
        return new TakeLastTimed<T>(source, -1, time, unit, scheduler);
    }

    public static <T> Observable.OnSubscribeFunc<T> takeLast(Observable<? extends T> source, int count, long time, TimeUnit unit, Scheduler scheduler) {
        return new TakeLastTimed<T>(source, count, time, unit, scheduler);
    }

    static final class TakeLastTimedObserver<T>
    extends Subscriber<T> {
        final Observer<? super T> observer;
        final Subscription cancel;
        final long ageMillis;
        final Scheduler scheduler;
        final int count;
        final Deque<Timestamped<T>> buffer = new LinkedList<Timestamped<T>>();

        public TakeLastTimedObserver(Observer<? super T> observer, Subscription cancel, int count, long ageMillis, Scheduler scheduler) {
            this.observer = observer;
            this.cancel = cancel;
            this.ageMillis = ageMillis;
            this.scheduler = scheduler;
            this.count = count;
        }

        protected void runEvictionPolicy(long now) {
            Timestamped<T> v;
            while (this.count >= 0 && this.buffer.size() > this.count) {
                this.buffer.pollFirst();
            }
            while (!this.buffer.isEmpty() && (v = this.buffer.peekFirst()).getTimestampMillis() < now - this.ageMillis) {
                this.buffer.pollFirst();
            }
        }

        @Override
        public void onNext(T args) {
            long t = this.scheduler.now();
            this.buffer.add(new Timestamped<T>(t, args));
            this.runEvictionPolicy(t);
        }

        @Override
        public void onError(Throwable e) {
            this.buffer.clear();
            this.observer.onError(e);
            this.cancel.unsubscribe();
        }

        protected boolean emitBuffer() {
            for (Timestamped<T> v : this.buffer) {
                try {
                    this.observer.onNext(v.getValue());
                }
                catch (Throwable t) {
                    this.buffer.clear();
                    this.observer.onError(t);
                    return false;
                }
            }
            this.buffer.clear();
            return true;
        }

        @Override
        public void onCompleted() {
            this.runEvictionPolicy(this.scheduler.now());
            if (this.emitBuffer()) {
                this.observer.onCompleted();
            }
            this.cancel.unsubscribe();
        }
    }

    static final class TakeLastTimed<T>
    implements Observable.OnSubscribeFunc<T> {
        final Observable<? extends T> source;
        final long ageMillis;
        final Scheduler scheduler;
        final int count;

        public TakeLastTimed(Observable<? extends T> source, int count, long time, TimeUnit unit, Scheduler scheduler) {
            this.source = source;
            this.ageMillis = unit.toMillis(time);
            this.scheduler = scheduler;
            this.count = count;
        }

        @Override
        public Subscription onSubscribe(Observer<? super T> t1) {
            SafeObservableSubscription sas = new SafeObservableSubscription();
            sas.wrap(this.source.unsafeSubscribe(new TakeLastTimedObserver<T>(t1, sas, this.count, this.ageMillis, this.scheduler)));
            return sas;
        }
    }

    private static class TakeLast<T>
    implements Observable.OnSubscribeFunc<T> {
        private final int count;
        private final Observable<? extends T> items;
        private final SafeObservableSubscription subscription = new SafeObservableSubscription();

        TakeLast(Observable<? extends T> items, int count) {
            this.count = count;
            this.items = items;
        }

        @Override
        public Subscription onSubscribe(Observer<? super T> observer) {
            if (this.count < 0) {
                throw new IndexOutOfBoundsException("count could not be negative");
            }
            return this.subscription.wrap(this.items.unsafeSubscribe(new ItemObserver(observer)));
        }

        private class ItemObserver
        extends Subscriber<T> {
            private Deque<T> deque = new LinkedList();
            private final Observer<? super T> observer;
            private final ReentrantLock lock = new ReentrantLock();

            public ItemObserver(Observer<? super T> observer) {
                this.observer = observer;
            }

            @Override
            public void onCompleted() {
                try {
                    for (Object value : this.deque) {
                        this.observer.onNext(value);
                    }
                    this.observer.onCompleted();
                }
                catch (Throwable e) {
                    this.observer.onError(e);
                }
            }

            @Override
            public void onError(Throwable e) {
                this.observer.onError(e);
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void onNext(T value) {
                if (TakeLast.this.count == 0) {
                    return;
                }
                this.lock.lock();
                try {
                    this.deque.offerLast(value);
                    if (this.deque.size() > TakeLast.this.count) {
                        this.deque.removeFirst();
                    }
                }
                catch (Throwable e) {
                    this.observer.onError(e);
                    TakeLast.this.subscription.unsubscribe();
                }
                finally {
                    this.lock.unlock();
                }
            }
        }
    }
}

