|
|
|
@ -17,38 +17,21 @@
|
|
|
|
|
|
|
|
|
|
package com.fr.third.v2.org.quartz.impl.jdbcjobstore; |
|
|
|
|
|
|
|
|
|
import java.io.IOException; |
|
|
|
|
import java.lang.reflect.InvocationHandler; |
|
|
|
|
import java.lang.reflect.Proxy; |
|
|
|
|
import java.sql.Connection; |
|
|
|
|
import java.sql.SQLException; |
|
|
|
|
import java.util.ArrayList; |
|
|
|
|
import java.util.Date; |
|
|
|
|
import java.util.HashMap; |
|
|
|
|
import java.util.HashSet; |
|
|
|
|
import java.util.LinkedList; |
|
|
|
|
import java.util.List; |
|
|
|
|
import java.util.Map; |
|
|
|
|
import java.util.Set; |
|
|
|
|
|
|
|
|
|
import com.fr.third.v2.org.quartz.Calendar; |
|
|
|
|
import com.fr.third.v2.org.quartz.Job; |
|
|
|
|
import com.fr.third.v2.org.quartz.JobDataMap; |
|
|
|
|
import com.fr.third.v2.org.quartz.JobDetail; |
|
|
|
|
import com.fr.third.v2.org.quartz.JobKey; |
|
|
|
|
import com.fr.third.v2.org.quartz.JobPersistenceException; |
|
|
|
|
import com.fr.third.v2.org.quartz.ObjectAlreadyExistsException; |
|
|
|
|
import com.fr.third.v2.org.quartz.Scheduler; |
|
|
|
|
import com.fr.third.v2.org.quartz.SchedulerConfigException; |
|
|
|
|
import com.fr.third.v2.org.quartz.TriggerKey; |
|
|
|
|
import com.fr.third.v2.org.quartz.spi.OperableTrigger; |
|
|
|
|
import com.fr.third.v2.org.quartz.spi.ThreadExecutor; |
|
|
|
|
import com.fr.third.v2.org.quartz.spi.TriggerFiredResult; |
|
|
|
|
import com.fr.third.v2.org.quartz.JobDataMap; |
|
|
|
|
import com.fr.third.v2.org.quartz.JobKey; |
|
|
|
|
import com.fr.third.v2.org.quartz.JobPersistenceException; |
|
|
|
|
import com.fr.third.v2.org.quartz.SchedulerException; |
|
|
|
|
import com.fr.third.v2.org.quartz.SimpleTrigger; |
|
|
|
|
import com.fr.third.v2.org.quartz.Trigger; |
|
|
|
|
import com.fr.third.v2.org.quartz.Trigger.CompletedExecutionInstruction; |
|
|
|
|
import com.fr.third.v2.org.quartz.Trigger.TriggerState; |
|
|
|
|
import com.fr.third.v2.org.quartz.TriggerKey; |
|
|
|
|
import com.fr.third.v2.org.quartz.impl.DefaultThreadExecutor; |
|
|
|
|
import com.fr.third.v2.org.quartz.impl.matchers.GroupMatcher; |
|
|
|
|
import com.fr.third.v2.org.quartz.impl.matchers.StringMatcher; |
|
|
|
@ -56,12 +39,29 @@ import com.fr.third.v2.org.quartz.impl.matchers.StringMatcher.StringOperatorName
|
|
|
|
|
import com.fr.third.v2.org.quartz.impl.triggers.SimpleTriggerImpl; |
|
|
|
|
import com.fr.third.v2.org.quartz.spi.ClassLoadHelper; |
|
|
|
|
import com.fr.third.v2.org.quartz.spi.JobStore; |
|
|
|
|
import com.fr.third.v2.org.quartz.spi.OperableTrigger; |
|
|
|
|
import com.fr.third.v2.org.quartz.spi.SchedulerSignaler; |
|
|
|
|
import com.fr.third.v2.org.quartz.spi.ThreadExecutor; |
|
|
|
|
import com.fr.third.v2.org.quartz.spi.TriggerFiredBundle; |
|
|
|
|
import com.fr.third.v2.org.quartz.spi.TriggerFiredResult; |
|
|
|
|
import com.fr.third.v2.org.quartz.utils.DBConnectionManager; |
|
|
|
|
import org.slf4j.Logger; |
|
|
|
|
import org.slf4j.LoggerFactory; |
|
|
|
|
|
|
|
|
|
import java.io.IOException; |
|
|
|
|
import java.lang.reflect.InvocationHandler; |
|
|
|
|
import java.lang.reflect.Proxy; |
|
|
|
|
import java.sql.Connection; |
|
|
|
|
import java.sql.SQLException; |
|
|
|
|
import java.util.ArrayList; |
|
|
|
|
import java.util.Date; |
|
|
|
|
import java.util.HashMap; |
|
|
|
|
import java.util.HashSet; |
|
|
|
|
import java.util.LinkedList; |
|
|
|
|
import java.util.List; |
|
|
|
|
import java.util.Map; |
|
|
|
|
import java.util.Set; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* <p> |
|
|
|
@ -354,6 +354,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
public long getDbRetryInterval() { |
|
|
|
|
return dbRetryInterval; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* @param dbRetryInterval The dbRetryInterval to set. |
|
|
|
|
*/ |
|
|
|
@ -473,7 +474,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
* should be performed after obtaining an explicit DB lock. This is the |
|
|
|
|
* behavior prior to Quartz 1.6.3, but is considered unnecessary for most |
|
|
|
|
* databases, and therefore a superfluous performance hit. |
|
|
|
|
* |
|
|
|
|
* <p> |
|
|
|
|
* However, if batch acquisition is used, it is important for this behavior |
|
|
|
|
* to be used for all dbs. |
|
|
|
|
*/ |
|
|
|
@ -488,13 +489,12 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
* Set the JDBC driver delegate class. |
|
|
|
|
* </p> |
|
|
|
|
* |
|
|
|
|
* @param delegateClassName |
|
|
|
|
* the delegate class name |
|
|
|
|
* @param delegateClassName the delegate class name |
|
|
|
|
*/ |
|
|
|
|
@SuppressWarnings("UnusedDeclaration") /* called reflectively */ |
|
|
|
|
public void setDriverDelegateClass(String delegateClassName) |
|
|
|
|
throws InvalidConfigurationException { |
|
|
|
|
synchronized(this) { |
|
|
|
|
synchronized (this) { |
|
|
|
|
this.delegateClassName = delegateClassName; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -515,8 +515,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
* Set the JDBC driver delegate's initialization string. |
|
|
|
|
* </p> |
|
|
|
|
* |
|
|
|
|
* @param delegateInitString |
|
|
|
|
* the delegate init string |
|
|
|
|
* @param delegateInitString the delegate init string |
|
|
|
|
*/ |
|
|
|
|
@SuppressWarnings("UnusedDeclaration") /* called reflectively */ |
|
|
|
|
public void setDriverDelegateInitString(String delegateInitString) |
|
|
|
@ -639,7 +638,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
classLoadHelper = loadHelper; |
|
|
|
|
if(isThreadsInheritInitializersClassLoadContext()) { |
|
|
|
|
if (isThreadsInheritInitializersClassLoadContext()) { |
|
|
|
|
log.info("JDBCJobStore threads will inherit ContextClassLoader of thread: " + Thread.currentThread().getName()); |
|
|
|
|
initializersLoader = Thread.currentThread().getContextClassLoader(); |
|
|
|
|
} |
|
|
|
@ -657,8 +656,8 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (getUseDBLocks()) { |
|
|
|
|
if(getDriverDelegateClass() != null && getDriverDelegateClass().equals(MSSQLDelegate.class.getName())) { |
|
|
|
|
if(getSelectWithLockSQL() == null) { |
|
|
|
|
if (getDriverDelegateClass() != null && getDriverDelegateClass().equals(MSSQLDelegate.class.getName())) { |
|
|
|
|
if (getSelectWithLockSQL() == null) { |
|
|
|
|
String msSqlDflt = "SELECT * FROM {0}LOCKS WITH (UPDLOCK,ROWLOCK) WHERE " + COL_SCHEDULER_NAME + " = {1} AND LOCK_NAME = ?"; |
|
|
|
|
getLog().info("Detected usage of MSSQLDelegate class - defaulting 'selectWithLockSQL' to '" + msSqlDflt + "'."); |
|
|
|
|
setSelectWithLockSQL(msSqlDflt); |
|
|
|
@ -682,7 +681,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
|
|
|
|
|
if (isClustered()) { |
|
|
|
|
clusterManagementThread = new ClusterManager(); |
|
|
|
|
if(initializersLoader != null) |
|
|
|
|
if (initializersLoader != null) |
|
|
|
|
clusterManagementThread.setContextClassLoader(initializersLoader); |
|
|
|
|
clusterManagementThread.initialize(); |
|
|
|
|
} else { |
|
|
|
@ -695,7 +694,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
misfireHandler = new MisfireHandler(); |
|
|
|
|
if(initializersLoader != null) |
|
|
|
|
if (initializersLoader != null) |
|
|
|
|
misfireHandler.setContextClassLoader(initializersLoader); |
|
|
|
|
misfireHandler.initialize(); |
|
|
|
|
schedulerRunning = true; |
|
|
|
@ -763,9 +762,9 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
* (and potentially restored to a pool). |
|
|
|
|
*/ |
|
|
|
|
protected Connection getAttributeRestoringConnection(Connection conn) { |
|
|
|
|
return (Connection)Proxy.newProxyInstance( |
|
|
|
|
return (Connection) Proxy.newProxyInstance( |
|
|
|
|
Thread.currentThread().getContextClassLoader(), |
|
|
|
|
new Class[] { Connection.class }, |
|
|
|
|
new Class[]{Connection.class}, |
|
|
|
|
new AttributeRestoringConnectionInvocationHandler(conn)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -799,13 +798,16 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
conn.setAutoCommit(false); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if(isTxIsolationLevelSerializable()) { |
|
|
|
|
if (isTxIsolationLevelSerializable()) { |
|
|
|
|
conn.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE); |
|
|
|
|
} |
|
|
|
|
} catch (SQLException sqle) { |
|
|
|
|
getLog().warn("Failed to override connection auto commit/transaction isolation.", sqle); |
|
|
|
|
} catch (Throwable e) { |
|
|
|
|
try { conn.close(); } catch(Throwable ignored) {} |
|
|
|
|
try { |
|
|
|
|
conn.close(); |
|
|
|
|
} catch (Throwable ignored) { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
throw new JobPersistenceException( |
|
|
|
|
"Failure setting up connection.", e); |
|
|
|
@ -846,8 +848,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
* appropriate. |
|
|
|
|
* </p> |
|
|
|
|
* |
|
|
|
|
* @throws JobPersistenceException |
|
|
|
|
* if jobs could not be recovered |
|
|
|
|
* @throws JobPersistenceException if jobs could not be recovered |
|
|
|
|
*/ |
|
|
|
|
protected void recoverJobs(Connection conn) throws JobPersistenceException { |
|
|
|
|
try { |
|
|
|
@ -874,18 +875,22 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
+ recoveringJobTriggers.size() |
|
|
|
|
+ " jobs that were in-progress at the time of the last shut-down."); |
|
|
|
|
|
|
|
|
|
for (OperableTrigger recoveringJobTrigger: recoveringJobTriggers) { |
|
|
|
|
for (OperableTrigger recoveringJobTrigger : recoveringJobTriggers) { |
|
|
|
|
try { |
|
|
|
|
if (jobExists(conn, recoveringJobTrigger.getJobKey())) { |
|
|
|
|
recoveringJobTrigger.computeFirstFireTime(null); |
|
|
|
|
storeTrigger(conn, recoveringJobTrigger, null, false, |
|
|
|
|
STATE_WAITING, false, true); |
|
|
|
|
} |
|
|
|
|
} catch (JobPersistenceException e) { |
|
|
|
|
getLog().error(e.getMessage()); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
getLog().info("Recovery complete."); |
|
|
|
|
|
|
|
|
|
// remove lingering 'complete' triggers...
|
|
|
|
|
List<TriggerKey> cts = getDelegate().selectTriggersInState(conn, STATE_COMPLETE); |
|
|
|
|
for(TriggerKey ct: cts) { |
|
|
|
|
for (TriggerKey ct : cts) { |
|
|
|
|
removeTrigger(conn, ct); |
|
|
|
|
} |
|
|
|
|
getLog().info( |
|
|
|
@ -933,9 +938,11 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
public boolean hasMoreMisfiredTriggers() { |
|
|
|
|
return _hasMoreMisfiredTriggers; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public int getProcessedMisfiredTriggerCount() { |
|
|
|
|
return _processedMisfiredTriggerCount; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public long getEarliestNewTime() { |
|
|
|
|
return _earliestNewTime; |
|
|
|
|
} |
|
|
|
@ -974,7 +981,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
return RecoverMisfiredJobsResult.NO_OP; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for (TriggerKey triggerKey: misfiredTriggers) { |
|
|
|
|
for (TriggerKey triggerKey : misfiredTriggers) { |
|
|
|
|
|
|
|
|
|
OperableTrigger trig = |
|
|
|
|
retrieveTrigger(conn, triggerKey); |
|
|
|
@ -985,7 +992,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
|
|
|
|
|
doUpdateOfMisfiredTrigger(conn, trig, false, STATE_WAITING, recovering); |
|
|
|
|
|
|
|
|
|
if(trig.getNextFireTime() != null && trig.getNextFireTime().getTime() < earliestNewTime) |
|
|
|
|
if (trig.getNextFireTime() != null && trig.getNextFireTime().getTime() < earliestNewTime) |
|
|
|
|
earliestNewTime = trig.getNextFireTime().getTime(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -1044,12 +1051,9 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
* Store the given <code>{@link JobDetail}</code> and <code>{@link Trigger}</code>. |
|
|
|
|
* </p> |
|
|
|
|
* |
|
|
|
|
* @param newJob |
|
|
|
|
* The <code>JobDetail</code> to be stored. |
|
|
|
|
* @param newTrigger |
|
|
|
|
* The <code>Trigger</code> to be stored. |
|
|
|
|
* @throws ObjectAlreadyExistsException |
|
|
|
|
* if a <code>Job</code> with the same name/group already |
|
|
|
|
* @param newJob The <code>JobDetail</code> to be stored. |
|
|
|
|
* @param newTrigger The <code>Trigger</code> to be stored. |
|
|
|
|
* @throws ObjectAlreadyExistsException if a <code>Job</code> with the same name/group already |
|
|
|
|
* exists. |
|
|
|
|
*/ |
|
|
|
|
public void storeJobAndTrigger(final JobDetail newJob, |
|
|
|
@ -1071,14 +1075,11 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
* Store the given <code>{@link JobDetail}</code>. |
|
|
|
|
* </p> |
|
|
|
|
* |
|
|
|
|
* @param newJob |
|
|
|
|
* The <code>JobDetail</code> to be stored. |
|
|
|
|
* @param replaceExisting |
|
|
|
|
* If <code>true</code>, any <code>Job</code> existing in the |
|
|
|
|
* @param newJob The <code>JobDetail</code> to be stored. |
|
|
|
|
* @param replaceExisting If <code>true</code>, any <code>Job</code> existing in the |
|
|
|
|
* <code>JobStore</code> with the same name & group should be |
|
|
|
|
* over-written. |
|
|
|
|
* @throws ObjectAlreadyExistsException |
|
|
|
|
* if a <code>Job</code> with the same name/group already |
|
|
|
|
* @throws ObjectAlreadyExistsException if a <code>Job</code> with the same name/group already |
|
|
|
|
* exists, and replaceExisting is set to false. |
|
|
|
|
*/ |
|
|
|
|
public void storeJob(final JobDetail newJob, |
|
|
|
@ -1140,14 +1141,11 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
* Store the given <code>{@link Trigger}</code>. |
|
|
|
|
* </p> |
|
|
|
|
* |
|
|
|
|
* @param newTrigger |
|
|
|
|
* The <code>Trigger</code> to be stored. |
|
|
|
|
* @param replaceExisting |
|
|
|
|
* If <code>true</code>, any <code>Trigger</code> existing in |
|
|
|
|
* @param newTrigger The <code>Trigger</code> to be stored. |
|
|
|
|
* @param replaceExisting If <code>true</code>, any <code>Trigger</code> existing in |
|
|
|
|
* the <code>JobStore</code> with the same name & group should |
|
|
|
|
* be over-written. |
|
|
|
|
* @throws ObjectAlreadyExistsException |
|
|
|
|
* if a <code>Trigger</code> with the same name/group already |
|
|
|
|
* @throws ObjectAlreadyExistsException if a <code>Trigger</code> with the same name/group already |
|
|
|
|
* exists, and replaceExisting is set to false. |
|
|
|
|
*/ |
|
|
|
|
public void storeTrigger(final OperableTrigger newTrigger, |
|
|
|
@ -1187,7 +1185,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
shouldBepaused = getDelegate().isTriggerGroupPaused( |
|
|
|
|
conn, newTrigger.getKey().getGroup()); |
|
|
|
|
|
|
|
|
|
if(!shouldBepaused) { |
|
|
|
|
if (!shouldBepaused) { |
|
|
|
|
shouldBepaused = getDelegate().isTriggerGroupPaused(conn, |
|
|
|
|
ALL_GROUPS_PAUSED); |
|
|
|
|
|
|
|
|
@ -1201,7 +1199,8 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if(job == null) { |
|
|
|
|
try { |
|
|
|
|
if (job == null) { |
|
|
|
|
job = retrieveJob(conn, newTrigger.getJobKey()); |
|
|
|
|
} |
|
|
|
|
if (job == null) { |
|
|
|
@ -1209,7 +1208,6 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
+ newTrigger.getJobKey() |
|
|
|
|
+ ") referenced by the trigger does not exist."); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (job.isConcurrentExectionDisallowed() && !recovering) { |
|
|
|
|
state = checkBlockedState(conn, job.getKey(), state); |
|
|
|
|
} |
|
|
|
@ -1219,6 +1217,9 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
} else { |
|
|
|
|
getDelegate().insertTrigger(conn, newTrigger, state, job); |
|
|
|
|
} |
|
|
|
|
} catch (JobPersistenceException e) { |
|
|
|
|
getLog().error(e.getMessage()); |
|
|
|
|
} |
|
|
|
|
} catch (Exception e) { |
|
|
|
|
throw new JobPersistenceException("Couldn't store trigger '" + newTrigger.getKey() + "' for '" |
|
|
|
|
+ newTrigger.getJobKey() + "' job:" + e.getMessage(), e); |
|
|
|
@ -1271,7 +1272,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
|
|
|
|
|
try { |
|
|
|
|
List<TriggerKey> jobTriggers = getDelegate().selectTriggerKeysForJob(conn, jobKey); |
|
|
|
|
for (TriggerKey jobTrigger: jobTriggers) { |
|
|
|
|
for (TriggerKey jobTrigger : jobTriggers) { |
|
|
|
|
deleteTriggerAndChildren(conn, jobTrigger); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -1326,9 +1327,9 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
public void executeVoid(Connection conn) throws JobPersistenceException { |
|
|
|
|
|
|
|
|
|
// FUTURE_TODO: make this more efficient with a true bulk operation...
|
|
|
|
|
for(JobDetail job: triggersAndJobs.keySet()) { |
|
|
|
|
for (JobDetail job : triggersAndJobs.keySet()) { |
|
|
|
|
storeJob(conn, job, replace); |
|
|
|
|
for(Trigger trigger: triggersAndJobs.get(job)) { |
|
|
|
|
for (Trigger trigger : triggersAndJobs.get(job)) { |
|
|
|
|
storeTrigger(conn, (OperableTrigger) trigger, job, replace, |
|
|
|
|
STATE_WAITING, false, false); |
|
|
|
|
} |
|
|
|
@ -1371,7 +1372,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
* @return The desired <code>Job</code>, or null if there is no match. |
|
|
|
|
*/ |
|
|
|
|
public JobDetail retrieveJob(final JobKey jobKey) throws JobPersistenceException { |
|
|
|
|
return (JobDetail)executeWithoutLock( // no locks necessary for read...
|
|
|
|
|
return (JobDetail) executeWithoutLock( // no locks necessary for read...
|
|
|
|
|
new TransactionCallback() { |
|
|
|
|
public Object execute(Connection conn) throws JobPersistenceException { |
|
|
|
|
return retrieveJob(conn, jobKey); |
|
|
|
@ -1516,7 +1517,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
* match. |
|
|
|
|
*/ |
|
|
|
|
public OperableTrigger retrieveTrigger(final TriggerKey triggerKey) throws JobPersistenceException { |
|
|
|
|
return (OperableTrigger)executeWithoutLock( // no locks necessary for read...
|
|
|
|
|
return (OperableTrigger) executeWithoutLock( // no locks necessary for read...
|
|
|
|
|
new TransactionCallback() { |
|
|
|
|
public Object execute(Connection conn) throws JobPersistenceException { |
|
|
|
|
return retrieveTrigger(conn, triggerKey); |
|
|
|
@ -1547,7 +1548,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
* @see TriggerState#NONE |
|
|
|
|
*/ |
|
|
|
|
public TriggerState getTriggerState(final TriggerKey triggerKey) throws JobPersistenceException { |
|
|
|
|
return (TriggerState)executeWithoutLock( // no locks necessary for read...
|
|
|
|
|
return (TriggerState) executeWithoutLock( // no locks necessary for read...
|
|
|
|
|
new TransactionCallback() { |
|
|
|
|
public Object execute(Connection conn) throws JobPersistenceException { |
|
|
|
|
return getTriggerState(conn, triggerKey); |
|
|
|
@ -1601,16 +1602,12 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
* Store the given <code>{@link Calendar}</code>. |
|
|
|
|
* </p> |
|
|
|
|
* |
|
|
|
|
* @param calName |
|
|
|
|
* The name of the calendar. |
|
|
|
|
* @param calendar |
|
|
|
|
* The <code>Calendar</code> to be stored. |
|
|
|
|
* @param replaceExisting |
|
|
|
|
* If <code>true</code>, any <code>Calendar</code> existing |
|
|
|
|
* @param calName The name of the calendar. |
|
|
|
|
* @param calendar The <code>Calendar</code> to be stored. |
|
|
|
|
* @param replaceExisting If <code>true</code>, any <code>Calendar</code> existing |
|
|
|
|
* in the <code>JobStore</code> with the same name & group |
|
|
|
|
* should be over-written. |
|
|
|
|
* @throws ObjectAlreadyExistsException |
|
|
|
|
* if a <code>Calendar</code> with the same name already |
|
|
|
|
* @throws ObjectAlreadyExistsException if a <code>Calendar</code> with the same name already |
|
|
|
|
* exists, and replaceExisting is set to false. |
|
|
|
|
*/ |
|
|
|
|
public void storeCalendar(final String calName, |
|
|
|
@ -1641,10 +1638,10 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
"Couldn't store calendar. Update failed."); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if(updateTriggers) { |
|
|
|
|
if (updateTriggers) { |
|
|
|
|
List<OperableTrigger> trigs = getDelegate().selectTriggersForCalendar(conn, calName); |
|
|
|
|
|
|
|
|
|
for(OperableTrigger trigger: trigs) { |
|
|
|
|
for (OperableTrigger trigger : trigs) { |
|
|
|
|
trigger.updateWithNewCalendar(calendar, getMisfireThreshold()); |
|
|
|
|
storeTrigger(conn, trigger, null, true, STATE_WAITING, false, false); |
|
|
|
|
} |
|
|
|
@ -1667,7 +1664,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
} catch (ClassNotFoundException e) { |
|
|
|
|
throw new JobPersistenceException("Couldn't store calendar: " |
|
|
|
|
+ e.getMessage(), e); |
|
|
|
|
}catch (SQLException e) { |
|
|
|
|
} catch (SQLException e) { |
|
|
|
|
throw new JobPersistenceException("Couldn't store calendar: " |
|
|
|
|
+ e.getMessage(), e); |
|
|
|
|
} |
|
|
|
@ -1695,6 +1692,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
* <code>Trigger</code>s pointing to non-existent calendars, then a |
|
|
|
|
* <code>JobPersistenceException</code> will be thrown.</p> |
|
|
|
|
* * |
|
|
|
|
* |
|
|
|
|
* @param calName The name of the <code>Calendar</code> to be removed. |
|
|
|
|
* @return <code>true</code> if a <code>Calendar</code> with the given name |
|
|
|
|
* was found and removed from the store. |
|
|
|
@ -1735,14 +1733,13 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
* Retrieve the given <code>{@link Trigger}</code>. |
|
|
|
|
* </p> |
|
|
|
|
* |
|
|
|
|
* @param calName |
|
|
|
|
* The name of the <code>Calendar</code> to be retrieved. |
|
|
|
|
* @param calName The name of the <code>Calendar</code> to be retrieved. |
|
|
|
|
* @return The desired <code>Calendar</code>, or null if there is no |
|
|
|
|
* match. |
|
|
|
|
*/ |
|
|
|
|
public Calendar retrieveCalendar(final String calName) |
|
|
|
|
throws JobPersistenceException { |
|
|
|
|
return (Calendar)executeWithoutLock( // no locks necessary for read...
|
|
|
|
|
return (Calendar) executeWithoutLock( // no locks necessary for read...
|
|
|
|
|
new TransactionCallback() { |
|
|
|
|
public Object execute(Connection conn) throws JobPersistenceException { |
|
|
|
|
return retrieveCalendar(conn, calName); |
|
|
|
@ -1871,7 +1868,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
@SuppressWarnings("unchecked") |
|
|
|
|
public Set<JobKey> getJobKeys(final GroupMatcher<JobKey> matcher) |
|
|
|
|
throws JobPersistenceException { |
|
|
|
|
return (Set<JobKey>)executeWithoutLock( // no locks necessary for read...
|
|
|
|
|
return (Set<JobKey>) executeWithoutLock( // no locks necessary for read...
|
|
|
|
|
new TransactionCallback() { |
|
|
|
|
public Object execute(Connection conn) throws JobPersistenceException { |
|
|
|
|
return getJobNames(conn, matcher); |
|
|
|
@ -1903,7 +1900,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
* @throws JobPersistenceException |
|
|
|
|
*/ |
|
|
|
|
public boolean checkExists(final JobKey jobKey) throws JobPersistenceException { |
|
|
|
|
return (Boolean)executeWithoutLock( // no locks necessary for read...
|
|
|
|
|
return (Boolean) executeWithoutLock( // no locks necessary for read...
|
|
|
|
|
new TransactionCallback() { |
|
|
|
|
public Object execute(Connection conn) throws JobPersistenceException { |
|
|
|
|
return checkExists(conn, jobKey); |
|
|
|
@ -1929,7 +1926,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
* @throws JobPersistenceException |
|
|
|
|
*/ |
|
|
|
|
public boolean checkExists(final TriggerKey triggerKey) throws JobPersistenceException { |
|
|
|
|
return (Boolean)executeWithoutLock( // no locks necessary for read...
|
|
|
|
|
return (Boolean) executeWithoutLock( // no locks necessary for read...
|
|
|
|
|
new TransactionCallback() { |
|
|
|
|
public Object execute(Connection conn) throws JobPersistenceException { |
|
|
|
|
return checkExists(conn, triggerKey); |
|
|
|
@ -1984,7 +1981,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
@SuppressWarnings("unchecked") |
|
|
|
|
public Set<TriggerKey> getTriggerKeys(final GroupMatcher<TriggerKey> matcher) |
|
|
|
|
throws JobPersistenceException { |
|
|
|
|
return (Set<TriggerKey>)executeWithoutLock( // no locks necessary for read...
|
|
|
|
|
return (Set<TriggerKey>) executeWithoutLock( // no locks necessary for read...
|
|
|
|
|
new TransactionCallback() { |
|
|
|
|
public Object execute(Connection conn) throws JobPersistenceException { |
|
|
|
|
return getTriggerNames(conn, matcher); |
|
|
|
@ -2022,7 +2019,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
@SuppressWarnings("unchecked") |
|
|
|
|
public List<String> getJobGroupNames() |
|
|
|
|
throws JobPersistenceException { |
|
|
|
|
return (List<String>)executeWithoutLock( // no locks necessary for read...
|
|
|
|
|
return (List<String>) executeWithoutLock( // no locks necessary for read...
|
|
|
|
|
new TransactionCallback() { |
|
|
|
|
public Object execute(Connection conn) throws JobPersistenceException { |
|
|
|
|
return getJobGroupNames(conn); |
|
|
|
@ -2059,7 +2056,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
@SuppressWarnings("unchecked") |
|
|
|
|
public List<String> getTriggerGroupNames() |
|
|
|
|
throws JobPersistenceException { |
|
|
|
|
return (List<String>)executeWithoutLock( // no locks necessary for read...
|
|
|
|
|
return (List<String>) executeWithoutLock( // no locks necessary for read...
|
|
|
|
|
new TransactionCallback() { |
|
|
|
|
public Object execute(Connection conn) throws JobPersistenceException { |
|
|
|
|
return getTriggerGroupNames(conn); |
|
|
|
@ -2095,7 +2092,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
@SuppressWarnings("unchecked") |
|
|
|
|
public List<String> getCalendarNames() |
|
|
|
|
throws JobPersistenceException { |
|
|
|
|
return (List<String>)executeWithoutLock( // no locks necessary for read...
|
|
|
|
|
return (List<String>) executeWithoutLock( // no locks necessary for read...
|
|
|
|
|
new TransactionCallback() { |
|
|
|
|
public Object execute(Connection conn) throws JobPersistenceException { |
|
|
|
|
return getCalendarNames(conn); |
|
|
|
@ -2124,7 +2121,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
*/ |
|
|
|
|
@SuppressWarnings("unchecked") |
|
|
|
|
public List<OperableTrigger> getTriggersForJob(final JobKey jobKey) throws JobPersistenceException { |
|
|
|
|
return (List<OperableTrigger>)executeWithoutLock( // no locks necessary for read...
|
|
|
|
|
return (List<OperableTrigger>) executeWithoutLock( // no locks necessary for read...
|
|
|
|
|
new TransactionCallback() { |
|
|
|
|
public Object execute(Connection conn) throws JobPersistenceException { |
|
|
|
|
return getTriggersForJob(conn, jobKey); |
|
|
|
@ -2209,7 +2206,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
new VoidTransactionCallback() { |
|
|
|
|
public void executeVoid(Connection conn) throws JobPersistenceException { |
|
|
|
|
List<OperableTrigger> triggers = getTriggersForJob(conn, jobKey); |
|
|
|
|
for (OperableTrigger trigger: triggers) { |
|
|
|
|
for (OperableTrigger trigger : triggers) { |
|
|
|
|
pauseTrigger(conn, trigger.getKey()); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -2335,7 +2332,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
boolean blocked = false; |
|
|
|
|
if(STATE_PAUSED_BLOCKED.equals(status.getStatus())) { |
|
|
|
|
if (STATE_PAUSED_BLOCKED.equals(status.getStatus())) { |
|
|
|
|
blocked = true; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -2348,8 +2345,8 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
newState, true); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if(!misfired) { |
|
|
|
|
if(blocked) { |
|
|
|
|
if (!misfired) { |
|
|
|
|
if (blocked) { |
|
|
|
|
getDelegate().updateTriggerStateFromOtherState(conn, |
|
|
|
|
key, newState, STATE_PAUSED_BLOCKED); |
|
|
|
|
} else { |
|
|
|
@ -2384,7 +2381,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
new VoidTransactionCallback() { |
|
|
|
|
public void executeVoid(Connection conn) throws JobPersistenceException { |
|
|
|
|
List<OperableTrigger> triggers = getTriggersForJob(conn, jobKey); |
|
|
|
|
for (OperableTrigger trigger: triggers) { |
|
|
|
|
for (OperableTrigger trigger : triggers) { |
|
|
|
|
resumeTrigger(conn, trigger.getKey()); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -2415,9 +2412,9 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
Set<JobKey> jobKeys = getJobNames(conn, matcher); |
|
|
|
|
Set<String> groupNames = new HashSet<String>(); |
|
|
|
|
|
|
|
|
|
for (JobKey jobKey: jobKeys) { |
|
|
|
|
for (JobKey jobKey : jobKeys) { |
|
|
|
|
List<OperableTrigger> triggers = getTriggersForJob(conn, jobKey); |
|
|
|
|
for (OperableTrigger trigger: triggers) { |
|
|
|
|
for (OperableTrigger trigger : triggers) { |
|
|
|
|
resumeTrigger(conn, trigger.getKey()); |
|
|
|
|
} |
|
|
|
|
groupNames.add(jobKey.getGroup()); |
|
|
|
@ -2492,7 +2489,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
@SuppressWarnings("unchecked") |
|
|
|
|
public Set<String> getPausedTriggerGroups() |
|
|
|
|
throws JobPersistenceException { |
|
|
|
|
return (Set<String>)executeWithoutLock( // no locks necessary for read...
|
|
|
|
|
return (Set<String>) executeWithoutLock( // no locks necessary for read...
|
|
|
|
|
new TransactionCallback() { |
|
|
|
|
public Object execute(Connection conn) throws JobPersistenceException { |
|
|
|
|
return getPausedTriggerGroups(conn); |
|
|
|
@ -2569,7 +2566,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
Set<TriggerKey> keys = getDelegate().selectTriggersInGroup(conn, |
|
|
|
|
matcher); |
|
|
|
|
|
|
|
|
|
for (TriggerKey key: keys) { |
|
|
|
|
for (TriggerKey key : keys) { |
|
|
|
|
resumeTrigger(conn, key); |
|
|
|
|
groups.add(key.getGroup()); |
|
|
|
|
} |
|
|
|
@ -2658,7 +2655,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
|
|
|
|
|
List<String> names = getTriggerGroupNames(conn); |
|
|
|
|
|
|
|
|
|
for (String name: names) { |
|
|
|
|
for (String name : names) { |
|
|
|
|
pauseTriggerGroup(conn, GroupMatcher.triggerGroupEquals(name)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -2717,7 +2714,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
|
|
|
|
|
List<String> names = getTriggerGroupNames(conn); |
|
|
|
|
|
|
|
|
|
for (String name: names) { |
|
|
|
|
for (String name : names) { |
|
|
|
|
resumeTriggerGroup(conn, GroupMatcher.triggerGroupEquals(name)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -2748,7 +2745,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
throws JobPersistenceException { |
|
|
|
|
|
|
|
|
|
String lockName; |
|
|
|
|
if(isAcquireTriggersWithinLock() || maxCount > 1) { |
|
|
|
|
if (isAcquireTriggersWithinLock() || maxCount > 1) { |
|
|
|
|
lockName = LOCK_TRIGGER_ACCESS; |
|
|
|
|
} else { |
|
|
|
|
lockName = null; |
|
|
|
@ -2793,7 +2790,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
final int MAX_DO_LOOP_RETRY = 3; |
|
|
|
|
int currentLoopCount = 0; |
|
|
|
|
do { |
|
|
|
|
currentLoopCount ++; |
|
|
|
|
currentLoopCount++; |
|
|
|
|
try { |
|
|
|
|
List<TriggerKey> keys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, getMisfireTime(), maxCount); |
|
|
|
|
|
|
|
|
@ -2803,10 +2800,10 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
|
|
|
|
|
long batchEnd = noLaterThan; |
|
|
|
|
|
|
|
|
|
for(TriggerKey triggerKey: keys) { |
|
|
|
|
for (TriggerKey triggerKey : keys) { |
|
|
|
|
// If our trigger is no longer available, try a new one.
|
|
|
|
|
OperableTrigger nextTrigger = retrieveTrigger(conn, triggerKey); |
|
|
|
|
if(nextTrigger == null) { |
|
|
|
|
if (nextTrigger == null) { |
|
|
|
|
continue; // next trigger
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -2846,7 +2843,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
nextTrigger.setFireInstanceId(getFiredTriggerRecordId()); |
|
|
|
|
getDelegate().insertFiredTrigger(conn, nextTrigger, STATE_ACQUIRED, null); |
|
|
|
|
|
|
|
|
|
if(acquiredTriggers.isEmpty()) { |
|
|
|
|
if (acquiredTriggers.isEmpty()) { |
|
|
|
|
batchEnd = Math.max(nextTrigger.getNextFireTime().getTime(), System.currentTimeMillis()) + timeWindow; |
|
|
|
|
} |
|
|
|
|
acquiredTriggers.add(nextTrigger); |
|
|
|
@ -2854,7 +2851,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
|
|
|
|
|
// if we didn't end up with any trigger to fire from that first
|
|
|
|
|
// batch, try again for another batch. We allow with a max retry count.
|
|
|
|
|
if(acquiredTriggers.size() == 0 && currentLoopCount < MAX_DO_LOOP_RETRY) { |
|
|
|
|
if (acquiredTriggers.size() == 0 && currentLoopCount < MAX_DO_LOOP_RETRY) { |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -2925,7 +2922,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
result = new TriggerFiredResult(bundle); |
|
|
|
|
} catch (JobPersistenceException jpe) { |
|
|
|
|
result = new TriggerFiredResult(jpe); |
|
|
|
|
} catch(RuntimeException re) { |
|
|
|
|
} catch (RuntimeException re) { |
|
|
|
|
result = new TriggerFiredResult(re); |
|
|
|
|
} |
|
|
|
|
results.add(result); |
|
|
|
@ -2978,7 +2975,9 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
|
|
|
|
|
try { |
|
|
|
|
job = retrieveJob(conn, trigger.getJobKey()); |
|
|
|
|
if (job == null) { return null; } |
|
|
|
|
if (job == null) { |
|
|
|
|
return null; |
|
|
|
|
} |
|
|
|
|
} catch (JobPersistenceException jpe) { |
|
|
|
|
try { |
|
|
|
|
getLog().error("Error retrieving job, setting trigger state to ERROR.", jpe); |
|
|
|
@ -2992,7 +2991,9 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
|
|
|
|
|
if (trigger.getCalendarName() != null) { |
|
|
|
|
cal = retrieveCalendar(conn, trigger.getCalendarName()); |
|
|
|
|
if (cal == null) { return null; } |
|
|
|
|
if (cal == null) { |
|
|
|
|
return null; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
try { |
|
|
|
@ -3056,7 +3057,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
LOCK_TRIGGER_ACCESS, |
|
|
|
|
new VoidTransactionCallback() { |
|
|
|
|
public void executeVoid(Connection conn) throws JobPersistenceException { |
|
|
|
|
triggeredJobComplete(conn, trigger, jobDetail,triggerInstCode); |
|
|
|
|
triggeredJobComplete(conn, trigger, jobDetail, triggerInstCode); |
|
|
|
|
} |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
@ -3066,15 +3067,15 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
CompletedExecutionInstruction triggerInstCode) throws JobPersistenceException { |
|
|
|
|
try { |
|
|
|
|
if (triggerInstCode == CompletedExecutionInstruction.DELETE_TRIGGER) { |
|
|
|
|
if(trigger.getNextFireTime() == null) { |
|
|
|
|
if (trigger.getNextFireTime() == null) { |
|
|
|
|
// double check for possible reschedule within job
|
|
|
|
|
// execution, which would cancel the need to delete...
|
|
|
|
|
TriggerStatus stat = getDelegate().selectTriggerStatus( |
|
|
|
|
conn, trigger.getKey()); |
|
|
|
|
if(stat != null && stat.getNextFireTime() == null) { |
|
|
|
|
if (stat != null && stat.getNextFireTime() == null) { |
|
|
|
|
removeTrigger(conn, trigger.getKey()); |
|
|
|
|
} |
|
|
|
|
} else{ |
|
|
|
|
} else { |
|
|
|
|
removeTrigger(conn, trigger.getKey()); |
|
|
|
|
signalSchedulingChangeOnTxCompletion(0L); |
|
|
|
|
} |
|
|
|
@ -3142,10 +3143,10 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
* </p> |
|
|
|
|
*/ |
|
|
|
|
protected DriverDelegate getDelegate() throws NoSuchDelegateException { |
|
|
|
|
synchronized(this) { |
|
|
|
|
if(null == delegate) { |
|
|
|
|
synchronized (this) { |
|
|
|
|
if (null == delegate) { |
|
|
|
|
try { |
|
|
|
|
if(delegateClassName != null) { |
|
|
|
|
if (delegateClassName != null) { |
|
|
|
|
delegateClass = getClassLoadHelper().loadClass(delegateClassName, DriverDelegate.class); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -3225,12 +3226,13 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
protected ThreadLocal<Long> sigChangeForTxCompletion = new ThreadLocal<Long>(); |
|
|
|
|
|
|
|
|
|
protected void signalSchedulingChangeOnTxCompletion(long candidateNewNextFireTime) { |
|
|
|
|
Long sigTime = sigChangeForTxCompletion.get(); |
|
|
|
|
if(sigTime == null && candidateNewNextFireTime >= 0L) |
|
|
|
|
if (sigTime == null && candidateNewNextFireTime >= 0L) |
|
|
|
|
sigChangeForTxCompletion.set(candidateNewNextFireTime); |
|
|
|
|
else { |
|
|
|
|
if(sigTime == null || candidateNewNextFireTime < sigTime) |
|
|
|
|
if (sigTime == null || candidateNewNextFireTime < sigTime) |
|
|
|
|
sigChangeForTxCompletion.set(candidateNewNextFireTime); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -3322,7 +3324,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
|
|
|
|
|
List<SchedulerStateRecord> states = getDelegate().selectSchedulerStateRecords(conn, null); |
|
|
|
|
|
|
|
|
|
for(SchedulerStateRecord rec: states) { |
|
|
|
|
for (SchedulerStateRecord rec : states) { |
|
|
|
|
|
|
|
|
|
// find own record...
|
|
|
|
|
if (rec.getSchedulerInstanceId().equals(getInstanceId())) { |
|
|
|
@ -3376,12 +3378,12 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
|
|
|
|
|
Set<String> allFiredTriggerInstanceNames = getDelegate().selectFiredTriggerInstanceNames(conn); |
|
|
|
|
if (!allFiredTriggerInstanceNames.isEmpty()) { |
|
|
|
|
for (SchedulerStateRecord rec: schedulerStateRecords) { |
|
|
|
|
for (SchedulerStateRecord rec : schedulerStateRecords) { |
|
|
|
|
|
|
|
|
|
allFiredTriggerInstanceNames.remove(rec.getSchedulerInstanceId()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for (String inst: allFiredTriggerInstanceNames) { |
|
|
|
|
for (String inst : allFiredTriggerInstanceNames) { |
|
|
|
|
|
|
|
|
|
SchedulerStateRecord orphanedInstance = new SchedulerStateRecord(); |
|
|
|
|
orphanedInstance.setSchedulerInstanceId(inst); |
|
|
|
@ -3413,7 +3415,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
|
|
|
|
|
// check in...
|
|
|
|
|
lastCheckin = System.currentTimeMillis(); |
|
|
|
|
if(getDelegate().updateSchedulerState(conn, getInstanceId(), lastCheckin) == 0) { |
|
|
|
|
if (getDelegate().updateSchedulerState(conn, getInstanceId(), lastCheckin) == 0) { |
|
|
|
|
getDelegate().insertSchedulerState(conn, getInstanceId(), |
|
|
|
|
lastCheckin, getClusterCheckinInterval()); |
|
|
|
|
} |
|
|
|
@ -3605,13 +3607,13 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
protected void cleanupConnection(Connection conn) { |
|
|
|
|
if (conn != null) { |
|
|
|
|
if (conn instanceof Proxy) { |
|
|
|
|
Proxy connProxy = (Proxy)conn; |
|
|
|
|
Proxy connProxy = (Proxy) conn; |
|
|
|
|
|
|
|
|
|
InvocationHandler invocationHandler = |
|
|
|
|
Proxy.getInvocationHandler(connProxy); |
|
|
|
|
if (invocationHandler instanceof AttributeRestoringConnectionInvocationHandler) { |
|
|
|
|
AttributeRestoringConnectionInvocationHandler connHandler = |
|
|
|
|
(AttributeRestoringConnectionInvocationHandler)invocationHandler; |
|
|
|
|
(AttributeRestoringConnectionInvocationHandler) invocationHandler; |
|
|
|
|
|
|
|
|
|
connHandler.restoreOriginalAtributes(); |
|
|
|
|
closeConnection(connHandler.getWrappedConnection()); |
|
|
|
@ -3666,7 +3668,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
conn.rollback(); |
|
|
|
|
} catch (SQLException e) { |
|
|
|
|
getLog().error( |
|
|
|
|
"Couldn't rollback jdbc connection. "+e.getMessage(), e); |
|
|
|
|
"Couldn't rollback jdbc connection. " + e.getMessage(), e); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -3686,7 +3688,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
conn.commit(); |
|
|
|
|
} catch (SQLException e) { |
|
|
|
|
throw new JobPersistenceException( |
|
|
|
|
"Couldn't commit jdbc connection. "+e.getMessage(), e); |
|
|
|
|
"Couldn't commit jdbc connection. " + e.getMessage(), e); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -3757,7 +3759,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
try { |
|
|
|
|
return executeInNonManagedTXLock(lockName, txCallback, null); |
|
|
|
|
} catch (JobPersistenceException jpe) { |
|
|
|
|
if(retry % 4 == 0) { |
|
|
|
|
if (retry % 4 == 0) { |
|
|
|
|
schedSignaler.notifySchedulerListenersError("An error occurred while " + txCallback, jpe); |
|
|
|
|
} |
|
|
|
|
} catch (RuntimeException e) { |
|
|
|
@ -3816,7 +3818,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Long sigTime = clearAndGetSignalSchedulingChangeOnTxCompletion(); |
|
|
|
|
if(sigTime != null && sigTime >= 0) { |
|
|
|
|
if (sigTime != null && sigTime >= 0) { |
|
|
|
|
signalSchedulingChangeImmediately(sigTime); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -3876,7 +3878,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
numFails = 0; |
|
|
|
|
getLog().debug("ClusterManager: Check-in complete."); |
|
|
|
|
} catch (Exception e) { |
|
|
|
|
if(numFails % 4 == 0) { |
|
|
|
|
if (numFails % 4 == 0) { |
|
|
|
|
getLog().error( |
|
|
|
|
"ClusterManager: Error managing cluster: " |
|
|
|
|
+ e.getMessage(), e); |
|
|
|
@ -3898,7 +3900,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
timeToSleep = 100L; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if(numFails > 0) { |
|
|
|
|
if (numFails > 0) { |
|
|
|
|
timeToSleep = Math.max(getDbRetryInterval(), timeToSleep); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -3952,7 +3954,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
numFails = 0; |
|
|
|
|
return res; |
|
|
|
|
} catch (Exception e) { |
|
|
|
|
if(numFails % 4 == 0) { |
|
|
|
|
if (numFails % 4 == 0) { |
|
|
|
|
getLog().error( |
|
|
|
|
"MisfireHandler: Error handling misfires: " |
|
|
|
|
+ e.getMessage(), e); |
|
|
|
@ -3983,7 +3985,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
|
|
|
|
|
timeToSleep = 50l; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if(numFails > 0) { |
|
|
|
|
if (numFails > 0) { |
|
|
|
|
timeToSleep = Math.max(getDbRetryInterval(), timeToSleep); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|