/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.extension;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.subscription.BackPressureStrategy;
import io.smallrye.mutiny.subscription.MultiEmitter;
import io.smallrye.reactive.messaging.extension.EmitterConfiguration;
import io.smallrye.reactive.messaging.extension.ThrowingEmitter;
import io.smallrye.reactive.messaging.helpers.BroadcastHelper;
import io.smallrye.reactive.messaging.helpers.NoStackTraceException;
import io.smallrye.reactive.messaging.i18n.ProviderExceptions;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.OnOverflow;
import org.reactivestreams.Publisher;

public abstract class AbstractEmitter<T> {
    public static final NoStackTraceException NO_SUBSCRIBER_EXCEPTION = new NoStackTraceException("Unable to process message - no subscriber");
    protected final AtomicReference<MultiEmitter<? super Message<? extends T>>> internal = new AtomicReference();
    protected final Multi<Message<? extends T>> publisher;
    protected final String name;
    protected final AtomicReference<Throwable> synchronousFailure = new AtomicReference();
    private final OnOverflow.Strategy overflow;

    public AbstractEmitter(EmitterConfiguration config, long defaultBufferSize) {
        Multi<Message<T>> tempPublisher;
        this.name = config.name;
        this.overflow = config.overflowBufferStrategy;
        if (defaultBufferSize <= 0L) {
            throw ProviderExceptions.ex.illegalArgumentForDefaultBuffer();
        }
        Consumer<MultiEmitter<? super Message<? extends T>>> deferred = fe -> {
            MultiEmitter previous = this.internal.getAndSet((MultiEmitter<Message<T>>)fe);
            if (previous != null) {
                previous.complete();
            }
        };
        if (config.overflowBufferStrategy == null) {
            Multi multi = Multi.createFrom().emitter(deferred, BackPressureStrategy.BUFFER);
            tempPublisher = this.getPublisherUsingBufferStrategy(defaultBufferSize, multi);
        } else {
            tempPublisher = this.getPublisherForStrategy(config.overflowBufferStrategy, config.overflowBufferSize, defaultBufferSize, deferred);
        }
        this.publisher = config.broadcast ? BroadcastHelper.broadcastPublisher(tempPublisher, config.numberOfSubscriberBeforeConnecting) : tempPublisher;
    }

    public synchronized void complete() {
        MultiEmitter<Message<T>> emitter = this.verify();
        if (emitter != null) {
            emitter.complete();
        }
    }

    public synchronized void error(Exception e) {
        if (e == null) {
            throw ProviderExceptions.ex.illegalArgumentForException("null");
        }
        MultiEmitter<Message<T>> emitter = this.verify();
        if (emitter != null) {
            emitter.fail(e);
        }
    }

    public synchronized boolean isCancelled() {
        MultiEmitter<Message<T>> emitter = this.internal.get();
        return emitter == null || emitter.isCancelled();
    }

    public boolean hasRequests() {
        MultiEmitter<Message<T>> emitter = this.internal.get();
        return !this.isCancelled() && emitter.requested() > 0L;
    }

    Multi<Message<? extends T>> getPublisherForStrategy(OnOverflow.Strategy overFlowStrategy, long bufferSize, long defaultBufferSize, Consumer<MultiEmitter<? super Message<? extends T>>> deferred) {
        switch (overFlowStrategy) {
            case BUFFER: {
                if (bufferSize > 0L) {
                    return ThrowingEmitter.create(deferred, bufferSize);
                }
                return ThrowingEmitter.create(deferred, defaultBufferSize);
            }
            case UNBOUNDED_BUFFER: {
                return Multi.createFrom().emitter(deferred, BackPressureStrategy.BUFFER);
            }
            case THROW_EXCEPTION: {
                return ThrowingEmitter.create(deferred, 0L);
            }
            case DROP: {
                return Multi.createFrom().emitter(deferred, BackPressureStrategy.DROP);
            }
            case FAIL: {
                return Multi.createFrom().emitter(deferred, BackPressureStrategy.ERROR);
            }
            case LATEST: {
                return Multi.createFrom().emitter(deferred, BackPressureStrategy.LATEST);
            }
            case NONE: {
                return Multi.createFrom().emitter(deferred, BackPressureStrategy.IGNORE);
            }
        }
        throw ProviderExceptions.ex.illegalArgumentForBackPressure(overFlowStrategy);
    }

    Multi<Message<? extends T>> getPublisherUsingBufferStrategy(long defaultBufferSize, Multi<Message<? extends T>> stream) {
        int size = (int)defaultBufferSize;
        return stream.onOverflow().buffer(size - 2).onFailure().invoke(this.synchronousFailure::set);
    }

    public Publisher<Message<? extends T>> getPublisher() {
        return this.publisher;
    }

    protected synchronized void emit(Message<? extends T> message) {
        if (message == null) {
            throw ProviderExceptions.ex.illegalArgumentForNullValue();
        }
        MultiEmitter<Message<Message<? extends T>>> emitter = this.verify();
        if (emitter == null) {
            if (this.overflow == OnOverflow.Strategy.DROP) {
                message.nack(NO_SUBSCRIBER_EXCEPTION);
            }
            return;
        }
        if (this.synchronousFailure.get() != null) {
            throw ProviderExceptions.ex.incomingNotFoundForEmitter(this.synchronousFailure.get());
        }
        if (emitter.isCancelled()) {
            throw ProviderExceptions.ex.illegalStateForDownstreamCancel();
        }
        emitter.emit(message);
        if (this.synchronousFailure.get() != null) {
            throw ProviderExceptions.ex.illegalStateForEmitterWhileEmitting(this.synchronousFailure.get());
        }
    }

    protected MultiEmitter<? super Message<? extends T>> verify() {
        MultiEmitter<Message<T>> emitter = this.internal.get();
        if (emitter == null) {
            if (this.overflow == OnOverflow.Strategy.DROP) {
                return null;
            }
            throw ProviderExceptions.ex.noEmitterForChannel(this.name);
        }
        if (emitter.isCancelled()) {
            throw ProviderExceptions.ex.illegalStateForCancelledSubscriber(this.name);
        }
        return emitter;
    }
}

