/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ode.scheduler.simple;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import javax.transaction.Synchronization;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.helpers.AbsoluteTimeDateFormat;
import org.apache.ode.bpel.iapi.ClusterAware;
import org.apache.ode.bpel.iapi.ContextException;
import org.apache.ode.bpel.iapi.Scheduler;
import org.apache.ode.dao.scheduler.DatabaseException;
import org.apache.ode.dao.scheduler.JobDAO;
import org.apache.ode.dao.scheduler.SchedulerDAOConnection;
import org.apache.ode.dao.scheduler.SchedulerDAOConnectionFactory;
import org.apache.ode.dao.scheduler.Task;
import org.apache.ode.scheduler.simple.JobDAOTask;
import org.apache.ode.scheduler.simple.JobNoLongerInDbException;
import org.apache.ode.scheduler.simple.SchedulerThread;
import org.apache.ode.scheduler.simple.TaskRunner;

public class SimpleScheduler
implements Scheduler,
TaskRunner,
ClusterAware {
    private static final Log __log = LogFactory.getLog(SimpleScheduler.class);
    long _immediateInterval = 30000L;
    long _nearFutureInterval = 600000L;
    long _staleInterval = 10000L;
    long _warningDelay = 300000L;
    int _tps = 100;
    TransactionManager _txm;
    ExecutorService _exec;
    String _nodeId;
    int _todoLimit = 10000;
    volatile Scheduler.JobProcessor _jobProcessor;
    volatile Scheduler.JobProcessor _polledRunnableProcessor;
    private SchedulerThread _todo;
    private SchedulerDAOConnectionFactory _dbcf;
    private ConcurrentHashMap<String, Long> _outstandingJobs = new ConcurrentHashMap();
    private ConcurrentHashMap<String, Long> _processedSinceLastLoadTask = new ConcurrentHashMap();
    private ConcurrentHashMap<String, Boolean> _retryJobList = new ConcurrentHashMap();
    private boolean _running;
    private AtomicLong _nextUpgrade = new AtomicLong();
    private Random _random = new Random();
    private long _pollIntervalForPolledRunnable = Long.getLong("org.apache.ode.polledRunnable.pollInterval", 600000L);
    private int _immediateTransactionRetryLimit = 3;
    private long _immediateTransactionRetryInterval = 1000L;
    private List<String> _defaultNodeList = new ArrayList<String>();
    private List<String> _nodeList = new ArrayList<String>();

    public SimpleScheduler(String nodeId, SchedulerDAOConnectionFactory dbcf, TransactionManager txm, Properties conf) {
        this._nodeId = nodeId;
        this._dbcf = dbcf;
        this._txm = txm;
        this._todoLimit = this.getIntProperty(conf, "ode.scheduler.queueLength", this._todoLimit);
        this._immediateInterval = this.getLongProperty(conf, "ode.scheduler.immediateInterval", this._immediateInterval);
        this._nearFutureInterval = this.getLongProperty(conf, "ode.scheduler.nearFutureInterval", this._nearFutureInterval);
        this._staleInterval = this.getLongProperty(conf, "ode.scheduler.staleInterval", this._staleInterval);
        this._tps = this.getIntProperty(conf, "ode.scheduler.transactionsPerSecond", this._tps);
        this._warningDelay = this.getLongProperty(conf, "ode.scheduler.warningDelay", this._warningDelay);
        this._immediateTransactionRetryLimit = this.getIntProperty(conf, "ode.scheduler.immediateTransactionRetryLimit", this._immediateTransactionRetryLimit);
        this._immediateTransactionRetryInterval = this.getLongProperty(conf, "ode.scheduler.immediateTransactionRetryInterval", this._immediateTransactionRetryInterval);
        this._todo = new SchedulerThread(this);
        this._defaultNodeList.add(nodeId);
    }

    public void setPollIntervalForPolledRunnable(long pollIntervalForPolledRunnable) {
        this._pollIntervalForPolledRunnable = pollIntervalForPolledRunnable;
    }

    private int getIntProperty(Properties props, String propName, int defaultValue) {
        String s = props.getProperty(propName);
        if (s != null) {
            return Integer.parseInt(s);
        }
        return defaultValue;
    }

    private long getLongProperty(Properties props, String propName, long defaultValue) {
        String s = props.getProperty(propName);
        if (s != null) {
            return Long.parseLong(s);
        }
        return defaultValue;
    }

    public void setNodeId(String nodeId) {
        this._nodeId = nodeId;
    }

    public void setStaleInterval(long staleInterval) {
        this._staleInterval = staleInterval;
    }

    public void setImmediateInterval(long immediateInterval) {
        this._immediateInterval = immediateInterval;
    }

    public void setNearFutureInterval(long nearFutureInterval) {
        this._nearFutureInterval = nearFutureInterval;
    }

    public void setTransactionsPerSecond(int tps) {
        this._tps = tps;
    }

    public void setTransactionManager(TransactionManager txm) {
        this._txm = txm;
    }

    public void setSchedulerDAOConnectionFactory(SchedulerDAOConnectionFactory dbcf) {
        this._dbcf = dbcf;
    }

    public void setExecutorService(ExecutorService executorService) {
        this._exec = executorService;
    }

    @Override
    public void setPolledRunnableProcesser(Scheduler.JobProcessor polledRunnableProcessor) {
        this._polledRunnableProcessor = polledRunnableProcessor;
    }

    @Override
    public void cancelJob(String jobId) throws ContextException {
        this._todo.dequeue(new JobDAOTask(jobId));
        this._outstandingJobs.remove(jobId);
        SchedulerDAOConnection conn = this._dbcf.getConnection();
        try {
            conn.deleteJob(jobId, this._nodeId);
        }
        catch (DatabaseException e) {
            __log.debug((Object)"Job removal failed.", (Throwable)e);
            throw new ContextException("Job removal failed.", e);
        }
    }

    @Override
    public <T> Future<T> execIsolatedTransaction(final Callable<T> transaction) throws Exception, ContextException {
        return this._exec.submit(new Callable<T>(){

            @Override
            public T call() throws Exception {
                try {
                    return SimpleScheduler.this.execTransaction(transaction);
                }
                catch (Exception e) {
                    __log.error((Object)"An exception occured while executing an isolated transaction, the transaction is going to be abandoned.", (Throwable)e);
                    return null;
                }
            }
        });
    }

    @Override
    public <T> T execTransaction(Callable<T> transaction) throws Exception, ContextException {
        return this.execTransaction(transaction, 0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Loose catch block
     */
    @Override
    public <T> T execTransaction(Callable<T> transaction, int timeout) throws Exception, ContextException {
        Exception ex;
        block41: {
            if (this._txm == null) {
                throw new ContextException("Cannot locate the transaction manager; the server might be shutting down.");
            }
            if (timeout < 0) {
                throw new IllegalArgumentException("Timeout must be positive, received: " + timeout);
            }
            boolean existingTransaction = false;
            try {
                existingTransaction = this._txm.getTransaction() != null;
            }
            catch (Exception ex2) {
                String errmsg = "Internal Error, could not get current transaction.";
                throw new ContextException(errmsg, ex2);
            }
            if (existingTransaction) {
                return transaction.call();
            }
            ex = null;
            int immediateRetryCount = this._immediateTransactionRetryLimit;
            this._txm.setTransactionTimeout(timeout);
            if (__log.isDebugEnabled() && timeout != 0) {
                __log.debug((Object)("Custom transaction timeout: " + timeout));
            }
            while (true) {
                T e2222;
                block39: {
                    block38: {
                        try {
                            if (__log.isDebugEnabled()) {
                                __log.debug((Object)"Beginning a new transaction");
                            }
                            this._txm.begin();
                        }
                        catch (Exception e2222) {
                            String errmsg = "Internal Error, could not begin transaction.";
                            throw new ContextException(errmsg, e2222);
                        }
                        ex = null;
                        e2222 = transaction.call();
                        if (ex != null) break block38;
                        if (__log.isDebugEnabled()) {
                            __log.debug((Object)("Commiting on " + this._txm + "..."));
                        }
                        try {
                            this._txm.commit();
                            if (__log.isDebugEnabled()) {
                                __log.debug((Object)("committed on " + this._txm + " successfully."));
                            }
                            break block39;
                        }
                        catch (Exception e23) {
                            ex = e23;
                            __log.error((Object)"error in commiting transaction", (Throwable)e23);
                        }
                        break block39;
                    }
                    if (__log.isDebugEnabled()) {
                        __log.debug((Object)("Rollbacking on " + this._txm + "..."));
                    }
                    this._txm.rollback();
                }
                if (ex != null && immediateRetryCount > 0) {
                    if (__log.isDebugEnabled()) {
                        __log.debug((Object)("Will retry the transaction in " + this._immediateTransactionRetryInterval + " msecs on " + this._txm + " for error: "), (Throwable)ex);
                    }
                    Thread.sleep(this._immediateTransactionRetryInterval);
                }
                return e2222;
                catch (Exception e3) {
                    try {
                        ex = e3;
                        continue;
                    }
                    catch (Throwable throwable) {
                        throw throwable;
                    }
                    finally {
                        if (ex == null) {
                            if (__log.isDebugEnabled()) {
                                __log.debug((Object)("Commiting on " + this._txm + "..."));
                            }
                            try {
                                this._txm.commit();
                                if (__log.isDebugEnabled()) {
                                    __log.debug((Object)("committed on " + this._txm + " successfully."));
                                }
                            }
                            catch (Exception e24) {
                                ex = e24;
                                __log.error((Object)"error in commiting transaction", (Throwable)e24);
                            }
                        } else {
                            if (__log.isDebugEnabled()) {
                                __log.debug((Object)("Rollbacking on " + this._txm + "..."));
                            }
                            this._txm.rollback();
                        }
                        if (ex != null && immediateRetryCount > 0) {
                            if (__log.isDebugEnabled()) {
                                __log.debug((Object)("Will retry the transaction in " + this._immediateTransactionRetryInterval + " msecs on " + this._txm + " for error: "), (Throwable)ex);
                            }
                            Thread.sleep(this._immediateTransactionRetryInterval);
                        }
                    }
                    if (immediateRetryCount-- > 0) continue;
                    break block41;
                }
                break;
            }
            finally {
                if (this._txm != null) {
                    this._txm.setTransactionTimeout(0);
                }
            }
        }
        throw ex;
    }

    @Override
    public void setRollbackOnly() throws Exception {
        TransactionManager txm = this._txm;
        if (txm == null) {
            throw new ContextException("Cannot locate the transaction manager; the server might be shutting down.");
        }
        txm.setRollbackOnly();
    }

    @Override
    public void registerSynchronizer(final Scheduler.Synchronizer synch) throws ContextException {
        TransactionManager txm = this._txm;
        if (txm == null) {
            throw new ContextException("Cannot locate the transaction manager; the server might be shutting down.");
        }
        try {
            txm.getTransaction().registerSynchronization(new Synchronization(){

                public void beforeCompletion() {
                    synch.beforeCompletion();
                }

                public void afterCompletion(int status) {
                    synch.afterCompletion(status == 3);
                }
            });
        }
        catch (Exception e) {
            throw new ContextException("Unable to register synchronizer.", e);
        }
    }

    @Override
    public String schedulePersistedJob(Scheduler.JobDetails jobDetail, Date when) throws ContextException {
        long ctime = System.currentTimeMillis();
        if (when == null) {
            when = new Date(ctime);
        }
        if (__log.isDebugEnabled()) {
            __log.debug((Object)("scheduling " + jobDetail + " for " + when));
        }
        return this.schedulePersistedJob(jobDetail, true, when, ctime);
    }

    @Override
    public String scheduleMapSerializableRunnable(Scheduler.MapSerializableRunnable runnable, Date when) throws ContextException {
        long ctime = System.currentTimeMillis();
        if (when == null) {
            when = new Date(ctime);
        }
        Scheduler.JobDetails jobDetails = new Scheduler.JobDetails();
        jobDetails.getDetailsExt().put("runnable", runnable);
        runnable.storeToDetails(jobDetails);
        if (__log.isDebugEnabled()) {
            __log.debug((Object)("scheduling " + jobDetails + " for " + when));
        }
        return this.schedulePersistedJob(jobDetails, true, when, ctime);
    }

    private String schedulePersistedJob(Scheduler.JobDetails jobDetails, boolean transacted, Date when, long ctime) throws ContextException {
        JobDAO job;
        boolean immediate = when.getTime() <= ctime + this._immediateInterval;
        boolean nearfuture = !immediate && when.getTime() <= ctime + this._nearFutureInterval;
        try {
            if (immediate) {
                if (this._outstandingJobs.size() > this._todoLimit) {
                    __log.error((Object)"The execution queue is backed up, the engine can't keep up with the load. Either increase the queue size or regulate the flow.");
                    return null;
                }
                job = this.insertJob(transacted, jobDetails, when.getTime(), this._nodeId, true, true);
                __log.debug((Object)("scheduled immediate job: " + job.getJobId()));
            } else if (nearfuture) {
                job = this.insertJob(transacted, jobDetails, when.getTime(), this._nodeId, false, false);
                __log.debug((Object)("scheduled near-future job: " + job.getJobId()));
            } else {
                job = this.insertJob(transacted, jobDetails, when.getTime(), null, false, false);
                __log.debug((Object)("scheduled far-future job: " + job.getJobId()));
            }
        }
        catch (DatabaseException dbe) {
            __log.error((Object)"Database error.", (Throwable)dbe);
            throw new ContextException("Database error.", dbe);
        }
        return job.getJobId();
    }

    private JobDAO insertJob(boolean transacted, Scheduler.JobDetails jobDetails, long scheduledDate, String nodeID, boolean loaded, boolean enqueue) throws ContextException, DatabaseException {
        JobDAO job;
        SchedulerDAOConnection conn = this._dbcf.getConnection();
        if (!conn.insertJob(job = conn.createJob(transacted, jobDetails, true, scheduledDate), nodeID, loaded)) {
            String msg = String.format("Database insert failed. jobId %s nodeId %s", job.getJobId(), nodeID);
            __log.error((Object)msg);
            throw new ContextException(msg);
        }
        if (enqueue) {
            this.addTodoOnCommit(job);
        }
        return job;
    }

    @Override
    public String scheduleVolatileJob(boolean transacted, Scheduler.JobDetails jobDetail) throws ContextException {
        return this.scheduleVolatileJob(transacted, jobDetail, null);
    }

    @Override
    public String scheduleVolatileJob(boolean transacted, Scheduler.JobDetails jobDetail, Date when) throws ContextException {
        long ctime = System.currentTimeMillis();
        if (when == null) {
            when = new Date(ctime);
        }
        SchedulerDAOConnection conn = this._dbcf.getConnection();
        JobDAO job = conn.createJob(transacted, jobDetail, false, when.getTime());
        this.addTodoOnCommit(job);
        return job.toString();
    }

    @Override
    public void setJobProcessor(Scheduler.JobProcessor processor) throws ContextException {
        this._jobProcessor = processor;
    }

    public List<String> getNodeList() {
        if (this._nodeList == null || this._nodeList.size() == 0) {
            return this._defaultNodeList;
        }
        return this._nodeList;
    }

    @Override
    public void setNodeList(List<String> nodeList) {
        this._nodeList = nodeList;
    }

    @Override
    public void shutdown() {
        this.stop();
        this._jobProcessor = null;
        this._txm = null;
        this._todo = null;
    }

    @Override
    public synchronized void start() {
        if (this._running) {
            return;
        }
        if (this._exec == null) {
            this._exec = Executors.newCachedThreadPool();
        }
        this._todo.clearTasks(UpgradeJobsTask.class);
        this._todo.clearTasks(LoadImmediateTask.class);
        this._processedSinceLastLoadTask.clear();
        this._outstandingJobs.clear();
        this._retryJobList.clear();
        long now = System.currentTimeMillis();
        this._todo.enqueue(new LoadImmediateTask(now));
        this._todo.enqueue(new UpgradeJobsTask(now + this.randomMean(this._immediateInterval)));
        this._todo.start();
        this._running = true;
    }

    private long randomMean(long mean) {
        return (long)this._random.nextDouble() * mean + mean / 2L;
    }

    @Override
    public synchronized void stop() {
        if (!this._running) {
            return;
        }
        this._todo.stop();
        this._todo.clearTasks(UpgradeJobsTask.class);
        this._todo.clearTasks(LoadImmediateTask.class);
        this._processedSinceLastLoadTask.clear();
        this._outstandingJobs.clear();
        this._retryJobList.clear();
        this._running = false;
    }

    protected void runJob(JobDAO jobDao) {
        this._exec.submit(new RunJobCallable(jobDao, this._jobProcessor));
    }

    protected void runPolledRunnable(JobDAO jobDao) {
        this._exec.submit(new RunJobCallable(jobDao, this._polledRunnableProcessor));
    }

    private void addTodoOnCommit(final JobDAO job) {
        this.registerSynchronizer(new Scheduler.Synchronizer(){

            @Override
            public void afterCompletion(boolean success) {
                if (success) {
                    SimpleScheduler.this.enqueue(job);
                }
            }

            @Override
            public void beforeCompletion() {
            }
        });
    }

    @Override
    public boolean isTransacted() {
        TransactionManager txm = this._txm;
        if (txm == null) {
            throw new ContextException("Cannot locate the transaction manager; the server might be shutting down.");
        }
        try {
            Transaction tx = txm.getTransaction();
            return tx != null && tx.getStatus() != 6;
        }
        catch (SystemException e) {
            throw new ContextException("Internal Error: Could not obtain transaction status.");
        }
    }

    @Override
    public void runTask(final Task task) {
        if (task instanceof JobDAOTask) {
            JobDAOTask job = (JobDAOTask)task;
            if (job.getJobDAO().getDetails().getDetailsExt().get("runnable") != null) {
                this.runPolledRunnable(job.getJobDAO());
            } else {
                this.runJob(job.getJobDAO());
            }
        } else if (task instanceof SchedulerTask) {
            this._exec.submit(new Callable<Void>(){

                @Override
                public Void call() throws Exception {
                    try {
                        ((SchedulerTask)task).run();
                    }
                    catch (Exception ex) {
                        __log.error((Object)"Error during SchedulerTask execution", (Throwable)ex);
                    }
                    return null;
                }
            });
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    boolean doLoadImmediate() {
        __log.debug((Object)"LOAD IMMEDIATE started");
        if (this._outstandingJobs.size() > this._todoLimit / 2) {
            return true;
        }
        try {
            final int batch = Math.min((int)(this._immediateInterval * (long)this._tps / 1000L), this._todoLimit - this._outstandingJobs.size());
            if (batch <= 0) {
                if (__log.isDebugEnabled()) {
                    __log.debug((Object)("Max capacity reached: " + this._outstandingJobs.size() + " jobs dispacthed i.e. queued or being executed"));
                }
                boolean bl = true;
                return bl;
            }
            if (__log.isDebugEnabled()) {
                __log.debug((Object)("loading " + batch + " jobs from db"));
            }
            List<JobDAO> jobs = this.execTransaction(new Callable<List<JobDAO>>(){

                @Override
                public List<JobDAO> call() throws ContextException, DatabaseException {
                    SchedulerDAOConnection conn = SimpleScheduler.this._dbcf.getConnection();
                    return conn.dequeueImmediate(SimpleScheduler.this._nodeId, System.currentTimeMillis() + SimpleScheduler.this._immediateInterval, batch);
                }
            });
            if (__log.isDebugEnabled()) {
                __log.debug((Object)("loaded " + jobs.size() + " jobs from db"));
            }
            long delayedTime = System.currentTimeMillis() - this._warningDelay;
            int delayedCount = 0;
            AbsoluteTimeDateFormat f = new AbsoluteTimeDateFormat();
            for (JobDAO j : jobs) {
                boolean runningLate;
                if (this._outstandingJobs.size() >= this._todoLimit) {
                    if (!__log.isDebugEnabled()) break;
                    __log.debug((Object)("Max capacity reached: " + this._outstandingJobs.size() + " jobs dispacthed i.e. queued or being executed"));
                    break;
                }
                boolean bl = runningLate = j.getScheduledDate() <= delayedTime;
                if (runningLate) {
                    ++delayedCount;
                }
                if (__log.isDebugEnabled()) {
                    __log.debug((Object)("todo.enqueue job from db: " + j.getJobId().trim() + " for " + j.getScheduledDate() + "(" + f.format((Object)j.getScheduledDate()) + ") " + (runningLate ? " delayed=true" : "")));
                }
                this.enqueue(j);
            }
            if (delayedCount > 0) {
                __log.warn((Object)("Dispatching jobs with more than " + this._warningDelay / 60000L + " minutes delay. Either the server was down for some time or the job load is greater than available capacity"));
            }
            this._processedSinceLastLoadTask.clear();
            this._retryJobList.clear();
            boolean bl = true;
            return bl;
        }
        catch (Exception ex) {
            __log.error((Object)"Error loading immediate jobs from database.", (Throwable)ex);
            boolean bl = false;
            return bl;
        }
        finally {
            __log.debug((Object)"LOAD IMMEDIATE complete");
        }
    }

    private void enqueue(JobDAO job) {
        if (this._processedSinceLastLoadTask.get(job.getJobId()) == null) {
            if (this._outstandingJobs.putIfAbsent(job.getJobId(), job.getScheduledDate()) == null) {
                if (job.getScheduledDate() <= System.currentTimeMillis()) {
                    this.runJob(job);
                } else {
                    this._todo.enqueue(new JobDAOTask(job));
                }
            } else if (__log.isDebugEnabled()) {
                __log.debug((Object)("Job " + job.getJobId() + " is being processed (outstanding job)"));
            }
        } else if (__log.isDebugEnabled()) {
            __log.debug((Object)("Job " + job.getJobId() + " is being processed (processed since last load)"));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    boolean doUpgrade() {
        __log.debug((Object)"UPGRADE started");
        final long maxtime = System.currentTimeMillis() + this._nearFutureInterval;
        try {
            boolean bl = this.execTransaction(new Callable<Boolean>(){

                @Override
                public Boolean call() throws ContextException, DatabaseException {
                    SchedulerDAOConnection conn = SimpleScheduler.this._dbcf.getConnection();
                    int numNodes = SimpleScheduler.this.getNodeList().size();
                    for (int i = 0; i < numNodes; ++i) {
                        String node = SimpleScheduler.this.getNodeList().get(i);
                        conn.updateAssignToNode(node, i, numNodes, maxtime);
                    }
                    return true;
                }
            });
            return bl;
        }
        catch (Exception ex) {
            __log.error((Object)"Database error upgrading jobs.", (Throwable)ex);
            boolean bl = false;
            return bl;
        }
        finally {
            __log.debug((Object)"UPGRADE complete");
        }
    }

    @Override
    public boolean amICoordinator() {
        return true;
    }

    private class UpgradeJobsTask
    extends SchedulerTask {
        UpgradeJobsTask(long schedDate) {
            super(schedDate);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            long ctime = System.currentTimeMillis();
            long ntime = SimpleScheduler.this._nextUpgrade.get();
            __log.debug((Object)("UPGRADE task for " + this.getScheduledDate() + " fired at " + ctime));
            if (SimpleScheduler.this._nextUpgrade.get() > System.currentTimeMillis()) {
                __log.debug((Object)("UPGRADE skipped -- wait another " + (ntime - ctime) + "ms"));
                SimpleScheduler.this._todo.enqueue(new UpgradeJobsTask(ntime));
                return;
            }
            boolean success = false;
            try {
                success = SimpleScheduler.this.doUpgrade();
            }
            catch (Throwable throwable) {
                long future = System.currentTimeMillis() + (success ? (long)((double)SimpleScheduler.this._nearFutureInterval * 0.5) : 1000L);
                SimpleScheduler.this._nextUpgrade.set(future);
                SimpleScheduler.this._todo.enqueue(new UpgradeJobsTask(future));
                __log.debug((Object)("UPGRADE completed, success = " + success + "; next time in " + (future - ctime) + "ms"));
                throw throwable;
            }
            long future = System.currentTimeMillis() + (success ? (long)((double)SimpleScheduler.this._nearFutureInterval * 0.5) : 1000L);
            SimpleScheduler.this._nextUpgrade.set(future);
            SimpleScheduler.this._todo.enqueue(new UpgradeJobsTask(future));
            __log.debug((Object)("UPGRADE completed, success = " + success + "; next time in " + (future - ctime) + "ms"));
        }
    }

    private class LoadImmediateTask
    extends SchedulerTask {
        LoadImmediateTask(long schedDate) {
            super(schedDate);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            boolean success = false;
            try {
                success = SimpleScheduler.this.doLoadImmediate();
            }
            finally {
                if (success) {
                    SimpleScheduler.this._todo.enqueue(new LoadImmediateTask(System.currentTimeMillis() + (long)((double)SimpleScheduler.this._immediateInterval * 0.9)));
                } else {
                    SimpleScheduler.this._todo.enqueue(new LoadImmediateTask(System.currentTimeMillis() + 1000L));
                }
            }
        }
    }

    private abstract class SchedulerTask
    extends Task
    implements Runnable {
        SchedulerTask(long schedDate) {
            super(schedDate);
        }
    }

    private class RunJobCallable
    implements Callable<Void> {
        final Scheduler.JobProcessor processor;
        final JobDAO job;

        RunJobCallable(JobDAO jobDao, Scheduler.JobProcessor processor) {
            this.job = jobDao;
            this.processor = processor;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Void call() throws Exception {
            try {
                Scheduler.JobInfo jobInfo = new Scheduler.JobInfo(this.job.getJobId(), this.job.getDetails(), this.job.getDetails().getRetryCount());
                if (this.job.isTransacted()) {
                    this.processInTransactionContext(jobInfo);
                } else {
                    this.processor.onScheduledJob(jobInfo);
                }
                Void void_ = null;
                return void_;
            }
            finally {
                if (SimpleScheduler.this._retryJobList.get(this.job.getJobId()) == null) {
                    SimpleScheduler.this._processedSinceLastLoadTask.put(this.job.getJobId(), this.job.getScheduledDate());
                } else {
                    SimpleScheduler.this._retryJobList.remove(this.job.getJobId());
                }
                SimpleScheduler.this._outstandingJobs.remove(this.job.getJobId());
            }
        }

        private void processInTransactionContext(final Scheduler.JobInfo jobInfo) throws Exception {
            block5: {
                final boolean[] needRetry = new boolean[]{true};
                try {
                    SimpleScheduler.this.execTransaction(new Callable<Void>(){

                        @Override
                        public Void call() throws ContextException, Exception {
                            SchedulerDAOConnection conn = SimpleScheduler.this._dbcf.getConnection();
                            if (RunJobCallable.this.job.isPersisted() && !conn.deleteJob(RunJobCallable.this.job.getJobId(), SimpleScheduler.this._nodeId)) {
                                throw new JobNoLongerInDbException(RunJobCallable.this.job.getJobId(), SimpleScheduler.this._nodeId);
                            }
                            try {
                                RunJobCallable.this.processor.onScheduledJob(jobInfo);
                                if (RunJobCallable.this.job.getDetails().getDetailsExt().get("runnable") != null && !"COMPLETED".equals(String.valueOf(jobInfo.jobDetail.getDetailsExt().get("runnable_status")))) {
                                    if (SimpleScheduler.this._pollIntervalForPolledRunnable < 0L) {
                                        if (__log.isWarnEnabled()) {
                                            __log.warn((Object)"The poll interval for polled runnables is negative; setting it to 1000ms");
                                        }
                                        SimpleScheduler.this._pollIntervalForPolledRunnable = 1000L;
                                    }
                                    long schedDate = System.currentTimeMillis() + SimpleScheduler.this._pollIntervalForPolledRunnable;
                                    RunJobCallable.this.job.setScheduledDate(schedDate);
                                    conn.insertJob(RunJobCallable.this.job, SimpleScheduler.this._nodeId, false);
                                }
                            }
                            catch (Scheduler.JobProcessorException jpe) {
                                if (!jpe.retry) {
                                    needRetry[0] = false;
                                }
                                throw jpe;
                            }
                            return null;
                        }
                    });
                }
                catch (JobNoLongerInDbException jde) {
                    __log.debug((Object)("job no longer in db forced rollback: " + this.job));
                }
                catch (Exception ex) {
                    __log.error((Object)("Error while processing a " + (this.job.isPersisted() ? "" : "non-") + "persisted job" + (needRetry[0] && this.job.isPersisted() ? ": " : ", no retry: ") + this.job), (Throwable)ex);
                    if (!this.job.isPersisted()) break block5;
                    try {
                        SimpleScheduler.this.execTransaction(new Callable<Void>(){

                            @Override
                            public Void call() throws Exception {
                                RunJobCallable.this.retryJob(needRetry);
                                return null;
                            }
                        });
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        }

        private void retryJob(boolean[] needRetry) throws DatabaseException {
            SchedulerDAOConnection conn = SimpleScheduler.this._dbcf.getConnection();
            int retry = this.job.getDetails().getRetryCount() + 1;
            if (!needRetry[0] || retry > 10) {
                conn.deleteJob(this.job.getJobId(), SimpleScheduler.this._nodeId);
                if (retry > 10) {
                    __log.error((Object)("Error while processing job after 10 retries, no more retries:" + this.job));
                }
            } else {
                this.job.getDetails().setRetryCount(retry);
                long delay = (long)Math.pow(5.0, retry);
                long scheddate = System.currentTimeMillis() + delay * 1000L;
                this.job.setScheduled(false);
                this.job.setScheduledDate(scheddate);
                conn.updateJob(this.job);
                SimpleScheduler.this._retryJobList.put(this.job.getJobId(), new Boolean(true));
                __log.error((Object)("Error while processing job, retrying in " + delay + "s, the job is " + this.job));
            }
        }
    }
}

