/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.extension;

import io.reactivex.BackpressureOverflowStrategy;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableOnSubscribe;
import io.smallrye.reactive.messaging.annotations.Emitter;
import io.smallrye.reactive.messaging.annotations.OnOverflow;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EmitterImpl<T>
implements Emitter<T> {
    private final AtomicReference<FlowableEmitter<Message<? extends T>>> internal = new AtomicReference();
    private final Flowable<Message<? extends T>> publisher;
    private static final Logger LOGGER = LoggerFactory.getLogger(EmitterImpl.class);

    EmitterImpl(String name, String overFlowStrategy, long bufferSize, long defaultBufferSize) {
        FlowableOnSubscribe deferred = fe -> {
            if (!this.internal.compareAndSet(null, fe)) {
                fe.onError(new Exception("Emitter already created"));
            }
        };
        if (overFlowStrategy == null) {
            this.publisher = Flowable.create(deferred, BackpressureStrategy.BUFFER).onBackpressureBuffer(defaultBufferSize, () -> LOGGER.error("Buffer full for emitter {}", (Object)name), BackpressureOverflowStrategy.ERROR);
        } else {
            OnOverflow.Strategy strategy = OnOverflow.Strategy.valueOf(overFlowStrategy);
            switch (strategy) {
                case BUFFER: {
                    Flowable p = Flowable.create(deferred, BackpressureStrategy.BUFFER);
                    if (bufferSize > 0L) {
                        this.publisher = p.onBackpressureBuffer(bufferSize, () -> LOGGER.error("Buffer full for emitter {}", (Object)name), BackpressureOverflowStrategy.ERROR);
                        break;
                    }
                    this.publisher = p;
                    break;
                }
                case DROP: {
                    this.publisher = Flowable.create(deferred, BackpressureStrategy.DROP);
                    break;
                }
                case FAIL: {
                    this.publisher = Flowable.create(deferred, BackpressureStrategy.ERROR);
                    break;
                }
                case LATEST: {
                    this.publisher = Flowable.create(deferred, BackpressureStrategy.LATEST);
                    break;
                }
                case NONE: {
                    this.publisher = Flowable.create(deferred, BackpressureStrategy.MISSING);
                    break;
                }
                default: {
                    throw new IllegalArgumentException("Invalid back pressure strategy: " + overFlowStrategy);
                }
            }
        }
    }

    public Publisher<Message<? extends T>> getPublisher() {
        return this.publisher;
    }

    boolean isConnected() {
        return this.internal.get() != null;
    }

    @Override
    public synchronized Emitter<T> send(T msg) {
        if (msg == null) {
            throw new IllegalArgumentException("`null` is not a valid value");
        }
        FlowableEmitter<Message<Message<T>>> emitter = this.verify();
        if (msg instanceof Message) {
            emitter.onNext((Message)msg);
        } else {
            emitter.onNext(Message.of(msg));
        }
        return this;
    }

    private synchronized FlowableEmitter<Message<? extends T>> verify() {
        FlowableEmitter<Message<T>> emitter = this.internal.get();
        if (emitter == null) {
            throw new IllegalStateException("Stream not yet connected");
        }
        if (emitter.isCancelled()) {
            throw new IllegalStateException("Stream has been terminated");
        }
        return emitter;
    }

    @Override
    public synchronized void complete() {
        this.verify().onComplete();
    }

    @Override
    public synchronized void error(Exception e) {
        if (e == null) {
            throw new IllegalArgumentException("`null` is not a valid exception");
        }
        this.verify().onError(e);
    }

    @Override
    public synchronized boolean isCancelled() {
        FlowableEmitter<Message<T>> emitter = this.internal.get();
        return emitter == null || emitter.isCancelled();
    }

    @Override
    public boolean isRequested() {
        FlowableEmitter<Message<T>> emitter = this.internal.get();
        return !this.isCancelled() && emitter.requested() > 0L;
    }
}

