/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.mutiny.operators.multi.builders;

import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.mutiny.helpers.queues.Queues;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.smallrye.mutiny.operators.multi.builders.BaseMultiEmitter;
import io.smallrye.mutiny.subscription.MultiEmitter;
import io.smallrye.mutiny.subscription.MultiSubscriber;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.reactivestreams.Subscription;

public class SerializedMultiEmitter<T>
implements MultiEmitter<T>,
MultiSubscriber<T> {
    private final AtomicInteger wip = new AtomicInteger();
    private final BaseMultiEmitter<T> downstream;
    private final AtomicReference<Throwable> failure = new AtomicReference();
    private final Queue<T> queue = Queues.createMpscQueue();
    private volatile boolean done;

    SerializedMultiEmitter(BaseMultiEmitter<T> downstream) {
        this.downstream = downstream;
    }

    @Override
    public void onSubscribe(Subscription s) {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onItem(T item) {
        if (this.downstream.isCancelled() || this.done) {
            return;
        }
        if (item == null) {
            this.onFailure(new NullPointerException("`onItem` called with `null`"));
            return;
        }
        if (this.wip.compareAndSet(0, 1)) {
            this.downstream.emit(item);
            if (this.wip.decrementAndGet() == 0) {
                return;
            }
        } else {
            Queue<T> q;
            Queue<T> queue = q = this.queue;
            synchronized (queue) {
                q.offer(item);
            }
            if (this.wip.getAndIncrement() != 0) {
                return;
            }
        }
        this.drainLoop();
    }

    @Override
    public void onFailure(Throwable failure) {
        if (this.downstream.isCancelled() || this.done) {
            Infrastructure.handleDroppedException(failure);
            return;
        }
        if (failure == null) {
            failure = new NullPointerException("failure cannot be `null`");
        }
        if (this.failure.compareAndSet(null, failure)) {
            this.done = true;
            this.drain();
        }
    }

    @Override
    public void onCompletion() {
        if (this.downstream.isCancelled() || this.done) {
            return;
        }
        this.done = true;
        this.drain();
    }

    void drain() {
        if (this.wip.getAndIncrement() == 0) {
            this.drainLoop();
        }
    }

    void drainLoop() {
        BaseMultiEmitter<T> emitter = this.downstream;
        Queue<T> q = this.queue;
        int missed = 1;
        while (true) {
            boolean isEmpty;
            if (emitter.isCancelled()) {
                q.clear();
                return;
            }
            if (this.failure.get() != null) {
                q.clear();
                emitter.fail(this.failure.getAndSet(Subscriptions.TERMINATED));
                return;
            }
            boolean isDone = this.done;
            T item = q.poll();
            boolean bl = isEmpty = item == null;
            if (isDone && isEmpty) {
                emitter.complete();
                return;
            }
            if (!isEmpty) {
                emitter.emit(item);
                continue;
            }
            if ((missed = this.wip.addAndGet(-missed)) == 0) break;
        }
    }

    @Override
    public MultiEmitter<T> emit(T item) {
        this.onItem(item);
        return this;
    }

    @Override
    public void fail(Throwable failure) {
        this.onFailure(failure);
    }

    @Override
    public void complete() {
        this.onCompletion();
    }

    @Override
    public MultiEmitter<T> onTermination(Runnable onTermination) {
        this.downstream.onTermination(onTermination);
        return this;
    }

    @Override
    public boolean isCancelled() {
        return this.downstream.isCancelled();
    }

    @Override
    public long requested() {
        return this.downstream.requested();
    }
}

