/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.mutiny.context;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.helpers.StrictMultiSubscriber;
import io.smallrye.mutiny.infrastructure.MultiInterceptor;
import io.smallrye.mutiny.operators.AbstractMulti;
import io.smallrye.mutiny.subscription.MultiSubscriber;
import java.util.Objects;
import java.util.concurrent.Executor;
import org.eclipse.microprofile.context.ThreadContext;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public abstract class ContextPropagationMultiInterceptor
implements MultiInterceptor {
    @Override
    public <T> Subscriber<? super T> onSubscription(Publisher<? extends T> instance, Subscriber<? super T> subscriber) {
        Executor executor = this.getThreadContext().currentContextExecutor();
        return new ContextPropagationSubscriber<T>(executor, subscriber);
    }

    @Override
    public <T> Multi<T> onMultiCreation(Multi<T> multi) {
        Executor executor = this.getThreadContext().currentContextExecutor();
        return new ContextPropagationMulti<T>(executor, multi);
    }

    protected abstract ThreadContext getThreadContext();

    public static class ContextPropagationSubscriber<T>
    implements Subscriber<T> {
        private final Executor executor;
        private final Subscriber<? super T> subscriber;

        public ContextPropagationSubscriber(Executor executor, Subscriber<? super T> subscriber) {
            this.executor = executor;
            this.subscriber = subscriber;
        }

        @Override
        public void onSubscribe(Subscription subscription) {
            this.executor.execute(() -> this.subscriber.onSubscribe(subscription));
        }

        @Override
        public void onNext(T item) {
            Objects.requireNonNull(item);
            this.executor.execute(() -> this.subscriber.onNext(item));
        }

        @Override
        public void onError(Throwable failure) {
            Objects.requireNonNull(failure);
            this.executor.execute(() -> this.subscriber.onError(failure));
        }

        @Override
        public void onComplete() {
            this.executor.execute(this.subscriber::onComplete);
        }
    }

    private static class ContextPropagationMulti<T>
    extends AbstractMulti<T> {
        private final Executor executor;
        private final Multi<T> multi;

        public ContextPropagationMulti(Executor executor, Multi<T> multi) {
            this.executor = executor;
            this.multi = multi;
        }

        @Override
        public void subscribe(final Subscriber<? super T> subscriber) {
            Objects.requireNonNull(subscriber);
            this.executor.execute(new Runnable(){

                @Override
                public void run() {
                    if (subscriber instanceof MultiSubscriber) {
                        multi.subscribe(subscriber);
                    } else {
                        multi.subscribe(new StrictMultiSubscriber(subscriber));
                    }
                }
            });
        }
    }
}

