/*
 * Decompiled with CFR 0.152.
 */
package ch.squaredesk.nova.comm;

import ch.squaredesk.nova.tuples.Pair;
import com.conversantmedia.util.concurrent.DisruptorBlockingQueue;
import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

public class BackpressuredStreamFromAsyncSource<MessageType> {
    private static final int DEFAULT_MESSAGE_BUFFER_SIZE = 1;
    private final BlockingQueue<MessageType> queue;
    private final Runnable closeAction;
    private final AtomicBoolean shutdown = new AtomicBoolean(false);

    public BackpressuredStreamFromAsyncSource() {
        this(1, null);
    }

    public BackpressuredStreamFromAsyncSource(Runnable closeAction) {
        this(1, closeAction);
    }

    private BackpressuredStreamFromAsyncSource(int messageBufferSize, Runnable closeAction) {
        this.queue = new DisruptorBlockingQueue(messageBufferSize);
        this.closeAction = closeAction;
    }

    public void onNext(MessageType element) {
        if (this.shutdown.get()) {
            throw new IllegalStateException("Stream closed");
        }
        try {
            this.queue.put(element);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    public void onComplete() {
        this.shutdown.set(true);
    }

    public Flowable<MessageType> toFlowable() {
        Flowable theFlowable = Flowable.generate(() -> new Pair(this.queue, (Object)this.shutdown), (queueShutdownPair, emitter) -> {
            Object element = null;
            while (!((AtomicBoolean)queueShutdownPair._2).get() && element == null) {
                try {
                    element = ((BlockingQueue)queueShutdownPair._1).poll(100L, TimeUnit.MILLISECONDS);
                }
                catch (InterruptedException e) {
                    emitter.onComplete();
                    return;
                }
            }
            if (element != null) {
                emitter.onNext(element);
            } else {
                emitter.onComplete();
            }
        }, queueShutdownPair -> {
            if (this.closeAction != null) {
                this.closeAction.run();
            }
        });
        return theFlowable.subscribeOn(Schedulers.io());
    }
}

