Browse Source

KERNEL-4410 集群定时任务,支持切换执行节点

release/10.0
zed 5 years ago
parent
commit
137133d895
  1. 7
      fine-quartz/src/main/java/com/fr/third/v2/org/quartz/Scheduler.java
  2. 4
      fine-quartz/src/main/java/com/fr/third/v2/org/quartz/core/QuartzScheduler.java
  3. 5
      fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/RemoteMBeanScheduler.java
  4. 5
      fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/RemoteScheduler.java
  5. 5
      fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/StdScheduler.java
  6. 2
      fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/jdbcjobstore/DriverDelegate.java
  7. 21
      fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/jdbcjobstore/JobStoreSupport.java
  8. 10
      fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/jdbcjobstore/StdJDBCConstants.java
  9. 18
      fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/jdbcjobstore/StdJDBCDelegate.java
  10. 5
      fine-quartz/src/main/java/com/fr/third/v2/org/quartz/simpl/RAMJobStore.java
  11. 7
      fine-quartz/src/main/java/com/fr/third/v2/org/quartz/spi/JobStore.java

7
fine-quartz/src/main/java/com/fr/third/v2/org/quartz/Scheduler.java

@ -198,6 +198,13 @@ public interface Scheduler {
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*/
/**
* 切换指定节点id
* @param oldId 旧id
* @param newId 新id
*/
void changeAppointId(String oldId, String newId);
/**
* Returns the name of the <code>Scheduler</code>.
*/

4
fine-quartz/src/main/java/com/fr/third/v2/org/quartz/core/QuartzScheduler.java

@ -860,6 +860,10 @@ public class QuartzScheduler implements RemotableQuartzScheduler {
return ft;
}
public void changeAppointId(String oldId, String newId) {
resources.getJobStore().changeAppointId(oldId, newId);
}
/**
* <p>
* Schedule the given <code>{@link Trigger}</code> with the

5
fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/RemoteMBeanScheduler.java

@ -165,6 +165,11 @@ public abstract class RemoteMBeanScheduler implements Scheduler {
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*/
@Override
public void changeAppointId(String oldId, String newId) {
// do nothing
}
/**
* <p>
* Returns the name of the <code>Scheduler</code>.

5
fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/RemoteScheduler.java

@ -131,6 +131,11 @@ public class RemoteScheduler implements Scheduler {
return ex;
}
@Override
public void changeAppointId(String oldId, String newId) {
// do nothing
}
/**
* <p>
* Returns the name of the <code>Scheduler</code>.

5
fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/StdScheduler.java

@ -89,6 +89,11 @@ public class StdScheduler implements Scheduler {
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*/
@Override
public void changeAppointId(String oldId, String newId) {
sched.changeAppointId(oldId, newId);
}
/**
* <p>
* Returns the name of the <code>Scheduler</code>.

2
fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/jdbcjobstore/DriverDelegate.java

@ -101,6 +101,8 @@ public interface DriverDelegate {
int updateTriggerStatesFromOtherStates(Connection conn, String newState, String oldState1, String oldState2, String oldState3) throws SQLException;
int updateTriggerAppointId(Connection conn, String oldId, String newId) throws SQLException;
/**
* <p>
* Get the names of all of the triggers that have misfired - according to

21
fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/jdbcjobstore/JobStoreSupport.java

@ -174,6 +174,27 @@ public abstract class JobStoreSupport implements JobStore, Constants {
this.currentId = currentId;
}
@Override
public void changeAppointId(String oldId, String newId) {
try {
executeInNonManagedTXLock(
LOCK_TRIGGER_ACCESS,
new VoidTransactionCallback() {
public void executeVoid(Connection conn) throws JobPersistenceException {
try {
int rows = getDelegate().updateTriggerAppointId(conn, oldId, newId);
getLog().info("update " + rows + " triggers appointId from " + oldId + " to " + newId);
} catch (SQLException e) {
throw new JobPersistenceException("Couldn't update appointId: " + e.getMessage(), e);
}
}
}, null);
} catch (JobPersistenceException e) {
getLog().error(e.getMessage());
}
}
/**
* <p>
* Set the name of the <code>DataSource</code> that should be used for

10
fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/jdbcjobstore/StdJDBCConstants.java

@ -75,6 +75,16 @@ public interface StdJDBCConstants extends Constants {
+ " = ? OR "
+ COL_TRIGGER_STATE + " = ?)";
String UPDATE_TRIGGER_APPOINT_ID = "UPDATE "
+ TABLE_PREFIX_SUBST
+ TABLE_TRIGGERS
+ " SET "
+ COL_APPOINT_ID
+ " = ?"
+ " WHERE "
+ COL_APPOINT_ID
+ " = ?";
String SELECT_MISFIRED_TRIGGERS = "SELECT * FROM "
+ TABLE_PREFIX_SUBST + TABLE_TRIGGERS + " WHERE "
+ COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST

18
fine-quartz/src/main/java/com/fr/third/v2/org/quartz/impl/jdbcjobstore/StdJDBCDelegate.java

@ -227,8 +227,7 @@ public class StdJDBCDelegate implements DriverDelegate, StdJDBCConstants {
PreparedStatement ps = null;
try {
ps = conn
.prepareStatement(rtp(UPDATE_TRIGGER_STATES_FROM_OTHER_2_STATES));
ps = conn.prepareStatement(rtp(UPDATE_TRIGGER_STATES_FROM_OTHER_2_STATES));
ps.setString(1, newState);
ps.setString(2, oldState1);
ps.setString(3, oldState2);
@ -255,6 +254,21 @@ public class StdJDBCDelegate implements DriverDelegate, StdJDBCConstants {
}
}
@Override
public int updateTriggerAppointId(Connection conn, String oldId, String newId) throws SQLException {
PreparedStatement ps = null;
try {
ps = conn.prepareStatement(rtp(UPDATE_TRIGGER_APPOINT_ID));
ps.setString(1, newId);
ps.setString(2, oldId);
return ps.executeUpdate();
} finally {
closeStatement(ps);
}
}
/**
* <p>
* Get the names of all of the triggers that have misfired.

5
fine-quartz/src/main/java/com/fr/third/v2/org/quartz/simpl/RAMJobStore.java

@ -144,6 +144,11 @@ public class RAMJobStore implements JobStore {
//do nothing
}
@Override
public void changeAppointId(String oldId, String newId) {
//do nothing
}
/**
* <p>
* Called by the QuartzScheduler before the <code>JobStore</code> is

7
fine-quartz/src/main/java/com/fr/third/v2/org/quartz/spi/JobStore.java

@ -73,6 +73,13 @@ public interface JobStore {
*/
void setCurrentId(String currentId);
/**
* 切换指定节点id
* @param oldId 旧id
* @param newId 新id
*/
void changeAppointId(String oldId, String newId);
/**
* Called by the QuartzScheduler before the <code>JobStore</code> is

Loading…
Cancel
Save