/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.igfs;

import java.io.DataInput;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.MutableEntry;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.affinity.AffinityKeyMapper;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.FileSystemConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.igfs.IgfsBlockLocation;
import org.apache.ignite.igfs.IgfsException;
import org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper;
import org.apache.ignite.igfs.IgfsOutOfSpaceException;
import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable;
import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerCacheUpdaters;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl;
import org.apache.ignite.internal.processors.igfs.IgfsAckMessage;
import org.apache.ignite.internal.processors.igfs.IgfsBlockKey;
import org.apache.ignite.internal.processors.igfs.IgfsBlockLocationImpl;
import org.apache.ignite.internal.processors.igfs.IgfsBlocksMessage;
import org.apache.ignite.internal.processors.igfs.IgfsEntryInfo;
import org.apache.ignite.internal.processors.igfs.IgfsFileAffinityRange;
import org.apache.ignite.internal.processors.igfs.IgfsFileMap;
import org.apache.ignite.internal.processors.igfs.IgfsFileWorkerBatch;
import org.apache.ignite.internal.processors.igfs.IgfsManager;
import org.apache.ignite.internal.processors.igfs.IgfsUtils;
import org.apache.ignite.internal.processors.igfs.data.IgfsDataPutProcessor;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.CX1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.jetbrains.annotations.Nullable;

public class IgfsDataManager
extends IgfsManager {
    private IgniteInternalCache<IgfsBlockKey, byte[]> dataCachePrj;
    private IgniteInternalCache<Object, Object> dataCache;
    private CountDownLatch dataCacheStartLatch;
    private long grpBlockSize;
    private int grpSize;
    private ByteBufferBlocksWriter byteBufWriter = new ByteBufferBlocksWriter();
    private DataInputBlocksWriter dataInputWriter = new DataInputBlocksWriter();
    private ConcurrentMap<IgniteUuid, WriteCompletionFuture> pendingWrites = new ConcurrentHashMap<IgniteUuid, WriteCompletionFuture>();
    private AtomicLong affKeyGen = new AtomicLong();
    private AtomicLong reqIdCtr = new AtomicLong();
    private Object topic;
    private AsyncDeleteWorker delWorker;
    private String dataCacheName;
    private final ConcurrentHashMap<IgfsBlockKey, IgniteInternalFuture<byte[]>> rmtReadFuts = new ConcurrentHashMap();

    void awaitInit() {
        try {
            this.dataCacheStartLatch.await();
        }
        catch (InterruptedException e) {
            throw new IgniteInterruptedException(e);
        }
    }

    @Override
    protected void start0() throws IgniteCheckedException {
        this.dataCacheStartLatch = new CountDownLatch(1);
        String igfsName = this.igfsCtx.configuration().getName();
        this.topic = F.isEmpty(igfsName) ? GridTopic.TOPIC_IGFS : GridTopic.TOPIC_IGFS.topic(igfsName);
        this.igfsCtx.kernalContext().io().addMessageListener(this.topic, new GridMessageListener(){

            @Override
            public void onMessage(UUID nodeId, Object msg, byte plc) {
                if (msg instanceof IgfsBlocksMessage) {
                    IgfsDataManager.this.processBlocksMessage(nodeId, (IgfsBlocksMessage)msg);
                } else if (msg instanceof IgfsAckMessage) {
                    IgfsDataManager.this.processAckMessage(nodeId, (IgfsAckMessage)msg);
                }
            }
        });
        this.igfsCtx.kernalContext().event().addLocalEventListener(new GridLocalEventListener(){

            @Override
            public void onEvent(Event evt) {
                assert (evt.type() == 12 || evt.type() == 11);
                DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
                if (IgfsDataManager.this.igfsCtx.igfsNode(discoEvt.eventNode())) {
                    for (WriteCompletionFuture future : IgfsDataManager.this.pendingWrites.values()) {
                        future.onError(discoEvt.eventNode().id(), new ClusterTopologyCheckedException("Node left grid before write completed: " + evt.node().id()));
                    }
                }
            }
        }, 11, 12);
        this.delWorker = new AsyncDeleteWorker(this.igfsCtx.kernalContext().igniteInstanceName(), "igfs-" + igfsName + "-delete-worker", this.log);
        this.dataCacheName = this.igfsCtx.configuration().getDataCacheConfiguration().getName();
    }

    @Override
    protected void onKernalStart0() throws IgniteCheckedException {
        this.dataCachePrj = this.igfsCtx.kernalContext().cache().getOrStartCache(this.dataCacheName);
        assert (this.dataCachePrj != null);
        this.dataCache = this.dataCachePrj;
        AffinityKeyMapper mapper = this.igfsCtx.kernalContext().cache().internalCache(this.dataCacheName).configuration().getAffinityMapper();
        this.grpSize = mapper instanceof IgfsGroupDataBlocksKeyMapper ? ((IgfsGroupDataBlocksKeyMapper)mapper).getGroupSize() : 1;
        this.grpBlockSize = (long)this.igfsCtx.configuration().getBlockSize() * (long)this.grpSize;
        assert (this.grpBlockSize != 0L);
        this.igfsCtx.kernalContext().cache().internalCache(this.dataCacheName).preloader().startFuture().listen((IgniteInClosure<IgniteInternalFuture<Object>>)new CI1<IgniteInternalFuture<Object>>(){

            @Override
            public void apply(IgniteInternalFuture<Object> f) {
                IgfsDataManager.this.dataCacheStartLatch.countDown();
            }
        });
        new Thread(this.delWorker).start();
    }

    @Override
    protected void onKernalStop0(boolean cancel) {
        if (cancel) {
            this.delWorker.cancel();
        } else {
            this.delWorker.stop();
        }
        try {
            U.join(this.delWorker);
        }
        catch (IgniteInterruptedCheckedException e) {
            this.log.warning("Got interrupter while waiting for delete worker to stop (will continue stopping).", e);
        }
    }

    public long spaceSize() {
        return this.dataCachePrj.igfsDataSpaceUsed();
    }

    public long maxSpaceSize() {
        DataRegion plc = this.dataCachePrj.context().dataRegion();
        long size = plc != null ? plc.config().getMaxSize() : 0L;
        return size <= 0L ? 0L : size;
    }

    public IgniteUuid nextAffinityKey(@Nullable IgniteUuid prevAffKey) {
        IgniteUuid key;
        if (!this.dataCache.context().affinityNode()) {
            return null;
        }
        UUID nodeId = this.igfsCtx.kernalContext().localNodeId();
        if (prevAffKey != null && this.dataCache.affinity().mapKeyToNode(prevAffKey).isLocal()) {
            return prevAffKey;
        }
        do {
            key = new IgniteUuid(nodeId, this.affKeyGen.getAndIncrement());
        } while (!this.dataCache.affinity().mapKeyToNode(key).isLocal());
        return key;
    }

    public ClusterNode affinityNode(Object affinityKey) {
        return this.dataCache.affinity().mapKeyToNode(affinityKey);
    }

    public Collection<ClusterNode> affinityNodes(Object affinityKey) {
        return this.dataCache.affinity().mapKeyToPrimaryAndBackups(affinityKey);
    }

    private IgniteDataStreamer<IgfsBlockKey, byte[]> dataStreamer() {
        DataStreamerImpl<IgfsBlockKey, byte[]> ldr = this.igfsCtx.kernalContext().dataStream().dataStreamer(this.dataCachePrj.name());
        FileSystemConfiguration cfg = this.igfsCtx.configuration();
        if (cfg.getPerNodeBatchSize() > 0) {
            ldr.perNodeBufferSize(cfg.getPerNodeBatchSize());
        }
        if (cfg.getPerNodeParallelBatchCount() > 0) {
            ldr.perNodeParallelOperations(cfg.getPerNodeParallelBatchCount());
        }
        ldr.receiver(DataStreamerCacheUpdaters.batchedSorted());
        return ldr;
    }

    @Nullable
    public IgniteInternalFuture<byte[]> dataBlock(final IgfsEntryInfo fileInfo, final IgfsPath path, final long blockIdx, final @Nullable IgfsSecondaryFileSystemPositionedReadable secReader) throws IgniteCheckedException {
        assert (fileInfo != null);
        assert (blockIdx >= 0L);
        final IgfsBlockKey key = this.blockKey(blockIdx, fileInfo);
        if (this.log.isDebugEnabled() && this.dataCache.affinity().isPrimaryOrBackup(this.igfsCtx.kernalContext().discovery().localNode(), key)) {
            this.log.debug("Reading non-local data block [path=" + path + ", fileInfo=" + fileInfo + ", blockIdx=" + blockIdx + ']');
        }
        IgniteInternalFuture<byte[]> fut = this.dataCachePrj.getAsync(key);
        if (secReader != null) {
            Executor exec = this.igfsCtx.kernalContext().pools().poolForPolicy((byte)6);
            fut = fut.chain(new CX1<IgniteInternalFuture<byte[]>, byte[]>(){

                @Override
                public byte[] applyx(IgniteInternalFuture<byte[]> fut) throws IgniteCheckedException {
                    byte[] res = fut.get();
                    if (res == null) {
                        GridFutureAdapter<byte[]> rmtReadFut = new GridFutureAdapter<byte[]>();
                        IgniteInternalFuture oldRmtReadFut = IgfsDataManager.this.rmtReadFuts.putIfAbsent(key, rmtReadFut);
                        if (oldRmtReadFut == null) {
                            try {
                                res = IgfsDataManager.this.secondaryDataBlock(path, blockIdx, secReader, fileInfo.blockSize());
                                rmtReadFut.onDone(res);
                                IgfsDataManager.this.putBlock(fileInfo.blockSize(), key, res);
                            }
                            catch (IgniteCheckedException e) {
                                rmtReadFut.onDone(e);
                                throw e;
                            }
                            finally {
                                boolean rmv = IgfsDataManager.this.rmtReadFuts.remove(key, rmtReadFut);
                                assert (rmv);
                            }
                        } else {
                            res = (byte[])oldRmtReadFut.get();
                            IgfsDataManager.this.igfsCtx.metrics().addReadBlocks(1, 0);
                        }
                    } else {
                        IgfsDataManager.this.igfsCtx.metrics().addReadBlocks(1, 0);
                    }
                    return res;
                }
            }, exec);
        } else {
            this.igfsCtx.metrics().addReadBlocks(1, 0);
        }
        return fut;
    }

    @Nullable
    public byte[] secondaryDataBlock(IgfsPath path, long blockIdx, IgfsSecondaryFileSystemPositionedReadable secReader, int blockSize) throws IgniteCheckedException {
        int read;
        if (this.log.isDebugEnabled()) {
            this.log.debug("Reading non-local data block in the secondary file system [path=" + path + ", blockIdx=" + blockIdx + ']');
        }
        long pos = blockIdx * (long)blockSize;
        byte[] res = new byte[blockSize];
        try {
            int r;
            for (read = 0; read < blockSize && (r = secReader.read(pos + (long)read, res, read, blockSize - read)) >= 0; read += r) {
            }
        }
        catch (IOException e) {
            throw new IgniteCheckedException("Failed to read data due to secondary file system exception: " + e.getMessage(), e);
        }
        if (read != blockSize) {
            res = Arrays.copyOf(res, read);
        }
        this.igfsCtx.metrics().addReadBlocks(1, 1);
        return res;
    }

    private void putBlock(int blockSize, IgfsBlockKey key, byte[] data) throws IgniteCheckedException {
        if (data.length < blockSize) {
            this.dataCachePrj.invoke(key, new IgfsDataPutProcessor(data), new Object[0]);
        } else {
            assert (data.length == blockSize);
            this.dataCachePrj.put(key, data);
        }
    }

    public IgniteInternalFuture<Boolean> writeStart(IgniteUuid fileId) {
        WriteCompletionFuture fut = new WriteCompletionFuture(fileId);
        WriteCompletionFuture oldFut = this.pendingWrites.putIfAbsent(fileId, fut);
        assert (oldFut == null) : "Opened write that is being concurrently written: " + fileId;
        if (this.log.isDebugEnabled()) {
            this.log.debug("Registered write completion future for file output stream [fileId=" + fileId + ", fut=" + fut + ']');
        }
        return fut;
    }

    public void writeClose(IgniteUuid fileId) throws IgniteCheckedException {
        WriteCompletionFuture fut = (WriteCompletionFuture)this.pendingWrites.get(fileId);
        if (fut != null) {
            fut.markWaitingLastAck();
        }
    }

    @Nullable
    public byte[] storeDataBlocks(IgfsEntryInfo fileInfo, long reservedLen, @Nullable byte[] remainder, int remainderLen, ByteBuffer data, boolean flush, IgfsFileAffinityRange affinityRange, @Nullable IgfsFileWorkerBatch batch) throws IgniteCheckedException {
        return this.byteBufWriter.storeDataBlocks(fileInfo, reservedLen, remainder, remainderLen, data, data.remaining(), flush, affinityRange, batch);
    }

    @Nullable
    public byte[] storeDataBlocks(IgfsEntryInfo fileInfo, long reservedLen, @Nullable byte[] remainder, int remainderLen, DataInput in, int len, boolean flush, IgfsFileAffinityRange affinityRange, @Nullable IgfsFileWorkerBatch batch) throws IgniteCheckedException, IOException {
        return this.dataInputWriter.storeDataBlocks(fileInfo, reservedLen, remainder, remainderLen, in, len, flush, affinityRange, batch);
    }

    public IgniteInternalFuture<Object> delete(IgfsEntryInfo fileInfo) {
        if (!fileInfo.isFile()) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Cannot delete content of not-data file: " + fileInfo);
            }
            return new GridFinishedFuture<Object>();
        }
        return this.delWorker.deleteAsync(fileInfo);
    }

    public IgfsBlockKey blockKey(long blockIdx, IgfsEntryInfo fileInfo) {
        if (fileInfo.affinityKey() != null) {
            return new IgfsBlockKey(fileInfo.id(), fileInfo.affinityKey(), fileInfo.evictExclude(), blockIdx);
        }
        if (fileInfo.fileMap() != null) {
            IgniteUuid affKey = fileInfo.fileMap().affinityKey(blockIdx * (long)fileInfo.blockSize(), false);
            return new IgfsBlockKey(fileInfo.id(), affKey, fileInfo.evictExclude(), blockIdx);
        }
        return new IgfsBlockKey(fileInfo.id(), null, fileInfo.evictExclude(), blockIdx);
    }

    public void cleanBlocks(IgfsEntryInfo fileInfo, IgfsFileAffinityRange range, boolean cleanNonColocated) {
        long startIdx = range.startOffset() / (long)fileInfo.blockSize();
        long endIdx = range.endOffset() / (long)fileInfo.blockSize();
        if (this.log.isDebugEnabled()) {
            this.log.debug("Cleaning blocks [fileInfo=" + fileInfo + ", range=" + range + ", cleanNonColocated=" + cleanNonColocated + ", startIdx=" + startIdx + ", endIdx=" + endIdx + ']');
        }
        try (IgniteDataStreamer<IgfsBlockKey, byte[]> ldr = this.dataStreamer();){
            for (long idx = startIdx; idx <= endIdx; ++idx) {
                ldr.removeData(new IgfsBlockKey(fileInfo.id(), range.affinityKey(), fileInfo.evictExclude(), idx));
                if (!cleanNonColocated) continue;
                ldr.removeData(new IgfsBlockKey(fileInfo.id(), null, fileInfo.evictExclude(), idx));
            }
        }
        catch (IgniteException e) {
            this.log.error("Failed to clean up file range [fileInfo=" + fileInfo + ", range=" + range + ']', e);
        }
    }

    /*
     * Unable to fully structure code
     */
    public void spreadBlocks(IgfsEntryInfo fileInfo, IgfsFileAffinityRange range) {
        startIdx = range.startOffset() / (long)fileInfo.blockSize();
        endIdx = range.endOffset() / (long)fileInfo.blockSize();
        try {
            ldr = this.dataStreamer();
            var8_7 = null;
            try {
                bytesProcessed = 0L;
                for (idx = startIdx; idx <= endIdx; ++idx) {
                    colocatedKey = new IgfsBlockKey(fileInfo.id(), range.affinityKey(), fileInfo.evictExclude(), idx);
                    key = new IgfsBlockKey(fileInfo.id(), null, fileInfo.evictExclude(), idx);
                    block = this.dataCachePrj.get(colocatedKey);
                    if (block != null) {
                        if (block.length != fileInfo.blockSize()) {
                            tx = this.dataCachePrj.txStartEx(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ);
                            var17_16 = null;
                            try {
                                vals = this.dataCachePrj.getAll(F.asList(new IgfsBlockKey[]{colocatedKey, key}));
                                val = vals.get(colocatedKey);
                                if (val != null) {
                                    this.putBlock(fileInfo.blockSize(), key, val);
                                    tx.commit();
                                }
                                if (!this.log.isDebugEnabled()) ** GOTO lbl42
                                this.log.debug("Failed to find colocated file block for spread (will ignore) [fileInfo=" + fileInfo + ", range=" + range + ", startIdx=" + startIdx + ", endIdx=" + endIdx + ", idx=" + idx + ']');
                            }
                            catch (Throwable var18_19) {
                                var17_16 = var18_19;
                                throw var18_19;
                            }
                            finally {
                                if (tx != null) {
                                    if (var17_16 != null) {
                                        try {
                                            tx.close();
                                        }
                                        catch (Throwable var18_18) {
                                            var17_16.addSuppressed(var18_18);
                                        }
                                    } else {
                                        tx.close();
                                    }
                                }
                            }
                        } else {
                            ldr.addData(key, block);
                        }
lbl42:
                        // 4 sources

                        if ((bytesProcessed += (long)block.length) < this.igfsCtx.configuration().getFragmentizerThrottlingBlockLength()) continue;
                        ldr.flush();
                        bytesProcessed = 0L;
                        U.sleep(this.igfsCtx.configuration().getFragmentizerThrottlingDelay());
                        continue;
                    }
                    if (!this.log.isDebugEnabled()) continue;
                    this.log.debug("Failed to find colocated file block for spread (will ignore) [fileInfo=" + fileInfo + ", range=" + range + ", startIdx=" + startIdx + ", endIdx=" + endIdx + ", idx=" + idx + ']');
                }
            }
            catch (Throwable var9_10) {
                var8_7 = var9_10;
                throw var9_10;
            }
            finally {
                if (ldr != null) {
                    if (var8_7 != null) {
                        try {
                            ldr.close();
                        }
                        catch (Throwable var9_9) {
                            var8_7.addSuppressed(var9_9);
                        }
                    } else {
                        ldr.close();
                    }
                }
            }
        }
        catch (IgniteCheckedException e) {
            this.log.error("Failed to clean up file range [fileInfo=" + fileInfo + ", range=" + range + ']', e);
        }
    }

    public Collection<IgfsBlockLocation> affinity(IgfsEntryInfo info, long start, long len) throws IgniteCheckedException {
        return this.affinity(info, start, len, 0L);
    }

    public Collection<IgfsBlockLocation> affinity(IgfsEntryInfo info, long start, long len, long maxLen) throws IgniteCheckedException {
        assert (info.isFile()) : "Failed to get affinity (not a file): " + info;
        assert (start >= 0L) : "Start position should not be negative: " + start;
        assert (len >= 0L) : "Part length should not be negative: " + len;
        if (this.log.isDebugEnabled()) {
            this.log.debug("Calculating affinity for file [info=" + info + ", start=" + start + ", len=" + len + ']');
        }
        if (len == 0L) {
            return Collections.emptyList();
        }
        if (maxLen > 0L) {
            if ((maxLen -= maxLen % (long)info.blockSize()) < (long)info.blockSize()) {
                maxLen = info.blockSize();
            }
        } else {
            maxLen = 0L;
        }
        if (info.affinityKey() != null) {
            LinkedList<IgfsBlockLocation> res = new LinkedList<IgfsBlockLocation>();
            this.splitBlocks(start, len, maxLen, this.dataCache.affinity().mapKeyToPrimaryAndBackups(new IgfsBlockKey(info.id(), info.affinityKey(), info.evictExclude(), 0L)), res);
            return res;
        }
        LinkedList<IgfsBlockLocation> res = new LinkedList<IgfsBlockLocation>();
        if (info.fileMap().ranges().isEmpty()) {
            this.affinity0(info, start, len, maxLen, res);
            return res;
        }
        long pos = start;
        long end = start + len;
        for (IgfsFileAffinityRange range : info.fileMap().ranges()) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Checking range [range=" + range + ", pos=" + pos + ']');
            }
            if (range.less(pos)) {
                long partEnd = Math.min(end, range.startOffset());
                this.affinity0(info, pos, partEnd - pos, maxLen, res);
                pos = partEnd;
            }
            IgfsBlockLocation last = (IgfsBlockLocation)res.peekLast();
            if (range.belongs(pos)) {
                long partEnd = Math.min(range.endOffset() + 1L, end);
                Collection<ClusterNode> affNodes = this.dataCache.affinity().mapKeyToPrimaryAndBackups(range.affinityKey());
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Calculated affinity for range [start=" + pos + ", end=" + partEnd + ", nodes=" + F.nodeIds(affNodes) + ", range=" + range + ", affNodes=" + F.nodeIds(affNodes) + ']');
                }
                if (last != null && this.equal(last.nodeIds(), F.viewReadOnly(affNodes, F.node2id(), new IgnitePredicate[0]))) {
                    res.removeLast();
                    this.splitBlocks(last.start(), last.length() + partEnd - pos, maxLen, affNodes, res);
                } else {
                    this.splitBlocks(pos, partEnd - pos, maxLen, affNodes, res);
                }
                pos = partEnd;
            }
            if (this.log.isDebugEnabled()) {
                this.log.debug("Finished range check [range=" + range + ", pos=" + pos + ", res=" + res + ']');
            }
            if (pos != end) continue;
            break;
        }
        if (pos != end) {
            this.affinity0(info, pos, end, maxLen, res);
        }
        return res;
    }

    private void affinity0(IgfsEntryInfo info, long start, long len, long maxLen, Deque<IgfsBlockLocation> res) {
        long limitGrpIdx = (start + len + this.grpBlockSize - 1L) / this.grpBlockSize;
        long firstGrpIdx = start / this.grpBlockSize;
        if (limitGrpIdx - firstGrpIdx > Integer.MAX_VALUE) {
            throw new IgfsException("Failed to get affinity (range is too wide) [info=" + info + ", start=" + start + ", len=" + len + ']');
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("Mapping file region [fileInfo=" + info + ", start=" + start + ", len=" + len + ']');
        }
        for (long grpIdx = firstGrpIdx; grpIdx < limitGrpIdx; ++grpIdx) {
            IgfsBlockLocation last;
            long blockLen;
            long blockStart;
            if (grpIdx == firstGrpIdx) {
                blockStart = start % this.grpBlockSize;
                blockLen = Math.min(this.grpBlockSize - blockStart, len);
            } else if (grpIdx == limitGrpIdx - 1L) {
                blockStart = 0L;
                blockLen = (start + len - 1L) % this.grpBlockSize + 1L;
            } else {
                blockStart = 0L;
                blockLen = this.grpBlockSize;
            }
            IgfsBlockKey key = new IgfsBlockKey(info.id(), info.affinityKey(), info.evictExclude(), grpIdx * (long)this.grpSize);
            Collection<ClusterNode> affNodes = this.dataCache.affinity().mapKeyToPrimaryAndBackups(key);
            if (this.log.isDebugEnabled()) {
                this.log.debug("Mapped key to nodes [key=" + key + ", nodes=" + F.nodeIds(affNodes) + ", blockStart=" + blockStart + ", blockLen=" + blockLen + ']');
            }
            if ((last = res.peekLast()) != null && this.equal(last.nodeIds(), F.viewReadOnly(affNodes, F.node2id(), new IgnitePredicate[0]))) {
                res.removeLast();
                this.splitBlocks(last.start(), last.length() + blockLen, maxLen, affNodes, res);
                continue;
            }
            this.splitBlocks(grpIdx * this.grpBlockSize + blockStart, blockLen, maxLen, affNodes, res);
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("Calculated file affinity [info=" + info + ", start=" + start + ", len=" + len + ", res=" + res + ']');
        }
    }

    private void splitBlocks(long start, long len, long maxLen, Collection<ClusterNode> nodes, Collection<IgfsBlockLocation> res) {
        if (maxLen > 0L) {
            long len0;
            long end = start + len;
            for (long start0 = start; start0 < end; start0 += len0) {
                len0 = Math.min(maxLen, end - start0);
                res.add(new IgfsBlockLocationImpl(start0, len0, nodes));
            }
        } else {
            res.add(new IgfsBlockLocationImpl(start, len, nodes));
        }
    }

    public long groupBlockSize() {
        return this.grpBlockSize;
    }

    private boolean equal(Collection<UUID> one, Collection<UUID> two) {
        if (one.size() != two.size()) {
            return false;
        }
        Iterator<UUID> it1 = one.iterator();
        Iterator<UUID> it2 = two.iterator();
        int size = one.size();
        for (int i = 0; i < size; ++i) {
            if (it1.next().equals(it2.next())) continue;
            return false;
        }
        return true;
    }

    private void processBatch(IgniteUuid fileId, ClusterNode node, final Map<IgfsBlockKey, byte[]> blocks) throws IgniteCheckedException {
        final long batchId = this.reqIdCtr.getAndIncrement();
        final WriteCompletionFuture completionFut = (WriteCompletionFuture)this.pendingWrites.get(fileId);
        if (completionFut == null) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Missing completion future for file write request (most likely exception occurred which will be thrown upon stream close) [nodeId=" + node.id() + ", fileId=" + fileId + ']');
            }
            return;
        }
        if (completionFut.isDone()) {
            completionFut.get();
        }
        completionFut.onWriteRequest(node.id(), batchId);
        final UUID nodeId = node.id();
        if (!node.isLocal()) {
            IgfsBlocksMessage msg = new IgfsBlocksMessage(fileId, batchId, blocks);
            try {
                this.igfsCtx.send(nodeId, this.topic, msg, (byte)6);
            }
            catch (IgniteCheckedException e) {
                completionFut.onError(nodeId, e);
            }
        } else {
            this.igfsCtx.runInIgfsThreadPool(new Runnable(){

                @Override
                public void run() {
                    IgfsDataManager.this.storeBlocksAsync(blocks).listen(new CI1<IgniteInternalFuture<?>>(){

                        @Override
                        public void apply(IgniteInternalFuture<?> fut) {
                            try {
                                fut.get();
                                completionFut.onWriteAck(nodeId, batchId);
                            }
                            catch (IgniteCheckedException e) {
                                completionFut.onError(nodeId, e);
                            }
                        }
                    });
                }
            });
        }
    }

    private void processPartialBlockWrite(IgniteUuid fileId, IgfsBlockKey colocatedKey, int startOff, byte[] data, int blockSize) throws IgniteCheckedException {
        if (colocatedKey.affinityKey() == null) {
            this.dataCachePrj.invoke(colocatedKey, new UpdateProcessor(startOff, data), new Object[0]);
            return;
        }
        if (startOff == 0) {
            this.putBlock(blockSize, colocatedKey, data);
            return;
        }
        IgfsBlockKey key = new IgfsBlockKey(colocatedKey.getFileId(), null, colocatedKey.evictExclude(), colocatedKey.blockId());
        try (GridNearTxLocal tx = this.dataCachePrj.txStartEx(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ);){
            Map<IgfsBlockKey[], byte[]> vals = this.dataCachePrj.getAll(F.asList(new IgfsBlockKey[]{colocatedKey, key}));
            boolean hasVal = false;
            UpdateProcessor transformClos = new UpdateProcessor(startOff, data);
            if (vals.get(colocatedKey) != null) {
                this.dataCachePrj.invoke(colocatedKey, transformClos, new Object[0]);
                hasVal = true;
            }
            if (vals.get(key) != null) {
                this.dataCachePrj.invoke(key, transformClos, new Object[0]);
                hasVal = true;
            }
            if (!hasVal) {
                throw new IgniteCheckedException("Failed to write partial block (no previous data was found in cache) [key=" + colocatedKey + ", relaxedKey=" + key + ", startOff=" + startOff + ", dataLen=" + data.length + ']');
            }
            tx.commit();
        }
    }

    private IgniteInternalFuture<?> storeBlocksAsync(Map<IgfsBlockKey, byte[]> blocks) {
        assert (!blocks.isEmpty());
        return this.dataCachePrj.putAllAsync(blocks);
    }

    private void processBlocksMessage(final UUID nodeId, final IgfsBlocksMessage blocksMsg) {
        this.storeBlocksAsync(blocksMsg.blocks()).listen(new CI1<IgniteInternalFuture<?>>(){

            @Override
            public void apply(IgniteInternalFuture<?> fut) {
                IgniteCheckedException err = null;
                try {
                    fut.get();
                }
                catch (IgniteCheckedException e) {
                    err = e;
                }
                try {
                    IgfsDataManager.this.igfsCtx.send(nodeId, IgfsDataManager.this.topic, new IgfsAckMessage(blocksMsg.fileId(), blocksMsg.id(), err), (byte)6);
                }
                catch (IgniteCheckedException e) {
                    U.warn(IgfsDataManager.this.log, "Failed to send batch acknowledgement (did node leave the grid?) [nodeId=" + nodeId + ", fileId=" + blocksMsg.fileId() + ", batchId=" + blocksMsg.id() + ']', e);
                }
            }
        });
    }

    private void processAckMessage(UUID nodeId, IgfsAckMessage ackMsg) {
        try {
            ackMsg.finishUnmarshal(this.igfsCtx.kernalContext().config().getMarshaller(), null);
        }
        catch (IgniteCheckedException e) {
            U.error(this.log, "Failed to unmarshal message (will ignore): " + ackMsg, e);
            return;
        }
        IgniteUuid fileId = ackMsg.fileId();
        WriteCompletionFuture fut = (WriteCompletionFuture)this.pendingWrites.get(fileId);
        if (fut != null) {
            if (ackMsg.error() != null) {
                fut.onError(nodeId, ackMsg.error());
            } else {
                fut.onWriteAck(nodeId, ackMsg.id());
            }
        } else if (this.log.isDebugEnabled()) {
            this.log.debug("Received write acknowledgement for non-existent write future (most likely future was failed) [nodeId=" + nodeId + ", fileId=" + fileId + ']');
        }
    }

    private IgfsBlockKey createBlockKey(long block, IgfsEntryInfo fileInfo, IgfsFileAffinityRange locRange) {
        if (fileInfo.affinityKey() != null) {
            return new IgfsBlockKey(fileInfo.id(), fileInfo.affinityKey(), fileInfo.evictExclude(), block);
        }
        if (locRange == null) {
            return new IgfsBlockKey(fileInfo.id(), null, fileInfo.evictExclude(), block);
        }
        long blockStart = block * (long)fileInfo.blockSize();
        if (locRange.less(blockStart)) {
            IgniteUuid affKey = fileInfo.fileMap().affinityKey(blockStart, false);
            return new IgfsBlockKey(fileInfo.id(), affKey, fileInfo.evictExclude(), block);
        }
        if (!locRange.belongs(blockStart)) {
            locRange.expand(blockStart, fileInfo.blockSize());
        }
        return new IgfsBlockKey(fileInfo.id(), locRange.affinityKey(), fileInfo.evictExclude(), block);
    }

    void awaitAllAcksReceived(IgniteUuid fileId) throws IgniteInterruptedCheckedException {
        WriteCompletionFuture fut = (WriteCompletionFuture)this.pendingWrites.get(fileId);
        if (fut != null) {
            fut.awaitAllAcksReceived();
        }
    }

    private class WriteCompletionFuture
    extends GridFutureAdapter<Boolean> {
        private final IgniteUuid fileId;
        private final ConcurrentMap<Long, UUID> ackMap = new ConcurrentHashMap<Long, UUID>();
        private final Lock lock = new ReentrantLock();
        private final Condition allAcksRcvCond = this.lock.newCondition();
        private volatile boolean awaitingLast;

        private WriteCompletionFuture(IgniteUuid fileId) {
            assert (fileId != null);
            this.fileId = fileId;
        }

        public void awaitAllAcksReceived() throws IgniteInterruptedCheckedException {
            this.lock.lock();
            try {
                while (!this.ackMap.isEmpty()) {
                    U.await(this.allAcksRcvCond);
                }
            }
            finally {
                this.lock.unlock();
            }
        }

        @Override
        public boolean onDone(@Nullable Boolean res, @Nullable Throwable err) {
            if (!this.isDone()) {
                IgfsDataManager.this.pendingWrites.remove(this.fileId, this);
                if (super.onDone(res, err)) {
                    return true;
                }
            }
            return false;
        }

        private void onWriteRequest(UUID nodeId, long batchId) {
            if (!this.isDone()) {
                UUID pushedOut = this.ackMap.putIfAbsent(batchId, nodeId);
                assert (pushedOut == null);
            }
        }

        private boolean hasPendingAcks(UUID nodeId) {
            assert (nodeId != null);
            for (Map.Entry e : this.ackMap.entrySet()) {
                if (!nodeId.equals(e.getValue())) continue;
                return true;
            }
            return false;
        }

        private void onError(UUID nodeId, IgniteCheckedException e) {
            if (this.hasPendingAcks(nodeId)) {
                this.ackMap.clear();
                this.signalNoAcks();
                if (e.hasCause(IgfsOutOfSpaceException.class)) {
                    this.onDone(new IgniteCheckedException("Failed to write data (not enough space on node): " + nodeId, e));
                } else {
                    this.onDone(new IgniteCheckedException("Failed to wait for write completion (write failed on node): " + nodeId, e));
                }
            }
        }

        private void onWriteAck(UUID nodeId, long batchId) {
            if (!this.isDone()) {
                boolean rmv = this.ackMap.remove(batchId, nodeId);
                assert (rmv) : "Received acknowledgement message for not registered batch [nodeId=" + nodeId + ", batchId=" + batchId + ']';
                if (this.ackMap.isEmpty()) {
                    this.signalNoAcks();
                    if (this.awaitingLast) {
                        this.onDone(true);
                    }
                }
            }
        }

        private void signalNoAcks() {
            this.lock.lock();
            try {
                this.allAcksRcvCond.signalAll();
            }
            finally {
                this.lock.unlock();
            }
        }

        private void markWaitingLastAck() {
            this.awaitingLast = true;
            if (IgfsDataManager.this.log.isDebugEnabled()) {
                IgfsDataManager.this.log.debug("Marked write completion future as awaiting last ack: " + this.fileId);
            }
            if (this.ackMap.isEmpty()) {
                this.onDone(true);
            }
        }
    }

    private class AsyncDeleteWorker
    extends GridWorker {
        private final IgfsEntryInfo stopInfo;
        private BlockingQueue<IgniteBiTuple<GridFutureAdapter<Object>, IgfsEntryInfo>> delReqs;

        protected AsyncDeleteWorker(String igniteInstanceName, String name, IgniteLogger log) {
            super(igniteInstanceName, name, log);
            this.delReqs = new LinkedBlockingQueue<IgniteBiTuple<GridFutureAdapter<Object>, IgfsEntryInfo>>();
            this.stopInfo = IgfsUtils.createDirectory(IgniteUuid.randomUuid());
        }

        private void stop() {
            this.delReqs.offer(F.t(new GridFutureAdapter(), this.stopInfo));
        }

        private IgniteInternalFuture<Object> deleteAsync(IgfsEntryInfo info) {
            GridFutureAdapter<Object> fut = new GridFutureAdapter<Object>();
            this.delReqs.offer(F.t(fut, info));
            return fut;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        @Override
        protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
            IgniteBiTuple req;
            try {
                block78: while (!this.isCancelled()) {
                    IgniteUuid fileId;
                    long size;
                    long block;
                    req = this.delReqs.take();
                    GridFutureAdapter<Object> fut = req.get1();
                    IgfsEntryInfo fileInfo = (IgfsEntryInfo)req.get2();
                    if (fileInfo == this.stopInfo) {
                        fut.onDone();
                        return;
                    }
                    IgniteDataStreamer ldr = IgfsDataManager.this.dataStreamer();
                    try {
                        IgfsFileMap map = fileInfo.fileMap();
                        block = 0L;
                        size = fileInfo.blocksCount();
                        while (true) {
                            if (block >= size) continue block78;
                            IgniteUuid affKey = map == null ? null : map.affinityKey(block * (long)fileInfo.blockSize(), true);
                            ldr.removeData(new IgfsBlockKey(fileInfo.id(), affKey, fileInfo.evictExclude(), block));
                            if (affKey != null) {
                                ldr.removeData(new IgfsBlockKey(fileInfo.id(), null, fileInfo.evictExclude(), block));
                            }
                            ++block;
                        }
                    }
                    catch (IgniteInterruptedException e) {
                        try {
                            fileId = fileInfo.id();
                            block = 0L;
                            size = fileInfo.blocksCount();
                            while (true) {
                                if (block >= size) continue block78;
                                ldr.removeData(new IgfsBlockKey(fileId, fileInfo.affinityKey(), fileInfo.evictExclude(), block));
                                ++block;
                            }
                        }
                        catch (IgniteException e2) {
                            this.log.error("Failed to remove file contents: " + fileInfo, e2);
                        }
                        finally {
                            try {
                                ldr.close(this.isCancelled());
                            }
                            catch (IgniteException e3) {
                                this.log.error("Failed to stop data streamer while shutting down igfs async delete thread.", e3);
                            }
                            finally {
                                fut.onDone();
                            }
                        }
                    }
                    catch (IgniteException e) {
                        this.log.error("Failed to remove file contents: " + fileInfo, e);
                    }
                    finally {
                        try {
                            fileId = fileInfo.id();
                            block = 0L;
                            size = fileInfo.blocksCount();
                            while (true) {
                                if (block >= size) continue block78;
                                ldr.removeData(new IgfsBlockKey(fileId, fileInfo.affinityKey(), fileInfo.evictExclude(), block));
                                ++block;
                            }
                        }
                        catch (IgniteException e) {
                            this.log.error("Failed to remove file contents: " + fileInfo, e);
                        }
                        finally {
                            try {
                                ldr.close(this.isCancelled());
                            }
                            catch (IgniteException e) {
                                this.log.error("Failed to stop data streamer while shutting down igfs async delete thread.", e);
                            }
                            finally {
                                fut.onDone();
                            }
                        }
                    }
                }
                return;
            }
            finally {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Stopping asynchronous igfs file delete thread: " + this.name());
                }
                req = (IgniteBiTuple)this.delReqs.poll();
                while (true) {
                    if (req == null) {
                    }
                    ((GridFutureAdapter)req.get1()).onCancelled();
                    req = (IgniteBiTuple)this.delReqs.poll();
                }
            }
        }
    }

    @GridInternal
    private static final class UpdateProcessor
    implements EntryProcessor<IgfsBlockKey, byte[], Void>,
    Externalizable {
        private static final long serialVersionUID = 0L;
        private int start;
        private byte[] data;

        public UpdateProcessor() {
        }

        private UpdateProcessor(int start, byte[] data) {
            assert (start >= 0);
            assert (data != null);
            assert (start + data.length >= 0) : "Too much data [start=" + start + ", data.length=" + data.length + ']';
            this.start = start;
            this.data = data;
        }

        @Override
        public Void process(MutableEntry<IgfsBlockKey, byte[]> entry, Object ... args) {
            byte[] e = (byte[])entry.getValue();
            int size = this.data.length;
            if (e == null || e.length == 0) {
                e = new byte[this.start + size];
            } else if (e.length < this.start + size) {
                byte[] tmp = new byte[this.start + size];
                U.arrayCopy(e, 0, tmp, 0, e.length);
                e = tmp;
            }
            U.arrayCopy(this.data, 0, e, this.start, size);
            entry.setValue(e);
            return null;
        }

        @Override
        public void writeExternal(ObjectOutput out) throws IOException {
            out.writeInt(this.start);
            U.writeByteArray(out, this.data);
        }

        @Override
        public void readExternal(ObjectInput in) throws IOException {
            this.start = in.readInt();
            this.data = U.readByteArray(in);
        }

        public String toString() {
            return S.toString(UpdateProcessor.class, this, "start", this.start, "data.length", this.data.length);
        }
    }

    private class DataInputBlocksWriter
    extends BlocksWriter<DataInput> {
        private DataInputBlocksWriter() {
        }

        @Override
        protected void readData(DataInput src, byte[] dst, int dstOff) throws IgniteCheckedException {
            try {
                src.readFully(dst, dstOff, dst.length - dstOff);
            }
            catch (IOException e) {
                throw new IgniteCheckedException(e);
            }
        }
    }

    private class ByteBufferBlocksWriter
    extends BlocksWriter<ByteBuffer> {
        private ByteBufferBlocksWriter() {
        }

        @Override
        protected void readData(ByteBuffer src, byte[] dst, int dstOff) {
            src.get(dst, dstOff, dst.length - dstOff);
        }
    }

    private abstract class BlocksWriter<T> {
        private BlocksWriter() {
        }

        @Nullable
        public byte[] storeDataBlocks(IgfsEntryInfo fileInfo, long reservedLen, @Nullable byte[] remainder, int remainderLen, T src, int srcLen, boolean flush, IgfsFileAffinityRange affinityRange, @Nullable IgfsFileWorkerBatch batch) throws IgniteCheckedException {
            IgniteUuid id = fileInfo.id();
            int blockSize = fileInfo.blockSize();
            int len = remainderLen + srcLen;
            if ((long)len > reservedLen) {
                throw new IgfsException("Not enough space reserved to store data [id=" + id + ", reservedLen=" + reservedLen + ", remainderLen=" + remainderLen + ", data.length=" + srcLen + ']');
            }
            long start = reservedLen - (long)len;
            long first = start / (long)blockSize;
            long limit = (start + (long)len + (long)blockSize - 1L) / (long)blockSize;
            int written = 0;
            int remainderOff = 0;
            LinkedHashMap<IgfsBlockKey, byte[]> nodeBlocks = U.newLinkedHashMap((int)(limit - first));
            ClusterNode node = null;
            int off = 0;
            for (long block = first; block < limit; ++block) {
                long blockStartOff = block == first ? start % (long)blockSize : 0L;
                long blockEndOff = block == limit - 1L ? (start + (long)len - 1L) % (long)blockSize : (long)(blockSize - 1);
                long size = blockEndOff - blockStartOff + 1L;
                assert (size > 0L && size <= (long)blockSize);
                assert (blockStartOff + size <= (long)blockSize);
                byte[] portion = new byte[(int)size];
                int portionOff = Math.min((int)size, remainderLen - remainderOff);
                if (remainderOff != remainderLen) {
                    U.arrayCopy(remainder, remainderOff, portion, 0, portionOff);
                    remainderOff += portionOff;
                }
                if ((long)portionOff < size) {
                    this.readData(src, portion, portionOff);
                }
                IgfsBlockKey key = IgfsDataManager.this.createBlockKey(block, fileInfo, affinityRange);
                ClusterNode primaryNode = IgfsDataManager.this.dataCachePrj.cache().affinity().mapKeyToNode(key);
                if (block == first) {
                    off = (int)blockStartOff;
                    node = primaryNode;
                }
                if (size == (long)blockSize) {
                    assert (blockStartOff == 0L) : "Cannot write the whole block not from start position [start=" + start + ", block=" + block + ", blockStartOff=" + blockStartOff + ", blockEndOff=" + blockEndOff + ", size=" + size + ", first=" + first + ", limit=" + limit + ", blockSize=" + blockSize + ']';
                } else if (blockStartOff == 0L && !flush) {
                    assert (written + portion.length == len);
                    if (!nodeBlocks.isEmpty()) {
                        IgfsDataManager.this.processBatch(id, node, nodeBlocks);
                        IgfsDataManager.this.igfsCtx.metrics().addWriteBlocks(1, 0);
                    }
                    return portion;
                }
                int writtenSecondary = 0;
                if (batch != null) {
                    if (!batch.write(portion)) {
                        throw new IgniteCheckedException("Cannot write more data to the secondary file system output stream because it was marked as closed: " + batch.path());
                    }
                    writtenSecondary = 1;
                }
                assert (primaryNode != null);
                int writtenTotal = 0;
                if (!primaryNode.id().equals(node.id())) {
                    if (!nodeBlocks.isEmpty()) {
                        IgfsDataManager.this.processBatch(id, node, nodeBlocks);
                    }
                    writtenTotal = nodeBlocks.size();
                    nodeBlocks = U.newLinkedHashMap((int)(limit - first));
                    node = primaryNode;
                }
                assert (size == (long)portion.length);
                if (size != (long)blockSize) {
                    IgfsDataManager.this.processPartialBlockWrite(id, key, block == first ? off : 0, portion, blockSize);
                    ++writtenTotal;
                } else {
                    nodeBlocks.put(key, portion);
                }
                IgfsDataManager.this.igfsCtx.metrics().addWriteBlocks(writtenTotal, writtenSecondary);
                written += portion.length;
            }
            if (!nodeBlocks.isEmpty()) {
                IgfsDataManager.this.processBatch(id, node, nodeBlocks);
                IgfsDataManager.this.igfsCtx.metrics().addWriteBlocks(nodeBlocks.size(), 0);
            }
            assert (written == len);
            return null;
        }

        protected abstract void readData(T var1, byte[] var2, int var3) throws IgniteCheckedException;
    }
}

