From 17e4170f97bde02a97c17dbb51f8259265ebc10c Mon Sep 17 00:00:00 2001 From: Zed Date: Sat, 9 May 2020 14:45:54 +0800 Subject: [PATCH] =?UTF-8?q?KERNEL-3899=20=E9=9B=86=E7=BE=A4=E4=B8=8B?= =?UTF-8?q?=E5=AE=9A=E6=97=B6=E8=B0=83=E5=BA=A6=E4=BB=BB=E5=8A=A1=E6=94=AF?= =?UTF-8?q?=E6=8C=81=E7=89=B9=E5=AE=9A=E8=8A=82=E7=82=B9=E8=BF=90=E8=A1=8C?= =?UTF-8?q?=E5=AE=9A=E6=97=B6=E4=BB=BB=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/fr/third/v2/org/quartz/Trigger.java | 4 +++ .../third/v2/org/quartz/TriggerBuilder.java | 9 ++++- .../org/quartz/impl/StdSchedulerFactory.java | 5 +++ .../quartz/impl/jdbcjobstore/Constants.java | 2 ++ .../impl/jdbcjobstore/DriverDelegate.java | 2 ++ .../impl/jdbcjobstore/JobStoreSupport.java | 31 ++++++++++++++-- .../impl/jdbcjobstore/PointbaseDelegate.java | 1 + .../impl/jdbcjobstore/StdJDBCConstants.java | 29 ++++++++++----- .../impl/jdbcjobstore/StdJDBCDelegate.java | 35 +++++++++++++++++++ .../quartz/impl/triggers/AbstractTrigger.java | 33 +++++++++++------ .../v2/org/quartz/simpl/RAMJobStore.java | 5 +++ .../fr/third/v2/org/quartz/spi/JobStore.java | 6 ++++ 12 files changed, 139 insertions(+), 23 deletions(-) diff --git a/fine-quartz/src/com/fr/third/v2/org/quartz/Trigger.java b/fine-quartz/src/com/fr/third/v2/org/quartz/Trigger.java index b64ed0bc4..7d630c118 100644 --- a/fine-quartz/src/com/fr/third/v2/org/quartz/Trigger.java +++ b/fine-quartz/src/com/fr/third/v2/org/quartz/Trigger.java @@ -127,6 +127,10 @@ public interface Trigger extends Serializable, Cloneable, Comparable { */ public static final int DEFAULT_PRIORITY = 5; + String getAppointId(); + + void setAppointId(String appointId); + public TriggerKey getKey(); public JobKey getJobKey(); diff --git a/fine-quartz/src/com/fr/third/v2/org/quartz/TriggerBuilder.java b/fine-quartz/src/com/fr/third/v2/org/quartz/TriggerBuilder.java index db8ed1e7c..88de74055 100644 --- a/fine-quartz/src/com/fr/third/v2/org/quartz/TriggerBuilder.java +++ b/fine-quartz/src/com/fr/third/v2/org/quartz/TriggerBuilder.java @@ -71,6 +71,7 @@ public class TriggerBuilder { private String calendarName; private JobKey jobKey; private JobDataMap jobDataMap = new JobDataMap(); + private String appointId; private ScheduleBuilder scheduleBuilder = null; @@ -99,7 +100,8 @@ public class TriggerBuilder { if(scheduleBuilder == null) scheduleBuilder = SimpleScheduleBuilder.simpleSchedule(); MutableTrigger trig = scheduleBuilder.build(); - + + trig.setAppointId(appointId); trig.setCalendarName(calendarName); trig.setDescription(description); trig.setStartTime(startTime); @@ -117,6 +119,11 @@ public class TriggerBuilder { return (T) trig; } + public TriggerBuilder appointId(String appointId) { + this.appointId = appointId; + return this; + } + /** * Use a TriggerKey with the given name and default group to * identify the Trigger. diff --git a/fine-quartz/src/com/fr/third/v2/org/quartz/impl/StdSchedulerFactory.java b/fine-quartz/src/com/fr/third/v2/org/quartz/impl/StdSchedulerFactory.java index 9e3b1f626..062ed8867 100644 --- a/fine-quartz/src/com/fr/third/v2/org/quartz/impl/StdSchedulerFactory.java +++ b/fine-quartz/src/com/fr/third/v2/org/quartz/impl/StdSchedulerFactory.java @@ -193,6 +193,8 @@ public class StdSchedulerFactory implements SchedulerFactory { public static final String PROP_JOB_STORE_PREFIX = "com.fr.third.v2.org.quartz.jobStore"; + public static final String PROP_JOB_STORE_CURRENT_ID = "com.fr.third.v2.org.quartz.jobStore.currentId"; + public static final String PROP_JOB_STORE_LOCK_HANDLER_PREFIX = PROP_JOB_STORE_PREFIX + ".lockHandler"; public static final String PROP_JOB_STORE_LOCK_HANDLER_CLASS = PROP_JOB_STORE_LOCK_HANDLER_PREFIX + ".class"; @@ -860,8 +862,11 @@ public class StdSchedulerFactory implements SchedulerFactory { throw initException; } + String currentId = cfg.getStringProperty(PROP_JOB_STORE_CURRENT_ID); + try { js = (JobStore) loadHelper.loadClass(jsClass).newInstance(); + js.setCurrentId(currentId); } catch (Exception e) { initException = new SchedulerException("JobStore class '" + jsClass + "' could not be instantiated.", e); diff --git a/fine-quartz/src/com/fr/third/v2/org/quartz/impl/jdbcjobstore/Constants.java b/fine-quartz/src/com/fr/third/v2/org/quartz/impl/jdbcjobstore/Constants.java index 60290765c..252600ce7 100644 --- a/fine-quartz/src/com/fr/third/v2/org/quartz/impl/jdbcjobstore/Constants.java +++ b/fine-quartz/src/com/fr/third/v2/org/quartz/impl/jdbcjobstore/Constants.java @@ -105,6 +105,8 @@ public interface Constants { String ALIAS_COL_NEXT_FIRE_TIME = "ALIAS_NXT_FR_TM"; + String COL_APPOINT_ID= "APPOINT_ID"; + // TABLE_SIMPLE_TRIGGERS columns names String COL_REPEAT_COUNT = "REPEAT_COUNT"; diff --git a/fine-quartz/src/com/fr/third/v2/org/quartz/impl/jdbcjobstore/DriverDelegate.java b/fine-quartz/src/com/fr/third/v2/org/quartz/impl/jdbcjobstore/DriverDelegate.java index f95dbfb89..9c5e6fe69 100644 --- a/fine-quartz/src/com/fr/third/v2/org/quartz/impl/jdbcjobstore/DriverDelegate.java +++ b/fine-quartz/src/com/fr/third/v2/org/quartz/impl/jdbcjobstore/DriverDelegate.java @@ -968,6 +968,8 @@ public interface DriverDelegate { public List selectTriggerToAcquire(Connection conn, long noLaterThan, long noEarlierThan, int maxCount) throws SQLException; + List selectAppointTriggerToAcquire(Connection conn, long noLaterThan, long noEarlierThan, int maxCount, String appointId); + /** *

* Insert a fired trigger. diff --git a/fine-quartz/src/com/fr/third/v2/org/quartz/impl/jdbcjobstore/JobStoreSupport.java b/fine-quartz/src/com/fr/third/v2/org/quartz/impl/jdbcjobstore/JobStoreSupport.java index 4f3a3d6b0..64fa45585 100644 --- a/fine-quartz/src/com/fr/third/v2/org/quartz/impl/jdbcjobstore/JobStoreSupport.java +++ b/fine-quartz/src/com/fr/third/v2/org/quartz/impl/jdbcjobstore/JobStoreSupport.java @@ -159,6 +159,8 @@ public abstract class JobStoreSupport implements JobStore, Constants { private volatile boolean schedulerRunning = false; private volatile boolean shutdown = false; + private volatile String currentId; + /* * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ * @@ -167,6 +169,11 @@ public abstract class JobStoreSupport implements JobStore, Constants { * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ */ + @Override + public void setCurrentId(String currentId) { + this.currentId = currentId; + } + /** *

* Set the name of the DataSource that should be used for @@ -2780,6 +2787,17 @@ public abstract class JobStoreSupport implements JobStore, Constants { }); } + private void mergeKeys(List allKeys, Set allKeyIds, List mergeKeys) { + if (mergeKeys != null) { + for (TriggerKey key : mergeKeys) { + if (!allKeyIds.contains(key.toString())) { + allKeys.add(key); + allKeyIds.add(key.toString()); + } + } + } + } + // FUTURE_TODO: this really ought to return something like a FiredTriggerBundle, // so that the fireInstanceId doesn't have to be on the trigger... protected List acquireNextTrigger(Connection conn, long noLaterThan, int maxCount, long timeWindow) @@ -2795,15 +2813,22 @@ public abstract class JobStoreSupport implements JobStore, Constants { do { currentLoopCount++; try { - List keys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, getMisfireTime(), maxCount); + long misfireTime = getMisfireTime(); + List waitingKeys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, misfireTime, maxCount); + List appointKeys = getDelegate().selectAppointTriggerToAcquire(conn, noLaterThan + timeWindow, misfireTime, maxCount, this.currentId); + List allKeys = new ArrayList(); + Set keyIds = new HashSet(); + + mergeKeys(allKeys, keyIds, waitingKeys); + mergeKeys(allKeys, keyIds, appointKeys); // No trigger is ready to fire yet. - if (keys == null || keys.size() == 0) + if (keyIds.size() == 0) return acquiredTriggers; long batchEnd = noLaterThan; - for (TriggerKey triggerKey : keys) { + for (TriggerKey triggerKey : allKeys) { // If our trigger is no longer available, try a new one. OperableTrigger nextTrigger = retrieveTrigger(conn, triggerKey); if (nextTrigger == null) { diff --git a/fine-quartz/src/com/fr/third/v2/org/quartz/impl/jdbcjobstore/PointbaseDelegate.java b/fine-quartz/src/com/fr/third/v2/org/quartz/impl/jdbcjobstore/PointbaseDelegate.java index 5cf9398a0..01bc08c93 100644 --- a/fine-quartz/src/com/fr/third/v2/org/quartz/impl/jdbcjobstore/PointbaseDelegate.java +++ b/fine-quartz/src/com/fr/third/v2/org/quartz/impl/jdbcjobstore/PointbaseDelegate.java @@ -181,6 +181,7 @@ public class PointbaseDelegate extends StdJDBCDelegate { ps.setInt(13, trigger.getMisfireInstruction()); ps.setBinaryStream(14, bais, len); ps.setInt(15, trigger.getPriority()); + ps.setString(16, trigger.getAppointId()); insertResult = ps.executeUpdate(); diff --git a/fine-quartz/src/com/fr/third/v2/org/quartz/impl/jdbcjobstore/StdJDBCConstants.java b/fine-quartz/src/com/fr/third/v2/org/quartz/impl/jdbcjobstore/StdJDBCConstants.java index 6a20af625..5b094d208 100644 --- a/fine-quartz/src/com/fr/third/v2/org/quartz/impl/jdbcjobstore/StdJDBCConstants.java +++ b/fine-quartz/src/com/fr/third/v2/org/quartz/impl/jdbcjobstore/StdJDBCConstants.java @@ -212,8 +212,8 @@ public interface StdJDBCConstants extends Constants { + ", " + COL_NEXT_FIRE_TIME + ", " + COL_PREV_FIRE_TIME + ", " + COL_TRIGGER_STATE + ", " + COL_TRIGGER_TYPE + ", " + COL_START_TIME + ", " + COL_END_TIME + ", " + COL_CALENDAR_NAME - + ", " + COL_MISFIRE_INSTRUCTION + ", " + COL_JOB_DATAMAP + ", " + COL_PRIORITY + ") " - + " VALUES(" + SCHED_NAME_SUBST + ", ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; + + ", " + COL_MISFIRE_INSTRUCTION + ", " + COL_JOB_DATAMAP + ", " + COL_PRIORITY + ", " + COL_APPOINT_ID + ") " + + " VALUES(" + SCHED_NAME_SUBST + ", ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; String INSERT_SIMPLE_TRIGGER = "INSERT INTO " + TABLE_PREFIX_SUBST + TABLE_SIMPLE_TRIGGERS + " (" @@ -507,13 +507,24 @@ public interface StdJDBCConstants extends Constants { + " AND " + COL_TRIGGER_STATE + " = ? AND " + COL_NEXT_FIRE_TIME + " = ?"; String SELECT_NEXT_TRIGGER_TO_ACQUIRE = "SELECT " - + COL_TRIGGER_NAME + ", " + COL_TRIGGER_GROUP + ", " - + COL_NEXT_FIRE_TIME + ", " + COL_PRIORITY + " FROM " - + TABLE_PREFIX_SUBST + TABLE_TRIGGERS + " WHERE " - + COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST - + " AND " + COL_TRIGGER_STATE + " = ? AND " + COL_NEXT_FIRE_TIME + " <= ? " - + "AND (" + COL_MISFIRE_INSTRUCTION + " = -1 OR (" +COL_MISFIRE_INSTRUCTION+ " != -1 AND "+ COL_NEXT_FIRE_TIME + " >= ?)) " - + "ORDER BY "+ COL_NEXT_FIRE_TIME + " ASC, " + COL_PRIORITY + " DESC"; + + COL_TRIGGER_NAME + ", " + COL_TRIGGER_GROUP + ", " + + COL_NEXT_FIRE_TIME + ", " + COL_PRIORITY + " FROM " + + TABLE_PREFIX_SUBST + TABLE_TRIGGERS + " WHERE " + + COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST + + " AND " + COL_APPOINT_ID + " IS NULL" + + " AND " + COL_TRIGGER_STATE + " = ? AND " + COL_NEXT_FIRE_TIME + " <= ? " + + "AND (" + COL_MISFIRE_INSTRUCTION + " = -1 OR (" + COL_MISFIRE_INSTRUCTION + " != -1 AND " + COL_NEXT_FIRE_TIME + " >= ?)) " + + "ORDER BY " + COL_NEXT_FIRE_TIME + " ASC, " + COL_PRIORITY + " DESC"; + + String SELECT_NEXT_APPOINT_TRIGGER_TO_ACQUIRE = "SELECT " + + COL_TRIGGER_NAME + ", " + COL_TRIGGER_GROUP + ", " + + COL_NEXT_FIRE_TIME + ", " + COL_PRIORITY + " FROM " + + TABLE_PREFIX_SUBST + TABLE_TRIGGERS + " WHERE " + + COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST + + " AND " + COL_APPOINT_ID + " = ?" + + " AND " + COL_TRIGGER_STATE + " = ? AND " + COL_NEXT_FIRE_TIME + " <= ? " + + "AND (" + COL_MISFIRE_INSTRUCTION + " = -1 OR (" + COL_MISFIRE_INSTRUCTION + " != -1 AND " + COL_NEXT_FIRE_TIME + " >= ?)) " + + "ORDER BY " + COL_NEXT_FIRE_TIME + " ASC, " + COL_PRIORITY + " DESC"; String INSERT_FIRED_TRIGGER = "INSERT INTO " diff --git a/fine-quartz/src/com/fr/third/v2/org/quartz/impl/jdbcjobstore/StdJDBCDelegate.java b/fine-quartz/src/com/fr/third/v2/org/quartz/impl/jdbcjobstore/StdJDBCDelegate.java index add26e57b..b354e11dd 100644 --- a/fine-quartz/src/com/fr/third/v2/org/quartz/impl/jdbcjobstore/StdJDBCDelegate.java +++ b/fine-quartz/src/com/fr/third/v2/org/quartz/impl/jdbcjobstore/StdJDBCDelegate.java @@ -1088,6 +1088,7 @@ public class StdJDBCDelegate implements DriverDelegate, StdJDBCConstants { ps.setInt(13, trigger.getMisfireInstruction()); setBytes(ps, 14, baos); ps.setInt(15, trigger.getPriority()); + ps.setString(16, trigger.getAppointId()); insertResult = ps.executeUpdate(); @@ -2624,6 +2625,40 @@ public class StdJDBCDelegate implements DriverDelegate, StdJDBCConstants { } } + public List selectAppointTriggerToAcquire(Connection conn, long noLaterThan, long noEarlierThan, int maxCount, String appointId) { + PreparedStatement ps = null; + ResultSet rs = null; + List nextTriggers = new LinkedList(); + try { + ps = conn.prepareStatement(rtp(SELECT_NEXT_APPOINT_TRIGGER_TO_ACQUIRE)); + if (maxCount < 1) + maxCount = 1; + + ps.setMaxRows(maxCount); + ps.setFetchSize(maxCount); + + ps.setString(1, appointId); + ps.setString(2, STATE_WAITING); + ps.setBigDecimal(3, new BigDecimal(String.valueOf(noLaterThan))); + ps.setBigDecimal(4, new BigDecimal(String.valueOf(noEarlierThan))); + rs = ps.executeQuery(); + + while (rs.next() && nextTriggers.size() <= maxCount) { + nextTriggers.add(TriggerKey.triggerKey( + rs.getString(COL_TRIGGER_NAME), + rs.getString(COL_TRIGGER_GROUP))); + } + + return nextTriggers; + } catch (Exception e) { + logger.error(e.getMessage(), e); + return null; + } finally { + closeResultSet(rs); + closeStatement(ps); + } + } + /** *

* Insert a fired trigger. diff --git a/fine-quartz/src/com/fr/third/v2/org/quartz/impl/triggers/AbstractTrigger.java b/fine-quartz/src/com/fr/third/v2/org/quartz/impl/triggers/AbstractTrigger.java index 1305a8412..572475908 100644 --- a/fine-quartz/src/com/fr/third/v2/org/quartz/impl/triggers/AbstractTrigger.java +++ b/fine-quartz/src/com/fr/third/v2/org/quartz/impl/triggers/AbstractTrigger.java @@ -75,6 +75,8 @@ public abstract class AbstractTrigger implements OperableTrig * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ */ + private String appointId; + private String name; private String group = Scheduler.DEFAULT_GROUP; @@ -251,6 +253,16 @@ public abstract class AbstractTrigger implements OperableTrig this.key = null; } + @Override + public String getAppointId() { + return this.appointId; + } + + @Override + public void setAppointId(String appointId) { + this.appointId = appointId; + } + public void setKey(TriggerKey key) { setName(key.getName()); setGroup(key.getGroup()); @@ -873,18 +885,19 @@ public abstract class AbstractTrigger implements OperableTrig } return copy; } - + public TriggerBuilder getTriggerBuilder() { return TriggerBuilder.newTrigger() - .forJob(getJobKey()) - .modifiedByCalendar(getCalendarName()) - .usingJobData(getJobDataMap()) - .withDescription(getDescription()) - .endAt(getEndTime()) - .withIdentity(getKey()) - .withPriority(getPriority()) - .startAt(getStartTime()) - .withSchedule(getScheduleBuilder()); + .appointId(getAppointId()) + .forJob(getJobKey()) + .modifiedByCalendar(getCalendarName()) + .usingJobData(getJobDataMap()) + .withDescription(getDescription()) + .endAt(getEndTime()) + .withIdentity(getKey()) + .withPriority(getPriority()) + .startAt(getStartTime()) + .withSchedule(getScheduleBuilder()); } public abstract ScheduleBuilder getScheduleBuilder(); diff --git a/fine-quartz/src/com/fr/third/v2/org/quartz/simpl/RAMJobStore.java b/fine-quartz/src/com/fr/third/v2/org/quartz/simpl/RAMJobStore.java index e0db3e4df..f049bdb0e 100644 --- a/fine-quartz/src/com/fr/third/v2/org/quartz/simpl/RAMJobStore.java +++ b/fine-quartz/src/com/fr/third/v2/org/quartz/simpl/RAMJobStore.java @@ -139,6 +139,11 @@ public class RAMJobStore implements JobStore { return log; } + @Override + public void setCurrentId(String currentId) { + //do nothing + } + /** *

* Called by the QuartzScheduler before the JobStore is diff --git a/fine-quartz/src/com/fr/third/v2/org/quartz/spi/JobStore.java b/fine-quartz/src/com/fr/third/v2/org/quartz/spi/JobStore.java index fe4026ce8..e95eb13ee 100644 --- a/fine-quartz/src/com/fr/third/v2/org/quartz/spi/JobStore.java +++ b/fine-quartz/src/com/fr/third/v2/org/quartz/spi/JobStore.java @@ -68,6 +68,12 @@ public interface JobStore { * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ */ + /** + * 设置当前节点id + */ + void setCurrentId(String currentId); + + /** * Called by the QuartzScheduler before the JobStore is * used, in order to give the it a chance to initialize.