/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.core.io.buffer;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.Channel;
import java.nio.channels.Channels;
import java.nio.channels.CompletionHandler;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.StandardOpenOption;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import org.springframework.core.io.Resource;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.PooledDataBuffer;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SynchronousSink;

public abstract class DataBufferUtils {
    private static final Consumer<DataBuffer> RELEASE_CONSUMER = DataBufferUtils::release;

    public static Flux<DataBuffer> readInputStream(Callable<InputStream> inputStreamSupplier, DataBufferFactory dataBufferFactory, int bufferSize) {
        Assert.notNull(inputStreamSupplier, "'inputStreamSupplier' must not be null");
        return DataBufferUtils.readByteChannel(() -> Channels.newChannel((InputStream)inputStreamSupplier.call()), dataBufferFactory, bufferSize);
    }

    public static Flux<DataBuffer> readByteChannel(Callable<ReadableByteChannel> channelSupplier, DataBufferFactory dataBufferFactory, int bufferSize) {
        Assert.notNull(channelSupplier, "'channelSupplier' must not be null");
        Assert.notNull((Object)dataBufferFactory, "'dataBufferFactory' must not be null");
        Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0");
        return Flux.using(channelSupplier, channel -> {
            ReadableByteChannelGenerator generator = new ReadableByteChannelGenerator((ReadableByteChannel)channel, dataBufferFactory, bufferSize);
            return Flux.generate((Consumer)generator);
        }, DataBufferUtils::closeChannel).doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
    }

    public static Flux<DataBuffer> readAsynchronousFileChannel(Callable<AsynchronousFileChannel> channelSupplier, DataBufferFactory dataBufferFactory, int bufferSize) {
        return DataBufferUtils.readAsynchronousFileChannel(channelSupplier, 0L, dataBufferFactory, bufferSize);
    }

    public static Flux<DataBuffer> readAsynchronousFileChannel(Callable<AsynchronousFileChannel> channelSupplier, long position, DataBufferFactory dataBufferFactory, int bufferSize) {
        Assert.notNull(channelSupplier, "'channelSupplier' must not be null");
        Assert.notNull((Object)dataBufferFactory, "'dataBufferFactory' must not be null");
        Assert.isTrue(position >= 0L, "'position' must be >= 0");
        Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0");
        DataBuffer dataBuffer = dataBufferFactory.allocateBuffer(bufferSize);
        ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, bufferSize);
        Flux result = Flux.using(channelSupplier, channel -> Flux.create(sink2 -> {
            AsynchronousFileChannelReadCompletionHandler completionHandler = new AsynchronousFileChannelReadCompletionHandler((AsynchronousFileChannel)channel, (FluxSink<DataBuffer>)sink2, position, dataBufferFactory, bufferSize);
            channel.read(byteBuffer, position, dataBuffer, completionHandler);
            sink2.onDispose(completionHandler::dispose);
        }), DataBufferUtils::closeChannel);
        return result.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
    }

    public static Flux<DataBuffer> read(Resource resource, DataBufferFactory dataBufferFactory, int bufferSize) {
        return DataBufferUtils.read(resource, 0L, dataBufferFactory, bufferSize);
    }

    public static Flux<DataBuffer> read(Resource resource, long position, DataBufferFactory dataBufferFactory, int bufferSize) {
        try {
            if (resource.isFile()) {
                File file = resource.getFile();
                return DataBufferUtils.readAsynchronousFileChannel(() -> AsynchronousFileChannel.open(file.toPath(), StandardOpenOption.READ), position, dataBufferFactory, bufferSize);
            }
        }
        catch (IOException file) {
        }
        Flux<DataBuffer> result = DataBufferUtils.readByteChannel(resource::readableChannel, dataBufferFactory, bufferSize);
        return position == 0L ? result : DataBufferUtils.skipUntilByteCount(result, position);
    }

    public static Flux<DataBuffer> write(Publisher<DataBuffer> source2, OutputStream outputStream) {
        Assert.notNull(source2, "'source' must not be null");
        Assert.notNull((Object)outputStream, "'outputStream' must not be null");
        WritableByteChannel channel = Channels.newChannel(outputStream);
        return DataBufferUtils.write(source2, channel);
    }

    public static Flux<DataBuffer> write(Publisher<DataBuffer> source2, WritableByteChannel channel) {
        Assert.notNull(source2, "'source' must not be null");
        Assert.notNull((Object)channel, "'channel' must not be null");
        Flux flux = Flux.from(source2);
        return Flux.create(sink2 -> {
            WritableByteChannelSubscriber subscriber = new WritableByteChannelSubscriber((FluxSink<DataBuffer>)sink2, channel);
            sink2.onDispose((Disposable)subscriber);
            flux.subscribe((CoreSubscriber)subscriber);
        });
    }

    public static Flux<DataBuffer> write(Publisher<DataBuffer> source2, AsynchronousFileChannel channel) {
        return DataBufferUtils.write(source2, channel, 0L);
    }

    public static Flux<DataBuffer> write(Publisher<DataBuffer> source2, AsynchronousFileChannel channel, long position) {
        Assert.notNull(source2, "'source' must not be null");
        Assert.notNull((Object)channel, "'channel' must not be null");
        Assert.isTrue(position >= 0L, "'position' must be >= 0");
        Flux flux = Flux.from(source2);
        return Flux.create(sink2 -> {
            AsynchronousFileChannelWriteCompletionHandler completionHandler = new AsynchronousFileChannelWriteCompletionHandler((FluxSink<DataBuffer>)sink2, channel, position);
            sink2.onDispose((Disposable)completionHandler);
            flux.subscribe((CoreSubscriber)completionHandler);
        });
    }

    static void closeChannel(@Nullable Channel channel) {
        if (channel != null && channel.isOpen()) {
            try {
                channel.close();
            }
            catch (IOException iOException) {
                // empty catch block
            }
        }
    }

    public static Flux<DataBuffer> takeUntilByteCount(Publisher<DataBuffer> publisher, long maxByteCount) {
        Assert.notNull(publisher, "Publisher must not be null");
        Assert.isTrue(maxByteCount >= 0L, "'maxByteCount' must be a positive number");
        return Flux.defer(() -> {
            AtomicLong countDown = new AtomicLong(maxByteCount);
            return Flux.from((Publisher)publisher).map(buffer -> {
                long remainder = countDown.addAndGet(-buffer.readableByteCount());
                if (remainder < 0L) {
                    int length = buffer.readableByteCount() + (int)remainder;
                    return buffer.slice(0, length);
                }
                return buffer;
            }).takeUntil(buffer -> countDown.get() <= 0L);
        });
    }

    public static Flux<DataBuffer> skipUntilByteCount(Publisher<DataBuffer> publisher, long maxByteCount) {
        Assert.notNull(publisher, "Publisher must not be null");
        Assert.isTrue(maxByteCount >= 0L, "'maxByteCount' must be a positive number");
        return Flux.defer(() -> {
            AtomicLong countDown = new AtomicLong(maxByteCount);
            return Flux.from((Publisher)publisher).skipUntil(buffer -> {
                long remainder = countDown.addAndGet(-buffer.readableByteCount());
                return remainder < 0L;
            }).map(buffer -> {
                long remainder = countDown.get();
                if (remainder < 0L) {
                    countDown.set(0L);
                    int start = buffer.readableByteCount() + (int)remainder;
                    int length = (int)(-remainder);
                    return buffer.slice(start, length);
                }
                return buffer;
            });
        }).doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
    }

    public static <T extends DataBuffer> T retain(T dataBuffer) {
        if (dataBuffer instanceof PooledDataBuffer) {
            return (T)((PooledDataBuffer)dataBuffer).retain();
        }
        return dataBuffer;
    }

    public static boolean release(@Nullable DataBuffer dataBuffer) {
        PooledDataBuffer pooledDataBuffer;
        if (dataBuffer instanceof PooledDataBuffer && (pooledDataBuffer = (PooledDataBuffer)dataBuffer).isAllocated()) {
            return pooledDataBuffer.release();
        }
        return false;
    }

    public static Consumer<DataBuffer> releaseConsumer() {
        return RELEASE_CONSUMER;
    }

    public static Mono<DataBuffer> join(Publisher<DataBuffer> dataBuffers) {
        Assert.notNull(dataBuffers, "'dataBuffers' must not be null");
        return Flux.from(dataBuffers).collectList().filter(list -> !list.isEmpty()).map(list -> ((DataBuffer)list.get(0)).factory().join((List<? extends DataBuffer>)list)).doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
    }

    private static class AsynchronousFileChannelWriteCompletionHandler
    extends BaseSubscriber<DataBuffer>
    implements CompletionHandler<Integer, ByteBuffer> {
        private final FluxSink<DataBuffer> sink;
        private final AsynchronousFileChannel channel;
        private final AtomicBoolean completed = new AtomicBoolean();
        private final AtomicReference<Throwable> error = new AtomicReference();
        private final AtomicLong position;
        private final AtomicReference<DataBuffer> dataBuffer = new AtomicReference();

        public AsynchronousFileChannelWriteCompletionHandler(FluxSink<DataBuffer> sink2, AsynchronousFileChannel channel, long position) {
            this.sink = sink2;
            this.channel = channel;
            this.position = new AtomicLong(position);
        }

        protected void hookOnSubscribe(Subscription subscription) {
            this.request(1L);
        }

        protected void hookOnNext(DataBuffer value) {
            if (!this.dataBuffer.compareAndSet(null, value)) {
                throw new IllegalStateException();
            }
            ByteBuffer byteBuffer = value.asByteBuffer();
            this.channel.write(byteBuffer, this.position.get(), byteBuffer, this);
        }

        protected void hookOnError(Throwable throwable) {
            this.error.set(throwable);
            if (this.dataBuffer.get() == null) {
                this.sink.error(throwable);
            }
        }

        protected void hookOnComplete() {
            this.completed.set(true);
            if (this.dataBuffer.get() == null) {
                this.sink.complete();
            }
        }

        @Override
        public void completed(Integer written, ByteBuffer byteBuffer) {
            long pos = this.position.addAndGet(written.intValue());
            if (byteBuffer.hasRemaining()) {
                this.channel.write(byteBuffer, pos, byteBuffer, this);
                return;
            }
            this.sinkDataBuffer();
            Throwable throwable = this.error.get();
            if (throwable != null) {
                this.sink.error(throwable);
            } else if (this.completed.get()) {
                this.sink.complete();
            } else {
                this.request(1L);
            }
        }

        @Override
        public void failed(Throwable exc, ByteBuffer byteBuffer) {
            this.sinkDataBuffer();
            this.sink.error(exc);
        }

        private void sinkDataBuffer() {
            DataBuffer dataBuffer = this.dataBuffer.get();
            Assert.state(dataBuffer != null, "DataBuffer should not be null");
            this.sink.next((Object)dataBuffer);
            this.dataBuffer.set(null);
        }
    }

    private static class WritableByteChannelSubscriber
    extends BaseSubscriber<DataBuffer> {
        private final FluxSink<DataBuffer> sink;
        private final WritableByteChannel channel;

        public WritableByteChannelSubscriber(FluxSink<DataBuffer> sink2, WritableByteChannel channel) {
            this.sink = sink2;
            this.channel = channel;
        }

        protected void hookOnSubscribe(Subscription subscription) {
            this.request(1L);
        }

        protected void hookOnNext(DataBuffer dataBuffer) {
            try {
                ByteBuffer byteBuffer = dataBuffer.asByteBuffer();
                while (byteBuffer.hasRemaining()) {
                    this.channel.write(byteBuffer);
                }
                this.sink.next((Object)dataBuffer);
                this.request(1L);
            }
            catch (IOException ex) {
                this.sink.next((Object)dataBuffer);
                this.sink.error((Throwable)ex);
            }
        }

        protected void hookOnError(Throwable throwable) {
            this.sink.error(throwable);
        }

        protected void hookOnComplete() {
            this.sink.complete();
        }
    }

    private static class AsynchronousFileChannelReadCompletionHandler
    implements CompletionHandler<Integer, DataBuffer> {
        private final AsynchronousFileChannel channel;
        private final FluxSink<DataBuffer> sink;
        private final DataBufferFactory dataBufferFactory;
        private final int bufferSize;
        private final AtomicLong position;
        private final AtomicBoolean disposed = new AtomicBoolean();

        public AsynchronousFileChannelReadCompletionHandler(AsynchronousFileChannel channel, FluxSink<DataBuffer> sink2, long position, DataBufferFactory dataBufferFactory, int bufferSize) {
            this.channel = channel;
            this.sink = sink2;
            this.position = new AtomicLong(position);
            this.dataBufferFactory = dataBufferFactory;
            this.bufferSize = bufferSize;
        }

        @Override
        public void completed(Integer read2, DataBuffer dataBuffer) {
            if (read2 != -1) {
                long pos = this.position.addAndGet(read2.intValue());
                dataBuffer.writePosition(read2);
                this.sink.next((Object)dataBuffer);
                if (!this.disposed.get()) {
                    DataBuffer newDataBuffer = this.dataBufferFactory.allocateBuffer(this.bufferSize);
                    ByteBuffer newByteBuffer = newDataBuffer.asByteBuffer(0, this.bufferSize);
                    this.channel.read(newByteBuffer, pos, newDataBuffer, this);
                }
            } else {
                DataBufferUtils.release(dataBuffer);
                this.sink.complete();
            }
        }

        @Override
        public void failed(Throwable exc, DataBuffer dataBuffer) {
            DataBufferUtils.release(dataBuffer);
            this.sink.error(exc);
        }

        public void dispose() {
            this.disposed.set(true);
        }
    }

    private static class ReadableByteChannelGenerator
    implements Consumer<SynchronousSink<DataBuffer>> {
        private final ReadableByteChannel channel;
        private final DataBufferFactory dataBufferFactory;
        private final int bufferSize;

        public ReadableByteChannelGenerator(ReadableByteChannel channel, DataBufferFactory dataBufferFactory, int bufferSize) {
            this.channel = channel;
            this.dataBufferFactory = dataBufferFactory;
            this.bufferSize = bufferSize;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void accept(SynchronousSink<DataBuffer> sink2) {
            boolean release = true;
            DataBuffer dataBuffer = this.dataBufferFactory.allocateBuffer(this.bufferSize);
            try {
                ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, dataBuffer.capacity());
                int read2 = this.channel.read(byteBuffer);
                if (read2 >= 0) {
                    dataBuffer.writePosition(read2);
                    release = false;
                    sink2.next((Object)dataBuffer);
                } else {
                    sink2.complete();
                }
            }
            catch (IOException ex) {
                sink2.error((Throwable)ex);
            }
            finally {
                if (release) {
                    DataBufferUtils.release(dataBuffer);
                }
            }
        }
    }
}

