/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.fs.azurebfs.services;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Locale;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.io.ElasticByteBufferPool;

public class AbfsOutputStream
extends OutputStream
implements Syncable,
StreamCapabilities {
    private final AbfsClient client;
    private final String path;
    private long position;
    private boolean closed;
    private boolean supportFlush;
    private volatile IOException lastError;
    private long lastFlushOffset;
    private long lastTotalAppendOffset = 0L;
    private final int bufferSize;
    private byte[] buffer;
    private int bufferIndex;
    private final int maxConcurrentRequestCount;
    private ConcurrentLinkedDeque<WriteOperation> writeOperations;
    private final ThreadPoolExecutor threadExecutor;
    private final ExecutorCompletionService<Void> completionService;
    private final ElasticByteBufferPool byteBufferPool = new ElasticByteBufferPool();

    public AbfsOutputStream(AbfsClient client, String path, long position, int bufferSize, boolean supportFlush) {
        this.client = client;
        this.path = path;
        this.position = position;
        this.closed = false;
        this.supportFlush = supportFlush;
        this.lastError = null;
        this.lastFlushOffset = 0L;
        this.bufferSize = bufferSize;
        this.buffer = this.byteBufferPool.getBuffer(false, bufferSize).array();
        this.bufferIndex = 0;
        this.writeOperations = new ConcurrentLinkedDeque();
        this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors();
        this.threadExecutor = new ThreadPoolExecutor(this.maxConcurrentRequestCount, this.maxConcurrentRequestCount, 10L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
        this.completionService = new ExecutorCompletionService(this.threadExecutor);
    }

    public boolean hasCapability(String capability) {
        switch (capability.toLowerCase(Locale.ENGLISH)) {
            case "hsync": 
            case "hflush": {
                return this.supportFlush;
            }
        }
        return false;
    }

    @Override
    public void write(int byteVal) throws IOException {
        this.write(new byte[]{(byte)(byteVal & 0xFF)});
    }

    @Override
    public synchronized void write(byte[] data, int off, int length) throws IOException {
        this.maybeThrowLastError();
        Preconditions.checkArgument((data != null ? 1 : 0) != 0, (Object)"null data");
        if (off < 0 || length < 0 || length > data.length - off) {
            throw new IndexOutOfBoundsException();
        }
        int currentOffset = off;
        int writableBytes = this.bufferSize - this.bufferIndex;
        int numberOfBytesToWrite = length;
        while (numberOfBytesToWrite > 0) {
            if (writableBytes <= numberOfBytesToWrite) {
                System.arraycopy(data, currentOffset, this.buffer, this.bufferIndex, writableBytes);
                this.bufferIndex += writableBytes;
                this.writeCurrentBufferToService();
                currentOffset += writableBytes;
                numberOfBytesToWrite -= writableBytes;
            } else {
                System.arraycopy(data, currentOffset, this.buffer, this.bufferIndex, numberOfBytesToWrite);
                this.bufferIndex += numberOfBytesToWrite;
                numberOfBytesToWrite = 0;
            }
            writableBytes = this.bufferSize - this.bufferIndex;
        }
    }

    private void maybeThrowLastError() throws IOException {
        if (this.lastError != null) {
            throw this.lastError;
        }
    }

    @Override
    public void flush() throws IOException {
        if (this.supportFlush) {
            this.flushInternalAsync();
        }
    }

    public void hsync() throws IOException {
        if (this.supportFlush) {
            this.flushInternal(false);
        }
    }

    public void hflush() throws IOException {
        if (this.supportFlush) {
            this.flushInternal(false);
        }
    }

    @Override
    public synchronized void close() throws IOException {
        if (this.closed) {
            return;
        }
        try {
            this.flushInternal(true);
            this.threadExecutor.shutdown();
        }
        finally {
            this.lastError = new IOException("Stream is closed!");
            this.buffer = null;
            this.bufferIndex = 0;
            this.closed = true;
            this.writeOperations.clear();
            if (!this.threadExecutor.isShutdown()) {
                this.threadExecutor.shutdownNow();
            }
        }
    }

    private synchronized void flushInternal(boolean isClose) throws IOException {
        this.maybeThrowLastError();
        this.writeCurrentBufferToService();
        this.flushWrittenBytesToService(isClose);
    }

    private synchronized void flushInternalAsync() throws IOException {
        this.maybeThrowLastError();
        this.writeCurrentBufferToService();
        this.flushWrittenBytesToServiceAsync();
    }

    private synchronized void writeCurrentBufferToService() throws IOException {
        if (this.bufferIndex == 0) {
            return;
        }
        final byte[] bytes = this.buffer;
        final int bytesLength = this.bufferIndex;
        this.buffer = this.byteBufferPool.getBuffer(false, this.bufferSize).array();
        this.bufferIndex = 0;
        final long offset = this.position;
        this.position += (long)bytesLength;
        if (this.threadExecutor.getQueue().size() >= this.maxConcurrentRequestCount * 2) {
            this.waitForTaskToComplete();
        }
        Future<Void> job = this.completionService.submit(new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                AbfsOutputStream.this.client.append(AbfsOutputStream.this.path, offset, bytes, 0, bytesLength);
                AbfsOutputStream.this.byteBufferPool.putBuffer(ByteBuffer.wrap(bytes));
                return null;
            }
        });
        this.writeOperations.add(new WriteOperation(job, offset, bytesLength));
        this.shrinkWriteOperationQueue();
    }

    private synchronized void flushWrittenBytesToService(boolean isClose) throws IOException {
        for (WriteOperation writeOperation : this.writeOperations) {
            try {
                writeOperation.task.get();
            }
            catch (Exception ex2) {
                AzureBlobFileSystemException ex2;
                if (ex2.getCause() instanceof AbfsRestOperationException && ((AbfsRestOperationException)ex2.getCause()).getStatusCode() == 404) {
                    throw new FileNotFoundException(ex2.getMessage());
                }
                if (ex2.getCause() instanceof AzureBlobFileSystemException) {
                    ex2 = (AzureBlobFileSystemException)ex2.getCause();
                }
                this.lastError = new IOException(ex2);
                throw this.lastError;
            }
        }
        this.flushWrittenBytesToServiceInternal(this.position, false, isClose);
    }

    private synchronized void flushWrittenBytesToServiceAsync() throws IOException {
        this.shrinkWriteOperationQueue();
        if (this.lastTotalAppendOffset > this.lastFlushOffset) {
            this.flushWrittenBytesToServiceInternal(this.lastTotalAppendOffset, true, false);
        }
    }

    private synchronized void flushWrittenBytesToServiceInternal(long offset, boolean retainUncommitedData, boolean isClose) throws IOException {
        try {
            this.client.flush(this.path, offset, retainUncommitedData, isClose);
        }
        catch (AzureBlobFileSystemException ex) {
            if (ex instanceof AbfsRestOperationException && ((AbfsRestOperationException)ex).getStatusCode() == 404) {
                throw new FileNotFoundException(ex.getMessage());
            }
            throw new IOException(ex);
        }
        this.lastFlushOffset = offset;
    }

    private synchronized void shrinkWriteOperationQueue() throws IOException {
        try {
            while (this.writeOperations.peek() != null && this.writeOperations.peek().task.isDone()) {
                this.writeOperations.peek().task.get();
                this.lastTotalAppendOffset += this.writeOperations.peek().length;
                this.writeOperations.remove();
            }
        }
        catch (Exception e) {
            this.lastError = e.getCause() instanceof AzureBlobFileSystemException ? (AzureBlobFileSystemException)e.getCause() : new IOException(e);
            throw this.lastError;
        }
    }

    private void waitForTaskToComplete() throws IOException {
        boolean completed = false;
        while (this.completionService.poll() != null) {
            completed = true;
        }
        if (!completed) {
            try {
                this.completionService.take();
            }
            catch (InterruptedException e) {
                this.lastError = (IOException)new InterruptedIOException(e.toString()).initCause(e);
                throw this.lastError;
            }
        }
    }

    @VisibleForTesting
    public synchronized void waitForPendingUploads() throws IOException {
        this.waitForTaskToComplete();
    }

    private static class WriteOperation {
        private final Future<Void> task;
        private final long startOffset;
        private final long length;

        WriteOperation(Future<Void> task, long startOffset, long length) {
            Preconditions.checkNotNull(task, (Object)"task");
            Preconditions.checkArgument((startOffset >= 0L ? 1 : 0) != 0, (Object)"startOffset");
            Preconditions.checkArgument((length >= 0L ? 1 : 0) != 0, (Object)"length");
            this.task = task;
            this.startOffset = startOffset;
            this.length = length;
        }
    }
}

