/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.job;

import java.util.AbstractCollection;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteDeploymentException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeExecutionRejectedException;
import org.apache.ignite.compute.ComputeJobSibling;
import org.apache.ignite.compute.ComputeTaskSession;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.events.JobEvent;
import org.apache.ignite.events.TaskEvent;
import org.apache.ignite.internal.GridJobCancelRequest;
import org.apache.ignite.internal.GridJobContextImpl;
import org.apache.ignite.internal.GridJobExecuteRequest;
import org.apache.ignite.internal.GridJobExecuteResponse;
import org.apache.ignite.internal.GridJobSessionImpl;
import org.apache.ignite.internal.GridJobSiblingsRequest;
import org.apache.ignite.internal.GridJobSiblingsResponse;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.GridTaskSessionImpl;
import org.apache.ignite.internal.GridTaskSessionRequest;
import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.SkipDaemon;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.collision.GridCollisionJobContextAdapter;
import org.apache.ignite.internal.managers.communication.GridIoManager;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.managers.systemview.walker.ComputeJobViewWalker;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.job.GridJobEventListener;
import org.apache.ignite.internal.processors.job.GridJobHoldListener;
import org.apache.ignite.internal.processors.job.GridJobWorker;
import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsSnapshot;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
import org.apache.ignite.internal.processors.metric.impl.MetricUtils;
import org.apache.ignite.internal.processors.security.SecurityUtils;
import org.apache.ignite.internal.util.GridAtomicLong;
import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.GridSpinReadWriteLock;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P1;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.metric.DoubleMetric;
import org.apache.ignite.spi.systemview.view.ComputeJobView;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentLinkedHashMap;

@SkipDaemon
public class GridJobProcessor
extends GridProcessorAdapter {
    public static final String JOBS_VIEW = "jobs";
    public static final String JOBS_VIEW_DESC = "Running compute jobs, part of compute task started on remote host.";
    public static final int DFLT_JOBS_HISTORY_SIZE = 10240;
    private static final int FINISHED_JOBS_COUNT = Integer.getInteger("IGNITE_JOBS_HISTORY_SIZE", 10240);
    public static final String JOBS_METRICS = MetricUtils.metricName("compute", "jobs");
    public static final String STARTED = "Started";
    public static final String ACTIVE = "Active";
    public static final String WAITING = "Waiting";
    public static final String CANCELED = "Canceled";
    public static final String REJECTED = "Rejected";
    public static final String FINISHED = "Finished";
    public static final String EXECUTION_TIME = "ExecutionTime";
    public static final String WAITING_TIME = "WaitingTime";
    private final Marshaller marsh;
    private final boolean jobAlwaysActivate;
    private volatile ConcurrentMap<IgniteUuid, GridJobWorker> activeJobs;
    private final ConcurrentMap<IgniteUuid, GridJobWorker> passiveJobs;
    private final ConcurrentMap<IgniteUuid, GridJobWorker> cancelledJobs = new ConcurrentHashMap<IgniteUuid, GridJobWorker>();
    private final Collection<IgniteUuid> heldJobs = new GridConcurrentHashSet<IgniteUuid>();
    private volatile GridBoundedConcurrentLinkedHashMap<IgniteUuid, Boolean> cancelReqs = new GridBoundedConcurrentLinkedHashMap(FINISHED_JOBS_COUNT, FINISHED_JOBS_COUNT < 128 ? FINISHED_JOBS_COUNT : 128, 0.75f, 16);
    private final GridBoundedConcurrentLinkedHashSet<IgniteUuid> finishedJobs = new GridBoundedConcurrentLinkedHashSet(FINISHED_JOBS_COUNT, FINISHED_JOBS_COUNT < 128 ? FINISHED_JOBS_COUNT : 128, 0.75f, 256, ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q);
    private final GridJobEventListener evtLsnr;
    private final GridMessageListener cancelLsnr;
    private final GridMessageListener jobExecLsnr;
    private final GridLocalEventListener discoLsnr;
    @Deprecated
    private final LongAdder canceledJobsCnt = new LongAdder();
    @Deprecated
    private final LongAdder finishedJobsCnt = new LongAdder();
    @Deprecated
    private final LongAdder startedJobsCnt = new LongAdder();
    @Deprecated
    private final LongAdder rejectedJobsCnt = new LongAdder();
    @Deprecated
    private final LongAdder finishedJobsTime = new LongAdder();
    private final DoubleMetric cpuLoadMetric;
    @Deprecated
    private final GridAtomicLong maxFinishedJobsTime = new GridAtomicLong();
    @Deprecated
    private final AtomicLong metricsLastUpdateTstamp = new AtomicLong();
    final AtomicLongMetric startedJobsMetric;
    final AtomicLongMetric activeJobsMetric;
    final AtomicLongMetric waitingJobsMetric;
    final AtomicLongMetric canceledJobsMetric;
    final AtomicLongMetric rejectedJobsMetric;
    final AtomicLongMetric finishedJobsMetric;
    final AtomicLongMetric totalExecutionTimeMetric;
    final AtomicLongMetric totalWaitTimeMetric;
    private boolean stopping;
    private boolean cancelOnStop;
    @Deprecated
    private final long metricsUpdateFreq;
    private final GridSpinReadWriteLock rwLock = new GridSpinReadWriteLock();
    private final AtomicLong topicIdGen = new AtomicLong();
    private final GridJobHoldListener holdLsnr = new JobHoldListener();
    private final ThreadLocal<Boolean> handlingCollision = new ThreadLocal<Boolean>(){

        @Override
        protected Boolean initialValue() {
            return false;
        }
    };
    private final ThreadLocal<Boolean> internal = new ThreadLocal<Boolean>(){

        @Override
        protected Boolean initialValue() {
            return false;
        }
    };
    private final ThreadLocal<GridJobSessionImpl> currSess = new ThreadLocal();

    public GridJobProcessor(GridKernalContext ctx) {
        super(ctx);
        this.marsh = ctx.config().getMarshaller();
        this.jobAlwaysActivate = !ctx.collision().enabled();
        this.metricsUpdateFreq = ctx.config().getMetricsUpdateFrequency();
        this.activeJobs = this.initJobsMap(this.jobAlwaysActivate);
        this.passiveJobs = this.jobAlwaysActivate ? null : new JobsMap(1024, 0.75f, 256);
        this.evtLsnr = new JobEventListener();
        this.cancelLsnr = new JobCancelListener();
        this.jobExecLsnr = new JobExecutionListener();
        this.discoLsnr = new JobDiscoveryListener();
        this.cpuLoadMetric = (DoubleMetric)ctx.metric().registry("sys").findMetric("CpuLoad");
        MetricRegistry mreg = ctx.metric().registry(JOBS_METRICS);
        this.startedJobsMetric = mreg.longMetric(STARTED, "Number of started jobs.");
        this.activeJobsMetric = mreg.longMetric(ACTIVE, "Number of active jobs currently executing.");
        this.waitingJobsMetric = mreg.longMetric(WAITING, "Number of currently queued jobs waiting to be executed.");
        this.canceledJobsMetric = mreg.longMetric(CANCELED, "Number of cancelled jobs that are still running.");
        this.rejectedJobsMetric = mreg.longMetric(REJECTED, "Number of jobs rejected after more recent collision resolution operation.");
        this.finishedJobsMetric = mreg.longMetric(FINISHED, "Number of finished jobs.");
        this.totalExecutionTimeMetric = mreg.longMetric(EXECUTION_TIME, "Total execution time of jobs.");
        this.totalWaitTimeMetric = mreg.longMetric(WAITING_TIME, "Total time jobs spent on waiting queue.");
        ctx.systemView().registerInnerCollectionView(JOBS_VIEW, JOBS_VIEW_DESC, new ComputeJobViewWalker(), this.passiveJobs == null ? Arrays.asList(this.activeJobs, this.cancelledJobs) : Arrays.asList(this.activeJobs, this.passiveJobs, this.cancelledJobs), Map::entrySet, (map, e) -> {
            ComputeJobView.ComputeJobState state = map == this.activeJobs ? ComputeJobView.ComputeJobState.ACTIVE : (map == this.passiveJobs ? ComputeJobView.ComputeJobState.PASSIVE : ComputeJobView.ComputeJobState.CANCELED);
            return new ComputeJobView((IgniteUuid)e.getKey(), (GridJobWorker)e.getValue(), state);
        });
    }

    @Override
    public void start() throws IgniteCheckedException {
        if (this.metricsUpdateFreq < -1L) {
            throw new IgniteCheckedException("Invalid value for 'metricsUpdateFrequency' configuration property (should be greater than or equals to -1): " + this.metricsUpdateFreq);
        }
        if (this.metricsUpdateFreq == -1L) {
            U.warn(this.log, "Job metrics are disabled (use with caution).");
        }
        if (!this.jobAlwaysActivate) {
            this.ctx.collision().setCollisionExternalListener(new CollisionExternalListener());
        }
        GridIoManager ioMgr = this.ctx.io();
        ioMgr.addMessageListener(GridTopic.TOPIC_JOB_CANCEL, this.cancelLsnr);
        ioMgr.addMessageListener(GridTopic.TOPIC_JOB, this.jobExecLsnr);
        this.ctx.event().addLocalEventListener(this.discoLsnr, 12, 11, 13);
        if (this.log.isDebugEnabled()) {
            this.log.debug("Job processor started.");
        }
    }

    @Override
    public void stop(boolean cancel) {
        this.activeJobs = this.initJobsMap(this.jobAlwaysActivate);
        this.activeJobsMetric.reset();
        this.cancelledJobs.clear();
        this.cancelReqs = new GridBoundedConcurrentLinkedHashMap(FINISHED_JOBS_COUNT, FINISHED_JOBS_COUNT < 128 ? FINISHED_JOBS_COUNT : 128, 0.75f, 16);
        if (this.log.isDebugEnabled()) {
            this.log.debug("Job processor stopped.");
        }
    }

    @Override
    public void onKernalStop(boolean cancel) {
        GridIoManager commMgr = this.ctx.io();
        commMgr.removeMessageListener(GridTopic.TOPIC_JOB, this.jobExecLsnr);
        commMgr.removeMessageListener(GridTopic.TOPIC_JOB_CANCEL, this.cancelLsnr);
        if (!this.jobAlwaysActivate) {
            this.ctx.collision().unsetCollisionExternalListener();
        }
        this.rwLock.writeLock();
        try {
            this.stopping = true;
            this.cancelOnStop = cancel;
        }
        finally {
            this.rwLock.writeUnlock();
        }
        if (!this.jobAlwaysActivate) {
            for (GridJobWorker job : this.passiveJobs.values()) {
                if (!this.removeFromPassive(job)) continue;
                this.rejectJob(job, false);
            }
        }
        if (cancel) {
            for (GridJobWorker job : this.activeJobs.values()) {
                job.onStopping();
                this.cancelJob(job, false);
            }
        }
        U.join(this.activeJobs.values(), this.log);
        U.join(this.cancelledJobs.values(), this.log);
        this.ctx.event().removeLocalEventListener(this.discoLsnr, new int[0]);
        if (this.log.isDebugEnabled()) {
            this.log.debug("Finished executing job processor onKernalStop() callback.");
        }
    }

    @Nullable
    public GridJobWorker activeJob(IgniteUuid jobId) {
        assert (jobId != null);
        return (GridJobWorker)this.activeJobs.get(jobId);
    }

    public boolean internal() {
        return this.internal.get();
    }

    void internal(boolean internal) {
        this.internal.set(internal);
    }

    private void rejectJob(GridJobWorker job, boolean sndReply) {
        ComputeExecutionRejectedException e = new ComputeExecutionRejectedException("Job was cancelled before execution [taskSesId=" + job.getSession().getId() + ", jobId=" + job.getJobId() + ", job=" + job.getJob() + ']');
        job.finishJob(null, e, sndReply);
    }

    private void cancelJob(GridJobWorker job, boolean sysCancel) {
        boolean isCancelled = job.isCancelled();
        if (!job.isInternal() && !isCancelled) {
            this.canceledJobsCnt.increment();
            this.canceledJobsMetric.increment();
        }
        job.cancel(sysCancel);
    }

    private void release(GridDeployment dep) {
        dep.release();
        if (dep.obsolete()) {
            this.ctx.resource().onUndeployed(dep);
        }
    }

    private ConcurrentMap<IgniteUuid, GridJobWorker> initJobsMap(boolean collisionsDisabled) {
        return collisionsDisabled ? new ConcurrentHashMap() : new JobsMap(1024, 0.75f, 256);
    }

    public void setAttributes(GridJobSessionImpl ses, Map<?, ?> attrs) throws IgniteCheckedException {
        ClusterNode taskNode;
        assert (ses.isFullSupport());
        long timeout = ses.getEndTime() - U.currentTimeMillis();
        if (timeout <= 0L) {
            U.warn(this.log, "Task execution timed out (remote session attributes won't be set): " + ses);
            return;
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("Setting session attribute(s) from job: " + ses);
        }
        if ((taskNode = this.ctx.discovery().node(ses.getTaskNodeId())) == null) {
            throw new IgniteCheckedException("Node that originated task execution has left grid: " + ses.getTaskNodeId());
        }
        boolean loc = this.ctx.localNodeId().equals(taskNode.id()) && !this.ctx.config().isMarshalLocalJobs();
        GridTaskSessionRequest req = new GridTaskSessionRequest(ses.getId(), ses.getJobId(), loc ? null : U.marshal(this.marsh, attrs), attrs);
        Object topic = GridTopic.TOPIC_TASK.topic(ses.getJobId(), this.ctx.discovery().localNode().id());
        this.ctx.io().sendOrderedMessage(taskNode, topic, req, (byte)2, timeout, false);
    }

    /*
     * Loose catch block
     */
    public Collection<ComputeJobSibling> requestJobSiblings(ComputeTaskSession ses) throws IgniteCheckedException {
        assert (ses != null);
        final UUID taskNodeId = ses.getTaskNodeId();
        ClusterNode taskNode = this.ctx.discovery().node(taskNodeId);
        if (taskNode == null) {
            throw new IgniteCheckedException("Node that originated task execution has left grid: " + taskNodeId);
        }
        final IgniteBiTuple t = new IgniteBiTuple();
        final ReentrantLock lock = new ReentrantLock();
        final Condition cond = lock.newCondition();
        GridMessageListener msgLsnr = new GridMessageListener(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void onMessage(UUID nodeId, Object msg, byte plc) {
                String err = null;
                GridJobSiblingsResponse res = null;
                if (!(msg instanceof GridJobSiblingsResponse)) {
                    err = "Received unexpected message: " + msg;
                } else if (!nodeId.equals(taskNodeId)) {
                    err = "Received job siblings response from unexpected node [taskNodeId=" + taskNodeId + ", nodeId=" + nodeId + ']';
                } else {
                    res = (GridJobSiblingsResponse)msg;
                    if (res.jobSiblings() == null) {
                        try {
                            res.unmarshalSiblings(GridJobProcessor.this.marsh);
                        }
                        catch (IgniteCheckedException e) {
                            U.error(GridJobProcessor.this.log, "Failed to unmarshal job siblings.", e);
                            err = e.getMessage();
                        }
                    }
                }
                lock.lock();
                try {
                    if (t.isEmpty()) {
                        t.set(err, res);
                        cond.signalAll();
                    }
                }
                finally {
                    lock.unlock();
                }
            }
        };
        GridLocalEventListener discoLsnr = new GridLocalEventListener(){

            @Override
            public void onEvent(Event evt) {
                assert (evt instanceof DiscoveryEvent && (evt.type() == 12 || evt.type() == 11)) : "Unexpected event: " + evt;
                DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
                if (taskNodeId.equals(discoEvt.eventNode().id())) {
                    lock.lock();
                    try {
                        if (t.isEmpty()) {
                            t.set("Node that originated task execution has left grid: " + taskNodeId, null);
                            cond.signalAll();
                        }
                    }
                    finally {
                        lock.unlock();
                    }
                }
            }
        };
        boolean loc = this.ctx.localNodeId().equals(taskNodeId);
        Object topic = GridTopic.TOPIC_JOB_SIBLINGS.topic(ses.getId(), this.topicIdGen.getAndIncrement());
        try {
            this.ctx.io().addMessageListener(topic, msgLsnr);
            this.ctx.io().sendToGridTopic(taskNode, GridTopic.TOPIC_JOB_SIBLINGS, (Message)new GridJobSiblingsRequest(ses.getId(), loc ? topic : null, loc ? null : U.marshal(this.marsh, topic)), (byte)2);
            this.ctx.event().addLocalEventListener(discoLsnr, 12, 11);
            taskNode = this.ctx.discovery().node(taskNodeId);
            if (taskNode == null) {
                throw new IgniteCheckedException("Node that originated task execution has left grid: " + taskNodeId);
            }
            lock.lock();
            try {
                long netTimeout = this.ctx.config().getNetworkTimeout();
                if (t.isEmpty()) {
                    cond.await(netTimeout, TimeUnit.MILLISECONDS);
                }
                if (t.isEmpty()) {
                    throw new IgniteCheckedException("Timed out waiting for job siblings (consider increasing'networkTimeout' configuration property) [ses=" + ses + ", netTimeout=" + netTimeout + ']');
                }
                if (t.get1() != null) {
                    throw new IgniteCheckedException((String)t.get1());
                }
                Collection<ComputeJobSibling> collection = ((GridJobSiblingsResponse)t.get2()).jobSiblings();
                return collection;
            }
            catch (InterruptedException e) {
                throw new IgniteCheckedException("Interrupted while waiting for job siblings response: " + ses, e);
            }
            finally {
                lock.unlock();
            }
            {
                catch (Throwable throwable) {
                    throw throwable;
                }
            }
        }
        finally {
            this.ctx.io().removeMessageListener(topic, msgLsnr);
            this.ctx.event().removeLocalEventListener(discoLsnr, new int[0]);
        }
    }

    public void masterLeaveLocal(IgniteUuid sesId) {
        assert (sesId != null);
        for (GridJobWorker job : this.activeJobs.values()) {
            if (!job.getSession().getId().equals(sesId)) continue;
            job.onMasterNodeLeft();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void cancelJob(final @Nullable IgniteUuid sesId, final @Nullable IgniteUuid jobId, boolean sys) {
        assert (sesId != null || jobId != null);
        this.rwLock.readLock();
        try {
            if (this.stopping && this.cancelOnStop) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Received job cancellation request while stopping grid with cancellation (will ignore) [sesId=" + sesId + ", jobId=" + jobId + ", sys=" + sys + ']');
                }
                return;
            }
            this.cancelReqs.putIfAbsent(jobId != null ? jobId : sesId, sys);
            P1<GridJobWorker> idsMatch = new P1<GridJobWorker>(){

                @Override
                public boolean apply(GridJobWorker e) {
                    return sesId != null ? (jobId != null ? e.getSession().getId().equals(sesId) && e.getJobId().equals(jobId) : e.getSession().getId().equals(sesId)) : e.getJobId().equals(jobId);
                }
            };
            if (jobId == null) {
                if (!this.jobAlwaysActivate) {
                    for (GridJobWorker job : this.passiveJobs.values()) {
                        if (!idsMatch.apply(job)) continue;
                        this.cancelPassiveJob(job);
                    }
                }
                for (GridJobWorker job : this.activeJobs.values()) {
                    if (!idsMatch.apply(job)) continue;
                    this.cancelActiveJob(job, sys);
                }
            } else {
                GridJobWorker passiveJob;
                if (!this.jobAlwaysActivate && (passiveJob = (GridJobWorker)this.passiveJobs.get(jobId)) != null && idsMatch.apply(passiveJob) && this.cancelPassiveJob(passiveJob)) {
                    return;
                }
                GridJobWorker activeJob = (GridJobWorker)this.activeJobs.get(jobId);
                if (activeJob != null && idsMatch.apply(activeJob)) {
                    this.cancelActiveJob(activeJob, sys);
                }
            }
        }
        finally {
            this.rwLock.readUnlock();
        }
    }

    private boolean cancelPassiveJob(GridJobWorker job) {
        assert (!this.jobAlwaysActivate);
        if (this.removeFromPassive(job)) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Job has been cancelled before activation: " + job);
            }
            this.canceledJobsCnt.increment();
            this.canceledJobsMetric.increment();
            return true;
        }
        return false;
    }

    private void cancelActiveJob(GridJobWorker job, boolean sys) {
        if (this.removeFromActive(job)) {
            this.cancelledJobs.put(job.getJobId(), job);
            if (this.finishedJobs.contains(job.getJobId())) {
                this.cancelledJobs.remove(job.getJobId(), job);
            } else {
                this.cancelJob(job, sys);
            }
        }
    }

    private boolean removeFromActive(GridJobWorker job) {
        boolean res = this.activeJobs.remove(job.getJobId(), job);
        if (res) {
            this.activeJobsMetric.decrement();
        }
        return res;
    }

    private boolean removeFromPassive(GridJobWorker job) {
        boolean res = this.passiveJobs.remove(job.getJobId(), job);
        if (res) {
            this.waitingJobsMetric.decrement();
            if (!this.jobAlwaysActivate) {
                this.totalWaitTimeMetric.add(job.getQueuedTime());
            }
        }
        return res;
    }

    private void handleCollisions() {
        assert (!this.jobAlwaysActivate);
        if (this.handlingCollision.get().booleanValue()) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Skipping recursive collision handling.");
            }
            return;
        }
        this.handlingCollision.set(Boolean.TRUE);
        try {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Before handling collisions.");
            }
            this.ctx.collision().onCollision((Collection<org.apache.ignite.spi.collision.CollisionJobContext>)new AbstractCollection<org.apache.ignite.spi.collision.CollisionJobContext>(){

                @Override
                @NotNull
                public Iterator<org.apache.ignite.spi.collision.CollisionJobContext> iterator() {
                    final Iterator iter = GridJobProcessor.this.passiveJobs.values().iterator();
                    return new Iterator<org.apache.ignite.spi.collision.CollisionJobContext>(){

                        @Override
                        public boolean hasNext() {
                            return iter.hasNext();
                        }

                        @Override
                        public org.apache.ignite.spi.collision.CollisionJobContext next() {
                            return new CollisionJobContext((GridJobWorker)iter.next(), true);
                        }

                        @Override
                        public void remove() {
                            throw new UnsupportedOperationException();
                        }
                    };
                }

                @Override
                public int size() {
                    return GridJobProcessor.this.passiveJobs.size();
                }
            }, (Collection<org.apache.ignite.spi.collision.CollisionJobContext>)new AbstractCollection<org.apache.ignite.spi.collision.CollisionJobContext>(){

                @Override
                @NotNull
                public Iterator<org.apache.ignite.spi.collision.CollisionJobContext> iterator() {
                    final Iterator iter = GridJobProcessor.this.activeJobs.values().iterator();
                    return new Iterator<org.apache.ignite.spi.collision.CollisionJobContext>(){
                        private GridJobWorker w;
                        {
                            this.advance();
                        }

                        void advance() {
                            assert (this.w == null);
                            while (iter.hasNext()) {
                                GridJobWorker w0 = (GridJobWorker)iter.next();
                                assert (!w0.isInternal());
                                if (w0.held()) continue;
                                this.w = w0;
                                break;
                            }
                        }

                        @Override
                        public boolean hasNext() {
                            return this.w != null;
                        }

                        @Override
                        public org.apache.ignite.spi.collision.CollisionJobContext next() {
                            if (this.w == null) {
                                throw new NoSuchElementException();
                            }
                            CollisionJobContext ret = new CollisionJobContext(this.w, false);
                            this.w = null;
                            this.advance();
                            return ret;
                        }

                        @Override
                        public void remove() {
                            throw new UnsupportedOperationException();
                        }
                    };
                }

                @Override
                public int size() {
                    int ret = GridJobProcessor.this.activeJobs.size() - GridJobProcessor.this.heldJobs.size();
                    return ret > 0 ? ret : 0;
                }
            }, (Collection<org.apache.ignite.spi.collision.CollisionJobContext>)new AbstractCollection<org.apache.ignite.spi.collision.CollisionJobContext>(){

                @Override
                @NotNull
                public Iterator<org.apache.ignite.spi.collision.CollisionJobContext> iterator() {
                    final Iterator iter = GridJobProcessor.this.activeJobs.values().iterator();
                    return new Iterator<org.apache.ignite.spi.collision.CollisionJobContext>(){
                        private GridJobWorker w;
                        {
                            this.advance();
                        }

                        void advance() {
                            assert (this.w == null);
                            while (iter.hasNext()) {
                                GridJobWorker w0 = (GridJobWorker)iter.next();
                                assert (!w0.isInternal());
                                if (!w0.held()) continue;
                                this.w = w0;
                                break;
                            }
                        }

                        @Override
                        public boolean hasNext() {
                            return this.w != null;
                        }

                        @Override
                        public org.apache.ignite.spi.collision.CollisionJobContext next() {
                            if (this.w == null) {
                                throw new NoSuchElementException();
                            }
                            CollisionJobContext ret = new CollisionJobContext(this.w, false);
                            this.w = null;
                            this.advance();
                            return ret;
                        }

                        @Override
                        public void remove() {
                            throw new UnsupportedOperationException();
                        }
                    };
                }

                @Override
                public int size() {
                    return GridJobProcessor.this.heldJobs.size();
                }
            });
            this.updateJobMetrics();
        }
        finally {
            this.handlingCollision.set(Boolean.FALSE);
        }
    }

    @Deprecated
    private void updateJobMetrics() {
        long lastUpdate;
        assert (this.metricsUpdateFreq > 0L);
        long now = U.currentTimeMillis();
        if (now - (lastUpdate = this.metricsLastUpdateTstamp.get()) > this.metricsUpdateFreq && this.metricsLastUpdateTstamp.compareAndSet(lastUpdate, now)) {
            this.updateJobMetrics0();
        }
    }

    @Deprecated
    private void updateJobMetrics0() {
        assert (this.metricsUpdateFreq > 0L);
        GridJobMetricsSnapshot m = new GridJobMetricsSnapshot();
        m.setRejectJobs((int)this.rejectedJobsCnt.sumThenReset());
        m.setStartedJobs((int)this.startedJobsCnt.sumThenReset());
        int cnt = 0;
        for (GridJobWorker jobWorker : this.activeJobs.values()) {
            long execTime;
            assert (!jobWorker.isInternal());
            ++cnt;
            if (jobWorker.held() || (execTime = jobWorker.getExecuteTime()) <= m.getMaximumExecutionTime()) continue;
            m.setMaximumExecutionTime(execTime);
        }
        m.setActiveJobs(cnt);
        cnt = 0;
        if (!this.jobAlwaysActivate) {
            for (GridJobWorker jobWorker : this.passiveJobs.values()) {
                assert (!jobWorker.isInternal());
                ++cnt;
                long queuedTime = jobWorker.getQueuedTime();
                if (queuedTime > m.getMaximumWaitTime()) {
                    m.setMaximumWaitTime(queuedTime);
                }
                m.setWaitTime(m.getWaitTime() + jobWorker.getQueuedTime());
            }
            m.setPassiveJobs(cnt);
        }
        m.setFinishedJobs((int)this.finishedJobsCnt.sumThenReset());
        m.setExecutionTime(this.finishedJobsTime.sumThenReset());
        m.setCancelJobs((int)this.canceledJobsCnt.sumThenReset());
        long maxFinishedTime = this.maxFinishedJobsTime.getAndSet(0L);
        if (maxFinishedTime > m.getMaximumExecutionTime()) {
            m.setMaximumExecutionTime(maxFinishedTime);
        }
        m.setCpuLoad(this.cpuLoadMetric.value());
        this.ctx.jobMetric().addSnapshot(m);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void processJobExecuteRequest(ClusterNode node, GridJobExecuteRequest req) {
        GridWorker job;
        block45: {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Received job request message [req=" + req + ", nodeId=" + node.id() + ']');
            }
            PartitionsReservation partsReservation = null;
            if (req.getCacheIds() != null) {
                assert (req.getPartition() >= 0) : req;
                assert (!F.isEmpty(req.getCacheIds())) : req;
                partsReservation = new PartitionsReservation(req.getCacheIds(), req.getPartition(), req.getTopVer());
            }
            job = null;
            if (!this.rwLock.tryReadLock()) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Received job execution request while stopping this node (will ignore): " + req);
                }
                return;
            }
            try {
                GridDeployment tmpDep;
                long endTime = req.getCreateTime() + req.getTimeout();
                if (endTime < 0L) {
                    endTime = Long.MAX_VALUE;
                }
                GridDeployment gridDeployment = tmpDep = req.isForceLocalDeployment() ? this.ctx.deploy().getLocalDeployment(req.getTaskClassName()) : this.ctx.deploy().getGlobalDeployment(req.getDeploymentMode(), req.getTaskName(), req.getTaskClassName(), req.getUserVersion(), node.id(), req.getClassLoaderId(), req.getLoaderParticipants(), null);
                if (tmpDep == null) {
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Checking local tasks...");
                    }
                    for (Map.Entry<String, GridDeployment> d : this.ctx.task().getUsedDeploymentMap().entrySet()) {
                        if (!d.getValue().classLoaderId().equals(req.getClassLoaderId())) continue;
                        assert (d.getValue().local());
                        tmpDep = d.getValue();
                        break;
                    }
                }
                GridDeployment dep = tmpDep;
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Deployment: " + dep);
                }
                boolean releaseDep = true;
                try {
                    if (dep != null && dep.acquire()) {
                        GridJobContextImpl jobCtx;
                        GridJobSessionImpl jobSes;
                        try {
                            IgnitePredicate topologyPred;
                            ArrayList<ComputeJobSibling> siblings = null;
                            if (!req.isDynamicSiblings()) {
                                Collection siblings0 = req.getSiblings();
                                if (siblings0 == null) {
                                    assert (req.getSiblingsBytes() != null);
                                    siblings0 = (Collection)U.unmarshal(this.marsh, req.getSiblingsBytes(), U.resolveClassLoader(this.ctx.config()));
                                }
                                siblings = new ArrayList<ComputeJobSibling>(siblings0);
                            }
                            Map sesAttrs = null;
                            if (req.isSessionFullSupport() && (sesAttrs = req.getSessionAttributes()) == null) {
                                sesAttrs = (Map)U.unmarshal(this.marsh, req.getSessionAttributesBytes(), U.resolveClassLoader(dep.classLoader(), this.ctx.config()));
                            }
                            if ((topologyPred = req.getTopologyPredicate()) == null && req.getTopologyPredicateBytes() != null) {
                                topologyPred = (IgnitePredicate)U.unmarshal(this.marsh, req.getTopologyPredicateBytes(), U.resolveClassLoader(dep.classLoader(), this.ctx.config()));
                            }
                            GridTaskSessionImpl taskSes = this.ctx.session().createTaskSession(req.getSessionId(), node.id(), req.getTaskName(), dep, req.getTaskClassName(), req.topology(), topologyPred, req.getStartTaskTime(), endTime, siblings, sesAttrs, req.isSessionFullSupport(), req.isInternal(), req.executorName());
                            taskSes.setCheckpointSpi(req.getCheckpointSpi());
                            taskSes.setClassLoader(dep.classLoader());
                            jobSes = new GridJobSessionImpl(this.ctx, taskSes, req.getJobId());
                            Map jobAttrs = req.getJobAttributes();
                            if (jobAttrs == null) {
                                jobAttrs = (Map)U.unmarshal(this.marsh, req.getJobAttributesBytes(), U.resolveClassLoader(dep.classLoader(), this.ctx.config()));
                            }
                            jobCtx = new GridJobContextImpl(this.ctx, req.getJobId(), jobAttrs);
                        }
                        catch (IgniteCheckedException e) {
                            IgniteException ex = new IgniteException("Failed to deserialize task attributes [taskName=" + req.getTaskName() + ", taskClsName=" + req.getTaskClassName() + ", codeVer=" + req.getUserVersion() + ", taskClsLdr=" + dep.classLoader() + ']', e);
                            U.error(this.log, ex.getMessage(), e);
                            this.handleException(node, req, ex, endTime);
                            if (dep != null && releaseDep) {
                                this.release(dep);
                            }
                            this.rwLock.readUnlock();
                            return;
                        }
                        job = new GridJobWorker(this.ctx, dep, req.getCreateTime(), jobSes, jobCtx, req.getJobBytes(), req.getJob(), node, req.isInternal(), this.evtLsnr, this.holdLsnr, partsReservation, req.getTopVer(), req.executorName());
                        jobCtx.job((GridJobWorker)job);
                        releaseDep = false;
                        if (((GridJobWorker)job).initialize(dep, dep.deployedClass(req.getTaskClassName(), new String[0]).get1())) {
                            if (((GridJobWorker)job).isInternal()) {
                                job.run();
                                job = null;
                            } else if (this.jobAlwaysActivate) {
                                if (this.onBeforeActivateJob((GridJobWorker)job)) {
                                    if (this.ctx.localNodeId().equals(node.id())) {
                                        this.executeAsync((GridJobWorker)job);
                                        job = null;
                                    } else {
                                        if (this.metricsUpdateFreq > -1L) {
                                            this.startedJobsCnt.increment();
                                        }
                                        this.startedJobsMetric.increment();
                                    }
                                } else {
                                    job = null;
                                }
                            } else {
                                GridJobWorker old = this.passiveJobs.putIfAbsent(((GridJobWorker)job).getJobId(), (GridJobWorker)job);
                                if (old == null) {
                                    this.waitingJobsMetric.increment();
                                    this.handleCollisions();
                                } else {
                                    U.error(this.log, "Received computation request with duplicate job ID (could be network malfunction, source node may hang if task timeout was not set) [srcNode=" + node.id() + ", jobId=" + req.getJobId() + ", sesId=" + req.getSessionId() + ", locNodeId=" + this.ctx.localNodeId() + ']');
                                }
                                job = null;
                            }
                        } else {
                            job = null;
                        }
                        break block45;
                    }
                    IgniteDeploymentException ex = new IgniteDeploymentException("Task was not deployed or was redeployed since task execution [taskName=" + req.getTaskName() + ", taskClsName=" + req.getTaskClassName() + ", codeVer=" + req.getUserVersion() + ", clsLdrId=" + req.getClassLoaderId() + ", seqNum=" + req.getClassLoaderId().localId() + ", depMode=" + (Object)((Object)req.getDeploymentMode()) + ", dep=" + dep + ']');
                    U.error(this.log, ex.getMessage(), ex);
                    this.handleException(node, req, ex, endTime);
                }
                finally {
                    if (dep != null && releaseDep) {
                        this.release(dep);
                    }
                }
            }
            finally {
                this.rwLock.readUnlock();
            }
        }
        if (job != null) {
            job.run();
        }
    }

    public void currentTaskSession(GridJobSessionImpl ses) {
        this.currSess.set(ses);
    }

    public int currentTaskNameHash() {
        String name = this.currentTaskName();
        return name == null ? 0 : name.hashCode();
    }

    public String currentTaskName() {
        if (!this.ctx.security().enabled()) {
            return null;
        }
        ComputeTaskSession ses = this.currSess.get();
        if (ses == null) {
            return null;
        }
        return ses.getTaskName();
    }

    public GridDeployment currentDeployment() {
        GridJobSessionImpl session = this.currSess.get();
        if (session == null || session.deployment() == null) {
            return null;
        }
        return session.deployment();
    }

    private boolean onBeforeActivateJob(GridJobWorker jobWorker) {
        assert (jobWorker != null);
        this.activeJobs.put(jobWorker.getJobId(), jobWorker);
        this.activeJobsMetric.increment();
        Boolean sysCancelled = (Boolean)this.cancelReqs.get(jobWorker.getSession().getId());
        if (sysCancelled == null) {
            sysCancelled = (Boolean)this.cancelReqs.get(jobWorker.getJobId());
        }
        if (sysCancelled != null) {
            this.removeFromActive(jobWorker);
            ComputeExecutionRejectedException e2 = new ComputeExecutionRejectedException("Job was cancelled before execution [jobSes=" + jobWorker.getSession() + ", job=" + jobWorker.getJob() + ']');
            jobWorker.finishJob(null, e2, sysCancelled == false);
            return false;
        }
        if (this.ctx.discovery().node(jobWorker.getTaskNode().id()) == null && this.removeFromActive(jobWorker)) {
            this.cancelledJobs.put(jobWorker.getJobId(), jobWorker);
            if (!jobWorker.onMasterNodeLeft()) {
                U.warn(this.log, "Job is being cancelled because master task node left grid (as there is no one waiting for results, job will not be failed over): " + jobWorker.getJobId());
                this.cancelJob(jobWorker, true);
            }
        }
        return true;
    }

    private boolean executeAsync(GridJobWorker jobWorker) {
        try {
            if (jobWorker.executorName() != null) {
                Executor customExec = this.ctx.pools().customExecutor(jobWorker.executorName());
                if (customExec != null) {
                    customExec.execute(jobWorker);
                } else {
                    LT.warn(this.log, "Custom executor doesn't exist (local job will be processed in default thread pool): " + jobWorker.executorName());
                    this.ctx.pools().getExecutorService().execute(jobWorker);
                }
            } else {
                this.ctx.pools().getExecutorService().execute(jobWorker);
            }
            if (this.metricsUpdateFreq > -1L) {
                this.startedJobsCnt.increment();
            }
            this.startedJobsMetric.increment();
            return true;
        }
        catch (RejectedExecutionException e) {
            this.removeFromActive(jobWorker);
            ComputeExecutionRejectedException e2 = new ComputeExecutionRejectedException("Job has been rejected [jobSes=" + jobWorker.getSession() + ", job=" + jobWorker.getJob() + ']', e);
            if (this.metricsUpdateFreq > -1L) {
                this.rejectedJobsCnt.increment();
            }
            this.rejectedJobsMetric.increment();
            jobWorker.finishJob(null, e2, true);
            return false;
        }
    }

    private void handleException(ClusterNode node, GridJobExecuteRequest req, IgniteException ex, long endTime) {
        block12: {
            UUID locNodeId = this.ctx.localNodeId();
            ClusterNode sndNode = this.ctx.discovery().node(node.id());
            if (sndNode == null) {
                U.warn(this.log, "Failed to reply to sender node because it left grid [nodeId=" + node.id() + ", jobId=" + req.getJobId() + ']');
                if (this.ctx.event().isRecordable(48)) {
                    JobEvent evt = new JobEvent();
                    evt.jobId(req.getJobId());
                    evt.message("Job reply failed (original task node left grid): " + req.getJobId());
                    evt.node(this.ctx.discovery().localNode());
                    evt.taskName(req.getTaskName());
                    evt.taskClassName(req.getTaskClassName());
                    evt.taskSessionId(req.getSessionId());
                    evt.type(48);
                    evt.taskNode(node);
                    evt.taskSubjectId(SecurityUtils.securitySubjectId(this.ctx));
                    this.ctx.event().record(evt);
                }
                return;
            }
            try {
                boolean loc = this.ctx.localNodeId().equals(sndNode.id()) && !this.ctx.config().isMarshalLocalJobs();
                GridJobExecuteResponse jobRes = new GridJobExecuteResponse(locNodeId, req.getSessionId(), req.getJobId(), loc ? null : U.marshal(this.marsh, (Object)ex), ex, loc ? null : U.marshal(this.marsh, null), null, loc ? null : U.marshal(this.marsh, null), null, false, null);
                if (req.isSessionFullSupport()) {
                    Object topic = GridTopic.TOPIC_TASK.topic(req.getJobId(), locNodeId);
                    long timeout = endTime - U.currentTimeMillis();
                    if (timeout <= 0L) {
                        timeout = 1L;
                    }
                    this.ctx.io().sendOrderedMessage(sndNode, topic, jobRes, req.isInternal() ? (byte)3 : 2, timeout, false);
                } else if (this.ctx.localNodeId().equals(sndNode.id())) {
                    this.ctx.task().processJobExecuteResponse(this.ctx.localNodeId(), jobRes);
                } else {
                    this.ctx.io().sendToGridTopic(sndNode, GridTopic.TOPIC_TASK, (Message)jobRes, req.isInternal() ? (byte)3 : 2);
                }
            }
            catch (IgniteCheckedException e) {
                if (e instanceof ClusterTopologyCheckedException || this.isDeadNode(node.id())) {
                    U.error(this.log, "Failed to reply to sender node because it left grid [nodeId=" + node.id() + ", jobId=" + req.getJobId() + ']');
                } else {
                    assert (sndNode != null);
                    U.error(this.log, "Error sending reply for job [nodeId=" + sndNode.id() + ", jobId=" + req.getJobId() + ']', e);
                }
                if (!this.ctx.event().isRecordable(48)) break block12;
                JobEvent evt = new JobEvent();
                evt.jobId(req.getJobId());
                evt.message("Failed to send reply for job: " + req.getJobId());
                evt.node(this.ctx.discovery().localNode());
                evt.taskName(req.getTaskName());
                evt.taskClassName(req.getTaskClassName());
                evt.taskSessionId(req.getSessionId());
                evt.type(48);
                evt.taskNode(node);
                evt.taskSubjectId(SecurityUtils.securitySubjectId(this.ctx));
                this.ctx.event().record(evt);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processTaskSessionRequest(UUID nodeId, GridTaskSessionRequest req) {
        if (!this.rwLock.tryReadLock()) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Received job session request while stopping grid (will ignore): " + req);
            }
            return;
        }
        try {
            Map attrs;
            GridTaskSessionImpl ses = this.ctx.session().getSession(req.getSessionId());
            if (ses == null) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Received job session request for non-existing session: " + req);
                }
                return;
            }
            boolean loc = this.ctx.localNodeId().equals(nodeId) && !this.ctx.config().isMarshalLocalJobs();
            Map map = attrs = loc ? req.getAttributes() : (Map)U.unmarshal(this.marsh, req.getAttributesBytes(), U.resolveClassLoader(ses.getClassLoader(), this.ctx.config()));
            if (this.ctx.event().isRecordable(24)) {
                TaskEvent evt = new TaskEvent(this.ctx.discovery().localNode(), "Changed attributes: " + attrs, 24, ses.getId(), ses.getTaskName(), ses.getTaskClassName(), false, null);
                this.ctx.event().record(evt);
            }
            GridTaskSessionImpl gridTaskSessionImpl = ses;
            synchronized (gridTaskSessionImpl) {
                ses.setInternal(attrs);
            }
        }
        catch (IgniteCheckedException e) {
            U.error(this.log, "Failed to deserialize session attributes.", e);
        }
        finally {
            this.rwLock.readUnlock();
        }
    }

    private boolean isDeadNode(UUID uid) {
        return this.ctx.discovery().node(uid) == null || !this.ctx.discovery().pingNodeNoError(uid);
    }

    @Override
    public void printMemoryStats() {
        X.println(">>>", new Object[0]);
        X.println(">>> Job processor memory stats [igniteInstanceName=" + this.ctx.igniteInstanceName() + ']', new Object[0]);
        X.println(">>>   activeJobsSize: " + this.activeJobs.size(), new Object[0]);
        X.println(">>>   passiveJobsSize: " + (this.jobAlwaysActivate ? "n/a" : Integer.valueOf(this.passiveJobs.size())), new Object[0]);
        X.println(">>>   cancelledJobsSize: " + this.cancelledJobs.size(), new Object[0]);
        X.println(">>>   cancelReqsSize: " + this.cancelReqs.sizex(), new Object[0]);
        X.println(">>>   finishedJobsSize: " + this.finishedJobs.sizex(), new Object[0]);
    }

    private class JobsMap
    extends ConcurrentLinkedHashMap<IgniteUuid, GridJobWorker> {
        private JobsMap(int initCap, float loadFactor, int concurLvl) {
            super(initCap, loadFactor, concurLvl);
        }

        @Override
        public GridJobWorker put(IgniteUuid key, GridJobWorker val) {
            assert (!val.isInternal());
            GridJobWorker old = super.put(key, val);
            if (old != null) {
                U.warn(GridJobProcessor.this.log, "Jobs map already contains mapping for key [key=" + key + ", val=" + val + ", old=" + old + ']');
            }
            return old;
        }

        @Override
        public GridJobWorker putIfAbsent(IgniteUuid key, GridJobWorker val) {
            assert (!val.isInternal());
            GridJobWorker old = super.putIfAbsent(key, val);
            if (old != null) {
                U.warn(GridJobProcessor.this.log, "Jobs map already contains mapping for key [key=" + key + ", val=" + val + ", old=" + old + ']');
            }
            return old;
        }

        @Override
        public int size() {
            return this.sizex();
        }
    }

    private class JobDiscoveryListener
    implements GridLocalEventListener {
        private int metricsUpdateCntr;

        private JobDiscoveryListener() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onEvent(Event evt) {
            assert (evt instanceof DiscoveryEvent);
            boolean handleCollisions = false;
            UUID nodeId = ((DiscoveryEvent)evt).eventNode().id();
            switch (evt.type()) {
                case 11: 
                case 12: {
                    if (!GridJobProcessor.this.jobAlwaysActivate) {
                        for (GridJobWorker job : GridJobProcessor.this.passiveJobs.values()) {
                            if (!job.getTaskNode().id().equals(nodeId) || !GridJobProcessor.this.removeFromPassive(job)) continue;
                            U.warn(GridJobProcessor.this.log, "Task node left grid (job will not be activated) [nodeId=" + nodeId + ", jobSes=" + job.getSession() + ", job=" + job + ']');
                        }
                    }
                    for (GridJobWorker job : GridJobProcessor.this.activeJobs.values()) {
                        if (!job.getTaskNode().id().equals(nodeId) || job.isFinishing() || !GridJobProcessor.this.removeFromActive(job)) continue;
                        GridJobProcessor.this.cancelledJobs.put(job.getJobId(), job);
                        if (GridJobProcessor.this.finishedJobs.contains(job.getJobId())) {
                            GridJobProcessor.this.cancelledJobs.remove(job.getJobId(), job);
                            continue;
                        }
                        if (job.onMasterNodeLeft()) continue;
                        U.warn(GridJobProcessor.this.log, "Job is being cancelled because master task node left grid (as there is no one waiting for results, job will not be failed over): " + job.getJobId());
                        GridJobProcessor.this.cancelJob(job, true);
                    }
                    handleCollisions = true;
                    break;
                }
                case 13: {
                    if (GridJobProcessor.this.ctx.discovery().allNodes().size() > ++this.metricsUpdateCntr) break;
                    this.metricsUpdateCntr = 0;
                    handleCollisions = true;
                    break;
                }
                default: {
                    assert (false);
                    break;
                }
            }
            if (handleCollisions) {
                if (!GridJobProcessor.this.rwLock.tryReadLock()) {
                    if (GridJobProcessor.this.log.isDebugEnabled()) {
                        GridJobProcessor.this.log.debug("Skipped collision handling on discovery event (node is stopping): " + evt);
                    }
                    return;
                }
                try {
                    if (!GridJobProcessor.this.jobAlwaysActivate) {
                        GridJobProcessor.this.handleCollisions();
                    } else if (GridJobProcessor.this.metricsUpdateFreq > -1L) {
                        GridJobProcessor.this.updateJobMetrics();
                    }
                }
                finally {
                    GridJobProcessor.this.rwLock.readUnlock();
                }
            }
        }
    }

    private class JobExecutionListener
    implements GridMessageListener {
        private JobExecutionListener() {
        }

        @Override
        public void onMessage(UUID nodeId, Object msg, byte plc) {
            assert (nodeId != null);
            assert (msg != null);
            ClusterNode node = GridJobProcessor.this.ctx.discovery().node(nodeId);
            if (!GridJobProcessor.this.ctx.discovery().alive(nodeId)) {
                U.warn(GridJobProcessor.this.log, "Received job request message from unknown node (ignoring) [msg=" + msg + ", nodeId=" + nodeId + ']');
                return;
            }
            assert (node != null);
            GridJobProcessor.this.processJobExecuteRequest(node, (GridJobExecuteRequest)msg);
        }
    }

    private class JobCancelListener
    implements GridMessageListener {
        private JobCancelListener() {
        }

        @Override
        public void onMessage(UUID nodeId, Object msg, byte plc) {
            assert (nodeId != null);
            assert (msg != null);
            GridJobCancelRequest cancelMsg = (GridJobCancelRequest)msg;
            if (GridJobProcessor.this.log.isDebugEnabled()) {
                GridJobProcessor.this.log.debug("Received job cancel request [cancelMsg=" + cancelMsg + ", nodeId=" + nodeId + ']');
            }
            GridJobProcessor.this.cancelJob(cancelMsg.sessionId(), cancelMsg.jobId(), cancelMsg.system());
        }
    }

    private class JobSessionListener
    implements GridMessageListener {
        private JobSessionListener() {
        }

        @Override
        public void onMessage(UUID nodeId, Object msg, byte plc) {
            assert (nodeId != null);
            assert (msg != null);
            if (GridJobProcessor.this.log.isDebugEnabled()) {
                GridJobProcessor.this.log.debug("Received session attribute request message [msg=" + msg + ", nodeId=" + nodeId + ']');
            }
            GridJobProcessor.this.processTaskSessionRequest(nodeId, (GridTaskSessionRequest)msg);
        }
    }

    private class JobHoldListener
    implements GridJobHoldListener {
        private JobHoldListener() {
        }

        @Override
        public boolean onHeld(GridJobWorker worker) {
            if (GridJobProcessor.this.log.isDebugEnabled()) {
                GridJobProcessor.this.log.debug("Received onHeld() callback [worker=" + worker + ']');
            }
            if (worker.isInternal()) {
                return true;
            }
            boolean res = false;
            if (GridJobProcessor.this.activeJobs.containsKey(worker.getJobId())) {
                res = GridJobProcessor.this.heldJobs.add(worker.getJobId());
                if (!GridJobProcessor.this.activeJobs.containsKey(worker.getJobId())) {
                    GridJobProcessor.this.heldJobs.remove(worker.getJobId());
                    res = false;
                }
            }
            return res;
        }

        @Override
        public boolean onUnheld(GridJobWorker worker) {
            if (GridJobProcessor.this.log.isDebugEnabled()) {
                GridJobProcessor.this.log.debug("Received onUnheld() callback [worker=" + worker + ", active=" + GridJobProcessor.this.activeJobs + ", held=" + GridJobProcessor.this.heldJobs + ']');
            }
            if (worker.isInternal()) {
                return true;
            }
            return GridJobProcessor.this.heldJobs.remove(worker.getJobId());
        }
    }

    private class JobEventListener
    implements GridJobEventListener {
        private final GridMessageListener sesLsnr;

        private JobEventListener() {
            this.sesLsnr = new JobSessionListener();
        }

        @Override
        public void onJobStarted(GridJobWorker worker) {
            if (GridJobProcessor.this.log.isDebugEnabled()) {
                GridJobProcessor.this.log.debug("Received onJobStarted() callback: " + worker);
            }
            if (GridJobProcessor.this.metricsUpdateFreq > -1L) {
                GridJobProcessor.this.updateJobMetrics();
            }
            if (worker.endTime() < Long.MAX_VALUE) {
                GridJobProcessor.this.ctx.timeout().addTimeoutObject(worker);
            }
            if (worker.getSession().isFullSupport()) {
                GridJobProcessor.this.ctx.io().addMessageListener(worker.getJobTopic(), this.sesLsnr);
            }
        }

        @Override
        public void onBeforeJobResponseSent(GridJobWorker worker) {
            if (GridJobProcessor.this.log.isDebugEnabled()) {
                GridJobProcessor.this.log.debug("Received onBeforeJobResponseSent() callback: " + worker);
            }
            assert (GridJobProcessor.this.jobAlwaysActivate || !GridJobProcessor.this.passiveJobs.containsKey(worker.getJobId()));
            if (worker.getSession().isFullSupport()) {
                GridJobProcessor.this.ctx.io().removeMessageListener(worker.getJobTopic());
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onJobFinished(GridJobWorker worker) {
            GridJobSessionImpl ses;
            if (GridJobProcessor.this.log.isDebugEnabled()) {
                GridJobProcessor.this.log.debug("Received onJobFinished() callback: " + worker);
            }
            if ((ses = worker.getSession()).isFullSupport() && GridJobProcessor.this.ctx.session().removeSession(ses.getId())) {
                ses.onClosed();
                GridJobProcessor.this.ctx.checkpoint().onSessionEnd(ses, true);
            }
            if (worker.endTime() < Long.MAX_VALUE) {
                GridJobProcessor.this.ctx.timeout().removeTimeoutObject(worker);
            }
            GridJobProcessor.this.release(worker.getDeployment());
            GridJobProcessor.this.finishedJobs.add(worker.getJobId());
            if (!worker.isInternal()) {
                GridJobProcessor.this.finishedJobsCnt.increment();
                GridJobProcessor.this.finishedJobsMetric.increment();
                long execTime = worker.getExecuteTime();
                GridJobProcessor.this.finishedJobsTime.add(execTime);
                GridJobProcessor.this.totalExecutionTimeMetric.add(execTime);
                GridJobProcessor.this.maxFinishedJobsTime.setIfGreater(execTime);
                if (GridJobProcessor.this.jobAlwaysActivate) {
                    if (GridJobProcessor.this.metricsUpdateFreq > -1L) {
                        GridJobProcessor.this.updateJobMetrics();
                    }
                    if (!GridJobProcessor.this.removeFromActive(worker)) {
                        GridJobProcessor.this.cancelledJobs.remove(worker.getJobId(), worker);
                    }
                    GridJobProcessor.this.heldJobs.remove(worker.getJobId());
                } else {
                    if (!GridJobProcessor.this.rwLock.tryReadLock()) {
                        if (GridJobProcessor.this.log.isDebugEnabled()) {
                            GridJobProcessor.this.log.debug("Skipping collision handling on job finish (node is stopping).");
                        }
                        return;
                    }
                    if (!GridJobProcessor.this.removeFromActive(worker)) {
                        GridJobProcessor.this.cancelledJobs.remove(worker.getJobId(), worker);
                    }
                    GridJobProcessor.this.heldJobs.remove(worker.getJobId());
                    try {
                        GridJobProcessor.this.handleCollisions();
                    }
                    finally {
                        GridJobProcessor.this.rwLock.readUnlock();
                    }
                }
            }
            if (GridJobProcessor.this.ctx.performanceStatistics().enabled()) {
                GridJobProcessor.this.ctx.performanceStatistics().job(ses.getId(), worker.getQueuedTime(), worker.getStartTime(), worker.getExecuteTime(), worker.isTimedOut());
            }
        }
    }

    private class CollisionExternalListener
    implements org.apache.ignite.spi.collision.CollisionExternalListener {
        private CollisionExternalListener() {
        }

        @Override
        public void onExternalCollision() {
            assert (!GridJobProcessor.this.jobAlwaysActivate);
            if (GridJobProcessor.this.log.isDebugEnabled()) {
                GridJobProcessor.this.log.debug("Received external collision event.");
            }
            if (!GridJobProcessor.this.rwLock.tryReadLock()) {
                if (GridJobProcessor.this.log.isDebugEnabled()) {
                    GridJobProcessor.this.log.debug("Received external collision notification while stopping grid (will ignore).");
                }
                return;
            }
            try {
                GridJobProcessor.this.handleCollisions();
            }
            finally {
                GridJobProcessor.this.rwLock.readUnlock();
            }
        }
    }

    private class CollisionJobContext
    extends GridCollisionJobContextAdapter {
        private final boolean passive;

        CollisionJobContext(GridJobWorker jobWorker, boolean passive) {
            super(jobWorker);
            assert (!jobWorker.isInternal());
            assert (!GridJobProcessor.this.jobAlwaysActivate);
            this.passive = passive;
        }

        @Override
        public boolean activate() {
            GridJobWorker jobWorker = this.getJobWorker();
            return GridJobProcessor.this.removeFromPassive(jobWorker) && GridJobProcessor.this.onBeforeActivateJob(jobWorker) && GridJobProcessor.this.executeAsync(jobWorker);
        }

        @Override
        public boolean cancel() {
            GridJobWorker jobWorker = this.getJobWorker();
            GridJobProcessor.this.cancelReqs.putIfAbsent(jobWorker.getJobId(), false);
            boolean ret = false;
            if (this.passive) {
                if (GridJobProcessor.this.removeFromPassive(jobWorker)) {
                    GridJobProcessor.this.rejectJob(jobWorker, true);
                    if (GridJobProcessor.this.metricsUpdateFreq > -1L) {
                        GridJobProcessor.this.rejectedJobsCnt.increment();
                    }
                    GridJobProcessor.this.rejectedJobsMetric.increment();
                    ret = true;
                }
            } else if (GridJobProcessor.this.removeFromActive(jobWorker)) {
                GridJobProcessor.this.cancelledJobs.put(jobWorker.getJobId(), jobWorker);
                if (GridJobProcessor.this.finishedJobs.contains(jobWorker.getJobId())) {
                    GridJobProcessor.this.cancelledJobs.remove(jobWorker.getJobId(), jobWorker);
                } else {
                    GridJobProcessor.this.cancelJob(jobWorker, false);
                }
                ret = true;
            }
            return ret;
        }

        public String toString() {
            return S.toString(CollisionJobContext.class, this);
        }
    }

    public class PartitionsReservation
    implements GridReservable {
        private final int[] cacheIds;
        private final int partId;
        private final AffinityTopologyVersion topVer;
        private GridDhtLocalPartition[] partititons;

        public PartitionsReservation(int[] cacheIds, int partId, AffinityTopologyVersion topVer) {
            this.cacheIds = cacheIds;
            this.partId = partId;
            this.topVer = topVer;
            this.partititons = new GridDhtLocalPartition[cacheIds.length];
        }

        public int[] getCacheIds() {
            return this.cacheIds;
        }

        public int getPartId() {
            return this.partId;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public boolean reserve() {
            boolean reserved = false;
            try {
                for (int i = 0; i < this.cacheIds.length; ++i) {
                    boolean bl;
                    GridCacheContext cctx = GridJobProcessor.this.ctx.cache().context().cacheContext(this.cacheIds[i]);
                    if (cctx == null) {
                        bl = reserved;
                        return bl;
                    }
                    if (!cctx.started()) {
                        bl = reserved;
                        return bl;
                    }
                    if (cctx.isLocal() || !cctx.rebalanceEnabled()) continue;
                    boolean checkPartMapping = false;
                    try {
                        GridDhtLocalPartition part;
                        if (cctx.isReplicated() && ((part = cctx.topology().localPartition(this.partId, this.topVer, false)) == null || part.state() != GridDhtPartitionState.OWNING)) {
                            checkPartMapping = true;
                            boolean bl2 = reserved;
                            return bl2;
                        }
                        part = cctx.topology().localPartition(this.partId, this.topVer, false);
                        if (part == null || part.state() != GridDhtPartitionState.OWNING || !part.reserve()) {
                            checkPartMapping = true;
                            boolean bl3 = reserved;
                            return bl3;
                        }
                        this.partititons[i] = part;
                        if (part.state() == GridDhtPartitionState.OWNING) continue;
                        checkPartMapping = true;
                        boolean bl4 = reserved;
                        return bl4;
                    }
                    finally {
                        if (checkPartMapping && !cctx.affinity().primaryByPartition(this.partId, this.topVer).id().equals(GridJobProcessor.this.ctx.localNodeId())) {
                            throw new IgniteException("Failed partition reservation. Partition is not primary on the node. [partition=" + this.partId + ", cacheName=" + cctx.name() + ", nodeId=" + GridJobProcessor.this.ctx.localNodeId() + ", topology=" + this.topVer + ']');
                        }
                    }
                }
                reserved = true;
            }
            finally {
                if (!reserved) {
                    this.release();
                }
            }
            return true;
        }

        @Override
        public void release() {
            for (int i = 0; i < this.partititons.length && this.partititons[i] != null; ++i) {
                this.partititons[i].release();
                this.partititons[i] = null;
            }
        }
    }
}

