/*
 * Decompiled with CFR 0.152.
 */
package alluxio.client.block.stream;

import alluxio.client.block.stream.DataWriter;
import alluxio.client.file.FileSystemContext;
import alluxio.client.file.options.OutStreamOptions;
import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;
import alluxio.exception.status.AlluxioStatusException;
import alluxio.exception.status.CanceledException;
import alluxio.exception.status.DeadlineExceededException;
import alluxio.grpc.RequestType;
import alluxio.network.protocol.RPCProtoMessage;
import alluxio.proto.dataserver.Protocol;
import alluxio.proto.status.Status;
import alluxio.resource.LockResource;
import alluxio.shaded.client.com.google.common.base.Preconditions;
import alluxio.shaded.client.com.google.common.base.Throwables;
import alluxio.shaded.client.io.netty.buffer.ByteBuf;
import alluxio.shaded.client.io.netty.channel.Channel;
import alluxio.shaded.client.io.netty.channel.ChannelFuture;
import alluxio.shaded.client.io.netty.channel.ChannelFutureListener;
import alluxio.shaded.client.io.netty.channel.ChannelHandlerContext;
import alluxio.shaded.client.io.netty.channel.ChannelInboundHandlerAdapter;
import alluxio.shaded.client.javax.annotation.concurrent.GuardedBy;
import alluxio.shaded.client.javax.annotation.concurrent.NotThreadSafe;
import alluxio.util.CommonUtils;
import alluxio.util.proto.ProtoMessage;
import alluxio.wire.WorkerNetAddress;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
public final class NettyDataWriter
implements DataWriter {
    private static final Logger LOG = LoggerFactory.getLogger(NettyDataWriter.class);
    private static final int MAX_PACKETS_IN_FLIGHT = Configuration.getInt(PropertyKey.USER_NETWORK_NETTY_WRITER_BUFFER_SIZE_PACKETS);
    private static final long WRITE_TIMEOUT_MS = Configuration.getMs(PropertyKey.USER_NETWORK_NETTY_TIMEOUT_MS);
    private static final long CLOSE_TIMEOUT_MS = Configuration.getMs(PropertyKey.USER_NETWORK_NETTY_WRITER_CLOSE_TIMEOUT_MS);
    private final FileSystemContext mContext;
    private final Channel mChannel;
    private final WorkerNetAddress mAddress;
    private final long mLength;
    private final Protocol.WriteRequest mPartialRequest;
    private final long mPacketSize;
    private boolean mClosed;
    private final ReentrantLock mLock = new ReentrantLock();
    @GuardedBy(value="mLock")
    private long mPosToWrite;
    @GuardedBy(value="mLock")
    private long mPosToQueue;
    @GuardedBy(value="mLock")
    private Throwable mPacketWriteException;
    @GuardedBy(value="mLock")
    private boolean mDone;
    @GuardedBy(value="mLock")
    private boolean mEOFSent;
    @GuardedBy(value="mLock")
    private boolean mCancelSent;
    private final Condition mDoneOrFailed = this.mLock.newCondition();
    private final Condition mBufferNotFullOrFailed = this.mLock.newCondition();
    private final Condition mBufferEmptyOrFailed = this.mLock.newCondition();

    public static NettyDataWriter create(FileSystemContext context, WorkerNetAddress address, long id, long length, RequestType type, OutStreamOptions options) throws IOException {
        long packetSize = Configuration.getBytes(PropertyKey.USER_NETWORK_NETTY_WRITER_PACKET_SIZE_BYTES);
        Channel nettyChannel = context.acquireNettyChannel(address);
        return new NettyDataWriter(context, address, id, length, packetSize, type, options, nettyChannel);
    }

    private Protocol.RequestType getRequestType(RequestType requestType) {
        switch (requestType) {
            case ALLUXIO_BLOCK: {
                return Protocol.RequestType.ALLUXIO_BLOCK;
            }
            case UFS_FILE: {
                return Protocol.RequestType.UFS_FILE;
            }
            case UFS_FALLBACK_BLOCK: {
                throw new UnsupportedOperationException("Unsupported request type UFS_FALLBACK_BLOCK. ");
            }
        }
        throw new UnsupportedOperationException("Request type needs to be specified. ");
    }

    private NettyDataWriter(FileSystemContext context, WorkerNetAddress address, long id, long length, long packetSize, RequestType type, OutStreamOptions options, Channel channel) {
        this.mContext = context;
        this.mAddress = address;
        this.mLength = length;
        Protocol.WriteRequest.Builder builder = Protocol.WriteRequest.newBuilder().setId(id).setTier(options.getWriteTier()).setType(this.getRequestType(type));
        if (type == RequestType.UFS_FILE) {
            Protocol.CreateUfsFileOptions ufsFileOptions = Protocol.CreateUfsFileOptions.newBuilder().setUfsPath(options.getUfsPath()).setOwner(options.getOwner()).setGroup(options.getGroup()).setMode(options.getMode().toShort()).setMountId(options.getMountId()).build();
            builder.setCreateUfsFileOptions(ufsFileOptions);
        }
        this.mPartialRequest = builder.buildPartial();
        this.mPacketSize = packetSize;
        this.mChannel = channel;
        this.mChannel.pipeline().addLast(new PacketWriteResponseHandler());
    }

    @Override
    public long pos() {
        try (LockResource lr = new LockResource(this.mLock);){
            long l = this.mPosToQueue;
            return l;
        }
    }

    @Override
    public Optional<String> getUfsContentHash() {
        return Optional.empty();
    }

    /*
     * Exception decompiling
     */
    @Override
    public void writeChunk(ByteBuf buf) throws IOException {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Tried to end blocks [9[DOLOOP]], but top level block is 2[TRYBLOCK]
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.processEndingBlocks(Op04StructuredStatement.java:435)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:484)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    @Override
    public void cancel() {
        if (this.mClosed) {
            return;
        }
        this.sendCancel();
    }

    @Override
    public void flush() throws IOException {
        this.mChannel.flush();
        try (LockResource lr = new LockResource(this.mLock);){
            do {
                if (this.mPosToWrite == this.mPosToQueue) {
                    return;
                }
                if (this.mPacketWriteException == null) continue;
                Throwables.propagateIfPossible(this.mPacketWriteException, IOException.class);
                throw AlluxioStatusException.fromCheckedException(this.mPacketWriteException);
            } while (this.mBufferEmptyOrFailed.await(WRITE_TIMEOUT_MS, TimeUnit.MILLISECONDS));
            throw new DeadlineExceededException(String.format("Timeout flushing to %s for request %s after %dms.", this.mAddress, this.mPartialRequest, WRITE_TIMEOUT_MS));
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new CanceledException(e);
        }
    }

    /*
     * Exception decompiling
     */
    @Override
    public void close() throws IOException {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Tried to end blocks [10[DOLOOP]], but top level block is 3[TRYBLOCK]
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.processEndingBlocks(Op04StructuredStatement.java:435)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:484)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    private boolean tooManyPacketsInFlight() {
        return this.mPosToQueue - this.mPosToWrite >= (long)MAX_PACKETS_IN_FLIGHT * this.mPacketSize;
    }

    private void sendEof() {
        long pos;
        try (LockResource lr = new LockResource(this.mLock);){
            if (this.mEOFSent || this.mCancelSent) {
                return;
            }
            this.mEOFSent = true;
            pos = this.mPosToQueue;
        }
        Protocol.WriteRequest writeRequest = this.mPartialRequest.toBuilder().setOffset(pos).setEof(true).build();
        this.mChannel.writeAndFlush(new RPCProtoMessage(new ProtoMessage(writeRequest), null)).addListener(new EofOrCancelListener());
    }

    private void sendCancel() {
        long pos;
        try (LockResource lr = new LockResource(this.mLock);){
            if (this.mEOFSent || this.mCancelSent) {
                return;
            }
            this.mCancelSent = true;
            pos = this.mPosToQueue;
        }
        Protocol.WriteRequest writeRequest = this.mPartialRequest.toBuilder().setOffset(pos).setCancel(true).build();
        this.mChannel.writeAndFlush(new RPCProtoMessage(new ProtoMessage(writeRequest), null)).addListener(new EofOrCancelListener());
    }

    @Override
    public int chunkSize() {
        return (int)this.mPacketSize;
    }

    @GuardedBy(value="mLock")
    private void updateException(Throwable e) {
        if (this.mPacketWriteException == null || this.mPacketWriteException == e) {
            this.mPacketWriteException = e;
        } else {
            this.mPacketWriteException.addSuppressed(e);
        }
    }

    private final class EofOrCancelListener
    implements ChannelFutureListener {
        EofOrCancelListener() {
        }

        @Override
        public void operationComplete(ChannelFuture future) {
            if (!future.isSuccess()) {
                future.channel().close();
                try (LockResource lr = new LockResource(NettyDataWriter.this.mLock);){
                    NettyDataWriter.this.updateException(future.cause());
                    NettyDataWriter.this.mDoneOrFailed.signal();
                    NettyDataWriter.this.mBufferNotFullOrFailed.signal();
                    NettyDataWriter.this.mBufferEmptyOrFailed.signal();
                }
            }
        }
    }

    private final class WriteListener
    implements ChannelFutureListener {
        private final long mPosToWriteUncommitted;

        WriteListener(long posToWriteUncommitted) {
            this.mPosToWriteUncommitted = posToWriteUncommitted;
        }

        @Override
        public void operationComplete(ChannelFuture future) {
            if (!future.isSuccess()) {
                future.channel().close();
            }
            boolean shouldSendEOF = false;
            try (LockResource lr = new LockResource(NettyDataWriter.this.mLock);){
                Preconditions.checkState(this.mPosToWriteUncommitted - NettyDataWriter.this.mPosToWrite <= NettyDataWriter.this.mPacketSize, "Some packet is not acked.");
                Preconditions.checkState(this.mPosToWriteUncommitted <= NettyDataWriter.this.mLength);
                NettyDataWriter.this.mPosToWrite = this.mPosToWriteUncommitted;
                if (future.cause() != null) {
                    NettyDataWriter.this.updateException(future.cause());
                    NettyDataWriter.this.mDoneOrFailed.signal();
                    NettyDataWriter.this.mBufferNotFullOrFailed.signal();
                    NettyDataWriter.this.mBufferEmptyOrFailed.signal();
                    return;
                }
                if (NettyDataWriter.this.mPosToWrite == NettyDataWriter.this.mPosToQueue) {
                    NettyDataWriter.this.mBufferEmptyOrFailed.signal();
                }
                if (!NettyDataWriter.this.tooManyPacketsInFlight()) {
                    NettyDataWriter.this.mBufferNotFullOrFailed.signal();
                }
                if (NettyDataWriter.this.mPosToWrite == NettyDataWriter.this.mLength) {
                    shouldSendEOF = true;
                }
            }
            if (shouldSendEOF) {
                NettyDataWriter.this.sendEof();
            }
        }
    }

    private final class PacketWriteResponseHandler
    extends ChannelInboundHandlerAdapter {
        PacketWriteResponseHandler() {
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException {
            Preconditions.checkState(this.acceptMessage(msg), "Incorrect response type %s.", msg);
            RPCProtoMessage response = (RPCProtoMessage)msg;
            if (response.getMessage().asResponse().getStatus() != Status.PStatus.CANCELLED) {
                CommonUtils.unwrapResponseFrom(response.getMessage().asResponse(), ctx.channel());
            }
            try (LockResource lr = new LockResource(NettyDataWriter.this.mLock);){
                NettyDataWriter.this.mDone = true;
                NettyDataWriter.this.mDoneOrFailed.signal();
            }
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            LOG.error("Exception is caught when writing block {} to channel {}:", new Object[]{NettyDataWriter.this.mPartialRequest.getId(), ctx.channel(), cause});
            try (LockResource lr = new LockResource(NettyDataWriter.this.mLock);){
                NettyDataWriter.this.updateException(cause);
                NettyDataWriter.this.mBufferNotFullOrFailed.signal();
                NettyDataWriter.this.mDoneOrFailed.signal();
                NettyDataWriter.this.mBufferEmptyOrFailed.signal();
            }
            ctx.close();
        }

        @Override
        public void channelUnregistered(ChannelHandlerContext ctx) {
            LOG.warn("Channel {} is closed.", (Object)ctx.channel());
            try (LockResource lr = new LockResource(NettyDataWriter.this.mLock);){
                if (!NettyDataWriter.this.mDone) {
                    NettyDataWriter.this.updateException(new IOException(String.format("Channel %s is closed when writing block %d.", ctx.channel(), NettyDataWriter.this.mPartialRequest.getId())));
                    NettyDataWriter.this.mBufferNotFullOrFailed.signal();
                    NettyDataWriter.this.mDoneOrFailed.signal();
                    NettyDataWriter.this.mBufferEmptyOrFailed.signal();
                }
            }
            ctx.fireChannelUnregistered();
        }

        private boolean acceptMessage(Object msg) {
            if (msg instanceof RPCProtoMessage) {
                return ((RPCProtoMessage)msg).getMessage().isResponse();
            }
            return false;
        }
    }
}

