/*
 * Decompiled with CFR 0.152.
 */
package rx.operators;

import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.observables.ConnectableObservable;
import rx.operators.SafeObservableSubscription;
import rx.operators.SafeObserver;
import rx.subjects.Subject;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;

public class OperationMulticast {
    public static <T, R> ConnectableObservable<R> multicast(Observable<? extends T> source, Subject<? super T, ? extends R> subject) {
        return new MulticastConnectableObservable<T, R>(source, subject);
    }

    public static <TInput, TIntermediate, TResult> Observable<TResult> multicast(Observable<? extends TInput> source, Func0<? extends Subject<? super TInput, ? extends TIntermediate>> subjectFactory, Func1<? super Observable<TIntermediate>, ? extends Observable<TResult>> selector) {
        return Observable.create(new MulticastSubscribeFunc(source, subjectFactory, selector));
    }

    private static final class MulticastSubscribeFunc<TInput, TIntermediate, TResult>
    implements Observable.OnSubscribeFunc<TResult> {
        final Observable<? extends TInput> source;
        final Func0<? extends Subject<? super TInput, ? extends TIntermediate>> subjectFactory;
        final Func1<? super Observable<TIntermediate>, ? extends Observable<TResult>> resultSelector;

        public MulticastSubscribeFunc(Observable<? extends TInput> source, Func0<? extends Subject<? super TInput, ? extends TIntermediate>> subjectFactory, Func1<? super Observable<TIntermediate>, ? extends Observable<TResult>> resultSelector) {
            this.source = source;
            this.subjectFactory = subjectFactory;
            this.resultSelector = resultSelector;
        }

        @Override
        public Subscription onSubscribe(Observer<? super TResult> t1) {
            Observable<TResult> observable;
            MulticastConnectableObservable<TInput, TIntermediate> connectable;
            try {
                Subject<? super TInput, ? extends TIntermediate> subject = this.subjectFactory.call();
                connectable = new MulticastConnectableObservable<TInput, TIntermediate>(this.source, subject);
                observable = this.resultSelector.call(connectable);
            }
            catch (Throwable t) {
                t1.onError(t);
                return Subscriptions.empty();
            }
            CompositeSubscription csub = new CompositeSubscription();
            csub.add(observable.unsafeSubscribe(new SafeObserver<TResult>(new SafeObservableSubscription(csub), t1)));
            csub.add(((ConnectableObservable)connectable).connect());
            return csub;
        }
    }

    private static class MulticastConnectableObservable<T, R>
    extends ConnectableObservable<R> {
        private final Object lock = new Object();
        private final Observable<? extends T> source;
        private final Subject<? super T, ? extends R> subject;
        private Subscription subscription;

        public MulticastConnectableObservable(Observable<? extends T> source, final Subject<? super T, ? extends R> subject) {
            super(new Observable.OnSubscribe<R>(){

                @Override
                public void call(Subscriber<? super R> observer) {
                    subject.unsafeSubscribe(observer);
                }
            });
            this.source = source;
            this.subject = subject;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Subscription connect() {
            Object object = this.lock;
            synchronized (object) {
                if (this.subscription == null) {
                    this.subscription = this.source.unsafeSubscribe(new Subscriber<T>(){

                        @Override
                        public void onCompleted() {
                            MulticastConnectableObservable.this.subject.onCompleted();
                        }

                        @Override
                        public void onError(Throwable e) {
                            MulticastConnectableObservable.this.subject.onError(e);
                        }

                        @Override
                        public void onNext(T args) {
                            MulticastConnectableObservable.this.subject.onNext(args);
                        }
                    });
                }
            }
            return Subscriptions.create(new Action0(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void call() {
                    Object object = MulticastConnectableObservable.this.lock;
                    synchronized (object) {
                        if (MulticastConnectableObservable.this.subscription != null) {
                            MulticastConnectableObservable.this.subscription.unsubscribe();
                            MulticastConnectableObservable.this.subscription = null;
                        }
                    }
                }
            });
        }
    }
}

