/*
 * Decompiled with CFR 0.152.
 */
package rx.operators;

import rx.Observable;
import rx.Observer;
import rx.Scheduler;
import rx.Subscriber;
import rx.Subscription;
import rx.schedulers.Schedulers;
import rx.schedulers.TimeInterval;

public class OperationTimeInterval {
    public static <T> Observable.OnSubscribeFunc<TimeInterval<T>> timeInterval(Observable<? extends T> source) {
        return OperationTimeInterval.timeInterval(source, Schedulers.immediate());
    }

    public static <T> Observable.OnSubscribeFunc<TimeInterval<T>> timeInterval(final Observable<? extends T> source, final Scheduler scheduler) {
        return new Observable.OnSubscribeFunc<TimeInterval<T>>(){

            @Override
            public Subscription onSubscribe(Observer<? super TimeInterval<T>> observer) {
                return source.unsafeSubscribe(new TimeIntervalObserver(observer, scheduler));
            }
        };
    }

    private static class TimeIntervalObserver<T>
    extends Subscriber<T> {
        private final Observer<? super TimeInterval<T>> observer;
        private final Scheduler scheduler;
        private long lastTimestamp;

        public TimeIntervalObserver(Observer<? super TimeInterval<T>> observer, Scheduler scheduler) {
            this.observer = observer;
            this.scheduler = scheduler;
            this.lastTimestamp = scheduler.now();
        }

        @Override
        public void onNext(T args) {
            long nowTimestamp = this.scheduler.now();
            this.observer.onNext(new TimeInterval<T>(nowTimestamp - this.lastTimestamp, args));
            this.lastTimestamp = nowTimestamp;
        }

        @Override
        public void onCompleted() {
            this.observer.onCompleted();
        }

        @Override
        public void onError(Throwable e) {
            this.observer.onCompleted();
        }
    }
}

