diff --git a/escheduler-api/src/main/java/cn/escheduler/api/controller/ExecutorController.java b/escheduler-api/src/main/java/cn/escheduler/api/controller/ExecutorController.java index 8f69933fb4..b6d3d4475f 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/controller/ExecutorController.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/controller/ExecutorController.java @@ -66,7 +66,7 @@ public class ExecutorController extends BaseController { @RequestParam(value = "receiversCc", required = false) String receiversCc, @RequestParam(value = "runMode", required = false) RunMode runMode, @RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority, - @RequestParam(value = "workerGroupId", required = false, defaultValue = "0") int workerGroupId, + @RequestParam(value = "workerGroupId", required = false, defaultValue = "-1") int workerGroupId, @RequestParam(value = "timeout", required = false) Integer timeout) { try { logger.info("login user {}, start process instance, project name: {}, process definition id: {}, schedule time: {}, " diff --git a/escheduler-api/src/main/java/cn/escheduler/api/controller/SchedulerController.java b/escheduler-api/src/main/java/cn/escheduler/api/controller/SchedulerController.java index 3ef011d323..8449bd1d38 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/controller/SchedulerController.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/controller/SchedulerController.java @@ -76,13 +76,15 @@ public class SchedulerController extends BaseController{ @RequestParam(value = "failureStrategy", required = false, defaultValue = DEFAULT_FAILURE_POLICY) FailureStrategy failureStrategy, @RequestParam(value = "receivers", required = false) String receivers, @RequestParam(value = "receiversCc", required = false) String receiversCc, + @RequestParam(value = "workerGroupId", required = false, defaultValue = "-1") int workerGroupId, @RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority) { logger.info("login user {}, project name: {}, process name: {}, create schedule: {}, warning type: {}, warning group id: {}," + - "failure policy: {},receivers : {},receiversCc : {},processInstancePriority : {}", - loginUser.getUserName(), projectName, processDefinitionId, schedule, warningType, warningGroupId, failureStrategy,receivers,receiversCc,processInstancePriority); + "failure policy: {},receivers : {},receiversCc : {},processInstancePriority : {}, workGroupId:{}", + loginUser.getUserName(), projectName, processDefinitionId, schedule, warningType, warningGroupId, + failureStrategy,receivers,receiversCc,processInstancePriority,workerGroupId); try { Map result = schedulerService.insertSchedule(loginUser, projectName, processDefinitionId, schedule, - warningType, warningGroupId, failureStrategy, receivers,receiversCc,processInstancePriority); + warningType, warningGroupId, failureStrategy, receivers,receiversCc,processInstancePriority,workerGroupId); return returnDataList(result); }catch (Exception e){ @@ -113,14 +115,16 @@ public class SchedulerController extends BaseController{ @RequestParam(value = "failureStrategy", required = false, defaultValue = "END") FailureStrategy failureStrategy, @RequestParam(value = "receivers", required = false) String receivers, @RequestParam(value = "receiversCc", required = false) String receiversCc, + @RequestParam(value = "workerGroupId", required = false, defaultValue = "-1") int workerGroupId, @RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority) { logger.info("login user {}, project name: {},id: {}, updateProcessInstance schedule: {}, notify type: {}, notify mails: {}, " + - "failure policy: {},receivers : {},receiversCc : {},processInstancePriority : {}", - loginUser.getUserName(), projectName, id, schedule, warningType, warningGroupId, failureStrategy,receivers,receiversCc,processInstancePriority); + "failure policy: {},receivers : {},receiversCc : {},processInstancePriority : {},workerGroupId:{}", + loginUser.getUserName(), projectName, id, schedule, warningType, warningGroupId, failureStrategy, + receivers,receiversCc,processInstancePriority,workerGroupId); try { Map result = schedulerService.updateSchedule(loginUser, projectName, id, schedule, - warningType, warningGroupId, failureStrategy, receivers,receiversCc,null,processInstancePriority); + warningType, warningGroupId, failureStrategy, receivers,receiversCc,null,processInstancePriority, workerGroupId); return returnDataList(result); }catch (Exception e){ diff --git a/escheduler-api/src/main/java/cn/escheduler/api/quartz/ProcessScheduleJob.java b/escheduler-api/src/main/java/cn/escheduler/api/quartz/ProcessScheduleJob.java index 6fcaf1171d..96e283d7d5 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/quartz/ProcessScheduleJob.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/quartz/ProcessScheduleJob.java @@ -132,6 +132,7 @@ public class ProcessScheduleJob implements Job { command.setScheduleTime(scheduledFireTime); command.setStartTime(fireTime); command.setWarningGroupId(schedule.getWarningGroupId()); + command.setWorkerGroupId(schedule.getWorkerGroupId()); command.setWarningType(schedule.getWarningType()); command.setProcessInstancePriority(schedule.getProcessInstancePriority()); diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/SchedulerService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/SchedulerService.java index df73181d7b..232b9d7b15 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/service/SchedulerService.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/SchedulerService.java @@ -88,7 +88,7 @@ public class SchedulerService extends BaseService { @Transactional(value = "TransactionManager", rollbackFor = Exception.class) public Map insertSchedule(User loginUser, String projectName, Integer processDefineId, String schedule, WarningType warningType, int warningGroupId, FailureStrategy failureStrategy, - String receivers, String receiversCc,Priority processInstancePriority) throws IOException { + String receivers, String receiversCc,Priority processInstancePriority, int workerGroupId) throws IOException { Map result = new HashMap(5); @@ -133,6 +133,7 @@ public class SchedulerService extends BaseService { scheduleObj.setUserName(loginUser.getUserName()); scheduleObj.setReleaseState(ReleaseState.OFFLINE); scheduleObj.setProcessInstancePriority(processInstancePriority); + scheduleObj.setWorkerGroupId(workerGroupId); scheduleMapper.insert(scheduleObj); /** @@ -156,13 +157,14 @@ public class SchedulerService extends BaseService { * @param warningGroupId * @param failureStrategy * @param scheduleStatus + * @param workerGroupId * @return */ @Transactional(value = "TransactionManager", rollbackFor = Exception.class) public Map updateSchedule(User loginUser, String projectName, Integer id, String scheduleExpression, WarningType warningType, int warningGroupId, FailureStrategy failureStrategy, String receivers, String receiversCc, ReleaseState scheduleStatus, - Priority processInstancePriority) throws IOException { + Priority processInstancePriority, int workerGroupId) throws IOException { Map result = new HashMap(5); Project project = projectMapper.queryByName(projectName); @@ -221,6 +223,7 @@ public class SchedulerService extends BaseService { if (scheduleStatus != null) { schedule.setReleaseState(scheduleStatus); } + schedule.setWorkerGroupId(workerGroupId); schedule.setUpdateTime(now); schedule.setProcessInstancePriority(processInstancePriority); scheduleMapper.update(schedule); diff --git a/escheduler-dao/readme.txt b/escheduler-dao/readme.txt index e28c8a499a..7bc8e53bb4 100644 --- a/escheduler-dao/readme.txt +++ b/escheduler-dao/readme.txt @@ -42,10 +42,13 @@ CREATE TABLE `t_escheduler_worker_group` ( ) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic; ALTER TABLE `t_escheduler_task_instance` -ADD COLUMN `worker_group_id` int(11) ZEROFILL NULL COMMENT '任务指定运行的worker分组' AFTER `task_instance_priority`; +ADD COLUMN `worker_group_id` int(11) NULL DEFAULT -1 COMMENT '任务指定运行的worker分组' AFTER `task_instance_priority`; ALTER TABLE `t_escheduler_command` -ADD COLUMN `worker_group_id` int(11) ZEROFILL NULL COMMENT '任务指定运行的worker分组' NULL AFTER `process_instance_priority`; +ADD COLUMN `worker_group_id` int(11) NULL DEFAULT -1 COMMENT '任务指定运行的worker分组' NULL AFTER `process_instance_priority`; ALTER TABLE `t_escheduler_error_command` -ADD COLUMN `worker_group_id` int(11) ZEROFILL NULL COMMENT '任务指定运行的worker分组' NULL AFTER `process_instance_priority`; \ No newline at end of file +ADD COLUMN `worker_group_id` int(11) NULL DEFAULT -1 COMMENT '任务指定运行的worker分组' NULL AFTER `process_instance_priority`; + +ALTER TABLE `t_escheduler_schedules` +ADD COLUMN `worker_group_id` int(11) NULL DEFAULT -1 COMMENT '任务指定运行的worker分组' NULL AFTER `process_instance_priority`; \ No newline at end of file diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ScheduleMapper.java b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ScheduleMapper.java index 2e6f3fc40b..f1a48bde94 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ScheduleMapper.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ScheduleMapper.java @@ -75,6 +75,7 @@ public interface ScheduleMapper { @Result(property = "userName", column = "user_name", javaType = String.class, jdbcType = JdbcType.INTEGER), @Result(property = "releaseState", column = "release_state", typeHandler = EnumOrdinalTypeHandler.class, javaType = ReleaseState.class, jdbcType = JdbcType.TINYINT), @Result(property = "warningGroupId", column = "warning_group_id", javaType = int.class, jdbcType = JdbcType.INTEGER), + @Result(property = "workerGroupId", column = "worker_group_id", javaType = int.class, jdbcType = JdbcType.INTEGER), @Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) }) @SelectProvider(type = ScheduleMapperProvider.class, method = "queryByProcessDefineIdPaging") @@ -117,6 +118,7 @@ public interface ScheduleMapper { @Result(property = "userName", column = "user_name", javaType = String.class, jdbcType = JdbcType.INTEGER), @Result(property = "releaseState", column = "release_state", typeHandler = EnumOrdinalTypeHandler.class, javaType = ReleaseState.class, jdbcType = JdbcType.TINYINT), @Result(property = "warningGroupId", column = "warning_group_id", javaType = int.class, jdbcType = JdbcType.INTEGER), + @Result(property = "workerGroupId", column = "worker_group_id", javaType = int.class, jdbcType = JdbcType.INTEGER), @Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) }) @SelectProvider(type = ScheduleMapperProvider.class, method = "querySchedulerListByProjectName") @@ -141,6 +143,7 @@ public interface ScheduleMapper { @Result(property = "userId", column = "user_id", javaType = int.class, jdbcType = JdbcType.INTEGER), @Result(property = "releaseState", column = "release_state", typeHandler = EnumOrdinalTypeHandler.class, javaType = ReleaseState.class, jdbcType = JdbcType.TINYINT), @Result(property = "warningGroupId", column = "warning_group_id", javaType = int.class, jdbcType = JdbcType.INTEGER), + @Result(property = "workerGroupId", column = "worker_group_id", javaType = int.class, jdbcType = JdbcType.INTEGER), @Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) }) @SelectProvider(type = ScheduleMapperProvider.class, method = "queryById") @@ -164,6 +167,7 @@ public interface ScheduleMapper { @Result(property = "userId", column = "user_id", javaType = int.class, jdbcType = JdbcType.INTEGER), @Result(property = "releaseState", column = "release_state", typeHandler = EnumOrdinalTypeHandler.class, javaType = ReleaseState.class, jdbcType = JdbcType.TINYINT), @Result(property = "warningGroupId", column = "warning_group_id", javaType = int.class, jdbcType = JdbcType.INTEGER), + @Result(property = "workerGroupId", column = "worker_group_id", javaType = int.class, jdbcType = JdbcType.INTEGER), @Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) }) @SelectProvider(type = ScheduleMapperProvider.class, method = "selectAllByProcessDefineArray") diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ScheduleMapperProvider.java b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ScheduleMapperProvider.java index af82318da2..5674675a25 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ScheduleMapperProvider.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ScheduleMapperProvider.java @@ -48,6 +48,7 @@ public class ScheduleMapperProvider { VALUES("`user_id`", "#{schedule.userId}"); VALUES("`release_state`", EnumFieldUtil.genFieldStr("schedule.releaseState", ReleaseState.class)); VALUES("`warning_group_id`", "#{schedule.warningGroupId}"); + VALUES("`worker_group_id`", "#{schedule.workerGroupId}"); VALUES("`process_instance_priority`", EnumFieldUtil.genFieldStr("schedule.processInstancePriority", Priority.class)); }}.toString(); } @@ -67,6 +68,7 @@ public class ScheduleMapperProvider { SET("`user_id`=#{schedule.userId}"); SET("`release_state`=" + EnumFieldUtil.genFieldStr("schedule.releaseState", ReleaseState.class)); SET("`warning_group_id`=#{schedule.warningGroupId}"); + SET("`worker_group_id`=#{schedule.workerGroupId}"); SET("`process_instance_priority`="+ EnumFieldUtil.genFieldStr("schedule.processInstancePriority", Priority.class)); WHERE("`id` = #{schedule.id}"); diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/model/Schedule.java b/escheduler-dao/src/main/java/cn/escheduler/dao/model/Schedule.java index 40579a53ac..39926947c1 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/model/Schedule.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/model/Schedule.java @@ -111,6 +111,11 @@ public class Schedule { */ private Priority processInstancePriority; + /** + * worker group id + */ + private int workerGroupId; + public int getWarningGroupId() { return warningGroupId; } @@ -256,6 +261,15 @@ public class Schedule { this.processInstancePriority = processInstancePriority; } + + public int getWorkerGroupId() { + return workerGroupId; + } + + public void setWorkerGroupId(int workerGroupId) { + this.workerGroupId = workerGroupId; + } + @Override public String toString() { return "Schedule{" + @@ -276,6 +290,8 @@ public class Schedule { ", releaseState=" + releaseState + ", warningGroupId=" + warningGroupId + ", processInstancePriority=" + processInstancePriority + + ", workerGroupId=" + workerGroupId + '}'; } + } diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java index e77f1adc5e..f3a18b5aa8 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java @@ -109,7 +109,7 @@ public class FetchTaskThread implements Runnable{ } int processWorkerGroupId = processInstance.getWorkerGroupId(); - taskWorkerGroupId = (taskWorkerGroupId == 0 ? processWorkerGroupId : taskWorkerGroupId); + taskWorkerGroupId = (taskWorkerGroupId <= 0 ? processWorkerGroupId : taskWorkerGroupId); if(taskWorkerGroupId == 0){ return true;