/*
 * Decompiled with CFR 0.152.
 */
package ru.tinkoff.kora.common.util;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import ru.tinkoff.kora.common.Context;

public class ReactorUtils {
    private static final AtomicReference<Scheduler> CACHED_ELASTIC = new AtomicReference();

    private ReactorUtils() {
    }

    private static Scheduler ioScheduler() {
        Scheduler s = CACHED_ELASTIC.get();
        if (s != null) {
            return s;
        }
        int maxThreads = Math.min(Math.max(Runtime.getRuntime().availableProcessors(), 2) * 8, 64);
        s = Schedulers.newBoundedElastic((int)maxThreads, (int)Integer.MAX_VALUE, (String)"kora-io-", (int)60, (boolean)true);
        if (CACHED_ELASTIC.compareAndSet(null, s)) {
            return s;
        }
        s.dispose();
        return CACHED_ELASTIC.get();
    }

    public static <T> Mono<T> ioMono(Supplier<T> function) {
        return Mono.create(sink -> ReactorUtils.ioScheduler().schedule(() -> ReactorUtils.lambda$ioMono$0(sink, (Supplier)function)));
    }

    public static Mono<Void> ioMono(Runnable function) {
        return ReactorUtils.ioMono(() -> {
            function.run();
            return null;
        });
    }

    public static Mono<ByteBuffer> toByteBufferMono(Flux<ByteBuffer> flux) {
        return flux.reduce((Object)ByteBuffer.allocate(0), (bytes, byteBuffer) -> ByteBuffer.allocate(bytes.remaining() + byteBuffer.remaining()).put((ByteBuffer)bytes).put((ByteBuffer)byteBuffer).rewind());
    }

    public static Mono<ByteBuffer> toByteBufferMono(Publisher<ByteBuffer> flux) {
        return Flux.from(flux).reduce((Object)ByteBuffer.allocate(0), (bytes, byteBuffer) -> ByteBuffer.allocate(bytes.remaining() + byteBuffer.remaining()).put((ByteBuffer)bytes).put((ByteBuffer)byteBuffer).rewind());
    }

    public static Mono<byte[]> toByteArrayMono(Flux<ByteBuffer> flux) {
        return flux.reduce((Object)new byte[0], (bytes, byteBuffer) -> {
            byte[] newArr = Arrays.copyOf(bytes, ((byte[])bytes).length + byteBuffer.remaining());
            byteBuffer.get(newArr, ((byte[])bytes).length, byteBuffer.remaining());
            return newArr;
        });
    }

    public static Mono<byte[]> toByteArrayMono(Flux<ByteBuffer> flux, int limit) {
        return flux.reduce((Object)new byte[0], (bytes, byteBuffer) -> {
            if (((byte[])bytes).length >= limit) {
                return bytes;
            }
            int newLength = Math.min(((byte[])bytes).length + byteBuffer.remaining(), limit);
            byte[] newArr = Arrays.copyOf(bytes, newLength);
            byteBuffer.get(newArr, ((byte[])bytes).length, byteBuffer.remaining());
            return newArr;
        });
    }

    public static Mono<byte[]> toByteArrayMono(Publisher<ByteBuffer> flux) {
        return Flux.from(flux).reduce((Object)new byte[0], (bytes, byteBuffer) -> {
            byte[] newArr = Arrays.copyOf(bytes, ((byte[])bytes).length + byteBuffer.remaining());
            byteBuffer.get(newArr, ((byte[])bytes).length, byteBuffer.remaining());
            return newArr;
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static /* synthetic */ void lambda$ioMono$0(MonoSink sink, Supplier function) {
        Context oldCtx = Context.current();
        try {
            Context.Reactor.current(sink.contextView()).inject();
            Object result = function.get();
            sink.success(result);
        }
        catch (Throwable e) {
            sink.error(e);
        }
        finally {
            oldCtx.inject();
        }
    }
}

