Browse Source

DEC-7376 定时调度任务表中存在脏数据的时候,不能影响整个scheduler

bugfix/10.0
zed 6 years ago
parent
commit
362aa2e8e7
  1. 265
      fine-quartz/src/com/fr/third/v2/org/quartz/impl/jdbcjobstore/JobStoreSupport.java

265
fine-quartz/src/com/fr/third/v2/org/quartz/impl/jdbcjobstore/JobStoreSupport.java

@ -17,38 +17,21 @@
package com.fr.third.v2.org.quartz.impl.jdbcjobstore;
import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.fr.third.v2.org.quartz.Calendar;
import com.fr.third.v2.org.quartz.Job;
import com.fr.third.v2.org.quartz.JobDataMap;
import com.fr.third.v2.org.quartz.JobDetail;
import com.fr.third.v2.org.quartz.JobKey;
import com.fr.third.v2.org.quartz.JobPersistenceException;
import com.fr.third.v2.org.quartz.ObjectAlreadyExistsException;
import com.fr.third.v2.org.quartz.Scheduler;
import com.fr.third.v2.org.quartz.SchedulerConfigException;
import com.fr.third.v2.org.quartz.TriggerKey;
import com.fr.third.v2.org.quartz.spi.OperableTrigger;
import com.fr.third.v2.org.quartz.spi.ThreadExecutor;
import com.fr.third.v2.org.quartz.spi.TriggerFiredResult;
import com.fr.third.v2.org.quartz.JobDataMap;
import com.fr.third.v2.org.quartz.JobKey;
import com.fr.third.v2.org.quartz.JobPersistenceException;
import com.fr.third.v2.org.quartz.SchedulerException;
import com.fr.third.v2.org.quartz.SimpleTrigger;
import com.fr.third.v2.org.quartz.Trigger;
import com.fr.third.v2.org.quartz.Trigger.CompletedExecutionInstruction;
import com.fr.third.v2.org.quartz.Trigger.TriggerState;
import com.fr.third.v2.org.quartz.TriggerKey;
import com.fr.third.v2.org.quartz.impl.DefaultThreadExecutor;
import com.fr.third.v2.org.quartz.impl.matchers.GroupMatcher;
import com.fr.third.v2.org.quartz.impl.matchers.StringMatcher;
@ -56,12 +39,29 @@ import com.fr.third.v2.org.quartz.impl.matchers.StringMatcher.StringOperatorName
import com.fr.third.v2.org.quartz.impl.triggers.SimpleTriggerImpl;
import com.fr.third.v2.org.quartz.spi.ClassLoadHelper;
import com.fr.third.v2.org.quartz.spi.JobStore;
import com.fr.third.v2.org.quartz.spi.OperableTrigger;
import com.fr.third.v2.org.quartz.spi.SchedulerSignaler;
import com.fr.third.v2.org.quartz.spi.ThreadExecutor;
import com.fr.third.v2.org.quartz.spi.TriggerFiredBundle;
import com.fr.third.v2.org.quartz.spi.TriggerFiredResult;
import com.fr.third.v2.org.quartz.utils.DBConnectionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* <p>
@ -354,6 +354,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
public long getDbRetryInterval() {
return dbRetryInterval;
}
/**
* @param dbRetryInterval The dbRetryInterval to set.
*/
@ -473,7 +474,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
* should be performed after obtaining an explicit DB lock. This is the
* behavior prior to Quartz 1.6.3, but is considered unnecessary for most
* databases, and therefore a superfluous performance hit.
*
* <p>
* However, if batch acquisition is used, it is important for this behavior
* to be used for all dbs.
*/
@ -488,13 +489,12 @@ public abstract class JobStoreSupport implements JobStore, Constants {
* Set the JDBC driver delegate class.
* </p>
*
* @param delegateClassName
* the delegate class name
* @param delegateClassName the delegate class name
*/
@SuppressWarnings("UnusedDeclaration") /* called reflectively */
public void setDriverDelegateClass(String delegateClassName)
throws InvalidConfigurationException {
synchronized(this) {
synchronized (this) {
this.delegateClassName = delegateClassName;
}
}
@ -515,8 +515,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
* Set the JDBC driver delegate's initialization string.
* </p>
*
* @param delegateInitString
* the delegate init string
* @param delegateInitString the delegate init string
*/
@SuppressWarnings("UnusedDeclaration") /* called reflectively */
public void setDriverDelegateInitString(String delegateInitString)
@ -639,7 +638,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
}
classLoadHelper = loadHelper;
if(isThreadsInheritInitializersClassLoadContext()) {
if (isThreadsInheritInitializersClassLoadContext()) {
log.info("JDBCJobStore threads will inherit ContextClassLoader of thread: " + Thread.currentThread().getName());
initializersLoader = Thread.currentThread().getContextClassLoader();
}
@ -657,8 +656,8 @@ public abstract class JobStoreSupport implements JobStore, Constants {
}
if (getUseDBLocks()) {
if(getDriverDelegateClass() != null && getDriverDelegateClass().equals(MSSQLDelegate.class.getName())) {
if(getSelectWithLockSQL() == null) {
if (getDriverDelegateClass() != null && getDriverDelegateClass().equals(MSSQLDelegate.class.getName())) {
if (getSelectWithLockSQL() == null) {
String msSqlDflt = "SELECT * FROM {0}LOCKS WITH (UPDLOCK,ROWLOCK) WHERE " + COL_SCHEDULER_NAME + " = {1} AND LOCK_NAME = ?";
getLog().info("Detected usage of MSSQLDelegate class - defaulting 'selectWithLockSQL' to '" + msSqlDflt + "'.");
setSelectWithLockSQL(msSqlDflt);
@ -682,7 +681,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
if (isClustered()) {
clusterManagementThread = new ClusterManager();
if(initializersLoader != null)
if (initializersLoader != null)
clusterManagementThread.setContextClassLoader(initializersLoader);
clusterManagementThread.initialize();
} else {
@ -695,7 +694,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
}
misfireHandler = new MisfireHandler();
if(initializersLoader != null)
if (initializersLoader != null)
misfireHandler.setContextClassLoader(initializersLoader);
misfireHandler.initialize();
schedulerRunning = true;
@ -763,9 +762,9 @@ public abstract class JobStoreSupport implements JobStore, Constants {
* (and potentially restored to a pool).
*/
protected Connection getAttributeRestoringConnection(Connection conn) {
return (Connection)Proxy.newProxyInstance(
return (Connection) Proxy.newProxyInstance(
Thread.currentThread().getContextClassLoader(),
new Class[] { Connection.class },
new Class[]{Connection.class},
new AttributeRestoringConnectionInvocationHandler(conn));
}
@ -799,13 +798,16 @@ public abstract class JobStoreSupport implements JobStore, Constants {
conn.setAutoCommit(false);
}
if(isTxIsolationLevelSerializable()) {
if (isTxIsolationLevelSerializable()) {
conn.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
}
} catch (SQLException sqle) {
getLog().warn("Failed to override connection auto commit/transaction isolation.", sqle);
} catch (Throwable e) {
try { conn.close(); } catch(Throwable ignored) {}
try {
conn.close();
} catch (Throwable ignored) {
}
throw new JobPersistenceException(
"Failure setting up connection.", e);
@ -846,8 +848,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
* appropriate.
* </p>
*
* @throws JobPersistenceException
* if jobs could not be recovered
* @throws JobPersistenceException if jobs could not be recovered
*/
protected void recoverJobs(Connection conn) throws JobPersistenceException {
try {
@ -874,18 +875,22 @@ public abstract class JobStoreSupport implements JobStore, Constants {
+ recoveringJobTriggers.size()
+ " jobs that were in-progress at the time of the last shut-down.");
for (OperableTrigger recoveringJobTrigger: recoveringJobTriggers) {
for (OperableTrigger recoveringJobTrigger : recoveringJobTriggers) {
try {
if (jobExists(conn, recoveringJobTrigger.getJobKey())) {
recoveringJobTrigger.computeFirstFireTime(null);
storeTrigger(conn, recoveringJobTrigger, null, false,
STATE_WAITING, false, true);
}
} catch (JobPersistenceException e) {
getLog().error(e.getMessage());
}
}
getLog().info("Recovery complete.");
// remove lingering 'complete' triggers...
List<TriggerKey> cts = getDelegate().selectTriggersInState(conn, STATE_COMPLETE);
for(TriggerKey ct: cts) {
for (TriggerKey ct : cts) {
removeTrigger(conn, ct);
}
getLog().info(
@ -933,9 +938,11 @@ public abstract class JobStoreSupport implements JobStore, Constants {
public boolean hasMoreMisfiredTriggers() {
return _hasMoreMisfiredTriggers;
}
public int getProcessedMisfiredTriggerCount() {
return _processedMisfiredTriggerCount;
}
public long getEarliestNewTime() {
return _earliestNewTime;
}
@ -974,7 +981,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
return RecoverMisfiredJobsResult.NO_OP;
}
for (TriggerKey triggerKey: misfiredTriggers) {
for (TriggerKey triggerKey : misfiredTriggers) {
OperableTrigger trig =
retrieveTrigger(conn, triggerKey);
@ -985,7 +992,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
doUpdateOfMisfiredTrigger(conn, trig, false, STATE_WAITING, recovering);
if(trig.getNextFireTime() != null && trig.getNextFireTime().getTime() < earliestNewTime)
if (trig.getNextFireTime() != null && trig.getNextFireTime().getTime() < earliestNewTime)
earliestNewTime = trig.getNextFireTime().getTime();
}
@ -1044,12 +1051,9 @@ public abstract class JobStoreSupport implements JobStore, Constants {
* Store the given <code>{@link JobDetail}</code> and <code>{@link Trigger}</code>.
* </p>
*
* @param newJob
* The <code>JobDetail</code> to be stored.
* @param newTrigger
* The <code>Trigger</code> to be stored.
* @throws ObjectAlreadyExistsException
* if a <code>Job</code> with the same name/group already
* @param newJob The <code>JobDetail</code> to be stored.
* @param newTrigger The <code>Trigger</code> to be stored.
* @throws ObjectAlreadyExistsException if a <code>Job</code> with the same name/group already
* exists.
*/
public void storeJobAndTrigger(final JobDetail newJob,
@ -1071,14 +1075,11 @@ public abstract class JobStoreSupport implements JobStore, Constants {
* Store the given <code>{@link JobDetail}</code>.
* </p>
*
* @param newJob
* The <code>JobDetail</code> to be stored.
* @param replaceExisting
* If <code>true</code>, any <code>Job</code> existing in the
* @param newJob The <code>JobDetail</code> to be stored.
* @param replaceExisting If <code>true</code>, any <code>Job</code> existing in the
* <code>JobStore</code> with the same name & group should be
* over-written.
* @throws ObjectAlreadyExistsException
* if a <code>Job</code> with the same name/group already
* @throws ObjectAlreadyExistsException if a <code>Job</code> with the same name/group already
* exists, and replaceExisting is set to false.
*/
public void storeJob(final JobDetail newJob,
@ -1140,14 +1141,11 @@ public abstract class JobStoreSupport implements JobStore, Constants {
* Store the given <code>{@link Trigger}</code>.
* </p>
*
* @param newTrigger
* The <code>Trigger</code> to be stored.
* @param replaceExisting
* If <code>true</code>, any <code>Trigger</code> existing in
* @param newTrigger The <code>Trigger</code> to be stored.
* @param replaceExisting If <code>true</code>, any <code>Trigger</code> existing in
* the <code>JobStore</code> with the same name & group should
* be over-written.
* @throws ObjectAlreadyExistsException
* if a <code>Trigger</code> with the same name/group already
* @throws ObjectAlreadyExistsException if a <code>Trigger</code> with the same name/group already
* exists, and replaceExisting is set to false.
*/
public void storeTrigger(final OperableTrigger newTrigger,
@ -1187,7 +1185,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
shouldBepaused = getDelegate().isTriggerGroupPaused(
conn, newTrigger.getKey().getGroup());
if(!shouldBepaused) {
if (!shouldBepaused) {
shouldBepaused = getDelegate().isTriggerGroupPaused(conn,
ALL_GROUPS_PAUSED);
@ -1201,7 +1199,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
}
}
if(job == null) {
if (job == null) {
job = retrieveJob(conn, newTrigger.getJobKey());
}
if (job == null) {
@ -1271,7 +1269,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
try {
List<TriggerKey> jobTriggers = getDelegate().selectTriggerKeysForJob(conn, jobKey);
for (TriggerKey jobTrigger: jobTriggers) {
for (TriggerKey jobTrigger : jobTriggers) {
deleteTriggerAndChildren(conn, jobTrigger);
}
@ -1326,9 +1324,9 @@ public abstract class JobStoreSupport implements JobStore, Constants {
public void executeVoid(Connection conn) throws JobPersistenceException {
// FUTURE_TODO: make this more efficient with a true bulk operation...
for(JobDetail job: triggersAndJobs.keySet()) {
for (JobDetail job : triggersAndJobs.keySet()) {
storeJob(conn, job, replace);
for(Trigger trigger: triggersAndJobs.get(job)) {
for (Trigger trigger : triggersAndJobs.get(job)) {
storeTrigger(conn, (OperableTrigger) trigger, job, replace,
STATE_WAITING, false, false);
}
@ -1371,7 +1369,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
* @return The desired <code>Job</code>, or null if there is no match.
*/
public JobDetail retrieveJob(final JobKey jobKey) throws JobPersistenceException {
return (JobDetail)executeWithoutLock( // no locks necessary for read...
return (JobDetail) executeWithoutLock( // no locks necessary for read...
new TransactionCallback() {
public Object execute(Connection conn) throws JobPersistenceException {
return retrieveJob(conn, jobKey);
@ -1516,7 +1514,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
* match.
*/
public OperableTrigger retrieveTrigger(final TriggerKey triggerKey) throws JobPersistenceException {
return (OperableTrigger)executeWithoutLock( // no locks necessary for read...
return (OperableTrigger) executeWithoutLock( // no locks necessary for read...
new TransactionCallback() {
public Object execute(Connection conn) throws JobPersistenceException {
return retrieveTrigger(conn, triggerKey);
@ -1547,7 +1545,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
* @see TriggerState#NONE
*/
public TriggerState getTriggerState(final TriggerKey triggerKey) throws JobPersistenceException {
return (TriggerState)executeWithoutLock( // no locks necessary for read...
return (TriggerState) executeWithoutLock( // no locks necessary for read...
new TransactionCallback() {
public Object execute(Connection conn) throws JobPersistenceException {
return getTriggerState(conn, triggerKey);
@ -1601,16 +1599,12 @@ public abstract class JobStoreSupport implements JobStore, Constants {
* Store the given <code>{@link Calendar}</code>.
* </p>
*
* @param calName
* The name of the calendar.
* @param calendar
* The <code>Calendar</code> to be stored.
* @param replaceExisting
* If <code>true</code>, any <code>Calendar</code> existing
* @param calName The name of the calendar.
* @param calendar The <code>Calendar</code> to be stored.
* @param replaceExisting If <code>true</code>, any <code>Calendar</code> existing
* in the <code>JobStore</code> with the same name & group
* should be over-written.
* @throws ObjectAlreadyExistsException
* if a <code>Calendar</code> with the same name already
* @throws ObjectAlreadyExistsException if a <code>Calendar</code> with the same name already
* exists, and replaceExisting is set to false.
*/
public void storeCalendar(final String calName,
@ -1641,10 +1635,10 @@ public abstract class JobStoreSupport implements JobStore, Constants {
"Couldn't store calendar. Update failed.");
}
if(updateTriggers) {
if (updateTriggers) {
List<OperableTrigger> trigs = getDelegate().selectTriggersForCalendar(conn, calName);
for(OperableTrigger trigger: trigs) {
for (OperableTrigger trigger : trigs) {
trigger.updateWithNewCalendar(calendar, getMisfireThreshold());
storeTrigger(conn, trigger, null, true, STATE_WAITING, false, false);
}
@ -1667,7 +1661,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
} catch (ClassNotFoundException e) {
throw new JobPersistenceException("Couldn't store calendar: "
+ e.getMessage(), e);
}catch (SQLException e) {
} catch (SQLException e) {
throw new JobPersistenceException("Couldn't store calendar: "
+ e.getMessage(), e);
}
@ -1695,6 +1689,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
* <code>Trigger</code>s pointing to non-existent calendars, then a
* <code>JobPersistenceException</code> will be thrown.</p>
* *
*
* @param calName The name of the <code>Calendar</code> to be removed.
* @return <code>true</code> if a <code>Calendar</code> with the given name
* was found and removed from the store.
@ -1735,14 +1730,13 @@ public abstract class JobStoreSupport implements JobStore, Constants {
* Retrieve the given <code>{@link Trigger}</code>.
* </p>
*
* @param calName
* The name of the <code>Calendar</code> to be retrieved.
* @param calName The name of the <code>Calendar</code> to be retrieved.
* @return The desired <code>Calendar</code>, or null if there is no
* match.
*/
public Calendar retrieveCalendar(final String calName)
throws JobPersistenceException {
return (Calendar)executeWithoutLock( // no locks necessary for read...
return (Calendar) executeWithoutLock( // no locks necessary for read...
new TransactionCallback() {
public Object execute(Connection conn) throws JobPersistenceException {
return retrieveCalendar(conn, calName);
@ -1871,7 +1865,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
@SuppressWarnings("unchecked")
public Set<JobKey> getJobKeys(final GroupMatcher<JobKey> matcher)
throws JobPersistenceException {
return (Set<JobKey>)executeWithoutLock( // no locks necessary for read...
return (Set<JobKey>) executeWithoutLock( // no locks necessary for read...
new TransactionCallback() {
public Object execute(Connection conn) throws JobPersistenceException {
return getJobNames(conn, matcher);
@ -1903,7 +1897,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
* @throws JobPersistenceException
*/
public boolean checkExists(final JobKey jobKey) throws JobPersistenceException {
return (Boolean)executeWithoutLock( // no locks necessary for read...
return (Boolean) executeWithoutLock( // no locks necessary for read...
new TransactionCallback() {
public Object execute(Connection conn) throws JobPersistenceException {
return checkExists(conn, jobKey);
@ -1929,7 +1923,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
* @throws JobPersistenceException
*/
public boolean checkExists(final TriggerKey triggerKey) throws JobPersistenceException {
return (Boolean)executeWithoutLock( // no locks necessary for read...
return (Boolean) executeWithoutLock( // no locks necessary for read...
new TransactionCallback() {
public Object execute(Connection conn) throws JobPersistenceException {
return checkExists(conn, triggerKey);
@ -1984,7 +1978,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
@SuppressWarnings("unchecked")
public Set<TriggerKey> getTriggerKeys(final GroupMatcher<TriggerKey> matcher)
throws JobPersistenceException {
return (Set<TriggerKey>)executeWithoutLock( // no locks necessary for read...
return (Set<TriggerKey>) executeWithoutLock( // no locks necessary for read...
new TransactionCallback() {
public Object execute(Connection conn) throws JobPersistenceException {
return getTriggerNames(conn, matcher);
@ -2022,7 +2016,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
@SuppressWarnings("unchecked")
public List<String> getJobGroupNames()
throws JobPersistenceException {
return (List<String>)executeWithoutLock( // no locks necessary for read...
return (List<String>) executeWithoutLock( // no locks necessary for read...
new TransactionCallback() {
public Object execute(Connection conn) throws JobPersistenceException {
return getJobGroupNames(conn);
@ -2059,7 +2053,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
@SuppressWarnings("unchecked")
public List<String> getTriggerGroupNames()
throws JobPersistenceException {
return (List<String>)executeWithoutLock( // no locks necessary for read...
return (List<String>) executeWithoutLock( // no locks necessary for read...
new TransactionCallback() {
public Object execute(Connection conn) throws JobPersistenceException {
return getTriggerGroupNames(conn);
@ -2095,7 +2089,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
@SuppressWarnings("unchecked")
public List<String> getCalendarNames()
throws JobPersistenceException {
return (List<String>)executeWithoutLock( // no locks necessary for read...
return (List<String>) executeWithoutLock( // no locks necessary for read...
new TransactionCallback() {
public Object execute(Connection conn) throws JobPersistenceException {
return getCalendarNames(conn);
@ -2124,7 +2118,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
*/
@SuppressWarnings("unchecked")
public List<OperableTrigger> getTriggersForJob(final JobKey jobKey) throws JobPersistenceException {
return (List<OperableTrigger>)executeWithoutLock( // no locks necessary for read...
return (List<OperableTrigger>) executeWithoutLock( // no locks necessary for read...
new TransactionCallback() {
public Object execute(Connection conn) throws JobPersistenceException {
return getTriggersForJob(conn, jobKey);
@ -2209,7 +2203,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
new VoidTransactionCallback() {
public void executeVoid(Connection conn) throws JobPersistenceException {
List<OperableTrigger> triggers = getTriggersForJob(conn, jobKey);
for (OperableTrigger trigger: triggers) {
for (OperableTrigger trigger : triggers) {
pauseTrigger(conn, trigger.getKey());
}
}
@ -2335,7 +2329,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
}
boolean blocked = false;
if(STATE_PAUSED_BLOCKED.equals(status.getStatus())) {
if (STATE_PAUSED_BLOCKED.equals(status.getStatus())) {
blocked = true;
}
@ -2348,8 +2342,8 @@ public abstract class JobStoreSupport implements JobStore, Constants {
newState, true);
}
if(!misfired) {
if(blocked) {
if (!misfired) {
if (blocked) {
getDelegate().updateTriggerStateFromOtherState(conn,
key, newState, STATE_PAUSED_BLOCKED);
} else {
@ -2384,7 +2378,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
new VoidTransactionCallback() {
public void executeVoid(Connection conn) throws JobPersistenceException {
List<OperableTrigger> triggers = getTriggersForJob(conn, jobKey);
for (OperableTrigger trigger: triggers) {
for (OperableTrigger trigger : triggers) {
resumeTrigger(conn, trigger.getKey());
}
}
@ -2415,9 +2409,9 @@ public abstract class JobStoreSupport implements JobStore, Constants {
Set<JobKey> jobKeys = getJobNames(conn, matcher);
Set<String> groupNames = new HashSet<String>();
for (JobKey jobKey: jobKeys) {
for (JobKey jobKey : jobKeys) {
List<OperableTrigger> triggers = getTriggersForJob(conn, jobKey);
for (OperableTrigger trigger: triggers) {
for (OperableTrigger trigger : triggers) {
resumeTrigger(conn, trigger.getKey());
}
groupNames.add(jobKey.getGroup());
@ -2492,7 +2486,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
@SuppressWarnings("unchecked")
public Set<String> getPausedTriggerGroups()
throws JobPersistenceException {
return (Set<String>)executeWithoutLock( // no locks necessary for read...
return (Set<String>) executeWithoutLock( // no locks necessary for read...
new TransactionCallback() {
public Object execute(Connection conn) throws JobPersistenceException {
return getPausedTriggerGroups(conn);
@ -2569,7 +2563,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
Set<TriggerKey> keys = getDelegate().selectTriggersInGroup(conn,
matcher);
for (TriggerKey key: keys) {
for (TriggerKey key : keys) {
resumeTrigger(conn, key);
groups.add(key.getGroup());
}
@ -2658,7 +2652,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
List<String> names = getTriggerGroupNames(conn);
for (String name: names) {
for (String name : names) {
pauseTriggerGroup(conn, GroupMatcher.triggerGroupEquals(name));
}
@ -2717,7 +2711,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
List<String> names = getTriggerGroupNames(conn);
for (String name: names) {
for (String name : names) {
resumeTriggerGroup(conn, GroupMatcher.triggerGroupEquals(name));
}
@ -2748,7 +2742,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
throws JobPersistenceException {
String lockName;
if(isAcquireTriggersWithinLock() || maxCount > 1) {
if (isAcquireTriggersWithinLock() || maxCount > 1) {
lockName = LOCK_TRIGGER_ACCESS;
} else {
lockName = null;
@ -2793,7 +2787,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
final int MAX_DO_LOOP_RETRY = 3;
int currentLoopCount = 0;
do {
currentLoopCount ++;
currentLoopCount++;
try {
List<TriggerKey> keys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, getMisfireTime(), maxCount);
@ -2803,10 +2797,10 @@ public abstract class JobStoreSupport implements JobStore, Constants {
long batchEnd = noLaterThan;
for(TriggerKey triggerKey: keys) {
for (TriggerKey triggerKey : keys) {
// If our trigger is no longer available, try a new one.
OperableTrigger nextTrigger = retrieveTrigger(conn, triggerKey);
if(nextTrigger == null) {
if (nextTrigger == null) {
continue; // next trigger
}
@ -2846,7 +2840,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
nextTrigger.setFireInstanceId(getFiredTriggerRecordId());
getDelegate().insertFiredTrigger(conn, nextTrigger, STATE_ACQUIRED, null);
if(acquiredTriggers.isEmpty()) {
if (acquiredTriggers.isEmpty()) {
batchEnd = Math.max(nextTrigger.getNextFireTime().getTime(), System.currentTimeMillis()) + timeWindow;
}
acquiredTriggers.add(nextTrigger);
@ -2854,7 +2848,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
// if we didn't end up with any trigger to fire from that first
// batch, try again for another batch. We allow with a max retry count.
if(acquiredTriggers.size() == 0 && currentLoopCount < MAX_DO_LOOP_RETRY) {
if (acquiredTriggers.size() == 0 && currentLoopCount < MAX_DO_LOOP_RETRY) {
continue;
}
@ -2925,7 +2919,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
result = new TriggerFiredResult(bundle);
} catch (JobPersistenceException jpe) {
result = new TriggerFiredResult(jpe);
} catch(RuntimeException re) {
} catch (RuntimeException re) {
result = new TriggerFiredResult(re);
}
results.add(result);
@ -2978,7 +2972,9 @@ public abstract class JobStoreSupport implements JobStore, Constants {
try {
job = retrieveJob(conn, trigger.getJobKey());
if (job == null) { return null; }
if (job == null) {
return null;
}
} catch (JobPersistenceException jpe) {
try {
getLog().error("Error retrieving job, setting trigger state to ERROR.", jpe);
@ -2992,7 +2988,9 @@ public abstract class JobStoreSupport implements JobStore, Constants {
if (trigger.getCalendarName() != null) {
cal = retrieveCalendar(conn, trigger.getCalendarName());
if (cal == null) { return null; }
if (cal == null) {
return null;
}
}
try {
@ -3056,7 +3054,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
LOCK_TRIGGER_ACCESS,
new VoidTransactionCallback() {
public void executeVoid(Connection conn) throws JobPersistenceException {
triggeredJobComplete(conn, trigger, jobDetail,triggerInstCode);
triggeredJobComplete(conn, trigger, jobDetail, triggerInstCode);
}
});
}
@ -3066,15 +3064,15 @@ public abstract class JobStoreSupport implements JobStore, Constants {
CompletedExecutionInstruction triggerInstCode) throws JobPersistenceException {
try {
if (triggerInstCode == CompletedExecutionInstruction.DELETE_TRIGGER) {
if(trigger.getNextFireTime() == null) {
if (trigger.getNextFireTime() == null) {
// double check for possible reschedule within job
// execution, which would cancel the need to delete...
TriggerStatus stat = getDelegate().selectTriggerStatus(
conn, trigger.getKey());
if(stat != null && stat.getNextFireTime() == null) {
if (stat != null && stat.getNextFireTime() == null) {
removeTrigger(conn, trigger.getKey());
}
} else{
} else {
removeTrigger(conn, trigger.getKey());
signalSchedulingChangeOnTxCompletion(0L);
}
@ -3142,10 +3140,10 @@ public abstract class JobStoreSupport implements JobStore, Constants {
* </p>
*/
protected DriverDelegate getDelegate() throws NoSuchDelegateException {
synchronized(this) {
if(null == delegate) {
synchronized (this) {
if (null == delegate) {
try {
if(delegateClassName != null) {
if (delegateClassName != null) {
delegateClass = getClassLoadHelper().loadClass(delegateClassName, DriverDelegate.class);
}
@ -3225,12 +3223,13 @@ public abstract class JobStoreSupport implements JobStore, Constants {
}
protected ThreadLocal<Long> sigChangeForTxCompletion = new ThreadLocal<Long>();
protected void signalSchedulingChangeOnTxCompletion(long candidateNewNextFireTime) {
Long sigTime = sigChangeForTxCompletion.get();
if(sigTime == null && candidateNewNextFireTime >= 0L)
if (sigTime == null && candidateNewNextFireTime >= 0L)
sigChangeForTxCompletion.set(candidateNewNextFireTime);
else {
if(sigTime == null || candidateNewNextFireTime < sigTime)
if (sigTime == null || candidateNewNextFireTime < sigTime)
sigChangeForTxCompletion.set(candidateNewNextFireTime);
}
}
@ -3322,7 +3321,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
List<SchedulerStateRecord> states = getDelegate().selectSchedulerStateRecords(conn, null);
for(SchedulerStateRecord rec: states) {
for (SchedulerStateRecord rec : states) {
// find own record...
if (rec.getSchedulerInstanceId().equals(getInstanceId())) {
@ -3376,12 +3375,12 @@ public abstract class JobStoreSupport implements JobStore, Constants {
Set<String> allFiredTriggerInstanceNames = getDelegate().selectFiredTriggerInstanceNames(conn);
if (!allFiredTriggerInstanceNames.isEmpty()) {
for (SchedulerStateRecord rec: schedulerStateRecords) {
for (SchedulerStateRecord rec : schedulerStateRecords) {
allFiredTriggerInstanceNames.remove(rec.getSchedulerInstanceId());
}
for (String inst: allFiredTriggerInstanceNames) {
for (String inst : allFiredTriggerInstanceNames) {
SchedulerStateRecord orphanedInstance = new SchedulerStateRecord();
orphanedInstance.setSchedulerInstanceId(inst);
@ -3413,7 +3412,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
// check in...
lastCheckin = System.currentTimeMillis();
if(getDelegate().updateSchedulerState(conn, getInstanceId(), lastCheckin) == 0) {
if (getDelegate().updateSchedulerState(conn, getInstanceId(), lastCheckin) == 0) {
getDelegate().insertSchedulerState(conn, getInstanceId(),
lastCheckin, getClusterCheckinInterval());
}
@ -3605,13 +3604,13 @@ public abstract class JobStoreSupport implements JobStore, Constants {
protected void cleanupConnection(Connection conn) {
if (conn != null) {
if (conn instanceof Proxy) {
Proxy connProxy = (Proxy)conn;
Proxy connProxy = (Proxy) conn;
InvocationHandler invocationHandler =
Proxy.getInvocationHandler(connProxy);
if (invocationHandler instanceof AttributeRestoringConnectionInvocationHandler) {
AttributeRestoringConnectionInvocationHandler connHandler =
(AttributeRestoringConnectionInvocationHandler)invocationHandler;
(AttributeRestoringConnectionInvocationHandler) invocationHandler;
connHandler.restoreOriginalAtributes();
closeConnection(connHandler.getWrappedConnection());
@ -3666,7 +3665,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
conn.rollback();
} catch (SQLException e) {
getLog().error(
"Couldn't rollback jdbc connection. "+e.getMessage(), e);
"Couldn't rollback jdbc connection. " + e.getMessage(), e);
}
}
}
@ -3686,7 +3685,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
conn.commit();
} catch (SQLException e) {
throw new JobPersistenceException(
"Couldn't commit jdbc connection. "+e.getMessage(), e);
"Couldn't commit jdbc connection. " + e.getMessage(), e);
}
}
}
@ -3757,7 +3756,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
try {
return executeInNonManagedTXLock(lockName, txCallback, null);
} catch (JobPersistenceException jpe) {
if(retry % 4 == 0) {
if (retry % 4 == 0) {
schedSignaler.notifySchedulerListenersError("An error occurred while " + txCallback, jpe);
}
} catch (RuntimeException e) {
@ -3816,7 +3815,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
}
Long sigTime = clearAndGetSignalSchedulingChangeOnTxCompletion();
if(sigTime != null && sigTime >= 0) {
if (sigTime != null && sigTime >= 0) {
signalSchedulingChangeImmediately(sigTime);
}
@ -3876,7 +3875,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
numFails = 0;
getLog().debug("ClusterManager: Check-in complete.");
} catch (Exception e) {
if(numFails % 4 == 0) {
if (numFails % 4 == 0) {
getLog().error(
"ClusterManager: Error managing cluster: "
+ e.getMessage(), e);
@ -3898,7 +3897,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
timeToSleep = 100L;
}
if(numFails > 0) {
if (numFails > 0) {
timeToSleep = Math.max(getDbRetryInterval(), timeToSleep);
}
@ -3952,7 +3951,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
numFails = 0;
return res;
} catch (Exception e) {
if(numFails % 4 == 0) {
if (numFails % 4 == 0) {
getLog().error(
"MisfireHandler: Error handling misfires: "
+ e.getMessage(), e);
@ -3983,7 +3982,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
timeToSleep = 50l;
}
if(numFails > 0) {
if (numFails > 0) {
timeToSleep = Math.max(getDbRetryInterval(), timeToSleep);
}
}

Loading…
Cancel
Save