/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.mutiny.operators.multi;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.helpers.ParameterValidation;
import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.mutiny.helpers.queues.Queues;
import io.smallrye.mutiny.operators.multi.AbstractMultiOperator;
import io.smallrye.mutiny.operators.multi.MultiOperatorProcessor;
import io.smallrye.mutiny.subscription.BackPressureFailure;
import io.smallrye.mutiny.subscription.MultiSubscriber;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.reactivestreams.Subscription;

public class MultiEmitOnOp<T>
extends AbstractMultiOperator<T, T> {
    private final Executor executor;
    private final Supplier<? extends Queue<T>> queueSupplier = Queues.get(Queues.BUFFER_S);

    public MultiEmitOnOp(Multi<? extends T> upstream, Executor executor) {
        super(upstream);
        this.executor = ParameterValidation.nonNull(executor, "executor");
    }

    @Override
    public void subscribe(MultiSubscriber<? super T> downstream) {
        ParameterValidation.nonNullNpe(downstream, "subscriber");
        this.upstream.subscribe().withSubscriber(new MultiEmitOnProcessor<T>(downstream, this.executor, this.queueSupplier));
    }

    static final class MultiEmitOnProcessor<T>
    extends MultiOperatorProcessor<T, T>
    implements Runnable {
        private final Executor executor;
        private final int limit;
        private final Queue<T> queue;
        private volatile boolean cancelled;
        private volatile boolean done;
        private final AtomicReference<Throwable> failure = new AtomicReference();
        private final AtomicInteger wip = new AtomicInteger();
        private final AtomicLong requested = new AtomicLong();
        private long produced;

        MultiEmitOnProcessor(MultiSubscriber<? super T> downstream, Executor executor, Supplier<? extends Queue<T>> queueSupplier) {
            super(downstream);
            this.executor = executor;
            this.limit = 16;
            this.queue = queueSupplier.get();
        }

        @Override
        public void onSubscribe(Subscription subscription) {
            if (this.upstream.compareAndSet(null, subscription)) {
                this.downstream.onSubscribe(this);
                subscription.request(16L);
            } else {
                subscription.cancel();
            }
        }

        @Override
        public void onItem(T t) {
            if (this.done) {
                return;
            }
            if (!this.queue.offer(t)) {
                Subscriptions.cancel(this.upstream);
                this.onFailure(new BackPressureFailure("Queue is full, the upstream didn't enforce the requests"));
                this.done = true;
            } else {
                this.schedule();
            }
        }

        @Override
        public void onFailure(Throwable throwable) {
            if (!this.done || !this.cancelled) {
                this.done = true;
                this.failure.set(throwable);
                this.schedule();
            }
        }

        @Override
        public void onCompletion() {
            if (!this.done || !this.cancelled) {
                this.done = true;
                this.schedule();
            }
        }

        @Override
        public void request(long n) {
            if (n > 0L) {
                if (!this.done || !this.cancelled) {
                    Subscriptions.add(this.requested, n);
                    this.schedule();
                }
            } else {
                this.onFailure(Subscriptions.getInvalidRequestException());
            }
        }

        @Override
        public void cancel() {
            if (this.cancelled) {
                return;
            }
            this.cancelled = true;
            Subscriptions.cancel(this.upstream);
            if (this.wip.getAndIncrement() == 0) {
                this.queue.clear();
            }
        }

        void schedule() {
            block3: {
                if (this.wip.getAndIncrement() != 0) {
                    return;
                }
                try {
                    this.executor.execute(this);
                }
                catch (RejectedExecutionException rejected) {
                    Subscription s = this.upstream.getAndSet(Subscriptions.CANCELLED);
                    if (s == Subscriptions.CANCELLED) break block3;
                    this.done = true;
                    Subscriptions.cancel(this.upstream);
                    this.queue.clear();
                    this.downstream.onFailure(rejected);
                    super.cancel();
                }
            }
        }

        @Override
        public void run() {
            int missed = 1;
            Queue<T> q = this.queue;
            long emitted = this.produced;
            while (true) {
                long requests = this.requested.get();
                while (emitted != requests) {
                    boolean empty;
                    boolean wasDone = this.done;
                    T item = q.poll();
                    boolean bl = empty = item == null;
                    if (this.isDoneOrCancelled(wasDone, empty)) {
                        return;
                    }
                    if (empty) break;
                    this.downstream.onItem(item);
                    if (++emitted != (long)this.limit) continue;
                    if (requests != Long.MAX_VALUE) {
                        requests = this.requested.addAndGet(-emitted);
                    }
                    super.request(emitted);
                    emitted = 0L;
                }
                if (emitted == requests && this.isDoneOrCancelled(this.done, q.isEmpty())) {
                    return;
                }
                int w = this.wip.get();
                if (missed == w) {
                    this.produced = emitted;
                    if ((missed = this.wip.addAndGet(-missed)) != 0) continue;
                    break;
                }
                missed = w;
            }
        }

        boolean isDoneOrCancelled(boolean upstreamDone, boolean queueEmpty) {
            if (this.cancelled) {
                this.queue.clear();
                return true;
            }
            Throwable maybeFailure = this.failure.get();
            if (upstreamDone && maybeFailure != null) {
                this.downstream.onFailure(maybeFailure);
                return true;
            }
            if (upstreamDone && queueEmpty) {
                this.downstream.onCompletion();
                return true;
            }
            return false;
        }
    }
}

