/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.cache.distributed.near;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxAbstractEnlistFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxEnlistFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxRemote;
import org.apache.ignite.internal.processors.cache.distributed.dht.NearTxResultHandler;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxAbstractEnlistFuture;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshotWithoutTxs;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.query.EnlistOperation;
import org.apache.ignite.internal.processors.query.UpdateSourceIterator;
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.jetbrains.annotations.Nullable;

public class GridNearTxEnlistFuture
extends GridNearTxAbstractEnlistFuture<GridCacheReturn> {
    public static final int DFLT_BATCH_SIZE = 1024;
    private static final AtomicIntegerFieldUpdater<GridNearTxEnlistFuture> SKIP_UPD = AtomicIntegerFieldUpdater.newUpdater(GridNearTxEnlistFuture.class, "skipCntr");
    private static final Object FINISHED = new Object();
    @GridToStringExclude
    private final UpdateSourceIterator<?> it;
    private int batchSize;
    private AtomicInteger batchCntr = new AtomicInteger();
    @GridToStringExclude
    private volatile int skipCntr;
    @GridToStringExclude
    private volatile GridCacheReturn res;
    private final Map<UUID, Batch> batches = new ConcurrentHashMap<UUID, Batch>();
    private Object peek;
    private boolean topLocked;
    private final boolean sequential;
    private final CacheEntryPredicate filter;
    private final boolean needRes;

    public GridNearTxEnlistFuture(GridCacheContext<?, ?> cctx, GridNearTxLocal tx, long timeout, UpdateSourceIterator<?> it, int batchSize, boolean sequential, @Nullable CacheEntryPredicate filter, boolean needRes) {
        super(cctx, tx, timeout, null);
        this.it = it;
        this.batchSize = batchSize > 0 ? batchSize : 1024;
        this.sequential = sequential;
        this.filter = filter;
        this.needRes = needRes;
    }

    @Override
    protected void map(boolean topLocked) {
        this.topLocked = topLocked;
        this.sendNextBatches(null);
    }

    private void sendNextBatches(@Nullable UUID nodeId) {
        block5: {
            try {
                boolean first;
                Collection<Batch> next = this.continueLoop(nodeId);
                if (next == null) {
                    return;
                }
                boolean bl = first = nodeId != null;
                if (!this.topLocked && this.cctx.topology().holdsLock()) {
                    this.cctx.topology().readUnlock();
                }
                for (Batch batch : next) {
                    ClusterNode node = batch.node();
                    this.sendBatch(node, batch, first);
                    if (node.isLocal()) continue;
                    first = false;
                }
            }
            catch (Throwable e) {
                this.onDone(e);
                if (!(e instanceof Error)) break block5;
                throw (Error)e;
            }
        }
    }

    private Collection<Batch> continueLoop(@Nullable UUID nodeId) throws IgniteCheckedException {
        if (nodeId != null) {
            this.batches.remove(nodeId);
        }
        if (this.isDone() || SKIP_UPD.getAndIncrement(this) != 0) {
            return null;
        }
        ArrayList<Batch> res = null;
        Batch batch = null;
        boolean flush = false;
        EnlistOperation op = this.it.operation();
        while (true) {
            if (this.hasNext0()) {
                ClusterNode node;
                this.checkCompleted();
                Object cur = this.next0();
                KeyCacheObject key = this.cctx.toCacheKeyObject(op.isDeleteOrLock() ? cur : ((IgniteBiTuple)cur).getKey());
                List<ClusterNode> nodes = this.cctx.affinity().nodesByKey(key, this.topVer);
                if (F.isEmpty(nodes) || (node = nodes.get(0)) == null) {
                    throw new ClusterTopologyCheckedException("Failed to get primary node [topVer=" + this.topVer + ", key=" + key + ']');
                }
                this.tx.markQueryEnlisted(null);
                if (!this.sequential) {
                    batch = this.batches.get(node.id());
                } else if (batch != null && !batch.node().equals(node)) {
                    res = this.markReady(res, batch);
                }
                if (batch == null) {
                    batch = new Batch(node);
                    this.batches.put(node.id(), batch);
                }
                if (batch.ready()) {
                    batch = null;
                    this.peek = cur;
                    this.it.beforeDetach();
                    flush = true;
                } else {
                    batch.add(op.isDeleteOrLock() ? key : cur, op != EnlistOperation.LOCK && this.cctx.affinityNode() && (this.cctx.isReplicated() || nodes.indexOf(this.cctx.localNode()) > 0));
                    if (batch.size() != this.batchSize) continue;
                    res = this.markReady(res, batch);
                    continue;
                }
            }
            if (SKIP_UPD.decrementAndGet(this) == 0) break;
            this.skipCntr = 1;
        }
        if (flush) {
            return res;
        }
        for (Batch batch0 : this.batches.values()) {
            if (batch0.ready()) continue;
            if (res == null) {
                res = new ArrayList<Batch>();
            }
            batch0.ready(true);
            res.add(batch0);
        }
        if (this.batches.isEmpty()) {
            this.onDone(this.res);
        }
        return res;
    }

    private Object next0() {
        if (!this.hasNext0()) {
            throw new NoSuchElementException();
        }
        Object cur = this.peek;
        if (cur != null) {
            this.peek = null;
        } else {
            cur = this.it.next();
        }
        return cur;
    }

    private boolean hasNext0() {
        if (this.peek == null && !this.it.hasNext()) {
            this.peek = FINISHED;
        }
        return this.peek != FINISHED;
    }

    private ArrayList<Batch> markReady(ArrayList<Batch> batches, Batch batch) {
        if (!batch.ready()) {
            batch.ready(true);
            if (batches == null) {
                batches = new ArrayList();
            }
            batches.add(batch);
        }
        return batches;
    }

    private void processBatchLocalBackupKeys(UUID primaryId, List<Object> rows, GridCacheVersion dhtVer, IgniteUuid dhtFutId) {
        assert (dhtVer != null);
        assert (dhtFutId != null);
        EnlistOperation op = this.it.operation();
        assert (op != EnlistOperation.LOCK);
        boolean keysOnly = op.isDeleteOrLock();
        ArrayList<KeyCacheObject> keys = new ArrayList<KeyCacheObject>(rows.size());
        ArrayList<Message> vals = keysOnly ? null : new ArrayList<Message>(rows.size());
        for (Object row : rows) {
            if (keysOnly) {
                keys.add(this.cctx.toCacheKeyObject(row));
                continue;
            }
            keys.add(this.cctx.toCacheKeyObject(((IgniteBiTuple)row).getKey()));
            if (op.isInvoke()) {
                vals.add((Message)((IgniteBiTuple)row).getValue());
                continue;
            }
            vals.add(this.cctx.toCacheObject(((IgniteBiTuple)row).getValue()));
        }
        try {
            GridDhtTxRemote dhtTx = (GridDhtTxRemote)this.cctx.tm().tx(dhtVer);
            if (dhtTx == null) {
                dhtTx = new GridDhtTxRemote(this.cctx.shared(), this.cctx.localNodeId(), dhtFutId, primaryId, this.lockVer, this.topVer, dhtVer, null, this.cctx.systemTx(), this.cctx.ioPolicy(), TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ, false, this.tx.remainingTime(), -1, this.tx.subjectId(), this.tx.taskNameHash(), false);
                dhtTx.mvccSnapshot(new MvccSnapshotWithoutTxs(this.mvccSnapshot.coordinatorVersion(), this.mvccSnapshot.counter(), 0, this.mvccSnapshot.cleanupVersion()));
                dhtTx = this.cctx.tm().onCreated(null, dhtTx);
                if (dhtTx == null || !this.cctx.tm().onStarted(dhtTx)) {
                    throw new IgniteTxRollbackCheckedException("Failed to update backup (transaction has been completed): " + dhtVer);
                }
            }
            this.cctx.tm().txHandler().mvccEnlistBatch(dhtTx, this.cctx, this.it.operation(), keys, vals, this.mvccSnapshot.withoutActiveTransactions(), null, -1);
        }
        catch (IgniteCheckedException e) {
            this.onDone(e);
            return;
        }
        this.sendNextBatches(primaryId);
    }

    private void sendBatch(ClusterNode node, Batch batch, boolean first) throws IgniteCheckedException {
        this.updateMappings(node);
        boolean clientFirst = first && this.cctx.localNode().isClient() && !this.topLocked && !this.tx.hasRemoteLocks();
        int batchId = this.batchCntr.incrementAndGet();
        if (node.isLocal()) {
            this.enlistLocal(batchId, node.id(), batch);
        } else {
            this.sendBatch(batchId, node.id(), batch, clientFirst);
        }
    }

    private void sendBatch(int batchId, UUID nodeId, Batch batchFut, boolean clientFirst) throws IgniteCheckedException {
        assert (batchFut != null);
        GridNearTxEnlistRequest req = new GridNearTxEnlistRequest(this.cctx.cacheId(), this.threadId, this.futId, batchId, this.tx.subjectId(), this.topVer, this.lockVer, this.mvccSnapshot, clientFirst, this.remainingTime(), this.tx.remainingTime(), this.tx.taskNameHash(), batchFut.rows(), this.it.operation(), this.needRes, this.filter);
        this.sendRequest(req, nodeId);
    }

    private void sendRequest(final GridCacheMessage req, final UUID nodeId) throws IgniteCheckedException {
        IgniteInternalFuture<?> txSync = this.cctx.tm().awaitFinishAckAsync(nodeId, this.tx.threadId());
        if (txSync == null || txSync.isDone()) {
            this.cctx.io().send(nodeId, req, this.cctx.ioPolicy());
        } else {
            txSync.listen(new CI1<IgniteInternalFuture<?>>(){

                @Override
                public void apply(IgniteInternalFuture<?> future) {
                    try {
                        GridNearTxEnlistFuture.this.cctx.io().send(nodeId, req, GridNearTxEnlistFuture.this.cctx.ioPolicy());
                    }
                    catch (IgniteCheckedException e) {
                        GridNearTxEnlistFuture.this.onDone(e);
                    }
                }
            });
        }
    }

    private void enlistLocal(int batchId, final UUID nodeId, Batch batch) throws IgniteCheckedException {
        Collection<Object> rows = batch.rows();
        GridDhtTxEnlistFuture fut = new GridDhtTxEnlistFuture(nodeId, this.lockVer, this.mvccSnapshot, this.threadId, this.futId, batchId, this.tx, this.remainingTime(), this.cctx, rows, this.it.operation(), this.filter, this.needRes);
        this.updateLocalFuture(fut);
        fut.listen(new CI1<IgniteInternalFuture<GridCacheReturn>>(){

            @Override
            public void apply(IgniteInternalFuture<GridCacheReturn> fut) {
                try {
                    GridNearTxEnlistResponse res;
                    GridNearTxEnlistFuture.this.clearLocalFuture((GridDhtTxAbstractEnlistFuture)fut);
                    GridNearTxEnlistResponse gridNearTxEnlistResponse = res = fut.error() == null ? (GridNearTxEnlistResponse)NearTxResultHandler.createResponse(fut) : null;
                    if (GridNearTxEnlistFuture.this.checkResponse(nodeId, res, fut.error())) {
                        GridNearTxEnlistFuture.this.sendNextBatches(nodeId);
                    }
                }
                catch (IgniteCheckedException e) {
                    GridNearTxEnlistFuture.this.checkResponse(nodeId, null, e);
                }
                finally {
                    CU.unwindEvicts(GridNearTxEnlistFuture.this.cctx);
                }
            }
        });
        fut.init();
    }

    public void onResult(UUID nodeId, GridNearTxEnlistResponse res) {
        if (this.checkResponse(nodeId, res, res.error())) {
            Batch batch = this.batches.get(nodeId);
            if (batch != null && !F.isEmpty(batch.localBackupRows()) && res.dhtFutureId() != null) {
                this.processBatchLocalBackupKeys(nodeId, batch.localBackupRows(), res.dhtVersion(), res.dhtFutureId());
            } else {
                this.sendNextBatches(nodeId);
            }
        }
    }

    @Override
    public boolean onNodeLeft(UUID nodeId) {
        if (this.batches.keySet().contains(nodeId)) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Found unacknowledged batch for left node [nodeId=" + nodeId + ", fut=" + this + ']');
            }
            ClusterTopologyCheckedException topEx = new ClusterTopologyCheckedException("Failed to enlist keys (primary node left grid, retry transaction if possible) [node=" + nodeId + ']');
            topEx.retryReadyFuture(this.cctx.shared().nextAffinityReadyFuture(this.topVer));
            this.processFailure(topEx, null);
            this.batches.remove(nodeId);
            if (this.batches.isEmpty()) {
                this.onDone();
            }
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("Future does not have mapping for left node (ignoring) [nodeId=" + nodeId + ", fut=" + this + ']');
        }
        return false;
    }

    public boolean checkResponse(UUID nodeId, GridNearTxEnlistResponse res, Throwable err) {
        assert (res != null || err != null) : this;
        if (err == null && res.error() != null) {
            err = res.error();
        }
        if (res != null) {
            this.tx.mappings().get(nodeId).addBackups(res.newDhtNodes());
        }
        if (err != null) {
            this.processFailure(err, null);
        }
        if (this.ex != null) {
            this.batches.remove(nodeId);
            if (this.batches.isEmpty()) {
                this.onDone();
            }
            return false;
        }
        assert (res != null);
        if (res.result().invokeResult()) {
            if (this.res == null) {
                this.res = new GridCacheReturn(true, true);
            }
            this.res.success(this.res.success() && err == null && res.result().success());
            this.res.mergeEntryProcessResults(res.result());
        } else {
            this.res = res.result();
        }
        assert (this.res != null && (this.res.emptyResult() || this.needRes || this.res.invokeResult() || !this.res.success()));
        return true;
    }

    @Override
    public String toString() {
        return S.toString(GridNearTxEnlistFuture.class, this, super.toString());
    }

    private static class Batch {
        @GridToStringExclude
        private final ClusterNode node;
        private List<Object> rows = new ArrayList<Object>();
        private List<Object> locBkpRows;
        private boolean ready;

        private Batch(ClusterNode node) {
            this.node = node;
        }

        public ClusterNode node() {
            return this.node;
        }

        public void add(Object row, boolean localBackup) {
            this.rows.add(row);
            if (localBackup) {
                if (this.locBkpRows == null) {
                    this.locBkpRows = new ArrayList<Object>();
                }
                this.locBkpRows.add(row);
            }
        }

        public int size() {
            return this.rows.size();
        }

        public Collection<Object> rows() {
            return this.rows;
        }

        public List<Object> localBackupRows() {
            return this.locBkpRows;
        }

        public boolean ready() {
            return this.ready;
        }

        public void ready(boolean ready) {
            this.ready = ready;
        }
    }
}

