Browse Source

Merge pull request #120 from lenboo/dev-20190415

tasks run on the specified worker
pull/2/head
bao liang 6 years ago committed by GitHub
parent
commit
4a6912e4c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      escheduler-api/src/main/java/cn/escheduler/api/controller/ExecutorController.java
  2. 16
      escheduler-api/src/main/java/cn/escheduler/api/controller/SchedulerController.java
  3. 144
      escheduler-api/src/main/java/cn/escheduler/api/controller/WorkerGroupController.java
  4. 4
      escheduler-api/src/main/java/cn/escheduler/api/enums/Status.java
  5. 1
      escheduler-api/src/main/java/cn/escheduler/api/quartz/ProcessScheduleJob.java
  6. 7
      escheduler-api/src/main/java/cn/escheduler/api/service/ExecutorService.java
  7. 7
      escheduler-api/src/main/java/cn/escheduler/api/service/SchedulerService.java
  8. 155
      escheduler-api/src/main/java/cn/escheduler/api/service/WorkerGroupService.java
  9. 16
      escheduler-common/src/main/java/cn/escheduler/common/model/TaskNode.java
  10. 9
      escheduler-common/src/main/java/cn/escheduler/common/queue/ITaskQueue.java
  11. 33
      escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java
  12. 6
      escheduler-common/src/test/java/cn/escheduler/common/queue/TaskQueueImplTest.java
  13. 26
      escheduler-dao/readme.txt
  14. 14
      escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java
  15. 2
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/CommandMapper.java
  16. 6
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/CommandMapperProvider.java
  17. 2
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ErrorCommandMapperProvider.java
  18. 11
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessInstanceMapper.java
  19. 2
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessInstanceMapperProvider.java
  20. 4
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ScheduleMapper.java
  21. 2
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ScheduleMapperProvider.java
  22. 5
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/TaskInstanceMapper.java
  23. 2
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/TaskInstanceMapperProvider.java
  24. 131
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/WorkerGroupMapper.java
  25. 160
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/WorkerGroupMapperProvider.java
  26. 16
      escheduler-dao/src/main/java/cn/escheduler/dao/model/Command.java
  27. 15
      escheduler-dao/src/main/java/cn/escheduler/dao/model/ErrorCommand.java
  28. 13
      escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessInstance.java
  29. 16
      escheduler-dao/src/main/java/cn/escheduler/dao/model/Schedule.java
  30. 16
      escheduler-dao/src/main/java/cn/escheduler/dao/model/TaskInstance.java
  31. 88
      escheduler-dao/src/main/java/cn/escheduler/dao/model/WorkerGroup.java
  32. 6
      escheduler-dao/src/main/resources/dao/data_source.properties
  33. 69
      escheduler-dao/src/test/java/cn/escheduler/dao/mapper/WorkerGroupMapperTest.java
  34. 3
      escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterExecThread.java
  35. 54
      escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java
  36. 11
      escheduler-server/src/test/java/cn/escheduler/server/zk/ZKWorkerClientTest.java

8
escheduler-api/src/main/java/cn/escheduler/api/controller/ExecutorController.java

@ -66,13 +66,15 @@ public class ExecutorController extends BaseController {
@RequestParam(value = "receiversCc", required = false) String receiversCc, @RequestParam(value = "receiversCc", required = false) String receiversCc,
@RequestParam(value = "runMode", required = false) RunMode runMode, @RequestParam(value = "runMode", required = false) RunMode runMode,
@RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority, @RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority,
@RequestParam(value = "workerGroupId", required = false, defaultValue = "-1") int workerGroupId,
@RequestParam(value = "timeout", required = false) Integer timeout) { @RequestParam(value = "timeout", required = false) Integer timeout) {
try { try {
logger.info("login user {}, start process instance, project name: {}, process definition id: {}, schedule time: {}, " logger.info("login user {}, start process instance, project name: {}, process definition id: {}, schedule time: {}, "
+ "failure policy: {}, node name: {}, node dep: {}, notify type: {}, " + "failure policy: {}, node name: {}, node dep: {}, notify type: {}, "
+ "notify group id: {},receivers:{},receiversCc:{}, run mode: {},process instance priority:{}, timeout: {}", + "notify group id: {},receivers:{},receiversCc:{}, run mode: {},process instance priority:{}, workerGroupId: {}, timeout: {}",
loginUser.getUserName(), projectName, processDefinitionId, scheduleTime, failureStrategy, loginUser.getUserName(), projectName, processDefinitionId, scheduleTime, failureStrategy,
taskDependType, warningType, warningGroupId,receivers,receiversCc,runMode,processInstancePriority,timeout); taskDependType, warningType, warningGroupId,receivers,receiversCc,runMode,processInstancePriority,
workerGroupId, timeout);
if (timeout == null) { if (timeout == null) {
timeout = cn.escheduler.common.Constants.MAX_TASK_TIMEOUT; timeout = cn.escheduler.common.Constants.MAX_TASK_TIMEOUT;
@ -80,7 +82,7 @@ public class ExecutorController extends BaseController {
Map<String, Object> result = execService.execProcessInstance(loginUser, projectName, processDefinitionId, scheduleTime, execType, failureStrategy, Map<String, Object> result = execService.execProcessInstance(loginUser, projectName, processDefinitionId, scheduleTime, execType, failureStrategy,
startNodeList, taskDependType, warningType, startNodeList, taskDependType, warningType,
warningGroupId,receivers,receiversCc, runMode,processInstancePriority,timeout); warningGroupId,receivers,receiversCc, runMode,processInstancePriority, workerGroupId, timeout);
return returnDataList(result); return returnDataList(result);
} catch (Exception e) { } catch (Exception e) {
logger.error(START_PROCESS_INSTANCE_ERROR.getMsg(),e); logger.error(START_PROCESS_INSTANCE_ERROR.getMsg(),e);

16
escheduler-api/src/main/java/cn/escheduler/api/controller/SchedulerController.java

@ -76,13 +76,15 @@ public class SchedulerController extends BaseController{
@RequestParam(value = "failureStrategy", required = false, defaultValue = DEFAULT_FAILURE_POLICY) FailureStrategy failureStrategy, @RequestParam(value = "failureStrategy", required = false, defaultValue = DEFAULT_FAILURE_POLICY) FailureStrategy failureStrategy,
@RequestParam(value = "receivers", required = false) String receivers, @RequestParam(value = "receivers", required = false) String receivers,
@RequestParam(value = "receiversCc", required = false) String receiversCc, @RequestParam(value = "receiversCc", required = false) String receiversCc,
@RequestParam(value = "workerGroupId", required = false, defaultValue = "-1") int workerGroupId,
@RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority) { @RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority) {
logger.info("login user {}, project name: {}, process name: {}, create schedule: {}, warning type: {}, warning group id: {}," + logger.info("login user {}, project name: {}, process name: {}, create schedule: {}, warning type: {}, warning group id: {}," +
"failure policy: {},receivers : {},receiversCc : {},processInstancePriority : {}", "failure policy: {},receivers : {},receiversCc : {},processInstancePriority : {}, workGroupId:{}",
loginUser.getUserName(), projectName, processDefinitionId, schedule, warningType, warningGroupId, failureStrategy,receivers,receiversCc,processInstancePriority); loginUser.getUserName(), projectName, processDefinitionId, schedule, warningType, warningGroupId,
failureStrategy,receivers,receiversCc,processInstancePriority,workerGroupId);
try { try {
Map<String, Object> result = schedulerService.insertSchedule(loginUser, projectName, processDefinitionId, schedule, Map<String, Object> result = schedulerService.insertSchedule(loginUser, projectName, processDefinitionId, schedule,
warningType, warningGroupId, failureStrategy, receivers,receiversCc,processInstancePriority); warningType, warningGroupId, failureStrategy, receivers,receiversCc,processInstancePriority,workerGroupId);
return returnDataList(result); return returnDataList(result);
}catch (Exception e){ }catch (Exception e){
@ -113,14 +115,16 @@ public class SchedulerController extends BaseController{
@RequestParam(value = "failureStrategy", required = false, defaultValue = "END") FailureStrategy failureStrategy, @RequestParam(value = "failureStrategy", required = false, defaultValue = "END") FailureStrategy failureStrategy,
@RequestParam(value = "receivers", required = false) String receivers, @RequestParam(value = "receivers", required = false) String receivers,
@RequestParam(value = "receiversCc", required = false) String receiversCc, @RequestParam(value = "receiversCc", required = false) String receiversCc,
@RequestParam(value = "workerGroupId", required = false, defaultValue = "-1") int workerGroupId,
@RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority) { @RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority) {
logger.info("login user {}, project name: {},id: {}, updateProcessInstance schedule: {}, notify type: {}, notify mails: {}, " + logger.info("login user {}, project name: {},id: {}, updateProcessInstance schedule: {}, notify type: {}, notify mails: {}, " +
"failure policy: {},receivers : {},receiversCc : {},processInstancePriority : {}", "failure policy: {},receivers : {},receiversCc : {},processInstancePriority : {},workerGroupId:{}",
loginUser.getUserName(), projectName, id, schedule, warningType, warningGroupId, failureStrategy,receivers,receiversCc,processInstancePriority); loginUser.getUserName(), projectName, id, schedule, warningType, warningGroupId, failureStrategy,
receivers,receiversCc,processInstancePriority,workerGroupId);
try { try {
Map<String, Object> result = schedulerService.updateSchedule(loginUser, projectName, id, schedule, Map<String, Object> result = schedulerService.updateSchedule(loginUser, projectName, id, schedule,
warningType, warningGroupId, failureStrategy, receivers,receiversCc,null,processInstancePriority); warningType, warningGroupId, failureStrategy, receivers,receiversCc,null,processInstancePriority, workerGroupId);
return returnDataList(result); return returnDataList(result);
}catch (Exception e){ }catch (Exception e){

144
escheduler-api/src/main/java/cn/escheduler/api/controller/WorkerGroupController.java

@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.api.controller;
import cn.escheduler.api.enums.Status;
import cn.escheduler.api.service.WorkerGroupService;
import cn.escheduler.api.utils.Constants;
import cn.escheduler.api.utils.Result;
import cn.escheduler.dao.model.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.*;
import java.util.Map;
/**
* worker group controller
*/
@RestController
@RequestMapping("/worker-group")
public class WorkerGroupController extends BaseController{
private static final Logger logger = LoggerFactory.getLogger(WorkerGroupController.class);
@Autowired
WorkerGroupService workerGroupService;
/**
* create or update a worker group
* @param loginUser
* @param id
* @param name
* @param ipList
* @return
*/
@PostMapping(value = "/save")
@ResponseStatus(HttpStatus.OK)
public Result saveWorkerGroup(@RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "id", required = false, defaultValue = "0") int id,
@RequestParam(value = "name") String name,
@RequestParam(value = "ipList") String ipList
) {
logger.info("save worker group: login user {}, id:{}, name: {}, ipList: {} ",
loginUser.getUserName(), id, name, ipList);
try {
Map<String, Object> result = workerGroupService.saveWorkerGroup(id, name, ipList);
return returnDataList(result);
}catch (Exception e){
logger.error(Status.SAVE_ERROR.getMsg(),e);
return error(Status.SAVE_ERROR.getCode(), Status.SAVE_ERROR.getMsg());
}
}
/**
* query worker groups paging
* @param loginUser
* @param pageNo
* @param searchVal
* @param pageSize
* @return
*/
@GetMapping(value = "/list-paging")
@ResponseStatus(HttpStatus.OK)
public Result queryAllWorkerGroupsPaging(@RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam("pageNo") Integer pageNo,
@RequestParam(value = "searchVal", required = false) String searchVal,
@RequestParam("pageSize") Integer pageSize
) {
logger.info("query all worker group paging: login user {}, pageNo:{}, pageSize:{}, searchVal:{}",
loginUser.getUserName() , pageNo, pageSize, searchVal);
try {
Map<String, Object> result = workerGroupService.queryAllGroupPaging(pageNo, pageSize, searchVal);
return returnDataListPaging(result);
}catch (Exception e){
logger.error(Status.SAVE_ERROR.getMsg(),e);
return error(Status.SAVE_ERROR.getCode(), Status.SAVE_ERROR.getMsg());
}
}
/**
* query all worker groups
* @param loginUser
* @return
*/
@GetMapping(value = "/all-groups")
@ResponseStatus(HttpStatus.OK)
public Result queryAllWorkerGroups(@RequestAttribute(value = Constants.SESSION_USER) User loginUser
) {
logger.info("query all worker group: login user {}",
loginUser.getUserName() );
try {
Map<String, Object> result = workerGroupService.queryAllGroup();
return returnDataList(result);
}catch (Exception e){
logger.error(Status.SAVE_ERROR.getMsg(),e);
return error(Status.SAVE_ERROR.getCode(), Status.SAVE_ERROR.getMsg());
}
}
/**
* delete worker group by id
* @param loginUser
* @param id
* @return
*/
@GetMapping(value = "/delete-by-id")
@ResponseStatus(HttpStatus.OK)
public Result deleteById(@RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam("id") Integer id
) {
logger.info("delete worker group: login user {}, id:{} ",
loginUser.getUserName() , id);
try {
Map<String, Object> result = workerGroupService.deleteWorkerGroupById(id);
return returnDataList(result);
}catch (Exception e){
logger.error(Status.SAVE_ERROR.getMsg(),e);
return error(Status.SAVE_ERROR.getCode(), Status.SAVE_ERROR.getMsg());
}
}
}

4
escheduler-api/src/main/java/cn/escheduler/api/enums/Status.java

@ -156,6 +156,10 @@ public enum Status {
UPDATE_QUEUE_ERROR(10131, "update queue error"), UPDATE_QUEUE_ERROR(10131, "update queue error"),
NEED_NOT_UPDATE_QUEUE(10132, "no content changes, no updates are required"), NEED_NOT_UPDATE_QUEUE(10132, "no content changes, no updates are required"),
VERIFY_QUEUE_ERROR(10133,"verify queue error"), VERIFY_QUEUE_ERROR(10133,"verify queue error"),
NAME_NULL(10134,"name must be not null"),
NAME_EXIST(10135, "name {0} already exists"),
SAVE_ERROR(10136, "save error"),
UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found"), UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found"),

1
escheduler-api/src/main/java/cn/escheduler/api/quartz/ProcessScheduleJob.java

@ -132,6 +132,7 @@ public class ProcessScheduleJob implements Job {
command.setScheduleTime(scheduledFireTime); command.setScheduleTime(scheduledFireTime);
command.setStartTime(fireTime); command.setStartTime(fireTime);
command.setWarningGroupId(schedule.getWarningGroupId()); command.setWarningGroupId(schedule.getWarningGroupId());
command.setWorkerGroupId(schedule.getWorkerGroupId());
command.setWarningType(schedule.getWarningType()); command.setWarningType(schedule.getWarningType());
command.setProcessInstancePriority(schedule.getProcessInstancePriority()); command.setProcessInstancePriority(schedule.getProcessInstancePriority());

7
escheduler-api/src/main/java/cn/escheduler/api/service/ExecutorService.java

@ -90,7 +90,7 @@ public class ExecutorService extends BaseService{
FailureStrategy failureStrategy, String startNodeList, FailureStrategy failureStrategy, String startNodeList,
TaskDependType taskDependType, WarningType warningType, int warningGroupId, TaskDependType taskDependType, WarningType warningType, int warningGroupId,
String receivers, String receiversCc, RunMode runMode, String receivers, String receiversCc, RunMode runMode,
Priority processInstancePriority, Integer timeout) throws ParseException { Priority processInstancePriority, int workerGroupId, Integer timeout) throws ParseException {
Map<String, Object> result = new HashMap<>(5); Map<String, Object> result = new HashMap<>(5);
// timeout is valid // timeout is valid
if (timeout <= 0 || timeout > MAX_TASK_TIMEOUT) { if (timeout <= 0 || timeout > MAX_TASK_TIMEOUT) {
@ -115,7 +115,7 @@ public class ExecutorService extends BaseService{
*/ */
int create = this.createCommand(commandType, processDefinitionId, int create = this.createCommand(commandType, processDefinitionId,
taskDependType, failureStrategy, startNodeList, cronTime, warningType, loginUser.getId(), taskDependType, failureStrategy, startNodeList, cronTime, warningType, loginUser.getId(),
warningGroupId, runMode,processInstancePriority); warningGroupId, runMode,processInstancePriority, workerGroupId);
if(create > 0 ){ if(create > 0 ){
/** /**
* according to the process definition ID updateProcessInstance and CC recipient * according to the process definition ID updateProcessInstance and CC recipient
@ -405,7 +405,7 @@ public class ExecutorService extends BaseService{
TaskDependType nodeDep, FailureStrategy failureStrategy, TaskDependType nodeDep, FailureStrategy failureStrategy,
String startNodeList, String schedule, WarningType warningType, String startNodeList, String schedule, WarningType warningType,
int excutorId, int warningGroupId, int excutorId, int warningGroupId,
RunMode runMode,Priority processInstancePriority) throws ParseException { RunMode runMode,Priority processInstancePriority, int workerGroupId) throws ParseException {
/** /**
* instantiate command schedule instance * instantiate command schedule instance
@ -436,6 +436,7 @@ public class ExecutorService extends BaseService{
command.setExecutorId(excutorId); command.setExecutorId(excutorId);
command.setWarningGroupId(warningGroupId); command.setWarningGroupId(warningGroupId);
command.setProcessInstancePriority(processInstancePriority); command.setProcessInstancePriority(processInstancePriority);
command.setWorkerGroupId(workerGroupId);
Date start = null; Date start = null;
Date end = null; Date end = null;

7
escheduler-api/src/main/java/cn/escheduler/api/service/SchedulerService.java

@ -88,7 +88,7 @@ public class SchedulerService extends BaseService {
@Transactional(value = "TransactionManager", rollbackFor = Exception.class) @Transactional(value = "TransactionManager", rollbackFor = Exception.class)
public Map<String, Object> insertSchedule(User loginUser, String projectName, Integer processDefineId, String schedule, WarningType warningType, public Map<String, Object> insertSchedule(User loginUser, String projectName, Integer processDefineId, String schedule, WarningType warningType,
int warningGroupId, FailureStrategy failureStrategy, int warningGroupId, FailureStrategy failureStrategy,
String receivers, String receiversCc,Priority processInstancePriority) throws IOException { String receivers, String receiversCc,Priority processInstancePriority, int workerGroupId) throws IOException {
Map<String, Object> result = new HashMap<String, Object>(5); Map<String, Object> result = new HashMap<String, Object>(5);
@ -133,6 +133,7 @@ public class SchedulerService extends BaseService {
scheduleObj.setUserName(loginUser.getUserName()); scheduleObj.setUserName(loginUser.getUserName());
scheduleObj.setReleaseState(ReleaseState.OFFLINE); scheduleObj.setReleaseState(ReleaseState.OFFLINE);
scheduleObj.setProcessInstancePriority(processInstancePriority); scheduleObj.setProcessInstancePriority(processInstancePriority);
scheduleObj.setWorkerGroupId(workerGroupId);
scheduleMapper.insert(scheduleObj); scheduleMapper.insert(scheduleObj);
/** /**
@ -156,13 +157,14 @@ public class SchedulerService extends BaseService {
* @param warningGroupId * @param warningGroupId
* @param failureStrategy * @param failureStrategy
* @param scheduleStatus * @param scheduleStatus
* @param workerGroupId
* @return * @return
*/ */
@Transactional(value = "TransactionManager", rollbackFor = Exception.class) @Transactional(value = "TransactionManager", rollbackFor = Exception.class)
public Map<String, Object> updateSchedule(User loginUser, String projectName, Integer id, String scheduleExpression, WarningType warningType, public Map<String, Object> updateSchedule(User loginUser, String projectName, Integer id, String scheduleExpression, WarningType warningType,
int warningGroupId, FailureStrategy failureStrategy, int warningGroupId, FailureStrategy failureStrategy,
String receivers, String receiversCc, ReleaseState scheduleStatus, String receivers, String receiversCc, ReleaseState scheduleStatus,
Priority processInstancePriority) throws IOException { Priority processInstancePriority, int workerGroupId) throws IOException {
Map<String, Object> result = new HashMap<String, Object>(5); Map<String, Object> result = new HashMap<String, Object>(5);
Project project = projectMapper.queryByName(projectName); Project project = projectMapper.queryByName(projectName);
@ -221,6 +223,7 @@ public class SchedulerService extends BaseService {
if (scheduleStatus != null) { if (scheduleStatus != null) {
schedule.setReleaseState(scheduleStatus); schedule.setReleaseState(scheduleStatus);
} }
schedule.setWorkerGroupId(workerGroupId);
schedule.setUpdateTime(now); schedule.setUpdateTime(now);
schedule.setProcessInstancePriority(processInstancePriority); schedule.setProcessInstancePriority(processInstancePriority);
scheduleMapper.update(schedule); scheduleMapper.update(schedule);

155
escheduler-api/src/main/java/cn/escheduler/api/service/WorkerGroupService.java

@ -0,0 +1,155 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.api.service;
import cn.escheduler.api.enums.Status;
import cn.escheduler.api.utils.Constants;
import cn.escheduler.api.utils.PageInfo;
import cn.escheduler.dao.mapper.WorkerGroupMapper;
import cn.escheduler.dao.model.User;
import cn.escheduler.dao.model.WorkerGroup;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* work group service
*/
@Service
public class WorkerGroupService extends BaseService {
@Autowired
WorkerGroupMapper workerGroupMapper;
/**
* create or update a worker group
* @param id
* @param name
* @param ipList
* @return
*/
public Map<String, Object> saveWorkerGroup(int id, String name, String ipList){
Map<String, Object> result = new HashMap<>(5);
if(StringUtils.isEmpty(name)){
putMsg(result, Status.NAME_NULL);
return result;
}
Date now = new Date();
WorkerGroup workerGroup = null;
if(id != 0){
workerGroup = workerGroupMapper.queryById(id);
}else{
workerGroup = new WorkerGroup();
workerGroup.setCreateTime(now);
}
workerGroup.setName(name);
workerGroup.setIpList(ipList);
workerGroup.setUpdateTime(now);
if(checkWorkerGroupNameExists(workerGroup)){
putMsg(result, Status.NAME_EXIST, workerGroup.getName());
return result;
}
if(workerGroup.getId() != 0 ){
workerGroupMapper.update(workerGroup);
}else{
workerGroupMapper.insert(workerGroup);
}
putMsg(result, Status.SUCCESS);
return result;
}
/**
* check worker group name exists
* @param workerGroup
* @return
*/
private boolean checkWorkerGroupNameExists(WorkerGroup workerGroup) {
List<WorkerGroup> workerGroupList = workerGroupMapper.queryWorkerGroupByName(workerGroup.getName());
if(workerGroupList.size() > 0 ){
// new group has same name..
if(workerGroup.getId() == 0){
return true;
}
// update group...
for(WorkerGroup group : workerGroupList){
if(group.getId() != workerGroup.getId()){
return true;
}
}
}
return false;
}
/**
* query worker group paging
* @param pageNo
* @param pageSize
* @param searchVal
* @return
*/
public Map<String,Object> queryAllGroupPaging(Integer pageNo, Integer pageSize, String searchVal) {
Map<String, Object> result = new HashMap<>(5);
int count = workerGroupMapper.countPaging(searchVal);
PageInfo<WorkerGroup> pageInfo = new PageInfo<>(pageNo, pageSize);
List<WorkerGroup> workerGroupList = workerGroupMapper.queryListPaging(pageInfo.getStart(), pageSize, searchVal);
pageInfo.setTotalCount(count);
pageInfo.setLists(workerGroupList);
result.put(Constants.DATA_LIST, pageInfo);
putMsg(result, Status.SUCCESS);
return result;
}
/**
* delete worker group by id
* @param id
* @return
*/
public Map<String,Object> deleteWorkerGroupById(Integer id) {
Map<String, Object> result = new HashMap<>(5);
int delete = workerGroupMapper.deleteById(id);
putMsg(result, Status.SUCCESS);
return result;
}
/**
* query all worker group
* @return
*/
public Map<String,Object> queryAllGroup() {
Map<String, Object> result = new HashMap<>(5);
List<WorkerGroup> workerGroupList = workerGroupMapper.queryAllWorkerGroup();
result.put(Constants.DATA_LIST, workerGroupList);
putMsg(result, Status.SUCCESS);
return result;
}
}

16
escheduler-common/src/main/java/cn/escheduler/common/model/TaskNode.java

@ -113,6 +113,12 @@ public class TaskNode {
*/ */
private Priority taskInstancePriority; private Priority taskInstancePriority;
/**
* worker group id
*/
private int workerGroupId;
/** /**
* task time out * task time out
*/ */
@ -224,6 +230,7 @@ public class TaskNode {
Objects.equals(extras, taskNode.extras) && Objects.equals(extras, taskNode.extras) &&
Objects.equals(runFlag, taskNode.runFlag) && Objects.equals(runFlag, taskNode.runFlag) &&
Objects.equals(dependence, taskNode.dependence) && Objects.equals(dependence, taskNode.dependence) &&
Objects.equals(workerGroupId, taskNode.workerGroupId) &&
CollectionUtils.equalLists(depList, taskNode.depList); CollectionUtils.equalLists(depList, taskNode.depList);
} }
@ -303,6 +310,15 @@ public class TaskNode {
", dependence='" + dependence + '\'' + ", dependence='" + dependence + '\'' +
", taskInstancePriority=" + taskInstancePriority + ", taskInstancePriority=" + taskInstancePriority +
", timeout='" + timeout + '\'' + ", timeout='" + timeout + '\'' +
", workerGroupId='" + workerGroupId + '\'' +
'}'; '}';
} }
public int getWorkerGroupId() {
return workerGroupId;
}
public void setWorkerGroupId(int workerGroupId) {
this.workerGroupId = workerGroupId;
}
} }

9
escheduler-common/src/main/java/cn/escheduler/common/queue/ITaskQueue.java

@ -54,10 +54,17 @@ public interface ITaskQueue {
* an element pops out of the queue * an element pops out of the queue
* *
* @param key queue name * @param key queue name
* @param remove where remove the element
* @return * @return
*/ */
String poll(String key); String poll(String key, boolean remove);
/**
* remove a element from queue
* @param key
* @param value
*/
void removeNode(String key, String value);
/** /**
* add an element to the set * add an element to the set

33
escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java

@ -137,10 +137,11 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
* *
* 流程实例优先级_流程实例id_任务优先级_任务id high <- low * 流程实例优先级_流程实例id_任务优先级_任务id high <- low
* @param key task queue name * @param key task queue name
* @param remove whether remove the element
* @return the task id to be executed * @return the task id to be executed
*/ */
@Override @Override
public String poll(String key) { public String poll(String key, boolean remove) {
try{ try{
CuratorFramework zk = getZkClient(); CuratorFramework zk = getZkClient();
String tasksQueuePath = getTasksPath(key) + Constants.SINGLE_SLASH; String tasksQueuePath = getTasksPath(key) + Constants.SINGLE_SLASH;
@ -181,18 +182,11 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
String[] vals = targetTaskKey.split(Constants.UNDERLINE); String[] vals = targetTaskKey.split(Constants.UNDERLINE);
try{ if(remove){
zk.delete().forPath(taskIdPath); removeNode(key, targetTaskKey);
// String path = conf.getString(Constants.ZOOKEEPER_SCHEDULER_ROOT) + Constants.SINGLE_SLASH + Constants.SCHEDULER_TASKS_QUEUE + "_remove" + Constants.SINGLE_SLASH + targetTaskKey;
// getZkClient().create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path,
// Bytes.toBytes(targetTaskKey));
}catch(Exception e){
logger.error(String.format("delete task:%s from zookeeper fail, task detail: %s exception" ,targetTaskKey, vals[vals.length - 1]) ,e);
} }
logger.info("consume task: {},there still have {} tasks need to be executed", targetTaskKey, size - 1); logger.info("consume task: {},there still have {} tasks need to be executed", vals[vals.length - 1], size - 1);
return targetTaskKey;
return vals[vals.length - 1];
}else{ }else{
logger.error("should not go here, task queue poll error, please check!"); logger.error("should not go here, task queue poll error, please check!");
} }
@ -204,6 +198,21 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
return null; return null;
} }
@Override
public void removeNode(String key, String nodeValue){
CuratorFramework zk = getZkClient();
String tasksQueuePath = getTasksPath(key) + Constants.SINGLE_SLASH;
String taskIdPath = tasksQueuePath + nodeValue;
logger.info("consume task {}", taskIdPath);
try{
zk.delete().forPath(taskIdPath);
}catch(Exception e){
logger.error(String.format("delete task:%s from zookeeper fail, exception:" ,nodeValue) ,e);
}
}
/** /**

6
escheduler-common/src/test/java/cn/escheduler/common/queue/TaskQueueImplTest.java

@ -49,9 +49,9 @@ public class TaskQueueImplTest {
tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"4"); tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"4");
//pop //pop
String node1 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE); String node1 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, false);
assertEquals(node1,"1"); assertEquals(node1,"1");
String node2 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE); String node2 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, false);
assertEquals(node2,"2"); assertEquals(node2,"2");
//sadd //sadd
@ -99,7 +99,7 @@ public class TaskQueueImplTest {
} }
} }
String node1 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE); String node1 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, false);
assertEquals(node1,"0"); assertEquals(node1,"0");
//clear all data //clear all data

26
escheduler-dao/readme.txt

@ -12,9 +12,9 @@ CREATE TABLE `t_escheduler_access_token` (
PRIMARY KEY (`id`) PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8; ) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8;
CREATE TABLE `escheduler`.`t_escheduler_error_command` ( CREATE TABLE `t_escheduler_error_command` (
`id` int(11) NOT NULL COMMENT '主键', `id` int(11) NOT NULL COMMENT '主键',
`command_type` tinyint(4) NULL DEFAULT NULL COMMENT '命令类型:0 启动工作流,1 从当前节点开始执行,2 恢复被容错的工作流,3 恢复暂停流程 4 从失败节点开始执行', `command_type` tinyint(4) NULL DEFAULT NULL COMMENT '命令类型:0 启动工作流,1 从当前节点开始执行,2 恢复被容错的工作流,3 恢复暂停流程,4 从失败节点开始执行,5 补数,6 调度,7 重跑,8 暂停,9 停止,10 恢复等待线程',
`executor_id` int(11) NULL DEFAULT NULL COMMENT '命令执行者', `executor_id` int(11) NULL DEFAULT NULL COMMENT '命令执行者',
`process_definition_id` int(11) NULL DEFAULT NULL COMMENT '流程定义id', `process_definition_id` int(11) NULL DEFAULT NULL COMMENT '流程定义id',
`command_param` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '命令的参数(json格式)', `command_param` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '命令的参数(json格式)',
@ -30,3 +30,25 @@ CREATE TABLE `escheduler`.`t_escheduler_error_command` (
`message` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '执行信息', `message` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '执行信息',
PRIMARY KEY (`id`) USING BTREE PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic; ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
CREATE TABLE `t_escheduler_worker_group` (
`id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT 'id',
`name` varchar(256) CHARACTER SET latin1 COLLATE latin1_swedish_ci NULL DEFAULT NULL COMMENT '组名称',
`ip_list` varchar(256) CHARACTER SET latin1 COLLATE latin1_swedish_ci NULL DEFAULT NULL COMMENT 'worker地址列表',
`create_time` datetime(0) NULL DEFAULT NULL COMMENT '创建时间',
`update_time` datetime(0) NULL DEFAULT NULL COMMENT '更新时间',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
ALTER TABLE `t_escheduler_task_instance`
ADD COLUMN `worker_group_id` int(11) NULL DEFAULT -1 COMMENT '任务指定运行的worker分组' AFTER `task_instance_priority`;
ALTER TABLE `t_escheduler_command`
ADD COLUMN `worker_group_id` int(11) NULL DEFAULT -1 COMMENT '任务指定运行的worker分组' NULL AFTER `process_instance_priority`;
ALTER TABLE `t_escheduler_error_command`
ADD COLUMN `worker_group_id` int(11) NULL DEFAULT -1 COMMENT '任务指定运行的worker分组' NULL AFTER `process_instance_priority`;
ALTER TABLE `t_escheduler_schedules`
ADD COLUMN `worker_group_id` int(11) NULL DEFAULT -1 COMMENT '任务指定运行的worker分组' NULL AFTER `process_instance_priority`;

14
escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java

@ -88,6 +88,9 @@ public class ProcessDao extends AbstractBaseDao {
@Autowired @Autowired
private ResourceMapper resourceMapper; private ResourceMapper resourceMapper;
@Autowired
private WorkerGroupMapper workerGroupMapper;
@Autowired @Autowired
private ErrorCommandMapper errorCommandMapper; private ErrorCommandMapper errorCommandMapper;
@ -115,6 +118,7 @@ public class ProcessDao extends AbstractBaseDao {
scheduleMapper = getMapper(ScheduleMapper.class); scheduleMapper = getMapper(ScheduleMapper.class);
udfFuncMapper = getMapper(UdfFuncMapper.class); udfFuncMapper = getMapper(UdfFuncMapper.class);
resourceMapper = getMapper(ResourceMapper.class); resourceMapper = getMapper(ResourceMapper.class);
workerGroupMapper = getMapper(WorkerGroupMapper.class);
taskQueue = TaskQueueFactory.getTaskQueueInstance(); taskQueue = TaskQueueFactory.getTaskQueueInstance();
} }
@ -477,6 +481,7 @@ public class ProcessDao extends AbstractBaseDao {
processInstance.setProcessInstanceJson(processDefinition.getProcessDefinitionJson()); processInstance.setProcessInstanceJson(processDefinition.getProcessDefinitionJson());
// set process instance priority // set process instance priority
processInstance.setProcessInstancePriority(command.getProcessInstancePriority()); processInstance.setProcessInstancePriority(command.getProcessInstancePriority());
processInstance.setWorkerGroupId(command.getWorkerGroupId());
return processInstance; return processInstance;
} }
@ -1575,6 +1580,15 @@ public class ProcessDao extends AbstractBaseDao {
return userMapper.queryQueueByProcessInstanceId(processInstanceId); return userMapper.queryQueueByProcessInstanceId(processInstanceId);
} }
/**
* query worker group by id
* @param workerGroupId
* @return
*/
public WorkerGroup queryWorkerGroupById(int workerGroupId){
return workerGroupMapper.queryById(workerGroupId);
}
} }

2
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/CommandMapper.java

@ -79,6 +79,7 @@ public interface CommandMapper {
@Result(property = "scheduleTime", column = "schedule_time", javaType = Timestamp.class, jdbcType = JdbcType.DATE), @Result(property = "scheduleTime", column = "schedule_time", javaType = Timestamp.class, jdbcType = JdbcType.DATE),
@Result(property = "updateTime", column = "update_time", javaType = Timestamp.class, jdbcType = JdbcType.DATE), @Result(property = "updateTime", column = "update_time", javaType = Timestamp.class, jdbcType = JdbcType.DATE),
@Result(property = "startTime", column = "start_time", javaType = Timestamp.class, jdbcType = JdbcType.DATE), @Result(property = "startTime", column = "start_time", javaType = Timestamp.class, jdbcType = JdbcType.DATE),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) @Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
}) })
@SelectProvider(type = CommandMapperProvider.class, method = "queryOneCommand") @SelectProvider(type = CommandMapperProvider.class, method = "queryOneCommand")
@ -101,6 +102,7 @@ public interface CommandMapper {
@Result(property = "scheduleTime", column = "schedule_time", javaType = Timestamp.class, jdbcType = JdbcType.DATE), @Result(property = "scheduleTime", column = "schedule_time", javaType = Timestamp.class, jdbcType = JdbcType.DATE),
@Result(property = "updateTime", column = "update_time", javaType = Timestamp.class, jdbcType = JdbcType.DATE), @Result(property = "updateTime", column = "update_time", javaType = Timestamp.class, jdbcType = JdbcType.DATE),
@Result(property = "startTime", column = "start_time", javaType = Timestamp.class, jdbcType = JdbcType.DATE), @Result(property = "startTime", column = "start_time", javaType = Timestamp.class, jdbcType = JdbcType.DATE),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) @Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
}) })
@SelectProvider(type = CommandMapperProvider.class, method = "queryAllCommand") @SelectProvider(type = CommandMapperProvider.class, method = "queryAllCommand")

6
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/CommandMapperProvider.java

@ -51,6 +51,7 @@ public class CommandMapperProvider {
VALUES("`warning_group_id`", "#{command.warningGroupId}"); VALUES("`warning_group_id`", "#{command.warningGroupId}");
VALUES("`schedule_time`", "#{command.scheduleTime}"); VALUES("`schedule_time`", "#{command.scheduleTime}");
VALUES("`update_time`", "#{command.updateTime}"); VALUES("`update_time`", "#{command.updateTime}");
VALUES("`worker_group_id`", "#{command.workerGroupId}");
VALUES("`start_time`", "#{command.startTime}"); VALUES("`start_time`", "#{command.startTime}");
} }
@ -95,6 +96,7 @@ public class CommandMapperProvider {
SET("`warning_group_id`=#{command.warningGroupId}"); SET("`warning_group_id`=#{command.warningGroupId}");
SET("`schedule_time`=#{command.scheduleTime}"); SET("`schedule_time`=#{command.scheduleTime}");
SET("`update_time`=#{command.updateTime}"); SET("`update_time`=#{command.updateTime}");
SET("`worker_group_id`=#{command.workerGroupId}");
SET("`start_time`=#{command.startTime}"); SET("`start_time`=#{command.startTime}");
WHERE("`id`=#{command.id}"); WHERE("`id`=#{command.id}");
@ -166,8 +168,4 @@ public class CommandMapperProvider {
} }
}.toString(); }.toString();
} }
} }

2
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ErrorCommandMapperProvider.java

@ -22,6 +22,7 @@ public class ErrorCommandMapperProvider {
return new SQL() { return new SQL() {
{ {
INSERT_INTO(TABLE_NAME); INSERT_INTO(TABLE_NAME);
VALUES("`id`", "#{errorCommand.id}");
VALUES("`command_type`", EnumFieldUtil.genFieldStr("errorCommand.commandType", CommandType.class)); VALUES("`command_type`", EnumFieldUtil.genFieldStr("errorCommand.commandType", CommandType.class));
VALUES("`process_definition_id`", "#{errorCommand.processDefinitionId}"); VALUES("`process_definition_id`", "#{errorCommand.processDefinitionId}");
VALUES("`executor_id`", "#{errorCommand.executorId}"); VALUES("`executor_id`", "#{errorCommand.executorId}");
@ -34,6 +35,7 @@ public class ErrorCommandMapperProvider {
VALUES("`schedule_time`", "#{errorCommand.scheduleTime}"); VALUES("`schedule_time`", "#{errorCommand.scheduleTime}");
VALUES("`update_time`", "#{errorCommand.updateTime}"); VALUES("`update_time`", "#{errorCommand.updateTime}");
VALUES("`start_time`", "#{errorCommand.startTime}"); VALUES("`start_time`", "#{errorCommand.startTime}");
VALUES("`worker_group_id`", "#{errorCommand.workerGroupId}");
VALUES("`message`", "#{errorCommand.message}"); VALUES("`message`", "#{errorCommand.message}");
} }
}.toString(); }.toString();

11
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessInstanceMapper.java

@ -94,6 +94,7 @@ public interface ProcessInstanceMapper {
@Result(property = "dependenceScheduleTimes", column = "dependence_schedule_times", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "dependenceScheduleTimes", column = "dependence_schedule_times", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "duration", column = "duration", javaType = Long.class, jdbcType = JdbcType.BIGINT), @Result(property = "duration", column = "duration", javaType = Long.class, jdbcType = JdbcType.BIGINT),
@Result(property = "tenantCode", column = "tenant_code", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "tenantCode", column = "tenant_code", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) @Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
}) })
@SelectProvider(type = ProcessInstanceMapperProvider.class, method = "queryDetailById") @SelectProvider(type = ProcessInstanceMapperProvider.class, method = "queryDetailById")
@ -131,6 +132,7 @@ public interface ProcessInstanceMapper {
@Result(property = "connects", column = "connects", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "connects", column = "connects", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "historyCmd", column = "history_cmd", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "historyCmd", column = "history_cmd", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "dependenceScheduleTimes", column = "dependence_schedule_times", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "dependenceScheduleTimes", column = "dependence_schedule_times", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) @Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
}) })
@SelectProvider(type = ProcessInstanceMapperProvider.class, method = "queryById") @SelectProvider(type = ProcessInstanceMapperProvider.class, method = "queryById")
@ -168,6 +170,7 @@ public interface ProcessInstanceMapper {
@Result(property = "historyCmd", column = "history_cmd", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "historyCmd", column = "history_cmd", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "dependenceScheduleTimes", column = "dependence_schedule_times", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "dependenceScheduleTimes", column = "dependence_schedule_times", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) @Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
}) })
@ -205,6 +208,7 @@ public interface ProcessInstanceMapper {
@Result(property = "historyCmd", column = "history_cmd", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "historyCmd", column = "history_cmd", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "dependenceScheduleTimes", column = "dependence_schedule_times", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "dependenceScheduleTimes", column = "dependence_schedule_times", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) @Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
}) })
@ -251,6 +255,7 @@ public interface ProcessInstanceMapper {
@Result(property = "dependenceScheduleTimes", column = "dependence_schedule_times", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "dependenceScheduleTimes", column = "dependence_schedule_times", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "duration", column = "duration", javaType = Long.class, jdbcType = JdbcType.BIGINT), @Result(property = "duration", column = "duration", javaType = Long.class, jdbcType = JdbcType.BIGINT),
@Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) @Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
}) })
@ -346,6 +351,7 @@ public interface ProcessInstanceMapper {
@Result(property = "dependenceScheduleTimes", column = "dependence_schedule_times", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "dependenceScheduleTimes", column = "dependence_schedule_times", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "duration", column = "duration", javaType = Long.class, jdbcType = JdbcType.BIGINT), @Result(property = "duration", column = "duration", javaType = Long.class, jdbcType = JdbcType.BIGINT),
@Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) @Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
}) })
@ -437,6 +443,7 @@ public interface ProcessInstanceMapper {
@Result(property = "dependenceScheduleTimes", column = "dependence_schedule_times", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "dependenceScheduleTimes", column = "dependence_schedule_times", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "duration", column = "duration", javaType = Long.class, jdbcType = JdbcType.BIGINT), @Result(property = "duration", column = "duration", javaType = Long.class, jdbcType = JdbcType.BIGINT),
@Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) @Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
}) })
@ -480,6 +487,7 @@ public interface ProcessInstanceMapper {
@Result(property = "dependenceScheduleTimes", column = "dependence_schedule_times", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "dependenceScheduleTimes", column = "dependence_schedule_times", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "duration", column = "duration", javaType = Long.class, jdbcType = JdbcType.BIGINT), @Result(property = "duration", column = "duration", javaType = Long.class, jdbcType = JdbcType.BIGINT),
@Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) @Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
}) })
@ -523,6 +531,7 @@ public interface ProcessInstanceMapper {
@Result(property = "historyCmd", column = "history_cmd", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "historyCmd", column = "history_cmd", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "dependenceScheduleTimes", column = "dependence_schedule_times", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "dependenceScheduleTimes", column = "dependence_schedule_times", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) @Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
}) })
@ -564,6 +573,7 @@ public interface ProcessInstanceMapper {
@Result(property = "historyCmd", column = "history_cmd", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "historyCmd", column = "history_cmd", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "dependenceScheduleTimes", column = "dependence_schedule_times", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "dependenceScheduleTimes", column = "dependence_schedule_times", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) @Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
}) })
@SelectProvider(type = ProcessInstanceMapperProvider.class, method = "queryLastRunningProcess") @SelectProvider(type = ProcessInstanceMapperProvider.class, method = "queryLastRunningProcess")
@ -605,6 +615,7 @@ public interface ProcessInstanceMapper {
@Result(property = "historyCmd", column = "history_cmd", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "historyCmd", column = "history_cmd", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "dependenceScheduleTimes", column = "dependence_schedule_times", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "dependenceScheduleTimes", column = "dependence_schedule_times", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) @Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
}) })
@SelectProvider(type = ProcessInstanceMapperProvider.class, method = "queryLastManualProcess") @SelectProvider(type = ProcessInstanceMapperProvider.class, method = "queryLastManualProcess")

2
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessInstanceMapperProvider.java

@ -67,6 +67,7 @@ public class ProcessInstanceMapperProvider {
VALUES("`dependence_schedule_times`", "#{processInstance.dependenceScheduleTimes}"); VALUES("`dependence_schedule_times`", "#{processInstance.dependenceScheduleTimes}");
VALUES("`is_sub_process`", EnumFieldUtil.genFieldStr("processInstance.isSubProcess", Flag.class)); VALUES("`is_sub_process`", EnumFieldUtil.genFieldStr("processInstance.isSubProcess", Flag.class));
VALUES("`executor_id`", "#{processInstance.executorId}"); VALUES("`executor_id`", "#{processInstance.executorId}");
VALUES("`worker_group_id`", "#{processInstance.workerGroupId}");
VALUES("`process_instance_priority`", EnumFieldUtil.genFieldStr("processInstance.processInstancePriority", Priority.class)); VALUES("`process_instance_priority`", EnumFieldUtil.genFieldStr("processInstance.processInstancePriority", Priority.class));
} }
}.toString(); }.toString();
@ -139,6 +140,7 @@ public class ProcessInstanceMapperProvider {
SET("`dependence_schedule_times`=#{processInstance.dependenceScheduleTimes}"); SET("`dependence_schedule_times`=#{processInstance.dependenceScheduleTimes}");
SET("`is_sub_process`="+EnumFieldUtil.genFieldStr("processInstance.isSubProcess", Flag.class)); SET("`is_sub_process`="+EnumFieldUtil.genFieldStr("processInstance.isSubProcess", Flag.class));
SET("`executor_id`=#{processInstance.executorId}"); SET("`executor_id`=#{processInstance.executorId}");
SET("`worker_group_id`=#{processInstance.workerGroupId}");
WHERE("`id`=#{processInstance.id}"); WHERE("`id`=#{processInstance.id}");

4
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ScheduleMapper.java

@ -75,6 +75,7 @@ public interface ScheduleMapper {
@Result(property = "userName", column = "user_name", javaType = String.class, jdbcType = JdbcType.INTEGER), @Result(property = "userName", column = "user_name", javaType = String.class, jdbcType = JdbcType.INTEGER),
@Result(property = "releaseState", column = "release_state", typeHandler = EnumOrdinalTypeHandler.class, javaType = ReleaseState.class, jdbcType = JdbcType.TINYINT), @Result(property = "releaseState", column = "release_state", typeHandler = EnumOrdinalTypeHandler.class, javaType = ReleaseState.class, jdbcType = JdbcType.TINYINT),
@Result(property = "warningGroupId", column = "warning_group_id", javaType = int.class, jdbcType = JdbcType.INTEGER), @Result(property = "warningGroupId", column = "warning_group_id", javaType = int.class, jdbcType = JdbcType.INTEGER),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = int.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) @Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
}) })
@SelectProvider(type = ScheduleMapperProvider.class, method = "queryByProcessDefineIdPaging") @SelectProvider(type = ScheduleMapperProvider.class, method = "queryByProcessDefineIdPaging")
@ -117,6 +118,7 @@ public interface ScheduleMapper {
@Result(property = "userName", column = "user_name", javaType = String.class, jdbcType = JdbcType.INTEGER), @Result(property = "userName", column = "user_name", javaType = String.class, jdbcType = JdbcType.INTEGER),
@Result(property = "releaseState", column = "release_state", typeHandler = EnumOrdinalTypeHandler.class, javaType = ReleaseState.class, jdbcType = JdbcType.TINYINT), @Result(property = "releaseState", column = "release_state", typeHandler = EnumOrdinalTypeHandler.class, javaType = ReleaseState.class, jdbcType = JdbcType.TINYINT),
@Result(property = "warningGroupId", column = "warning_group_id", javaType = int.class, jdbcType = JdbcType.INTEGER), @Result(property = "warningGroupId", column = "warning_group_id", javaType = int.class, jdbcType = JdbcType.INTEGER),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = int.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) @Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
}) })
@SelectProvider(type = ScheduleMapperProvider.class, method = "querySchedulerListByProjectName") @SelectProvider(type = ScheduleMapperProvider.class, method = "querySchedulerListByProjectName")
@ -141,6 +143,7 @@ public interface ScheduleMapper {
@Result(property = "userId", column = "user_id", javaType = int.class, jdbcType = JdbcType.INTEGER), @Result(property = "userId", column = "user_id", javaType = int.class, jdbcType = JdbcType.INTEGER),
@Result(property = "releaseState", column = "release_state", typeHandler = EnumOrdinalTypeHandler.class, javaType = ReleaseState.class, jdbcType = JdbcType.TINYINT), @Result(property = "releaseState", column = "release_state", typeHandler = EnumOrdinalTypeHandler.class, javaType = ReleaseState.class, jdbcType = JdbcType.TINYINT),
@Result(property = "warningGroupId", column = "warning_group_id", javaType = int.class, jdbcType = JdbcType.INTEGER), @Result(property = "warningGroupId", column = "warning_group_id", javaType = int.class, jdbcType = JdbcType.INTEGER),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = int.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) @Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
}) })
@SelectProvider(type = ScheduleMapperProvider.class, method = "queryById") @SelectProvider(type = ScheduleMapperProvider.class, method = "queryById")
@ -164,6 +167,7 @@ public interface ScheduleMapper {
@Result(property = "userId", column = "user_id", javaType = int.class, jdbcType = JdbcType.INTEGER), @Result(property = "userId", column = "user_id", javaType = int.class, jdbcType = JdbcType.INTEGER),
@Result(property = "releaseState", column = "release_state", typeHandler = EnumOrdinalTypeHandler.class, javaType = ReleaseState.class, jdbcType = JdbcType.TINYINT), @Result(property = "releaseState", column = "release_state", typeHandler = EnumOrdinalTypeHandler.class, javaType = ReleaseState.class, jdbcType = JdbcType.TINYINT),
@Result(property = "warningGroupId", column = "warning_group_id", javaType = int.class, jdbcType = JdbcType.INTEGER), @Result(property = "warningGroupId", column = "warning_group_id", javaType = int.class, jdbcType = JdbcType.INTEGER),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = int.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) @Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
}) })
@SelectProvider(type = ScheduleMapperProvider.class, method = "selectAllByProcessDefineArray") @SelectProvider(type = ScheduleMapperProvider.class, method = "selectAllByProcessDefineArray")

2
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ScheduleMapperProvider.java

@ -48,6 +48,7 @@ public class ScheduleMapperProvider {
VALUES("`user_id`", "#{schedule.userId}"); VALUES("`user_id`", "#{schedule.userId}");
VALUES("`release_state`", EnumFieldUtil.genFieldStr("schedule.releaseState", ReleaseState.class)); VALUES("`release_state`", EnumFieldUtil.genFieldStr("schedule.releaseState", ReleaseState.class));
VALUES("`warning_group_id`", "#{schedule.warningGroupId}"); VALUES("`warning_group_id`", "#{schedule.warningGroupId}");
VALUES("`worker_group_id`", "#{schedule.workerGroupId}");
VALUES("`process_instance_priority`", EnumFieldUtil.genFieldStr("schedule.processInstancePriority", Priority.class)); VALUES("`process_instance_priority`", EnumFieldUtil.genFieldStr("schedule.processInstancePriority", Priority.class));
}}.toString(); }}.toString();
} }
@ -67,6 +68,7 @@ public class ScheduleMapperProvider {
SET("`user_id`=#{schedule.userId}"); SET("`user_id`=#{schedule.userId}");
SET("`release_state`=" + EnumFieldUtil.genFieldStr("schedule.releaseState", ReleaseState.class)); SET("`release_state`=" + EnumFieldUtil.genFieldStr("schedule.releaseState", ReleaseState.class));
SET("`warning_group_id`=#{schedule.warningGroupId}"); SET("`warning_group_id`=#{schedule.warningGroupId}");
SET("`worker_group_id`=#{schedule.workerGroupId}");
SET("`process_instance_priority`="+ EnumFieldUtil.genFieldStr("schedule.processInstancePriority", Priority.class)); SET("`process_instance_priority`="+ EnumFieldUtil.genFieldStr("schedule.processInstancePriority", Priority.class));
WHERE("`id` = #{schedule.id}"); WHERE("`id` = #{schedule.id}");

5
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/TaskInstanceMapper.java

@ -88,6 +88,7 @@ public interface TaskInstanceMapper {
@Result(property = "appLink", column = "app_link", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "appLink", column = "app_link", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "duration", column = "duration", javaType = Long.class, jdbcType = JdbcType.BIGINT), @Result(property = "duration", column = "duration", javaType = Long.class, jdbcType = JdbcType.BIGINT),
@Result(property = "flag", column = "flag", typeHandler = EnumOrdinalTypeHandler.class, javaType = Flag.class, jdbcType = JdbcType.TINYINT), @Result(property = "flag", column = "flag", typeHandler = EnumOrdinalTypeHandler.class, javaType = Flag.class, jdbcType = JdbcType.TINYINT),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "taskInstancePriority", column = "task_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) @Result(property = "taskInstancePriority", column = "task_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
}) })
@SelectProvider(type = TaskInstanceMapperProvider.class, method = "queryById") @SelectProvider(type = TaskInstanceMapperProvider.class, method = "queryById")
@ -131,6 +132,7 @@ public interface TaskInstanceMapper {
@Result(property = "appLink", column = "app_link", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "appLink", column = "app_link", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "duration", column = "duration", javaType = Long.class, jdbcType = JdbcType.BIGINT), @Result(property = "duration", column = "duration", javaType = Long.class, jdbcType = JdbcType.BIGINT),
@Result(property = "flag", column = "flag", typeHandler = EnumOrdinalTypeHandler.class, javaType = Flag.class, jdbcType = JdbcType.TINYINT), @Result(property = "flag", column = "flag", typeHandler = EnumOrdinalTypeHandler.class, javaType = Flag.class, jdbcType = JdbcType.TINYINT),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "taskInstancePriority", column = "task_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) @Result(property = "taskInstancePriority", column = "task_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
}) })
@SelectProvider(type = TaskInstanceMapperProvider.class, method = "findValidTaskListByProcessId") @SelectProvider(type = TaskInstanceMapperProvider.class, method = "findValidTaskListByProcessId")
@ -164,6 +166,7 @@ public interface TaskInstanceMapper {
@Result(property = "appLink", column = "app_link", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "appLink", column = "app_link", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "duration", column = "duration", javaType = Long.class, jdbcType = JdbcType.BIGINT), @Result(property = "duration", column = "duration", javaType = Long.class, jdbcType = JdbcType.BIGINT),
@Result(property = "flag", column = "flag", typeHandler = EnumOrdinalTypeHandler.class, javaType = Flag.class, jdbcType = JdbcType.TINYINT), @Result(property = "flag", column = "flag", typeHandler = EnumOrdinalTypeHandler.class, javaType = Flag.class, jdbcType = JdbcType.TINYINT),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "taskInstancePriority", column = "task_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) @Result(property = "taskInstancePriority", column = "task_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
}) })
@SelectProvider(type = TaskInstanceMapperProvider.class, method = "queryByHostAndStatus") @SelectProvider(type = TaskInstanceMapperProvider.class, method = "queryByHostAndStatus")
@ -255,6 +258,7 @@ public interface TaskInstanceMapper {
@Result(property = "appLink", column = "app_link", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "appLink", column = "app_link", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "duration", column = "duration", javaType = Long.class, jdbcType = JdbcType.BIGINT), @Result(property = "duration", column = "duration", javaType = Long.class, jdbcType = JdbcType.BIGINT),
@Result(property = "flag", column = "flag", typeHandler = EnumOrdinalTypeHandler.class, javaType = Flag.class, jdbcType = JdbcType.TINYINT), @Result(property = "flag", column = "flag", typeHandler = EnumOrdinalTypeHandler.class, javaType = Flag.class, jdbcType = JdbcType.TINYINT),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "taskInstancePriority", column = "task_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) @Result(property = "taskInstancePriority", column = "task_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
}) })
@SelectProvider(type = TaskInstanceMapperProvider.class, method = "queryTaskInstanceListPaging") @SelectProvider(type = TaskInstanceMapperProvider.class, method = "queryTaskInstanceListPaging")
@ -299,6 +303,7 @@ public interface TaskInstanceMapper {
@Result(property = "appLink", column = "app_link", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "appLink", column = "app_link", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "duration", column = "duration", javaType = Long.class, jdbcType = JdbcType.BIGINT), @Result(property = "duration", column = "duration", javaType = Long.class, jdbcType = JdbcType.BIGINT),
@Result(property = "flag", column = "flag", typeHandler = EnumOrdinalTypeHandler.class, javaType = Flag.class, jdbcType = JdbcType.TINYINT), @Result(property = "flag", column = "flag", typeHandler = EnumOrdinalTypeHandler.class, javaType = Flag.class, jdbcType = JdbcType.TINYINT),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "taskInstancePriority", column = "task_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) @Result(property = "taskInstancePriority", column = "task_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
}) })
@SelectProvider(type = TaskInstanceMapperProvider.class, method = "queryByInstanceIdAndName") @SelectProvider(type = TaskInstanceMapperProvider.class, method = "queryByInstanceIdAndName")

2
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/TaskInstanceMapperProvider.java

@ -62,6 +62,7 @@ public class TaskInstanceMapperProvider {
VALUES("`max_retry_times`", "#{taskInstance.maxRetryTimes}"); VALUES("`max_retry_times`", "#{taskInstance.maxRetryTimes}");
VALUES("`retry_interval`", "#{taskInstance.retryInterval}"); VALUES("`retry_interval`", "#{taskInstance.retryInterval}");
VALUES("`app_link`", "#{taskInstance.appLink}"); VALUES("`app_link`", "#{taskInstance.appLink}");
VALUES("`worker_group_id`", "#{taskInstance.workerGroupId}");
VALUES("`flag`", EnumFieldUtil.genFieldStr("taskInstance.flag", Flag.class)); VALUES("`flag`", EnumFieldUtil.genFieldStr("taskInstance.flag", Flag.class));
VALUES("`task_instance_priority`", EnumFieldUtil.genFieldStr("taskInstance.taskInstancePriority", Priority.class)); VALUES("`task_instance_priority`", EnumFieldUtil.genFieldStr("taskInstance.taskInstancePriority", Priority.class));
@ -114,6 +115,7 @@ public class TaskInstanceMapperProvider {
SET("`max_retry_times`=#{taskInstance.maxRetryTimes}"); SET("`max_retry_times`=#{taskInstance.maxRetryTimes}");
SET("`retry_interval`=#{taskInstance.retryInterval}"); SET("`retry_interval`=#{taskInstance.retryInterval}");
SET("`app_link`=#{taskInstance.appLink}"); SET("`app_link`=#{taskInstance.appLink}");
SET("`worker_group_id`=#{taskInstance.workerGroupId}");
SET("`flag`="+ EnumFieldUtil.genFieldStr("taskInstance.flag", Flag.class)); SET("`flag`="+ EnumFieldUtil.genFieldStr("taskInstance.flag", Flag.class));
SET("`task_instance_priority`="+ EnumFieldUtil.genFieldStr("taskInstance.taskInstancePriority", Priority.class)); SET("`task_instance_priority`="+ EnumFieldUtil.genFieldStr("taskInstance.taskInstancePriority", Priority.class));

131
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/WorkerGroupMapper.java

@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.dao.mapper;
import cn.escheduler.dao.model.WorkerGroup;
import org.apache.ibatis.annotations.*;
import org.apache.ibatis.type.JdbcType;
import java.util.Date;
import java.util.List;
/**
* worker group mapper
*/
public interface WorkerGroupMapper {
/**
* query all worker group list
*
* @return
*/
@Results(value = {
@Result(property = "id", column = "id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "ipList", column = "ip_list", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "name", column = "name", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "createTime", column = "create_time", javaType = Date.class, jdbcType = JdbcType.TIMESTAMP),
@Result(property = "updateTime", column = "update_time", javaType = Date.class, jdbcType = JdbcType.TIMESTAMP),
})
@SelectProvider(type = WorkerGroupMapperProvider.class, method = "queryAllWorkerGroup")
List<WorkerGroup> queryAllWorkerGroup();
/**
* query worker group by name
*
* @return
*/
@Results(value = {
@Result(property = "id", column = "id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "ipList", column = "ip_list", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "name", column = "name", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "createTime", column = "create_time", javaType = Date.class, jdbcType = JdbcType.TIMESTAMP),
@Result(property = "updateTime", column = "update_time", javaType = Date.class, jdbcType = JdbcType.TIMESTAMP),
})
@SelectProvider(type = WorkerGroupMapperProvider.class, method = "queryWorkerGroupByName")
List<WorkerGroup> queryWorkerGroupByName(@Param("name") String name);
/**
* query worker group paging by search value
*
* @return
*/
@Results(value = {
@Result(property = "id", column = "id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "ipList", column = "ip_list", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "name", column = "name", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "createTime", column = "create_time", javaType = Date.class, jdbcType = JdbcType.TIMESTAMP),
@Result(property = "updateTime", column = "update_time", javaType = Date.class, jdbcType = JdbcType.TIMESTAMP),
})
@SelectProvider(type = WorkerGroupMapperProvider.class, method = "queryListPaging")
List<WorkerGroup> queryListPaging(@Param("offset") int offset,
@Param("pageSize") int pageSize,
@Param("searchVal") String searchVal);
/**
* count worker group by search value
* @param searchVal
* @return
*/
@SelectProvider(type = WorkerGroupMapperProvider.class, method = "countPaging")
int countPaging(@Param("searchVal") String searchVal);
/**
* insert worker server
*
* @param workerGroup
* @return
*/
@InsertProvider(type = WorkerGroupMapperProvider.class, method = "insert")
@Options(useGeneratedKeys = true,keyProperty = "workerGroup.id")
@SelectKey(statement = "SELECT LAST_INSERT_ID()", keyProperty = "workerGroup.id", before = false, resultType = int.class)
int insert(@Param("workerGroup") WorkerGroup workerGroup);
/**
* update worker
*
* @param workerGroup
* @return
*/
@UpdateProvider(type = WorkerGroupMapperProvider.class, method = "update")
int update(@Param("workerGroup") WorkerGroup workerGroup);
/**
* delete work group by id
* @param id
* @return
*/
@DeleteProvider(type = WorkerGroupMapperProvider.class, method = "deleteById")
int deleteById(@Param("id") int id);
/**
* query work group by id
* @param id
* @return
*/
@Results(value = {
@Result(property = "id", column = "id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "ipList", column = "ip_list", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "name", column = "name", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "createTime", column = "create_time", javaType = Date.class, jdbcType = JdbcType.TIMESTAMP),
@Result(property = "updateTime", column = "update_time", javaType = Date.class, jdbcType = JdbcType.TIMESTAMP),
})
@SelectProvider(type = WorkerGroupMapperProvider.class, method = "queryById")
WorkerGroup queryById(@Param("id") int id);
}

160
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/WorkerGroupMapperProvider.java

@ -0,0 +1,160 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.dao.mapper;
import org.apache.commons.lang3.StringUtils;
import org.apache.ibatis.jdbc.SQL;
import java.util.Map;
/**
* worker group mapper provider
*/
public class WorkerGroupMapperProvider {
private static final String TABLE_NAME = "t_escheduler_worker_group";
/**
* query worker list
* @return
*/
public String queryAllWorkerGroup() {
return new SQL() {{
SELECT("*");
FROM(TABLE_NAME);
ORDER_BY("update_time desc");
}}.toString();
}
/**
* insert worker server
* @param parameter
* @return
*/
public String insert(Map<String, Object> parameter) {
return new SQL() {{
INSERT_INTO(TABLE_NAME);
VALUES("id", "#{workerGroup.id}");
VALUES("name", "#{workerGroup.name}");
VALUES("ip_list", "#{workerGroup.ipList}");
VALUES("create_time", "#{workerGroup.createTime}");
VALUES("update_time", "#{workerGroup.updateTime}");
}}.toString();
}
/**
* update worker group
*
* @param parameter
* @return
*/
public String update(Map<String, Object> parameter) {
return new SQL() {{
UPDATE(TABLE_NAME);
SET("name = #{workerGroup.name}");
SET("ip_list = #{workerGroup.ipList}");
SET("create_time = #{workerGroup.createTime}");
SET("update_time = #{workerGroup.updateTime}");
WHERE("id = #{workerGroup.id}");
}}.toString();
}
/**
* delete worker group by id
* @param parameter
* @return
*/
public String deleteById(Map<String, Object> parameter) {
return new SQL() {{
DELETE_FROM(TABLE_NAME);
WHERE("id = #{id}");
}}.toString();
}
/**
* query worker group by name
* @param parameter
* @return
*/
public String queryWorkerGroupByName(Map<String, Object> parameter) {
return new SQL() {{
SELECT("*");
FROM(TABLE_NAME);
WHERE("name = #{name}");
}}.toString();
}
/**
* query worker group by id
* @param parameter
* @return
*/
public String queryById(Map<String, Object> parameter) {
return new SQL() {{
SELECT("*");
FROM(TABLE_NAME);
WHERE("id = #{id}");
}}.toString();
}
/**
* query worker group by id
* @param parameter
* @return
*/
public String queryListPaging(Map<String, Object> parameter) {
return new SQL() {{
SELECT("*");
FROM(TABLE_NAME);
Object searchVal = parameter.get("searchVal");
if(searchVal != null && StringUtils.isNotEmpty(searchVal.toString())){
WHERE( " name like concat('%', #{searchVal}, '%') ");
}
ORDER_BY(" update_time desc limit #{offset},#{pageSize} ");
}}.toString();
}
/**
* count worker group number by search value
* @param parameter
* @return
*/
public String countPaging(Map<String, Object> parameter) {
return new SQL() {{
SELECT("count(0)");
FROM(TABLE_NAME);
Object searchVal = parameter.get("searchVal");
if(searchVal != null && StringUtils.isNotEmpty(searchVal.toString())){
WHERE( " name like concat('%', #{searchVal}, '%') ");
}
}}.toString();
}
}

16
escheduler-dao/src/main/java/cn/escheduler/dao/model/Command.java

@ -91,6 +91,12 @@ public class Command {
private Date updateTime; private Date updateTime;
/**
*
*/
private int workerGroupId;
public Command(){ public Command(){
this.taskDependType = TaskDependType.TASK_POST; this.taskDependType = TaskDependType.TASK_POST;
this.failureStrategy = FailureStrategy.CONTINUE; this.failureStrategy = FailureStrategy.CONTINUE;
@ -229,6 +235,15 @@ public class Command {
this.updateTime = updateTime; this.updateTime = updateTime;
} }
public int getWorkerGroupId() {
return workerGroupId;
}
public void setWorkerGroupId(int workerGroupId) {
this.workerGroupId = workerGroupId;
}
@Override @Override
public String toString() { public String toString() {
return "Command{" + return "Command{" +
@ -245,6 +260,7 @@ public class Command {
", startTime=" + startTime + ", startTime=" + startTime +
", processInstancePriority=" + processInstancePriority + ", processInstancePriority=" + processInstancePriority +
", updateTime=" + updateTime + ", updateTime=" + updateTime +
", workerGroupId=" + workerGroupId +
'}'; '}';
} }
} }

15
escheduler-dao/src/main/java/cn/escheduler/dao/model/ErrorCommand.java

@ -95,6 +95,11 @@ public class ErrorCommand {
*/ */
private String message; private String message;
/**
* worker group id
*/
private int workerGroupId;
public ErrorCommand(Command command, String message){ public ErrorCommand(Command command, String message){
this.commandType = command.getCommandType(); this.commandType = command.getCommandType();
@ -245,6 +250,14 @@ public class ErrorCommand {
this.updateTime = updateTime; this.updateTime = updateTime;
} }
public int getWorkerGroupId() {
return workerGroupId;
}
public void setWorkerGroupId(int workerGroupId) {
this.workerGroupId = workerGroupId;
}
@Override @Override
public String toString() { public String toString() {
return "Command{" + return "Command{" +
@ -272,4 +285,6 @@ public class ErrorCommand {
public void setMessage(String message) { public void setMessage(String message) {
this.message = message; this.message = message;
} }
} }

13
escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessInstance.java

@ -177,6 +177,12 @@ public class ProcessInstance {
*/ */
private Priority processInstancePriority; private Priority processInstancePriority;
/**
* worker group id
*/
private int workerGroupId;
public ProcessInstance(){ public ProcessInstance(){
} }
@ -481,6 +487,13 @@ public class ProcessInstance {
this.duration = duration; this.duration = duration;
} }
public int getWorkerGroupId() {
return workerGroupId;
}
public void setWorkerGroupId(int workerGroupId) {
this.workerGroupId = workerGroupId;
}
@Override @Override
public String toString() { public String toString() {

16
escheduler-dao/src/main/java/cn/escheduler/dao/model/Schedule.java

@ -111,6 +111,11 @@ public class Schedule {
*/ */
private Priority processInstancePriority; private Priority processInstancePriority;
/**
* worker group id
*/
private int workerGroupId;
public int getWarningGroupId() { public int getWarningGroupId() {
return warningGroupId; return warningGroupId;
} }
@ -256,6 +261,15 @@ public class Schedule {
this.processInstancePriority = processInstancePriority; this.processInstancePriority = processInstancePriority;
} }
public int getWorkerGroupId() {
return workerGroupId;
}
public void setWorkerGroupId(int workerGroupId) {
this.workerGroupId = workerGroupId;
}
@Override @Override
public String toString() { public String toString() {
return "Schedule{" + return "Schedule{" +
@ -276,6 +290,8 @@ public class Schedule {
", releaseState=" + releaseState + ", releaseState=" + releaseState +
", warningGroupId=" + warningGroupId + ", warningGroupId=" + warningGroupId +
", processInstancePriority=" + processInstancePriority + ", processInstancePriority=" + processInstancePriority +
", workerGroupId=" + workerGroupId +
'}'; '}';
} }
} }

16
escheduler-dao/src/main/java/cn/escheduler/dao/model/TaskInstance.java

@ -182,6 +182,13 @@ public class TaskInstance {
private String dependentResult; private String dependentResult;
/**
* worker group id
* @return
*/
private int workerGroupId;
public ProcessInstance getProcessInstance() { public ProcessInstance getProcessInstance() {
return processInstance; return processInstance;
} }
@ -439,6 +446,14 @@ public class TaskInstance {
this.processInstancePriority = processInstancePriority; this.processInstancePriority = processInstancePriority;
} }
public int getWorkerGroupId() {
return workerGroupId;
}
public void setWorkerGroupId(int workerGroupId) {
this.workerGroupId = workerGroupId;
}
@Override @Override
public String toString() { public String toString() {
return "TaskInstance{" + return "TaskInstance{" +
@ -470,6 +485,7 @@ public class TaskInstance {
", retryInterval=" + retryInterval + ", retryInterval=" + retryInterval +
", taskInstancePriority=" + taskInstancePriority + ", taskInstancePriority=" + taskInstancePriority +
", processInstancePriority=" + processInstancePriority + ", processInstancePriority=" + processInstancePriority +
", workGroupId=" + workerGroupId +
'}'; '}';
} }

88
escheduler-dao/src/main/java/cn/escheduler/dao/model/WorkerGroup.java

@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.dao.model;
import java.util.Date;
/**
* worker group for task running
*/
public class WorkerGroup {
private int id;
private String name;
private String ipList;
private Date createTime;
private Date updateTime;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getIpList() {
return ipList;
}
public void setIpList(String ipList) {
this.ipList = ipList;
}
public Date getCreateTime() {
return createTime;
}
public void setCreateTime(Date createTime) {
this.createTime = createTime;
}
public Date getUpdateTime() {
return updateTime;
}
public void setUpdateTime(Date updateTime) {
this.updateTime = updateTime;
}
@Override
public String toString() {
return "Worker group model{" +
"id= " + id +
",name= " + name +
",ipList= " + ipList +
",createTime= " + createTime +
",updateTime= " + updateTime +
"}";
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}

6
escheduler-dao/src/main/resources/dao/data_source.properties

@ -1,9 +1,9 @@
# base spring data source configuration # base spring data source configuration
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.driver-class-name=com.mysql.jdbc.Driver spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://192.168.xx.xx:3306/escheduler?characterEncoding=UTF-8 spring.datasource.url=jdbc:mysql://192.168.220.188:3306/escheduler_new?characterEncoding=UTF-8
spring.datasource.username=xx spring.datasource.username=root
spring.datasource.password=xx spring.datasource.password=root@123
# connection configuration # connection configuration
spring.datasource.initialSize=5 spring.datasource.initialSize=5

69
escheduler-dao/src/test/java/cn/escheduler/dao/mapper/WorkerGroupMapperTest.java

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.dao.mapper;
import cn.escheduler.dao.datasource.ConnectionFactory;
import cn.escheduler.dao.model.WorkerGroup;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.Date;
import java.util.List;
/**
* worker group mapper test
*/
public class WorkerGroupMapperTest {
WorkerGroupMapper workerGroupMapper;
@Before
public void before() {
workerGroupMapper = ConnectionFactory.getSqlSession().getMapper(WorkerGroupMapper.class);
}
@Test
public void test() {
WorkerGroup workerGroup = new WorkerGroup();
String name = "workerGroup3";
workerGroup.setName(name);
workerGroup.setIpList("192.168.220.154,192.168.220.188");
workerGroup.setCreateTime(new Date());
workerGroup.setUpdateTime(new Date());
workerGroupMapper.insert(workerGroup);
Assert.assertNotEquals(workerGroup.getId(), 0);
List<WorkerGroup> workerGroups2 = workerGroupMapper.queryWorkerGroupByName(name);
Assert.assertEquals(workerGroups2.size(), 1);
workerGroup.setName("workerGroup11");
workerGroupMapper.update(workerGroup);
List<WorkerGroup> workerGroups = workerGroupMapper.queryAllWorkerGroup();
Assert.assertNotEquals(workerGroups.size(), 0);
workerGroupMapper.deleteById(workerGroup.getId());
workerGroups = workerGroupMapper.queryAllWorkerGroup();
Assert.assertEquals(workerGroups.size(), 0);
}
}

3
escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterExecThread.java

@ -404,6 +404,9 @@ public class MasterExecThread implements Runnable {
taskInstance.setTaskInstancePriority(taskNode.getTaskInstancePriority()); taskInstance.setTaskInstancePriority(taskNode.getTaskInstancePriority());
} }
int workerGroupId = taskNode.getWorkerGroupId();
taskInstance.setWorkerGroupId(workerGroupId);
} }
return taskInstance; return taskInstance;
} }

54
escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java

@ -26,6 +26,7 @@ import cn.escheduler.dao.ProcessDao;
import cn.escheduler.dao.model.ProcessDefinition; import cn.escheduler.dao.model.ProcessDefinition;
import cn.escheduler.dao.model.ProcessInstance; import cn.escheduler.dao.model.ProcessInstance;
import cn.escheduler.dao.model.TaskInstance; import cn.escheduler.dao.model.TaskInstance;
import cn.escheduler.dao.model.WorkerGroup;
import cn.escheduler.server.zk.ZKWorkerClient; import cn.escheduler.server.zk.ZKWorkerClient;
import com.cronutils.utils.StringUtils; import com.cronutils.utils.StringUtils;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
@ -33,7 +34,9 @@ import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Date; import java.util.Date;
import java.util.List;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
@ -90,6 +93,42 @@ public class FetchTaskThread implements Runnable{
this.taskQueue = taskQueue; this.taskQueue = taskQueue;
} }
/**
* Check if the task runs on this worker
* @param taskInstance
* @param host
* @return
*/
private boolean checkWorkerGroup(TaskInstance taskInstance, String host){
int taskWorkerGroupId = taskInstance.getWorkerGroupId();
ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskInstance.getId());
if(processInstance == null){
logger.error("cannot find the task:{} process instance", taskInstance.getId());
return false;
}
int processWorkerGroupId = processInstance.getWorkerGroupId();
taskWorkerGroupId = (taskWorkerGroupId <= 0 ? processWorkerGroupId : taskWorkerGroupId);
if(taskWorkerGroupId <= 0){
return true;
}
WorkerGroup workerGroup = processDao.queryWorkerGroupById(taskWorkerGroupId);
if(workerGroup == null ){
logger.info("task {} cannot find the worker group, use all worker instead.", taskInstance.getId());
return true;
}
String ips = workerGroup.getIpList();
if(ips == null){
logger.error("task:{} worker group:{} parameters(ip_list) is null, this task would be running on all workers",
taskInstance.getId(), workerGroup.getId());
}
String[] ipArray = ips.split(",");
List<String> ipList = Arrays.asList(ipArray);
return ipList.contains(host);
}
@Override @Override
public void run() { public void run() {
@ -116,11 +155,13 @@ public class FetchTaskThread implements Runnable{
} }
// task instance id str // task instance id str
String taskInstIdStr = taskQueue.poll(Constants.SCHEDULER_TASKS_QUEUE); String taskQueueStr = taskQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, false);
if (!StringUtils.isEmpty(taskInstIdStr)) { if (!StringUtils.isEmpty(taskQueueStr )) {
Date now = new Date();
String[] taskStringArray = taskQueueStr.split(Constants.UNDERLINE);
String taskInstIdStr = taskStringArray[taskStringArray.length - 1];
Date now = new Date();
Integer taskId = Integer.parseInt(taskInstIdStr); Integer taskId = Integer.parseInt(taskInstIdStr);
// find task instance by task id // find task instance by task id
@ -136,10 +177,15 @@ public class FetchTaskThread implements Runnable{
retryTimes--; retryTimes--;
} }
if (taskInstance == null) { if (taskInstance == null ) {
logger.error("task instance is null. task id : {} ", taskId); logger.error("task instance is null. task id : {} ", taskId);
continue; continue;
} }
if(!checkWorkerGroup(taskInstance, OSUtils.getHost())){
continue;
}
taskQueue.removeNode(Constants.SCHEDULER_TASKS_QUEUE, taskQueueStr);
logger.info("remove task:{} from queue", taskQueueStr);
// set execute task worker host // set execute task worker host
taskInstance.setHost(OSUtils.getHost()); taskInstance.setHost(OSUtils.getHost());

11
escheduler-server/src/test/java/cn/escheduler/server/zk/ZKWorkerClientTest.java

@ -3,6 +3,9 @@ package cn.escheduler.server.zk;
import cn.escheduler.common.Constants; import cn.escheduler.common.Constants;
import org.junit.Test; import org.junit.Test;
import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.*; import static org.junit.Assert.*;
/** /**
@ -20,4 +23,12 @@ public class ZKWorkerClientTest {
} }
@Test
public void test(){
String ips = "";
List<String> ipList = Arrays.asList(ips.split(","));
System.out.println(ipList);
}
} }
Loading…
Cancel
Save