/*
 * Decompiled with CFR 0.152.
 */
package rx.operators;

import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Func1;

public class OperationTakeUntil {
    public static <T, E> Observable<T> takeUntil(Observable<? extends T> source, Observable<? extends E> other) {
        Observable s = Observable.create(new SourceObservable(source));
        Observable o = Observable.create(new OtherObservable(other));
        Observable result = Observable.merge(s, o);
        return result.takeWhile(new Func1<Notification<T>, Boolean>(){

            @Override
            public Boolean call(Notification<T> notification) {
                return !notification.halt;
            }
        }).map(new Func1<Notification<T>, T>(){

            @Override
            public T call(Notification<T> notification) {
                return notification.value;
            }
        });
    }

    private static class OtherObservable<T, E>
    implements Observable.OnSubscribeFunc<Notification<T>> {
        private final Observable<? extends E> sequence;

        private OtherObservable(Observable<? extends E> sequence) {
            this.sequence = sequence;
        }

        @Override
        public Subscription onSubscribe(final Observer<? super Notification<T>> notificationObserver) {
            return this.sequence.unsafeSubscribe(new Subscriber<E>(){

                @Override
                public void onCompleted() {
                    notificationObserver.onNext(Notification.halt());
                }

                @Override
                public void onError(Throwable e) {
                    notificationObserver.onError(e);
                }

                @Override
                public void onNext(E args) {
                    notificationObserver.onNext(Notification.halt());
                }
            });
        }
    }

    private static class SourceObservable<T>
    implements Observable.OnSubscribeFunc<Notification<T>> {
        private final Observable<? extends T> sequence;

        private SourceObservable(Observable<? extends T> sequence) {
            this.sequence = sequence;
        }

        @Override
        public Subscription onSubscribe(final Observer<? super Notification<T>> notificationObserver) {
            return this.sequence.unsafeSubscribe(new Subscriber<T>(){

                @Override
                public void onCompleted() {
                    notificationObserver.onNext(Notification.halt());
                }

                @Override
                public void onError(Throwable e) {
                    notificationObserver.onError(e);
                }

                @Override
                public void onNext(T args) {
                    notificationObserver.onNext(Notification.value(args));
                }
            });
        }
    }

    private static class Notification<T> {
        private final boolean halt;
        private final T value;

        public static <T> Notification<T> value(T value) {
            return new Notification<T>(false, value);
        }

        public static <T> Notification<T> halt() {
            return new Notification<Object>(true, null);
        }

        private Notification(boolean halt, T value) {
            this.halt = halt;
            this.value = value;
        }
    }
}

