/*
 * Decompiled with CFR 0.152.
 */
package rx.operators;

import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.Observer;
import rx.Scheduler;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.operators.ChunkedOperation;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;
import rx.subjects.Subject;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;

public final class OperationWindow
extends ChunkedOperation {
    public static <T> Func0<Window<T>> windowMaker() {
        return new Func0<Window<T>>(){

            @Override
            public Window<T> call() {
                return new Window();
            }
        };
    }

    public static <T, TClosing> Observable.OnSubscribeFunc<Observable<T>> window(final Observable<? extends T> source, final Func0<? extends Observable<? extends TClosing>> windowClosingSelector) {
        return new Observable.OnSubscribeFunc<Observable<T>>(){

            @Override
            public Subscription onSubscribe(Observer<? super Observable<T>> observer) {
                ChunkedOperation.NonOverlappingChunks windows = new ChunkedOperation.NonOverlappingChunks(observer, OperationWindow.windowMaker());
                ChunkedOperation.ObservableBasedSingleChunkCreator creator = new ChunkedOperation.ObservableBasedSingleChunkCreator(windows, windowClosingSelector);
                return source.unsafeSubscribe(new ChunkedOperation.ChunkObserver(windows, observer, creator));
            }
        };
    }

    public static <T, TOpening, TClosing> Observable.OnSubscribeFunc<Observable<T>> window(final Observable<? extends T> source, final Observable<? extends TOpening> windowOpenings, final Func1<? super TOpening, ? extends Observable<? extends TClosing>> windowClosingSelector) {
        return new Observable.OnSubscribeFunc<Observable<T>>(){

            @Override
            public Subscription onSubscribe(Observer<? super Observable<T>> observer) {
                ChunkedOperation.OverlappingChunks windows = new ChunkedOperation.OverlappingChunks(observer, OperationWindow.windowMaker());
                ChunkedOperation.ObservableBasedMultiChunkCreator creator = new ChunkedOperation.ObservableBasedMultiChunkCreator(windows, windowOpenings, windowClosingSelector);
                return source.unsafeSubscribe(new ChunkedOperation.ChunkObserver(windows, observer, creator));
            }
        };
    }

    public static <T> Observable.OnSubscribeFunc<Observable<T>> window(Observable<? extends T> source, int count) {
        return OperationWindow.window(source, count, count);
    }

    public static <T> Observable.OnSubscribeFunc<Observable<T>> window(final Observable<? extends T> source, final int count, final int skip) {
        return new Observable.OnSubscribeFunc<Observable<T>>(){

            @Override
            public Subscription onSubscribe(Observer<? super Observable<T>> observer) {
                ChunkedOperation.SizeBasedChunks chunks = new ChunkedOperation.SizeBasedChunks(observer, OperationWindow.windowMaker(), count);
                ChunkedOperation.SkippingChunkCreator creator = new ChunkedOperation.SkippingChunkCreator(chunks, skip);
                return source.unsafeSubscribe(new ChunkedOperation.ChunkObserver(chunks, observer, creator));
            }
        };
    }

    public static <T> Observable.OnSubscribeFunc<Observable<T>> window(Observable<? extends T> source, long timespan, TimeUnit unit) {
        return OperationWindow.window(source, timespan, unit, Schedulers.threadPoolForComputation());
    }

    public static <T> Observable.OnSubscribeFunc<Observable<T>> window(final Observable<? extends T> source, final long timespan, final TimeUnit unit, final Scheduler scheduler) {
        return new Observable.OnSubscribeFunc<Observable<T>>(){

            @Override
            public Subscription onSubscribe(Observer<? super Observable<T>> observer) {
                ChunkedOperation.NonOverlappingChunks windows = new ChunkedOperation.NonOverlappingChunks(observer, OperationWindow.windowMaker());
                ChunkedOperation.TimeBasedChunkCreator creator = new ChunkedOperation.TimeBasedChunkCreator(windows, timespan, unit, scheduler);
                return source.unsafeSubscribe(new ChunkedOperation.ChunkObserver(windows, observer, creator));
            }
        };
    }

    public static <T> Observable.OnSubscribeFunc<Observable<T>> window(Observable<? extends T> source, long timespan, TimeUnit unit, int count) {
        return OperationWindow.window(source, timespan, unit, count, Schedulers.threadPoolForComputation());
    }

    public static <T> Observable.OnSubscribeFunc<Observable<T>> window(final Observable<? extends T> source, final long timespan, final TimeUnit unit, final int count, final Scheduler scheduler) {
        return new Observable.OnSubscribeFunc<Observable<T>>(){

            @Override
            public Subscription onSubscribe(Observer<? super Observable<T>> observer) {
                ChunkedOperation.TimeAndSizeBasedChunks chunks = new ChunkedOperation.TimeAndSizeBasedChunks(observer, OperationWindow.windowMaker(), count, timespan, unit, scheduler);
                ChunkedOperation.SingleChunkCreator creator = new ChunkedOperation.SingleChunkCreator(chunks);
                return source.unsafeSubscribe(new ChunkedOperation.ChunkObserver(chunks, observer, creator));
            }
        };
    }

    public static <T> Observable.OnSubscribeFunc<Observable<T>> window(Observable<? extends T> source, long timespan, long timeshift, TimeUnit unit) {
        return OperationWindow.window(source, timespan, timeshift, unit, Schedulers.threadPoolForComputation());
    }

    public static <T> Observable.OnSubscribeFunc<Observable<T>> window(final Observable<? extends T> source, final long timespan, final long timeshift, final TimeUnit unit, final Scheduler scheduler) {
        return new Observable.OnSubscribeFunc<Observable<T>>(){

            @Override
            public Subscription onSubscribe(Observer<? super Observable<T>> observer) {
                ChunkedOperation.TimeBasedChunks windows = new ChunkedOperation.TimeBasedChunks(observer, OperationWindow.windowMaker(), timespan, unit, scheduler);
                ChunkedOperation.TimeBasedChunkCreator creator = new ChunkedOperation.TimeBasedChunkCreator(windows, timeshift, unit, scheduler);
                return source.unsafeSubscribe(new ChunkedOperation.ChunkObserver(windows, observer, creator));
            }
        };
    }

    public static <T, U> Observable.OnSubscribeFunc<Observable<T>> window(Observable<? extends T> source, Observable<U> boundary) {
        return new WindowViaObservable<T, U>(source, boundary);
    }

    private static final class WindowViaObservable<T, U>
    implements Observable.OnSubscribeFunc<Observable<T>> {
        final Observable<? extends T> source;
        final Observable<U> boundary;

        public WindowViaObservable(Observable<? extends T> source, Observable<U> boundary) {
            this.source = source;
            this.boundary = boundary;
        }

        @Override
        public Subscription onSubscribe(Observer<? super Observable<T>> t1) {
            CompositeSubscription csub = new CompositeSubscription();
            SourceObserver so = new SourceObserver(t1, csub);
            try {
                t1.onNext(so.subject);
            }
            catch (Throwable t) {
                t1.onError(t);
                return Subscriptions.empty();
            }
            csub.add(this.source.unsafeSubscribe(so));
            if (!csub.isUnsubscribed()) {
                csub.add(this.boundary.unsafeSubscribe(new BoundaryObserver(so)));
            }
            return csub;
        }

        private static final class BoundaryObserver<T, U>
        extends Subscriber<U> {
            final SourceObserver<T> so;

            public BoundaryObserver(SourceObserver<T> so) {
                this.so = so;
            }

            @Override
            public void onNext(U args) {
                this.so.replace();
            }

            @Override
            public void onError(Throwable e) {
                this.so.onError(e);
            }

            @Override
            public void onCompleted() {
                this.so.onCompleted();
            }
        }

        private static final class SourceObserver<T>
        extends Subscriber<T> {
            final Observer<? super Observable<T>> observer;
            final Subscription cancel;
            final Object guard;
            Subject<T, T> subject;

            public SourceObserver(Observer<? super Observable<T>> observer, Subscription cancel) {
                this.observer = observer;
                this.cancel = cancel;
                this.guard = new Object();
                this.subject = this.create();
            }

            Subject<T, T> create() {
                return PublishSubject.create();
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void onNext(T args) {
                Object object = this.guard;
                synchronized (object) {
                    if (this.subject == null) {
                        return;
                    }
                    this.subject.onNext(args);
                }
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void onError(Throwable e) {
                Object object = this.guard;
                synchronized (object) {
                    if (this.subject == null) {
                        return;
                    }
                    Subject<T, T> s = this.subject;
                    this.subject = null;
                    s.onError(e);
                    this.observer.onError(e);
                }
                this.cancel.unsubscribe();
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void onCompleted() {
                Object object = this.guard;
                synchronized (object) {
                    if (this.subject == null) {
                        return;
                    }
                    Subject<T, T> s = this.subject;
                    this.subject = null;
                    s.onCompleted();
                    this.observer.onCompleted();
                }
                this.cancel.unsubscribe();
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void replace() {
                try {
                    Object object = this.guard;
                    synchronized (object) {
                        if (this.subject == null) {
                            return;
                        }
                        Subject<T, T> s = this.subject;
                        s.onCompleted();
                        this.subject = this.create();
                        this.observer.onNext(this.subject);
                    }
                }
                catch (Throwable t) {
                    this.onError(t);
                }
            }
        }
    }

    protected static class Window<T>
    extends ChunkedOperation.Chunk<T, Observable<T>> {
        protected Window() {
        }

        @Override
        public Observable<T> getContents() {
            return Observable.from(this.contents);
        }
    }
}

