Browse Source

add worker group id in scheduler;

change workerGroupId default value to -1.
pull/2/head
baoliang 6 years ago
parent
commit
e02bbc1215
  1. 2
      escheduler-api/src/main/java/cn/escheduler/api/controller/ExecutorController.java
  2. 16
      escheduler-api/src/main/java/cn/escheduler/api/controller/SchedulerController.java
  3. 1
      escheduler-api/src/main/java/cn/escheduler/api/quartz/ProcessScheduleJob.java
  4. 7
      escheduler-api/src/main/java/cn/escheduler/api/service/SchedulerService.java
  5. 9
      escheduler-dao/readme.txt
  6. 4
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ScheduleMapper.java
  7. 2
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ScheduleMapperProvider.java
  8. 16
      escheduler-dao/src/main/java/cn/escheduler/dao/model/Schedule.java
  9. 2
      escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java

2
escheduler-api/src/main/java/cn/escheduler/api/controller/ExecutorController.java

@ -66,7 +66,7 @@ public class ExecutorController extends BaseController {
@RequestParam(value = "receiversCc", required = false) String receiversCc, @RequestParam(value = "receiversCc", required = false) String receiversCc,
@RequestParam(value = "runMode", required = false) RunMode runMode, @RequestParam(value = "runMode", required = false) RunMode runMode,
@RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority, @RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority,
@RequestParam(value = "workerGroupId", required = false, defaultValue = "0") int workerGroupId, @RequestParam(value = "workerGroupId", required = false, defaultValue = "-1") int workerGroupId,
@RequestParam(value = "timeout", required = false) Integer timeout) { @RequestParam(value = "timeout", required = false) Integer timeout) {
try { try {
logger.info("login user {}, start process instance, project name: {}, process definition id: {}, schedule time: {}, " logger.info("login user {}, start process instance, project name: {}, process definition id: {}, schedule time: {}, "

16
escheduler-api/src/main/java/cn/escheduler/api/controller/SchedulerController.java

@ -76,13 +76,15 @@ public class SchedulerController extends BaseController{
@RequestParam(value = "failureStrategy", required = false, defaultValue = DEFAULT_FAILURE_POLICY) FailureStrategy failureStrategy, @RequestParam(value = "failureStrategy", required = false, defaultValue = DEFAULT_FAILURE_POLICY) FailureStrategy failureStrategy,
@RequestParam(value = "receivers", required = false) String receivers, @RequestParam(value = "receivers", required = false) String receivers,
@RequestParam(value = "receiversCc", required = false) String receiversCc, @RequestParam(value = "receiversCc", required = false) String receiversCc,
@RequestParam(value = "workerGroupId", required = false, defaultValue = "-1") int workerGroupId,
@RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority) { @RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority) {
logger.info("login user {}, project name: {}, process name: {}, create schedule: {}, warning type: {}, warning group id: {}," + logger.info("login user {}, project name: {}, process name: {}, create schedule: {}, warning type: {}, warning group id: {}," +
"failure policy: {},receivers : {},receiversCc : {},processInstancePriority : {}", "failure policy: {},receivers : {},receiversCc : {},processInstancePriority : {}, workGroupId:{}",
loginUser.getUserName(), projectName, processDefinitionId, schedule, warningType, warningGroupId, failureStrategy,receivers,receiversCc,processInstancePriority); loginUser.getUserName(), projectName, processDefinitionId, schedule, warningType, warningGroupId,
failureStrategy,receivers,receiversCc,processInstancePriority,workerGroupId);
try { try {
Map<String, Object> result = schedulerService.insertSchedule(loginUser, projectName, processDefinitionId, schedule, Map<String, Object> result = schedulerService.insertSchedule(loginUser, projectName, processDefinitionId, schedule,
warningType, warningGroupId, failureStrategy, receivers,receiversCc,processInstancePriority); warningType, warningGroupId, failureStrategy, receivers,receiversCc,processInstancePriority,workerGroupId);
return returnDataList(result); return returnDataList(result);
}catch (Exception e){ }catch (Exception e){
@ -113,14 +115,16 @@ public class SchedulerController extends BaseController{
@RequestParam(value = "failureStrategy", required = false, defaultValue = "END") FailureStrategy failureStrategy, @RequestParam(value = "failureStrategy", required = false, defaultValue = "END") FailureStrategy failureStrategy,
@RequestParam(value = "receivers", required = false) String receivers, @RequestParam(value = "receivers", required = false) String receivers,
@RequestParam(value = "receiversCc", required = false) String receiversCc, @RequestParam(value = "receiversCc", required = false) String receiversCc,
@RequestParam(value = "workerGroupId", required = false, defaultValue = "-1") int workerGroupId,
@RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority) { @RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority) {
logger.info("login user {}, project name: {},id: {}, updateProcessInstance schedule: {}, notify type: {}, notify mails: {}, " + logger.info("login user {}, project name: {},id: {}, updateProcessInstance schedule: {}, notify type: {}, notify mails: {}, " +
"failure policy: {},receivers : {},receiversCc : {},processInstancePriority : {}", "failure policy: {},receivers : {},receiversCc : {},processInstancePriority : {},workerGroupId:{}",
loginUser.getUserName(), projectName, id, schedule, warningType, warningGroupId, failureStrategy,receivers,receiversCc,processInstancePriority); loginUser.getUserName(), projectName, id, schedule, warningType, warningGroupId, failureStrategy,
receivers,receiversCc,processInstancePriority,workerGroupId);
try { try {
Map<String, Object> result = schedulerService.updateSchedule(loginUser, projectName, id, schedule, Map<String, Object> result = schedulerService.updateSchedule(loginUser, projectName, id, schedule,
warningType, warningGroupId, failureStrategy, receivers,receiversCc,null,processInstancePriority); warningType, warningGroupId, failureStrategy, receivers,receiversCc,null,processInstancePriority, workerGroupId);
return returnDataList(result); return returnDataList(result);
}catch (Exception e){ }catch (Exception e){

1
escheduler-api/src/main/java/cn/escheduler/api/quartz/ProcessScheduleJob.java

@ -132,6 +132,7 @@ public class ProcessScheduleJob implements Job {
command.setScheduleTime(scheduledFireTime); command.setScheduleTime(scheduledFireTime);
command.setStartTime(fireTime); command.setStartTime(fireTime);
command.setWarningGroupId(schedule.getWarningGroupId()); command.setWarningGroupId(schedule.getWarningGroupId());
command.setWorkerGroupId(schedule.getWorkerGroupId());
command.setWarningType(schedule.getWarningType()); command.setWarningType(schedule.getWarningType());
command.setProcessInstancePriority(schedule.getProcessInstancePriority()); command.setProcessInstancePriority(schedule.getProcessInstancePriority());

7
escheduler-api/src/main/java/cn/escheduler/api/service/SchedulerService.java

@ -88,7 +88,7 @@ public class SchedulerService extends BaseService {
@Transactional(value = "TransactionManager", rollbackFor = Exception.class) @Transactional(value = "TransactionManager", rollbackFor = Exception.class)
public Map<String, Object> insertSchedule(User loginUser, String projectName, Integer processDefineId, String schedule, WarningType warningType, public Map<String, Object> insertSchedule(User loginUser, String projectName, Integer processDefineId, String schedule, WarningType warningType,
int warningGroupId, FailureStrategy failureStrategy, int warningGroupId, FailureStrategy failureStrategy,
String receivers, String receiversCc,Priority processInstancePriority) throws IOException { String receivers, String receiversCc,Priority processInstancePriority, int workerGroupId) throws IOException {
Map<String, Object> result = new HashMap<String, Object>(5); Map<String, Object> result = new HashMap<String, Object>(5);
@ -133,6 +133,7 @@ public class SchedulerService extends BaseService {
scheduleObj.setUserName(loginUser.getUserName()); scheduleObj.setUserName(loginUser.getUserName());
scheduleObj.setReleaseState(ReleaseState.OFFLINE); scheduleObj.setReleaseState(ReleaseState.OFFLINE);
scheduleObj.setProcessInstancePriority(processInstancePriority); scheduleObj.setProcessInstancePriority(processInstancePriority);
scheduleObj.setWorkerGroupId(workerGroupId);
scheduleMapper.insert(scheduleObj); scheduleMapper.insert(scheduleObj);
/** /**
@ -156,13 +157,14 @@ public class SchedulerService extends BaseService {
* @param warningGroupId * @param warningGroupId
* @param failureStrategy * @param failureStrategy
* @param scheduleStatus * @param scheduleStatus
* @param workerGroupId
* @return * @return
*/ */
@Transactional(value = "TransactionManager", rollbackFor = Exception.class) @Transactional(value = "TransactionManager", rollbackFor = Exception.class)
public Map<String, Object> updateSchedule(User loginUser, String projectName, Integer id, String scheduleExpression, WarningType warningType, public Map<String, Object> updateSchedule(User loginUser, String projectName, Integer id, String scheduleExpression, WarningType warningType,
int warningGroupId, FailureStrategy failureStrategy, int warningGroupId, FailureStrategy failureStrategy,
String receivers, String receiversCc, ReleaseState scheduleStatus, String receivers, String receiversCc, ReleaseState scheduleStatus,
Priority processInstancePriority) throws IOException { Priority processInstancePriority, int workerGroupId) throws IOException {
Map<String, Object> result = new HashMap<String, Object>(5); Map<String, Object> result = new HashMap<String, Object>(5);
Project project = projectMapper.queryByName(projectName); Project project = projectMapper.queryByName(projectName);
@ -221,6 +223,7 @@ public class SchedulerService extends BaseService {
if (scheduleStatus != null) { if (scheduleStatus != null) {
schedule.setReleaseState(scheduleStatus); schedule.setReleaseState(scheduleStatus);
} }
schedule.setWorkerGroupId(workerGroupId);
schedule.setUpdateTime(now); schedule.setUpdateTime(now);
schedule.setProcessInstancePriority(processInstancePriority); schedule.setProcessInstancePriority(processInstancePriority);
scheduleMapper.update(schedule); scheduleMapper.update(schedule);

9
escheduler-dao/readme.txt

@ -42,10 +42,13 @@ CREATE TABLE `t_escheduler_worker_group` (
) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic; ) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
ALTER TABLE `t_escheduler_task_instance` ALTER TABLE `t_escheduler_task_instance`
ADD COLUMN `worker_group_id` int(11) ZEROFILL NULL COMMENT '任务指定运行的worker分组' AFTER `task_instance_priority`; ADD COLUMN `worker_group_id` int(11) NULL DEFAULT -1 COMMENT '任务指定运行的worker分组' AFTER `task_instance_priority`;
ALTER TABLE `t_escheduler_command` ALTER TABLE `t_escheduler_command`
ADD COLUMN `worker_group_id` int(11) ZEROFILL NULL COMMENT '任务指定运行的worker分组' NULL AFTER `process_instance_priority`; ADD COLUMN `worker_group_id` int(11) NULL DEFAULT -1 COMMENT '任务指定运行的worker分组' NULL AFTER `process_instance_priority`;
ALTER TABLE `t_escheduler_error_command` ALTER TABLE `t_escheduler_error_command`
ADD COLUMN `worker_group_id` int(11) ZEROFILL NULL COMMENT '任务指定运行的worker分组' NULL AFTER `process_instance_priority`; ADD COLUMN `worker_group_id` int(11) NULL DEFAULT -1 COMMENT '任务指定运行的worker分组' NULL AFTER `process_instance_priority`;
ALTER TABLE `t_escheduler_schedules`
ADD COLUMN `worker_group_id` int(11) NULL DEFAULT -1 COMMENT '任务指定运行的worker分组' NULL AFTER `process_instance_priority`;

4
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ScheduleMapper.java

@ -75,6 +75,7 @@ public interface ScheduleMapper {
@Result(property = "userName", column = "user_name", javaType = String.class, jdbcType = JdbcType.INTEGER), @Result(property = "userName", column = "user_name", javaType = String.class, jdbcType = JdbcType.INTEGER),
@Result(property = "releaseState", column = "release_state", typeHandler = EnumOrdinalTypeHandler.class, javaType = ReleaseState.class, jdbcType = JdbcType.TINYINT), @Result(property = "releaseState", column = "release_state", typeHandler = EnumOrdinalTypeHandler.class, javaType = ReleaseState.class, jdbcType = JdbcType.TINYINT),
@Result(property = "warningGroupId", column = "warning_group_id", javaType = int.class, jdbcType = JdbcType.INTEGER), @Result(property = "warningGroupId", column = "warning_group_id", javaType = int.class, jdbcType = JdbcType.INTEGER),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = int.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) @Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
}) })
@SelectProvider(type = ScheduleMapperProvider.class, method = "queryByProcessDefineIdPaging") @SelectProvider(type = ScheduleMapperProvider.class, method = "queryByProcessDefineIdPaging")
@ -117,6 +118,7 @@ public interface ScheduleMapper {
@Result(property = "userName", column = "user_name", javaType = String.class, jdbcType = JdbcType.INTEGER), @Result(property = "userName", column = "user_name", javaType = String.class, jdbcType = JdbcType.INTEGER),
@Result(property = "releaseState", column = "release_state", typeHandler = EnumOrdinalTypeHandler.class, javaType = ReleaseState.class, jdbcType = JdbcType.TINYINT), @Result(property = "releaseState", column = "release_state", typeHandler = EnumOrdinalTypeHandler.class, javaType = ReleaseState.class, jdbcType = JdbcType.TINYINT),
@Result(property = "warningGroupId", column = "warning_group_id", javaType = int.class, jdbcType = JdbcType.INTEGER), @Result(property = "warningGroupId", column = "warning_group_id", javaType = int.class, jdbcType = JdbcType.INTEGER),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = int.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) @Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
}) })
@SelectProvider(type = ScheduleMapperProvider.class, method = "querySchedulerListByProjectName") @SelectProvider(type = ScheduleMapperProvider.class, method = "querySchedulerListByProjectName")
@ -141,6 +143,7 @@ public interface ScheduleMapper {
@Result(property = "userId", column = "user_id", javaType = int.class, jdbcType = JdbcType.INTEGER), @Result(property = "userId", column = "user_id", javaType = int.class, jdbcType = JdbcType.INTEGER),
@Result(property = "releaseState", column = "release_state", typeHandler = EnumOrdinalTypeHandler.class, javaType = ReleaseState.class, jdbcType = JdbcType.TINYINT), @Result(property = "releaseState", column = "release_state", typeHandler = EnumOrdinalTypeHandler.class, javaType = ReleaseState.class, jdbcType = JdbcType.TINYINT),
@Result(property = "warningGroupId", column = "warning_group_id", javaType = int.class, jdbcType = JdbcType.INTEGER), @Result(property = "warningGroupId", column = "warning_group_id", javaType = int.class, jdbcType = JdbcType.INTEGER),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = int.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) @Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
}) })
@SelectProvider(type = ScheduleMapperProvider.class, method = "queryById") @SelectProvider(type = ScheduleMapperProvider.class, method = "queryById")
@ -164,6 +167,7 @@ public interface ScheduleMapper {
@Result(property = "userId", column = "user_id", javaType = int.class, jdbcType = JdbcType.INTEGER), @Result(property = "userId", column = "user_id", javaType = int.class, jdbcType = JdbcType.INTEGER),
@Result(property = "releaseState", column = "release_state", typeHandler = EnumOrdinalTypeHandler.class, javaType = ReleaseState.class, jdbcType = JdbcType.TINYINT), @Result(property = "releaseState", column = "release_state", typeHandler = EnumOrdinalTypeHandler.class, javaType = ReleaseState.class, jdbcType = JdbcType.TINYINT),
@Result(property = "warningGroupId", column = "warning_group_id", javaType = int.class, jdbcType = JdbcType.INTEGER), @Result(property = "warningGroupId", column = "warning_group_id", javaType = int.class, jdbcType = JdbcType.INTEGER),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = int.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) @Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
}) })
@SelectProvider(type = ScheduleMapperProvider.class, method = "selectAllByProcessDefineArray") @SelectProvider(type = ScheduleMapperProvider.class, method = "selectAllByProcessDefineArray")

2
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ScheduleMapperProvider.java

@ -48,6 +48,7 @@ public class ScheduleMapperProvider {
VALUES("`user_id`", "#{schedule.userId}"); VALUES("`user_id`", "#{schedule.userId}");
VALUES("`release_state`", EnumFieldUtil.genFieldStr("schedule.releaseState", ReleaseState.class)); VALUES("`release_state`", EnumFieldUtil.genFieldStr("schedule.releaseState", ReleaseState.class));
VALUES("`warning_group_id`", "#{schedule.warningGroupId}"); VALUES("`warning_group_id`", "#{schedule.warningGroupId}");
VALUES("`worker_group_id`", "#{schedule.workerGroupId}");
VALUES("`process_instance_priority`", EnumFieldUtil.genFieldStr("schedule.processInstancePriority", Priority.class)); VALUES("`process_instance_priority`", EnumFieldUtil.genFieldStr("schedule.processInstancePriority", Priority.class));
}}.toString(); }}.toString();
} }
@ -67,6 +68,7 @@ public class ScheduleMapperProvider {
SET("`user_id`=#{schedule.userId}"); SET("`user_id`=#{schedule.userId}");
SET("`release_state`=" + EnumFieldUtil.genFieldStr("schedule.releaseState", ReleaseState.class)); SET("`release_state`=" + EnumFieldUtil.genFieldStr("schedule.releaseState", ReleaseState.class));
SET("`warning_group_id`=#{schedule.warningGroupId}"); SET("`warning_group_id`=#{schedule.warningGroupId}");
SET("`worker_group_id`=#{schedule.workerGroupId}");
SET("`process_instance_priority`="+ EnumFieldUtil.genFieldStr("schedule.processInstancePriority", Priority.class)); SET("`process_instance_priority`="+ EnumFieldUtil.genFieldStr("schedule.processInstancePriority", Priority.class));
WHERE("`id` = #{schedule.id}"); WHERE("`id` = #{schedule.id}");

16
escheduler-dao/src/main/java/cn/escheduler/dao/model/Schedule.java

@ -111,6 +111,11 @@ public class Schedule {
*/ */
private Priority processInstancePriority; private Priority processInstancePriority;
/**
* worker group id
*/
private int workerGroupId;
public int getWarningGroupId() { public int getWarningGroupId() {
return warningGroupId; return warningGroupId;
} }
@ -256,6 +261,15 @@ public class Schedule {
this.processInstancePriority = processInstancePriority; this.processInstancePriority = processInstancePriority;
} }
public int getWorkerGroupId() {
return workerGroupId;
}
public void setWorkerGroupId(int workerGroupId) {
this.workerGroupId = workerGroupId;
}
@Override @Override
public String toString() { public String toString() {
return "Schedule{" + return "Schedule{" +
@ -276,6 +290,8 @@ public class Schedule {
", releaseState=" + releaseState + ", releaseState=" + releaseState +
", warningGroupId=" + warningGroupId + ", warningGroupId=" + warningGroupId +
", processInstancePriority=" + processInstancePriority + ", processInstancePriority=" + processInstancePriority +
", workerGroupId=" + workerGroupId +
'}'; '}';
} }
} }

2
escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java

@ -109,7 +109,7 @@ public class FetchTaskThread implements Runnable{
} }
int processWorkerGroupId = processInstance.getWorkerGroupId(); int processWorkerGroupId = processInstance.getWorkerGroupId();
taskWorkerGroupId = (taskWorkerGroupId == 0 ? processWorkerGroupId : taskWorkerGroupId); taskWorkerGroupId = (taskWorkerGroupId <= 0 ? processWorkerGroupId : taskWorkerGroupId);
if(taskWorkerGroupId == 0){ if(taskWorkerGroupId == 0){
return true; return true;

Loading…
Cancel
Save