/*
 * Decompiled with CFR 0.152.
 */
package ru.tinkoff.kora.common.util;

import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Signal;
import reactor.util.context.ContextView;
import ru.tinkoff.kora.common.Context;

public class ByteBufferFluxInputStream
extends InputStream
implements CoreSubscriber<ByteBuffer> {
    private final BlockingQueue<Signal<ByteBuffer>> queue = new ArrayBlockingQueue<Signal<ByteBuffer>>(4);
    private final AtomicInteger demand = new AtomicInteger(1);
    private final reactor.util.context.Context context = Context.Reactor.inject((ContextView)reactor.util.context.Context.empty(), Context.current());
    private ByteBuffer currentBuffer = null;
    private volatile Subscription subscription = null;
    private volatile boolean completed = false;

    public ByteBufferFluxInputStream(Flux<ByteBuffer> byteBufferFlux) {
        byteBufferFlux.subscribe((CoreSubscriber)this);
    }

    @Override
    public int read() {
        byte[] b = new byte[1];
        int read = this.read(b, 0, 1);
        if (read <= 0) {
            return -1;
        }
        return b[0];
    }

    @Override
    public int read(@Nonnull byte[] b, int off, int len) {
        if (this.completed) {
            return -1;
        }
        while (this.currentBuffer == null || !this.currentBuffer.hasRemaining()) {
            if (this.demand.compareAndSet(0, 1)) {
                this.subscription.request(1L);
            }
            try {
                Signal<ByteBuffer> signal = this.queue.take();
                if (signal.isOnNext()) {
                    this.currentBuffer = (ByteBuffer)signal.get();
                    continue;
                }
                if (signal.isOnError()) {
                    this.completed = true;
                    throw this.toRuntimeException(signal.getThrowable());
                }
                if (!signal.isOnComplete()) continue;
                this.completed = true;
                return -1;
            }
            catch (InterruptedException e) {
                this.completed = true;
                if (this.subscription != null) {
                    this.subscription.cancel();
                }
                throw this.toRuntimeException(e);
            }
        }
        int realLen = Math.min(len, this.currentBuffer.remaining());
        this.currentBuffer.get(b, off, realLen);
        if (!this.currentBuffer.hasRemaining()) {
            this.currentBuffer = null;
        }
        return realLen;
    }

    @Override
    public void close() {
        if (this.completed) {
            return;
        }
        this.completed = true;
        this.subscription.cancel();
        this.currentBuffer = null;
        this.queue.clear();
    }

    private RuntimeException toRuntimeException(Throwable throwable) {
        Throwable unwrap = Exceptions.unwrap((Throwable)throwable);
        if (unwrap instanceof RuntimeException) {
            RuntimeException re = (RuntimeException)unwrap;
            return re;
        }
        return new RuntimeException(unwrap);
    }

    public void onSubscribe(Subscription s) {
        this.subscription = s;
        if (this.completed) {
            s.cancel();
        } else {
            s.request(1L);
        }
    }

    public void onNext(ByteBuffer byteBuffer) {
        if (this.completed) {
            return;
        }
        this.demand.decrementAndGet();
        this.queue.offer((Signal<ByteBuffer>)Signal.next((Object)byteBuffer));
    }

    public void onError(Throwable t) {
        this.queue.offer((Signal<ByteBuffer>)Signal.error((Throwable)t));
    }

    public void onComplete() {
        this.queue.offer((Signal<ByteBuffer>)Signal.complete());
    }

    @Nonnull
    public reactor.util.context.Context currentContext() {
        return this.context;
    }
}

