diff --git a/fine-quartz/src/main/java/com/fr/third/v2/org/quartz/Trigger.java b/fine-quartz/src/main/java/com/fr/third/v2/org/quartz/Trigger.java index b64ed0bc4..7d630c118 100644 --- a/fine-quartz/src/main/java/com/fr/third/v2/org/quartz/Trigger.java +++ b/fine-quartz/src/main/java/com/fr/third/v2/org/quartz/Trigger.java @@ -127,6 +127,10 @@ public interface Trigger extends Serializable, Cloneable, Comparable { */ public static final int DEFAULT_PRIORITY = 5; + String getAppointId(); + + void setAppointId(String appointId); + public TriggerKey getKey(); public JobKey getJobKey(); diff --git a/fine-quartz/src/main/java/com/fr/third/v2/org/quartz/TriggerBuilder.java b/fine-quartz/src/main/java/com/fr/third/v2/org/quartz/TriggerBuilder.java index db8ed1e7c..88de74055 100644 --- a/fine-quartz/src/main/java/com/fr/third/v2/org/quartz/TriggerBuilder.java +++ b/fine-quartz/src/main/java/com/fr/third/v2/org/quartz/TriggerBuilder.java @@ -71,6 +71,7 @@ public class TriggerBuilder { private String calendarName; private JobKey jobKey; private JobDataMap jobDataMap = new JobDataMap(); + private String appointId; private ScheduleBuilder scheduleBuilder = null; @@ -99,7 +100,8 @@ public class TriggerBuilder { if(scheduleBuilder == null) scheduleBuilder = SimpleScheduleBuilder.simpleSchedule(); MutableTrigger trig = scheduleBuilder.build(); - + + trig.setAppointId(appointId); trig.setCalendarName(calendarName); trig.setDescription(description); trig.setStartTime(startTime); @@ -117,6 +119,11 @@ public class TriggerBuilder { return (T) trig; } + public TriggerBuilder appointId(String appointId) { + this.appointId = appointId; + return this; + } + /** * Use a TriggerKey with the given name and default group to * identify the Trigger. diff --git a/fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/StdSchedulerFactory.java b/fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/StdSchedulerFactory.java index 9e3b1f626..062ed8867 100644 --- a/fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/StdSchedulerFactory.java +++ b/fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/StdSchedulerFactory.java @@ -193,6 +193,8 @@ public class StdSchedulerFactory implements SchedulerFactory { public static final String PROP_JOB_STORE_PREFIX = "com.fr.third.v2.org.quartz.jobStore"; + public static final String PROP_JOB_STORE_CURRENT_ID = "com.fr.third.v2.org.quartz.jobStore.currentId"; + public static final String PROP_JOB_STORE_LOCK_HANDLER_PREFIX = PROP_JOB_STORE_PREFIX + ".lockHandler"; public static final String PROP_JOB_STORE_LOCK_HANDLER_CLASS = PROP_JOB_STORE_LOCK_HANDLER_PREFIX + ".class"; @@ -860,8 +862,11 @@ public class StdSchedulerFactory implements SchedulerFactory { throw initException; } + String currentId = cfg.getStringProperty(PROP_JOB_STORE_CURRENT_ID); + try { js = (JobStore) loadHelper.loadClass(jsClass).newInstance(); + js.setCurrentId(currentId); } catch (Exception e) { initException = new SchedulerException("JobStore class '" + jsClass + "' could not be instantiated.", e); diff --git a/fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/jdbcjobstore/Constants.java b/fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/jdbcjobstore/Constants.java index 60290765c..252600ce7 100644 --- a/fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/jdbcjobstore/Constants.java +++ b/fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/jdbcjobstore/Constants.java @@ -105,6 +105,8 @@ public interface Constants { String ALIAS_COL_NEXT_FIRE_TIME = "ALIAS_NXT_FR_TM"; + String COL_APPOINT_ID= "APPOINT_ID"; + // TABLE_SIMPLE_TRIGGERS columns names String COL_REPEAT_COUNT = "REPEAT_COUNT"; diff --git a/fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/jdbcjobstore/DriverDelegate.java b/fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/jdbcjobstore/DriverDelegate.java index f95dbfb89..6368f2d6e 100644 --- a/fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/jdbcjobstore/DriverDelegate.java +++ b/fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/jdbcjobstore/DriverDelegate.java @@ -99,6 +99,8 @@ public interface DriverDelegate { String newState, String oldState1, String oldState2) throws SQLException; + int updateTriggerStatesFromOtherStates(Connection conn, String newState, String oldState1, String oldState2, String oldState3) throws SQLException; + /** *

* Get the names of all of the triggers that have misfired - according to @@ -968,6 +970,8 @@ public interface DriverDelegate { public List selectTriggerToAcquire(Connection conn, long noLaterThan, long noEarlierThan, int maxCount) throws SQLException; + List selectAppointTriggerToAcquire(Connection conn, long noLaterThan, long noEarlierThan, int maxCount, String appointId); + /** *

* Insert a fired trigger. diff --git a/fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/jdbcjobstore/JobStoreSupport.java b/fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/jdbcjobstore/JobStoreSupport.java index 4f3a3d6b0..fa600441b 100644 --- a/fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/jdbcjobstore/JobStoreSupport.java +++ b/fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/jdbcjobstore/JobStoreSupport.java @@ -159,6 +159,8 @@ public abstract class JobStoreSupport implements JobStore, Constants { private volatile boolean schedulerRunning = false; private volatile boolean shutdown = false; + private volatile String currentId; + /* * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ * @@ -167,6 +169,11 @@ public abstract class JobStoreSupport implements JobStore, Constants { * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ */ + @Override + public void setCurrentId(String currentId) { + this.currentId = currentId; + } + /** *

* Set the name of the DataSource that should be used for @@ -854,14 +861,14 @@ public abstract class JobStoreSupport implements JobStore, Constants { try { // update inconsistent job states int rows = getDelegate().updateTriggerStatesFromOtherStates(conn, - STATE_WAITING, STATE_ACQUIRED, STATE_BLOCKED); + STATE_WAITING, STATE_ACQUIRED, STATE_BLOCKED, STATE_ERROR); rows += getDelegate().updateTriggerStatesFromOtherStates(conn, STATE_PAUSED, STATE_PAUSED_BLOCKED, STATE_PAUSED_BLOCKED); getLog().info( "Freed " + rows - + " triggers from 'acquired' / 'blocked' state."); + + " triggers from 'acquired' / 'blocked' / 'error' state."); // clean up misfired jobs recoverMisfiredJobs(conn, true); @@ -2780,6 +2787,17 @@ public abstract class JobStoreSupport implements JobStore, Constants { }); } + private void mergeKeys(List allKeys, Set allKeyIds, List mergeKeys) { + if (mergeKeys != null) { + for (TriggerKey key : mergeKeys) { + if (!allKeyIds.contains(key.toString())) { + allKeys.add(key); + allKeyIds.add(key.toString()); + } + } + } + } + // FUTURE_TODO: this really ought to return something like a FiredTriggerBundle, // so that the fireInstanceId doesn't have to be on the trigger... protected List acquireNextTrigger(Connection conn, long noLaterThan, int maxCount, long timeWindow) @@ -2795,15 +2813,22 @@ public abstract class JobStoreSupport implements JobStore, Constants { do { currentLoopCount++; try { - List keys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, getMisfireTime(), maxCount); + long misfireTime = getMisfireTime(); + List waitingKeys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, misfireTime, maxCount); + List appointKeys = getDelegate().selectAppointTriggerToAcquire(conn, noLaterThan + timeWindow, misfireTime, maxCount, this.currentId); + List allKeys = new ArrayList(); + Set keyIds = new HashSet(); + + mergeKeys(allKeys, keyIds, waitingKeys); + mergeKeys(allKeys, keyIds, appointKeys); // No trigger is ready to fire yet. - if (keys == null || keys.size() == 0) + if (keyIds.size() == 0) return acquiredTriggers; long batchEnd = noLaterThan; - for (TriggerKey triggerKey : keys) { + for (TriggerKey triggerKey : allKeys) { // If our trigger is no longer available, try a new one. OperableTrigger nextTrigger = retrieveTrigger(conn, triggerKey); if (nextTrigger == null) { diff --git a/fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/jdbcjobstore/PointbaseDelegate.java b/fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/jdbcjobstore/PointbaseDelegate.java index 5cf9398a0..01bc08c93 100644 --- a/fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/jdbcjobstore/PointbaseDelegate.java +++ b/fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/jdbcjobstore/PointbaseDelegate.java @@ -181,6 +181,7 @@ public class PointbaseDelegate extends StdJDBCDelegate { ps.setInt(13, trigger.getMisfireInstruction()); ps.setBinaryStream(14, bais, len); ps.setInt(15, trigger.getPriority()); + ps.setString(16, trigger.getAppointId()); insertResult = ps.executeUpdate(); diff --git a/fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/jdbcjobstore/StdJDBCConstants.java b/fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/jdbcjobstore/StdJDBCConstants.java index 6a20af625..a871a0b20 100644 --- a/fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/jdbcjobstore/StdJDBCConstants.java +++ b/fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/jdbcjobstore/StdJDBCConstants.java @@ -47,19 +47,34 @@ public interface StdJDBCConstants extends Constants { String SCHED_NAME_SUBST = "{1}"; // QUERIES - String UPDATE_TRIGGER_STATES_FROM_OTHER_STATES = "UPDATE " + String UPDATE_TRIGGER_STATES_FROM_OTHER_2_STATES = "UPDATE " + TABLE_PREFIX_SUBST + TABLE_TRIGGERS + " SET " + COL_TRIGGER_STATE + " = ?" + " WHERE " - + COL_SCHEDULER_NAME + + COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST + " AND (" + COL_TRIGGER_STATE + " = ? OR " + COL_TRIGGER_STATE + " = ?)"; + String UPDATE_TRIGGER_STATES_FROM_OTHER_3_STATES = "UPDATE " + + TABLE_PREFIX_SUBST + + TABLE_TRIGGERS + + " SET " + + COL_TRIGGER_STATE + + " = ?" + + " WHERE " + + COL_SCHEDULER_NAME + + " = " + SCHED_NAME_SUBST + " AND (" + + COL_TRIGGER_STATE + + " = ? OR " + + COL_TRIGGER_STATE + + " = ? OR " + + COL_TRIGGER_STATE + " = ?)"; + String SELECT_MISFIRED_TRIGGERS = "SELECT * FROM " + TABLE_PREFIX_SUBST + TABLE_TRIGGERS + " WHERE " + COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST @@ -212,8 +227,8 @@ public interface StdJDBCConstants extends Constants { + ", " + COL_NEXT_FIRE_TIME + ", " + COL_PREV_FIRE_TIME + ", " + COL_TRIGGER_STATE + ", " + COL_TRIGGER_TYPE + ", " + COL_START_TIME + ", " + COL_END_TIME + ", " + COL_CALENDAR_NAME - + ", " + COL_MISFIRE_INSTRUCTION + ", " + COL_JOB_DATAMAP + ", " + COL_PRIORITY + ") " - + " VALUES(" + SCHED_NAME_SUBST + ", ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; + + ", " + COL_MISFIRE_INSTRUCTION + ", " + COL_JOB_DATAMAP + ", " + COL_PRIORITY + ", " + COL_APPOINT_ID + ") " + + " VALUES(" + SCHED_NAME_SUBST + ", ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; String INSERT_SIMPLE_TRIGGER = "INSERT INTO " + TABLE_PREFIX_SUBST + TABLE_SIMPLE_TRIGGERS + " (" @@ -507,13 +522,24 @@ public interface StdJDBCConstants extends Constants { + " AND " + COL_TRIGGER_STATE + " = ? AND " + COL_NEXT_FIRE_TIME + " = ?"; String SELECT_NEXT_TRIGGER_TO_ACQUIRE = "SELECT " - + COL_TRIGGER_NAME + ", " + COL_TRIGGER_GROUP + ", " - + COL_NEXT_FIRE_TIME + ", " + COL_PRIORITY + " FROM " - + TABLE_PREFIX_SUBST + TABLE_TRIGGERS + " WHERE " - + COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST - + " AND " + COL_TRIGGER_STATE + " = ? AND " + COL_NEXT_FIRE_TIME + " <= ? " - + "AND (" + COL_MISFIRE_INSTRUCTION + " = -1 OR (" +COL_MISFIRE_INSTRUCTION+ " != -1 AND "+ COL_NEXT_FIRE_TIME + " >= ?)) " - + "ORDER BY "+ COL_NEXT_FIRE_TIME + " ASC, " + COL_PRIORITY + " DESC"; + + COL_TRIGGER_NAME + ", " + COL_TRIGGER_GROUP + ", " + + COL_NEXT_FIRE_TIME + ", " + COL_PRIORITY + " FROM " + + TABLE_PREFIX_SUBST + TABLE_TRIGGERS + " WHERE " + + COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST + + " AND " + COL_APPOINT_ID + " IS NULL" + + " AND " + COL_TRIGGER_STATE + " = ? AND " + COL_NEXT_FIRE_TIME + " <= ? " + + "AND (" + COL_MISFIRE_INSTRUCTION + " = -1 OR (" + COL_MISFIRE_INSTRUCTION + " != -1 AND " + COL_NEXT_FIRE_TIME + " >= ?)) " + + "ORDER BY " + COL_NEXT_FIRE_TIME + " ASC, " + COL_PRIORITY + " DESC"; + + String SELECT_NEXT_APPOINT_TRIGGER_TO_ACQUIRE = "SELECT " + + COL_TRIGGER_NAME + ", " + COL_TRIGGER_GROUP + ", " + + COL_NEXT_FIRE_TIME + ", " + COL_PRIORITY + " FROM " + + TABLE_PREFIX_SUBST + TABLE_TRIGGERS + " WHERE " + + COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST + + " AND " + COL_APPOINT_ID + " = ?" + + " AND " + COL_TRIGGER_STATE + " = ? AND " + COL_NEXT_FIRE_TIME + " <= ? " + + "AND (" + COL_MISFIRE_INSTRUCTION + " = -1 OR (" + COL_MISFIRE_INSTRUCTION + " != -1 AND " + COL_NEXT_FIRE_TIME + " >= ?)) " + + "ORDER BY " + COL_NEXT_FIRE_TIME + " ASC, " + COL_PRIORITY + " DESC"; String INSERT_FIRED_TRIGGER = "INSERT INTO " diff --git a/fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/jdbcjobstore/StdJDBCDelegate.java b/fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/jdbcjobstore/StdJDBCDelegate.java index add26e57b..2a2cde569 100644 --- a/fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/jdbcjobstore/StdJDBCDelegate.java +++ b/fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/jdbcjobstore/StdJDBCDelegate.java @@ -228,7 +228,7 @@ public class StdJDBCDelegate implements DriverDelegate, StdJDBCConstants { try { ps = conn - .prepareStatement(rtp(UPDATE_TRIGGER_STATES_FROM_OTHER_STATES)); + .prepareStatement(rtp(UPDATE_TRIGGER_STATES_FROM_OTHER_2_STATES)); ps.setString(1, newState); ps.setString(2, oldState1); ps.setString(3, oldState2); @@ -238,6 +238,23 @@ public class StdJDBCDelegate implements DriverDelegate, StdJDBCConstants { } } + @Override + public int updateTriggerStatesFromOtherStates(Connection conn, String newState, String oldState1, String oldState2, String oldState3) throws SQLException { + + PreparedStatement ps = null; + try { + ps = conn.prepareStatement(rtp(UPDATE_TRIGGER_STATES_FROM_OTHER_3_STATES)); + ps.setString(1, newState); + ps.setString(2, oldState1); + ps.setString(3, oldState2); + ps.setString(4, oldState3); + + return ps.executeUpdate(); + } finally { + closeStatement(ps); + } + } + /** *

* Get the names of all of the triggers that have misfired. @@ -1088,6 +1105,7 @@ public class StdJDBCDelegate implements DriverDelegate, StdJDBCConstants { ps.setInt(13, trigger.getMisfireInstruction()); setBytes(ps, 14, baos); ps.setInt(15, trigger.getPriority()); + ps.setString(16, trigger.getAppointId()); insertResult = ps.executeUpdate(); @@ -2624,6 +2642,40 @@ public class StdJDBCDelegate implements DriverDelegate, StdJDBCConstants { } } + public List selectAppointTriggerToAcquire(Connection conn, long noLaterThan, long noEarlierThan, int maxCount, String appointId) { + PreparedStatement ps = null; + ResultSet rs = null; + List nextTriggers = new LinkedList(); + try { + ps = conn.prepareStatement(rtp(SELECT_NEXT_APPOINT_TRIGGER_TO_ACQUIRE)); + if (maxCount < 1) + maxCount = 1; + + ps.setMaxRows(maxCount); + ps.setFetchSize(maxCount); + + ps.setString(1, appointId); + ps.setString(2, STATE_WAITING); + ps.setBigDecimal(3, new BigDecimal(String.valueOf(noLaterThan))); + ps.setBigDecimal(4, new BigDecimal(String.valueOf(noEarlierThan))); + rs = ps.executeQuery(); + + while (rs.next() && nextTriggers.size() <= maxCount) { + nextTriggers.add(TriggerKey.triggerKey( + rs.getString(COL_TRIGGER_NAME), + rs.getString(COL_TRIGGER_GROUP))); + } + + return nextTriggers; + } catch (Exception e) { + logger.error(e.getMessage(), e); + return null; + } finally { + closeResultSet(rs); + closeStatement(ps); + } + } + /** *

* Insert a fired trigger. diff --git a/fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/triggers/AbstractTrigger.java b/fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/triggers/AbstractTrigger.java index 1305a8412..572475908 100644 --- a/fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/triggers/AbstractTrigger.java +++ b/fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/triggers/AbstractTrigger.java @@ -75,6 +75,8 @@ public abstract class AbstractTrigger implements OperableTrig * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ */ + private String appointId; + private String name; private String group = Scheduler.DEFAULT_GROUP; @@ -251,6 +253,16 @@ public abstract class AbstractTrigger implements OperableTrig this.key = null; } + @Override + public String getAppointId() { + return this.appointId; + } + + @Override + public void setAppointId(String appointId) { + this.appointId = appointId; + } + public void setKey(TriggerKey key) { setName(key.getName()); setGroup(key.getGroup()); @@ -873,18 +885,19 @@ public abstract class AbstractTrigger implements OperableTrig } return copy; } - + public TriggerBuilder getTriggerBuilder() { return TriggerBuilder.newTrigger() - .forJob(getJobKey()) - .modifiedByCalendar(getCalendarName()) - .usingJobData(getJobDataMap()) - .withDescription(getDescription()) - .endAt(getEndTime()) - .withIdentity(getKey()) - .withPriority(getPriority()) - .startAt(getStartTime()) - .withSchedule(getScheduleBuilder()); + .appointId(getAppointId()) + .forJob(getJobKey()) + .modifiedByCalendar(getCalendarName()) + .usingJobData(getJobDataMap()) + .withDescription(getDescription()) + .endAt(getEndTime()) + .withIdentity(getKey()) + .withPriority(getPriority()) + .startAt(getStartTime()) + .withSchedule(getScheduleBuilder()); } public abstract ScheduleBuilder getScheduleBuilder(); diff --git a/fine-quartz/src/main/java/com/fr/third/v2/org/quartz/simpl/RAMJobStore.java b/fine-quartz/src/main/java/com/fr/third/v2/org/quartz/simpl/RAMJobStore.java index e0db3e4df..f049bdb0e 100644 --- a/fine-quartz/src/main/java/com/fr/third/v2/org/quartz/simpl/RAMJobStore.java +++ b/fine-quartz/src/main/java/com/fr/third/v2/org/quartz/simpl/RAMJobStore.java @@ -139,6 +139,11 @@ public class RAMJobStore implements JobStore { return log; } + @Override + public void setCurrentId(String currentId) { + //do nothing + } + /** *

* Called by the QuartzScheduler before the JobStore is diff --git a/fine-quartz/src/main/java/com/fr/third/v2/org/quartz/spi/JobStore.java b/fine-quartz/src/main/java/com/fr/third/v2/org/quartz/spi/JobStore.java index fe4026ce8..e95eb13ee 100644 --- a/fine-quartz/src/main/java/com/fr/third/v2/org/quartz/spi/JobStore.java +++ b/fine-quartz/src/main/java/com/fr/third/v2/org/quartz/spi/JobStore.java @@ -68,6 +68,12 @@ public interface JobStore { * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ */ + /** + * 设置当前节点id + */ + void setCurrentId(String currentId); + + /** * Called by the QuartzScheduler before the JobStore is * used, in order to give the it a chance to initialize.