Browse Source

Pull request #448: KERNEL-3899 集群下定时调度任务支持特定节点运行定时任务

Merge in CORE/base-third from ~ZED/base-third:release/10.0 to release/10.0

* commit 'd88b97f24712eed7a7aa387dbde11ca944af8d34':
  fix
  KERNEL-3899 集群下定时调度任务支持特定节点运行定时任务
release/10.0
zed 5 years ago
parent
commit
664190d959
  1. 4
      fine-quartz/src/main/java/com/fr/third/v2/org/quartz/Trigger.java
  2. 9
      fine-quartz/src/main/java/com/fr/third/v2/org/quartz/TriggerBuilder.java
  3. 5
      fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/StdSchedulerFactory.java
  4. 2
      fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/jdbcjobstore/Constants.java
  5. 4
      fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/jdbcjobstore/DriverDelegate.java
  6. 35
      fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/jdbcjobstore/JobStoreSupport.java
  7. 1
      fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/jdbcjobstore/PointbaseDelegate.java
  8. 48
      fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/jdbcjobstore/StdJDBCConstants.java
  9. 54
      fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/jdbcjobstore/StdJDBCDelegate.java
  10. 33
      fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/triggers/AbstractTrigger.java
  11. 5
      fine-quartz/src/main/java/com/fr/third/v2/org/quartz/simpl/RAMJobStore.java
  12. 6
      fine-quartz/src/main/java/com/fr/third/v2/org/quartz/spi/JobStore.java

4
fine-quartz/src/main/java/com/fr/third/v2/org/quartz/Trigger.java

@ -127,6 +127,10 @@ public interface Trigger extends Serializable, Cloneable, Comparable<Trigger> {
*/
public static final int DEFAULT_PRIORITY = 5;
String getAppointId();
void setAppointId(String appointId);
public TriggerKey getKey();
public JobKey getJobKey();

9
fine-quartz/src/main/java/com/fr/third/v2/org/quartz/TriggerBuilder.java

@ -71,6 +71,7 @@ public class TriggerBuilder<T extends Trigger> {
private String calendarName;
private JobKey jobKey;
private JobDataMap jobDataMap = new JobDataMap();
private String appointId;
private ScheduleBuilder<?> scheduleBuilder = null;
@ -99,7 +100,8 @@ public class TriggerBuilder<T extends Trigger> {
if(scheduleBuilder == null)
scheduleBuilder = SimpleScheduleBuilder.simpleSchedule();
MutableTrigger trig = scheduleBuilder.build();
trig.setAppointId(appointId);
trig.setCalendarName(calendarName);
trig.setDescription(description);
trig.setStartTime(startTime);
@ -117,6 +119,11 @@ public class TriggerBuilder<T extends Trigger> {
return (T) trig;
}
public TriggerBuilder<T> appointId(String appointId) {
this.appointId = appointId;
return this;
}
/**
* Use a <code>TriggerKey</code> with the given name and default group to
* identify the Trigger.

5
fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/StdSchedulerFactory.java

@ -193,6 +193,8 @@ public class StdSchedulerFactory implements SchedulerFactory {
public static final String PROP_JOB_STORE_PREFIX = "com.fr.third.v2.org.quartz.jobStore";
public static final String PROP_JOB_STORE_CURRENT_ID = "com.fr.third.v2.org.quartz.jobStore.currentId";
public static final String PROP_JOB_STORE_LOCK_HANDLER_PREFIX = PROP_JOB_STORE_PREFIX + ".lockHandler";
public static final String PROP_JOB_STORE_LOCK_HANDLER_CLASS = PROP_JOB_STORE_LOCK_HANDLER_PREFIX + ".class";
@ -860,8 +862,11 @@ public class StdSchedulerFactory implements SchedulerFactory {
throw initException;
}
String currentId = cfg.getStringProperty(PROP_JOB_STORE_CURRENT_ID);
try {
js = (JobStore) loadHelper.loadClass(jsClass).newInstance();
js.setCurrentId(currentId);
} catch (Exception e) {
initException = new SchedulerException("JobStore class '" + jsClass
+ "' could not be instantiated.", e);

2
fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/jdbcjobstore/Constants.java

@ -105,6 +105,8 @@ public interface Constants {
String ALIAS_COL_NEXT_FIRE_TIME = "ALIAS_NXT_FR_TM";
String COL_APPOINT_ID= "APPOINT_ID";
// TABLE_SIMPLE_TRIGGERS columns names
String COL_REPEAT_COUNT = "REPEAT_COUNT";

4
fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/jdbcjobstore/DriverDelegate.java

@ -99,6 +99,8 @@ public interface DriverDelegate {
String newState, String oldState1, String oldState2)
throws SQLException;
int updateTriggerStatesFromOtherStates(Connection conn, String newState, String oldState1, String oldState2, String oldState3) throws SQLException;
/**
* <p>
* Get the names of all of the triggers that have misfired - according to
@ -968,6 +970,8 @@ public interface DriverDelegate {
public List<TriggerKey> selectTriggerToAcquire(Connection conn, long noLaterThan, long noEarlierThan, int maxCount)
throws SQLException;
List<TriggerKey> selectAppointTriggerToAcquire(Connection conn, long noLaterThan, long noEarlierThan, int maxCount, String appointId);
/**
* <p>
* Insert a fired trigger.

35
fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/jdbcjobstore/JobStoreSupport.java

@ -159,6 +159,8 @@ public abstract class JobStoreSupport implements JobStore, Constants {
private volatile boolean schedulerRunning = false;
private volatile boolean shutdown = false;
private volatile String currentId;
/*
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*
@ -167,6 +169,11 @@ public abstract class JobStoreSupport implements JobStore, Constants {
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*/
@Override
public void setCurrentId(String currentId) {
this.currentId = currentId;
}
/**
* <p>
* Set the name of the <code>DataSource</code> that should be used for
@ -854,14 +861,14 @@ public abstract class JobStoreSupport implements JobStore, Constants {
try {
// update inconsistent job states
int rows = getDelegate().updateTriggerStatesFromOtherStates(conn,
STATE_WAITING, STATE_ACQUIRED, STATE_BLOCKED);
STATE_WAITING, STATE_ACQUIRED, STATE_BLOCKED, STATE_ERROR);
rows += getDelegate().updateTriggerStatesFromOtherStates(conn,
STATE_PAUSED, STATE_PAUSED_BLOCKED, STATE_PAUSED_BLOCKED);
getLog().info(
"Freed " + rows
+ " triggers from 'acquired' / 'blocked' state.");
+ " triggers from 'acquired' / 'blocked' / 'error' state.");
// clean up misfired jobs
recoverMisfiredJobs(conn, true);
@ -2780,6 +2787,17 @@ public abstract class JobStoreSupport implements JobStore, Constants {
});
}
private void mergeKeys(List<TriggerKey> allKeys, Set<String> allKeyIds, List<TriggerKey> mergeKeys) {
if (mergeKeys != null) {
for (TriggerKey key : mergeKeys) {
if (!allKeyIds.contains(key.toString())) {
allKeys.add(key);
allKeyIds.add(key.toString());
}
}
}
}
// FUTURE_TODO: this really ought to return something like a FiredTriggerBundle,
// so that the fireInstanceId doesn't have to be on the trigger...
protected List<OperableTrigger> acquireNextTrigger(Connection conn, long noLaterThan, int maxCount, long timeWindow)
@ -2795,15 +2813,22 @@ public abstract class JobStoreSupport implements JobStore, Constants {
do {
currentLoopCount++;
try {
List<TriggerKey> keys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, getMisfireTime(), maxCount);
long misfireTime = getMisfireTime();
List<TriggerKey> waitingKeys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, misfireTime, maxCount);
List<TriggerKey> appointKeys = getDelegate().selectAppointTriggerToAcquire(conn, noLaterThan + timeWindow, misfireTime, maxCount, this.currentId);
List<TriggerKey> allKeys = new ArrayList<TriggerKey>();
Set<String> keyIds = new HashSet<String>();
mergeKeys(allKeys, keyIds, waitingKeys);
mergeKeys(allKeys, keyIds, appointKeys);
// No trigger is ready to fire yet.
if (keys == null || keys.size() == 0)
if (keyIds.size() == 0)
return acquiredTriggers;
long batchEnd = noLaterThan;
for (TriggerKey triggerKey : keys) {
for (TriggerKey triggerKey : allKeys) {
// If our trigger is no longer available, try a new one.
OperableTrigger nextTrigger = retrieveTrigger(conn, triggerKey);
if (nextTrigger == null) {

1
fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/jdbcjobstore/PointbaseDelegate.java

@ -181,6 +181,7 @@ public class PointbaseDelegate extends StdJDBCDelegate {
ps.setInt(13, trigger.getMisfireInstruction());
ps.setBinaryStream(14, bais, len);
ps.setInt(15, trigger.getPriority());
ps.setString(16, trigger.getAppointId());
insertResult = ps.executeUpdate();

48
fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/jdbcjobstore/StdJDBCConstants.java

@ -47,19 +47,34 @@ public interface StdJDBCConstants extends Constants {
String SCHED_NAME_SUBST = "{1}";
// QUERIES
String UPDATE_TRIGGER_STATES_FROM_OTHER_STATES = "UPDATE "
String UPDATE_TRIGGER_STATES_FROM_OTHER_2_STATES = "UPDATE "
+ TABLE_PREFIX_SUBST
+ TABLE_TRIGGERS
+ " SET "
+ COL_TRIGGER_STATE
+ " = ?"
+ " WHERE "
+ COL_SCHEDULER_NAME
+ COL_SCHEDULER_NAME
+ " = " + SCHED_NAME_SUBST + " AND ("
+ COL_TRIGGER_STATE
+ " = ? OR "
+ COL_TRIGGER_STATE + " = ?)";
String UPDATE_TRIGGER_STATES_FROM_OTHER_3_STATES = "UPDATE "
+ TABLE_PREFIX_SUBST
+ TABLE_TRIGGERS
+ " SET "
+ COL_TRIGGER_STATE
+ " = ?"
+ " WHERE "
+ COL_SCHEDULER_NAME
+ " = " + SCHED_NAME_SUBST + " AND ("
+ COL_TRIGGER_STATE
+ " = ? OR "
+ COL_TRIGGER_STATE
+ " = ? OR "
+ COL_TRIGGER_STATE + " = ?)";
String SELECT_MISFIRED_TRIGGERS = "SELECT * FROM "
+ TABLE_PREFIX_SUBST + TABLE_TRIGGERS + " WHERE "
+ COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST
@ -212,8 +227,8 @@ public interface StdJDBCConstants extends Constants {
+ ", " + COL_NEXT_FIRE_TIME + ", " + COL_PREV_FIRE_TIME + ", "
+ COL_TRIGGER_STATE + ", " + COL_TRIGGER_TYPE + ", "
+ COL_START_TIME + ", " + COL_END_TIME + ", " + COL_CALENDAR_NAME
+ ", " + COL_MISFIRE_INSTRUCTION + ", " + COL_JOB_DATAMAP + ", " + COL_PRIORITY + ") "
+ " VALUES(" + SCHED_NAME_SUBST + ", ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+ ", " + COL_MISFIRE_INSTRUCTION + ", " + COL_JOB_DATAMAP + ", " + COL_PRIORITY + ", " + COL_APPOINT_ID + ") "
+ " VALUES(" + SCHED_NAME_SUBST + ", ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
String INSERT_SIMPLE_TRIGGER = "INSERT INTO "
+ TABLE_PREFIX_SUBST + TABLE_SIMPLE_TRIGGERS + " ("
@ -507,13 +522,24 @@ public interface StdJDBCConstants extends Constants {
+ " AND " + COL_TRIGGER_STATE + " = ? AND " + COL_NEXT_FIRE_TIME + " = ?";
String SELECT_NEXT_TRIGGER_TO_ACQUIRE = "SELECT "
+ COL_TRIGGER_NAME + ", " + COL_TRIGGER_GROUP + ", "
+ COL_NEXT_FIRE_TIME + ", " + COL_PRIORITY + " FROM "
+ TABLE_PREFIX_SUBST + TABLE_TRIGGERS + " WHERE "
+ COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST
+ " AND " + COL_TRIGGER_STATE + " = ? AND " + COL_NEXT_FIRE_TIME + " <= ? "
+ "AND (" + COL_MISFIRE_INSTRUCTION + " = -1 OR (" +COL_MISFIRE_INSTRUCTION+ " != -1 AND "+ COL_NEXT_FIRE_TIME + " >= ?)) "
+ "ORDER BY "+ COL_NEXT_FIRE_TIME + " ASC, " + COL_PRIORITY + " DESC";
+ COL_TRIGGER_NAME + ", " + COL_TRIGGER_GROUP + ", "
+ COL_NEXT_FIRE_TIME + ", " + COL_PRIORITY + " FROM "
+ TABLE_PREFIX_SUBST + TABLE_TRIGGERS + " WHERE "
+ COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST
+ " AND " + COL_APPOINT_ID + " IS NULL"
+ " AND " + COL_TRIGGER_STATE + " = ? AND " + COL_NEXT_FIRE_TIME + " <= ? "
+ "AND (" + COL_MISFIRE_INSTRUCTION + " = -1 OR (" + COL_MISFIRE_INSTRUCTION + " != -1 AND " + COL_NEXT_FIRE_TIME + " >= ?)) "
+ "ORDER BY " + COL_NEXT_FIRE_TIME + " ASC, " + COL_PRIORITY + " DESC";
String SELECT_NEXT_APPOINT_TRIGGER_TO_ACQUIRE = "SELECT "
+ COL_TRIGGER_NAME + ", " + COL_TRIGGER_GROUP + ", "
+ COL_NEXT_FIRE_TIME + ", " + COL_PRIORITY + " FROM "
+ TABLE_PREFIX_SUBST + TABLE_TRIGGERS + " WHERE "
+ COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST
+ " AND " + COL_APPOINT_ID + " = ?"
+ " AND " + COL_TRIGGER_STATE + " = ? AND " + COL_NEXT_FIRE_TIME + " <= ? "
+ "AND (" + COL_MISFIRE_INSTRUCTION + " = -1 OR (" + COL_MISFIRE_INSTRUCTION + " != -1 AND " + COL_NEXT_FIRE_TIME + " >= ?)) "
+ "ORDER BY " + COL_NEXT_FIRE_TIME + " ASC, " + COL_PRIORITY + " DESC";
String INSERT_FIRED_TRIGGER = "INSERT INTO "

54
fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/jdbcjobstore/StdJDBCDelegate.java

@ -228,7 +228,7 @@ public class StdJDBCDelegate implements DriverDelegate, StdJDBCConstants {
try {
ps = conn
.prepareStatement(rtp(UPDATE_TRIGGER_STATES_FROM_OTHER_STATES));
.prepareStatement(rtp(UPDATE_TRIGGER_STATES_FROM_OTHER_2_STATES));
ps.setString(1, newState);
ps.setString(2, oldState1);
ps.setString(3, oldState2);
@ -238,6 +238,23 @@ public class StdJDBCDelegate implements DriverDelegate, StdJDBCConstants {
}
}
@Override
public int updateTriggerStatesFromOtherStates(Connection conn, String newState, String oldState1, String oldState2, String oldState3) throws SQLException {
PreparedStatement ps = null;
try {
ps = conn.prepareStatement(rtp(UPDATE_TRIGGER_STATES_FROM_OTHER_3_STATES));
ps.setString(1, newState);
ps.setString(2, oldState1);
ps.setString(3, oldState2);
ps.setString(4, oldState3);
return ps.executeUpdate();
} finally {
closeStatement(ps);
}
}
/**
* <p>
* Get the names of all of the triggers that have misfired.
@ -1088,6 +1105,7 @@ public class StdJDBCDelegate implements DriverDelegate, StdJDBCConstants {
ps.setInt(13, trigger.getMisfireInstruction());
setBytes(ps, 14, baos);
ps.setInt(15, trigger.getPriority());
ps.setString(16, trigger.getAppointId());
insertResult = ps.executeUpdate();
@ -2624,6 +2642,40 @@ public class StdJDBCDelegate implements DriverDelegate, StdJDBCConstants {
}
}
public List<TriggerKey> selectAppointTriggerToAcquire(Connection conn, long noLaterThan, long noEarlierThan, int maxCount, String appointId) {
PreparedStatement ps = null;
ResultSet rs = null;
List<TriggerKey> nextTriggers = new LinkedList<TriggerKey>();
try {
ps = conn.prepareStatement(rtp(SELECT_NEXT_APPOINT_TRIGGER_TO_ACQUIRE));
if (maxCount < 1)
maxCount = 1;
ps.setMaxRows(maxCount);
ps.setFetchSize(maxCount);
ps.setString(1, appointId);
ps.setString(2, STATE_WAITING);
ps.setBigDecimal(3, new BigDecimal(String.valueOf(noLaterThan)));
ps.setBigDecimal(4, new BigDecimal(String.valueOf(noEarlierThan)));
rs = ps.executeQuery();
while (rs.next() && nextTriggers.size() <= maxCount) {
nextTriggers.add(TriggerKey.triggerKey(
rs.getString(COL_TRIGGER_NAME),
rs.getString(COL_TRIGGER_GROUP)));
}
return nextTriggers;
} catch (Exception e) {
logger.error(e.getMessage(), e);
return null;
} finally {
closeResultSet(rs);
closeStatement(ps);
}
}
/**
* <p>
* Insert a fired trigger.

33
fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/triggers/AbstractTrigger.java

@ -75,6 +75,8 @@ public abstract class AbstractTrigger<T extends Trigger> implements OperableTrig
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*/
private String appointId;
private String name;
private String group = Scheduler.DEFAULT_GROUP;
@ -251,6 +253,16 @@ public abstract class AbstractTrigger<T extends Trigger> implements OperableTrig
this.key = null;
}
@Override
public String getAppointId() {
return this.appointId;
}
@Override
public void setAppointId(String appointId) {
this.appointId = appointId;
}
public void setKey(TriggerKey key) {
setName(key.getName());
setGroup(key.getGroup());
@ -873,18 +885,19 @@ public abstract class AbstractTrigger<T extends Trigger> implements OperableTrig
}
return copy;
}
public TriggerBuilder<T> getTriggerBuilder() {
return TriggerBuilder.newTrigger()
.forJob(getJobKey())
.modifiedByCalendar(getCalendarName())
.usingJobData(getJobDataMap())
.withDescription(getDescription())
.endAt(getEndTime())
.withIdentity(getKey())
.withPriority(getPriority())
.startAt(getStartTime())
.withSchedule(getScheduleBuilder());
.appointId(getAppointId())
.forJob(getJobKey())
.modifiedByCalendar(getCalendarName())
.usingJobData(getJobDataMap())
.withDescription(getDescription())
.endAt(getEndTime())
.withIdentity(getKey())
.withPriority(getPriority())
.startAt(getStartTime())
.withSchedule(getScheduleBuilder());
}
public abstract ScheduleBuilder<T> getScheduleBuilder();

5
fine-quartz/src/main/java/com/fr/third/v2/org/quartz/simpl/RAMJobStore.java

@ -139,6 +139,11 @@ public class RAMJobStore implements JobStore {
return log;
}
@Override
public void setCurrentId(String currentId) {
//do nothing
}
/**
* <p>
* Called by the QuartzScheduler before the <code>JobStore</code> is

6
fine-quartz/src/main/java/com/fr/third/v2/org/quartz/spi/JobStore.java

@ -68,6 +68,12 @@ public interface JobStore {
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*/
/**
* 设置当前节点id
*/
void setCurrentId(String currentId);
/**
* Called by the QuartzScheduler before the <code>JobStore</code> is
* used, in order to give the it a chance to initialize.

Loading…
Cancel
Save