Browse Source

DEC-7376 定时调度任务表中存在脏数据的时候,不能影响整个scheduler

research/11.0
zed 6 years ago
parent
commit
75e4955233
  1. 117
      fine-quartz/src/com/fr/third/v2/org/quartz/impl/jdbcjobstore/JobStoreSupport.java

117
fine-quartz/src/com/fr/third/v2/org/quartz/impl/jdbcjobstore/JobStoreSupport.java

@ -17,38 +17,21 @@
package com.fr.third.v2.org.quartz.impl.jdbcjobstore; package com.fr.third.v2.org.quartz.impl.jdbcjobstore;
import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.fr.third.v2.org.quartz.Calendar; import com.fr.third.v2.org.quartz.Calendar;
import com.fr.third.v2.org.quartz.Job; import com.fr.third.v2.org.quartz.Job;
import com.fr.third.v2.org.quartz.JobDataMap;
import com.fr.third.v2.org.quartz.JobDetail; import com.fr.third.v2.org.quartz.JobDetail;
import com.fr.third.v2.org.quartz.JobKey;
import com.fr.third.v2.org.quartz.JobPersistenceException;
import com.fr.third.v2.org.quartz.ObjectAlreadyExistsException; import com.fr.third.v2.org.quartz.ObjectAlreadyExistsException;
import com.fr.third.v2.org.quartz.Scheduler; import com.fr.third.v2.org.quartz.Scheduler;
import com.fr.third.v2.org.quartz.SchedulerConfigException; import com.fr.third.v2.org.quartz.SchedulerConfigException;
import com.fr.third.v2.org.quartz.TriggerKey;
import com.fr.third.v2.org.quartz.spi.OperableTrigger;
import com.fr.third.v2.org.quartz.spi.ThreadExecutor;
import com.fr.third.v2.org.quartz.spi.TriggerFiredResult;
import com.fr.third.v2.org.quartz.JobDataMap;
import com.fr.third.v2.org.quartz.JobKey;
import com.fr.third.v2.org.quartz.JobPersistenceException;
import com.fr.third.v2.org.quartz.SchedulerException; import com.fr.third.v2.org.quartz.SchedulerException;
import com.fr.third.v2.org.quartz.SimpleTrigger; import com.fr.third.v2.org.quartz.SimpleTrigger;
import com.fr.third.v2.org.quartz.Trigger; import com.fr.third.v2.org.quartz.Trigger;
import com.fr.third.v2.org.quartz.Trigger.CompletedExecutionInstruction; import com.fr.third.v2.org.quartz.Trigger.CompletedExecutionInstruction;
import com.fr.third.v2.org.quartz.Trigger.TriggerState; import com.fr.third.v2.org.quartz.Trigger.TriggerState;
import com.fr.third.v2.org.quartz.TriggerKey;
import com.fr.third.v2.org.quartz.impl.DefaultThreadExecutor; import com.fr.third.v2.org.quartz.impl.DefaultThreadExecutor;
import com.fr.third.v2.org.quartz.impl.matchers.GroupMatcher; import com.fr.third.v2.org.quartz.impl.matchers.GroupMatcher;
import com.fr.third.v2.org.quartz.impl.matchers.StringMatcher; import com.fr.third.v2.org.quartz.impl.matchers.StringMatcher;
@ -56,12 +39,29 @@ import com.fr.third.v2.org.quartz.impl.matchers.StringMatcher.StringOperatorName
import com.fr.third.v2.org.quartz.impl.triggers.SimpleTriggerImpl; import com.fr.third.v2.org.quartz.impl.triggers.SimpleTriggerImpl;
import com.fr.third.v2.org.quartz.spi.ClassLoadHelper; import com.fr.third.v2.org.quartz.spi.ClassLoadHelper;
import com.fr.third.v2.org.quartz.spi.JobStore; import com.fr.third.v2.org.quartz.spi.JobStore;
import com.fr.third.v2.org.quartz.spi.OperableTrigger;
import com.fr.third.v2.org.quartz.spi.SchedulerSignaler; import com.fr.third.v2.org.quartz.spi.SchedulerSignaler;
import com.fr.third.v2.org.quartz.spi.ThreadExecutor;
import com.fr.third.v2.org.quartz.spi.TriggerFiredBundle; import com.fr.third.v2.org.quartz.spi.TriggerFiredBundle;
import com.fr.third.v2.org.quartz.spi.TriggerFiredResult;
import com.fr.third.v2.org.quartz.utils.DBConnectionManager; import com.fr.third.v2.org.quartz.utils.DBConnectionManager;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
/** /**
* <p> * <p>
@ -354,6 +354,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
public long getDbRetryInterval() { public long getDbRetryInterval() {
return dbRetryInterval; return dbRetryInterval;
} }
/** /**
* @param dbRetryInterval The dbRetryInterval to set. * @param dbRetryInterval The dbRetryInterval to set.
*/ */
@ -473,7 +474,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
* should be performed after obtaining an explicit DB lock. This is the * should be performed after obtaining an explicit DB lock. This is the
* behavior prior to Quartz 1.6.3, but is considered unnecessary for most * behavior prior to Quartz 1.6.3, but is considered unnecessary for most
* databases, and therefore a superfluous performance hit. * databases, and therefore a superfluous performance hit.
* * <p>
* However, if batch acquisition is used, it is important for this behavior * However, if batch acquisition is used, it is important for this behavior
* to be used for all dbs. * to be used for all dbs.
*/ */
@ -488,8 +489,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
* Set the JDBC driver delegate class. * Set the JDBC driver delegate class.
* </p> * </p>
* *
* @param delegateClassName * @param delegateClassName the delegate class name
* the delegate class name
*/ */
@SuppressWarnings("UnusedDeclaration") /* called reflectively */ @SuppressWarnings("UnusedDeclaration") /* called reflectively */
public void setDriverDelegateClass(String delegateClassName) public void setDriverDelegateClass(String delegateClassName)
@ -515,8 +515,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
* Set the JDBC driver delegate's initialization string. * Set the JDBC driver delegate's initialization string.
* </p> * </p>
* *
* @param delegateInitString * @param delegateInitString the delegate init string
* the delegate init string
*/ */
@SuppressWarnings("UnusedDeclaration") /* called reflectively */ @SuppressWarnings("UnusedDeclaration") /* called reflectively */
public void setDriverDelegateInitString(String delegateInitString) public void setDriverDelegateInitString(String delegateInitString)
@ -805,7 +804,10 @@ public abstract class JobStoreSupport implements JobStore, Constants {
} catch (SQLException sqle) { } catch (SQLException sqle) {
getLog().warn("Failed to override connection auto commit/transaction isolation.", sqle); getLog().warn("Failed to override connection auto commit/transaction isolation.", sqle);
} catch (Throwable e) { } catch (Throwable e) {
try { conn.close(); } catch(Throwable ignored) {} try {
conn.close();
} catch (Throwable ignored) {
}
throw new JobPersistenceException( throw new JobPersistenceException(
"Failure setting up connection.", e); "Failure setting up connection.", e);
@ -846,8 +848,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
* appropriate. * appropriate.
* </p> * </p>
* *
* @throws JobPersistenceException * @throws JobPersistenceException if jobs could not be recovered
* if jobs could not be recovered
*/ */
protected void recoverJobs(Connection conn) throws JobPersistenceException { protected void recoverJobs(Connection conn) throws JobPersistenceException {
try { try {
@ -875,11 +876,15 @@ public abstract class JobStoreSupport implements JobStore, Constants {
+ " jobs that were in-progress at the time of the last shut-down."); + " jobs that were in-progress at the time of the last shut-down.");
for (OperableTrigger recoveringJobTrigger : recoveringJobTriggers) { for (OperableTrigger recoveringJobTrigger : recoveringJobTriggers) {
try {
if (jobExists(conn, recoveringJobTrigger.getJobKey())) { if (jobExists(conn, recoveringJobTrigger.getJobKey())) {
recoveringJobTrigger.computeFirstFireTime(null); recoveringJobTrigger.computeFirstFireTime(null);
storeTrigger(conn, recoveringJobTrigger, null, false, storeTrigger(conn, recoveringJobTrigger, null, false,
STATE_WAITING, false, true); STATE_WAITING, false, true);
} }
} catch (JobPersistenceException e) {
getLog().error(e.getMessage());
}
} }
getLog().info("Recovery complete."); getLog().info("Recovery complete.");
@ -933,9 +938,11 @@ public abstract class JobStoreSupport implements JobStore, Constants {
public boolean hasMoreMisfiredTriggers() { public boolean hasMoreMisfiredTriggers() {
return _hasMoreMisfiredTriggers; return _hasMoreMisfiredTriggers;
} }
public int getProcessedMisfiredTriggerCount() { public int getProcessedMisfiredTriggerCount() {
return _processedMisfiredTriggerCount; return _processedMisfiredTriggerCount;
} }
public long getEarliestNewTime() { public long getEarliestNewTime() {
return _earliestNewTime; return _earliestNewTime;
} }
@ -1044,12 +1051,9 @@ public abstract class JobStoreSupport implements JobStore, Constants {
* Store the given <code>{@link JobDetail}</code> and <code>{@link Trigger}</code>. * Store the given <code>{@link JobDetail}</code> and <code>{@link Trigger}</code>.
* </p> * </p>
* *
* @param newJob * @param newJob The <code>JobDetail</code> to be stored.
* The <code>JobDetail</code> to be stored. * @param newTrigger The <code>Trigger</code> to be stored.
* @param newTrigger * @throws ObjectAlreadyExistsException if a <code>Job</code> with the same name/group already
* The <code>Trigger</code> to be stored.
* @throws ObjectAlreadyExistsException
* if a <code>Job</code> with the same name/group already
* exists. * exists.
*/ */
public void storeJobAndTrigger(final JobDetail newJob, public void storeJobAndTrigger(final JobDetail newJob,
@ -1071,14 +1075,11 @@ public abstract class JobStoreSupport implements JobStore, Constants {
* Store the given <code>{@link JobDetail}</code>. * Store the given <code>{@link JobDetail}</code>.
* </p> * </p>
* *
* @param newJob * @param newJob The <code>JobDetail</code> to be stored.
* The <code>JobDetail</code> to be stored. * @param replaceExisting If <code>true</code>, any <code>Job</code> existing in the
* @param replaceExisting
* If <code>true</code>, any <code>Job</code> existing in the
* <code>JobStore</code> with the same name & group should be * <code>JobStore</code> with the same name & group should be
* over-written. * over-written.
* @throws ObjectAlreadyExistsException * @throws ObjectAlreadyExistsException if a <code>Job</code> with the same name/group already
* if a <code>Job</code> with the same name/group already
* exists, and replaceExisting is set to false. * exists, and replaceExisting is set to false.
*/ */
public void storeJob(final JobDetail newJob, public void storeJob(final JobDetail newJob,
@ -1140,14 +1141,11 @@ public abstract class JobStoreSupport implements JobStore, Constants {
* Store the given <code>{@link Trigger}</code>. * Store the given <code>{@link Trigger}</code>.
* </p> * </p>
* *
* @param newTrigger * @param newTrigger The <code>Trigger</code> to be stored.
* The <code>Trigger</code> to be stored. * @param replaceExisting If <code>true</code>, any <code>Trigger</code> existing in
* @param replaceExisting
* If <code>true</code>, any <code>Trigger</code> existing in
* the <code>JobStore</code> with the same name & group should * the <code>JobStore</code> with the same name & group should
* be over-written. * be over-written.
* @throws ObjectAlreadyExistsException * @throws ObjectAlreadyExistsException if a <code>Trigger</code> with the same name/group already
* if a <code>Trigger</code> with the same name/group already
* exists, and replaceExisting is set to false. * exists, and replaceExisting is set to false.
*/ */
public void storeTrigger(final OperableTrigger newTrigger, public void storeTrigger(final OperableTrigger newTrigger,
@ -1601,16 +1599,12 @@ public abstract class JobStoreSupport implements JobStore, Constants {
* Store the given <code>{@link Calendar}</code>. * Store the given <code>{@link Calendar}</code>.
* </p> * </p>
* *
* @param calName * @param calName The name of the calendar.
* The name of the calendar. * @param calendar The <code>Calendar</code> to be stored.
* @param calendar * @param replaceExisting If <code>true</code>, any <code>Calendar</code> existing
* The <code>Calendar</code> to be stored.
* @param replaceExisting
* If <code>true</code>, any <code>Calendar</code> existing
* in the <code>JobStore</code> with the same name & group * in the <code>JobStore</code> with the same name & group
* should be over-written. * should be over-written.
* @throws ObjectAlreadyExistsException * @throws ObjectAlreadyExistsException if a <code>Calendar</code> with the same name already
* if a <code>Calendar</code> with the same name already
* exists, and replaceExisting is set to false. * exists, and replaceExisting is set to false.
*/ */
public void storeCalendar(final String calName, public void storeCalendar(final String calName,
@ -1695,6 +1689,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
* <code>Trigger</code>s pointing to non-existent calendars, then a * <code>Trigger</code>s pointing to non-existent calendars, then a
* <code>JobPersistenceException</code> will be thrown.</p> * <code>JobPersistenceException</code> will be thrown.</p>
* * * *
*
* @param calName The name of the <code>Calendar</code> to be removed. * @param calName The name of the <code>Calendar</code> to be removed.
* @return <code>true</code> if a <code>Calendar</code> with the given name * @return <code>true</code> if a <code>Calendar</code> with the given name
* was found and removed from the store. * was found and removed from the store.
@ -1735,8 +1730,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
* Retrieve the given <code>{@link Trigger}</code>. * Retrieve the given <code>{@link Trigger}</code>.
* </p> * </p>
* *
* @param calName * @param calName The name of the <code>Calendar</code> to be retrieved.
* The name of the <code>Calendar</code> to be retrieved.
* @return The desired <code>Calendar</code>, or null if there is no * @return The desired <code>Calendar</code>, or null if there is no
* match. * match.
*/ */
@ -2978,7 +2972,9 @@ public abstract class JobStoreSupport implements JobStore, Constants {
try { try {
job = retrieveJob(conn, trigger.getJobKey()); job = retrieveJob(conn, trigger.getJobKey());
if (job == null) { return null; } if (job == null) {
return null;
}
} catch (JobPersistenceException jpe) { } catch (JobPersistenceException jpe) {
try { try {
getLog().error("Error retrieving job, setting trigger state to ERROR.", jpe); getLog().error("Error retrieving job, setting trigger state to ERROR.", jpe);
@ -2992,7 +2988,9 @@ public abstract class JobStoreSupport implements JobStore, Constants {
if (trigger.getCalendarName() != null) { if (trigger.getCalendarName() != null) {
cal = retrieveCalendar(conn, trigger.getCalendarName()); cal = retrieveCalendar(conn, trigger.getCalendarName());
if (cal == null) { return null; } if (cal == null) {
return null;
}
} }
try { try {
@ -3225,6 +3223,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
} }
protected ThreadLocal<Long> sigChangeForTxCompletion = new ThreadLocal<Long>(); protected ThreadLocal<Long> sigChangeForTxCompletion = new ThreadLocal<Long>();
protected void signalSchedulingChangeOnTxCompletion(long candidateNewNextFireTime) { protected void signalSchedulingChangeOnTxCompletion(long candidateNewNextFireTime) {
Long sigTime = sigChangeForTxCompletion.get(); Long sigTime = sigChangeForTxCompletion.get();
if (sigTime == null && candidateNewNextFireTime >= 0L) if (sigTime == null && candidateNewNextFireTime >= 0L)

Loading…
Cancel
Save