diff --git a/escheduler-api/src/main/java/cn/escheduler/api/controller/ExecutorController.java b/escheduler-api/src/main/java/cn/escheduler/api/controller/ExecutorController.java index 7ba044c6f6..b6d3d4475f 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/controller/ExecutorController.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/controller/ExecutorController.java @@ -66,13 +66,15 @@ public class ExecutorController extends BaseController { @RequestParam(value = "receiversCc", required = false) String receiversCc, @RequestParam(value = "runMode", required = false) RunMode runMode, @RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority, + @RequestParam(value = "workerGroupId", required = false, defaultValue = "-1") int workerGroupId, @RequestParam(value = "timeout", required = false) Integer timeout) { try { logger.info("login user {}, start process instance, project name: {}, process definition id: {}, schedule time: {}, " + "failure policy: {}, node name: {}, node dep: {}, notify type: {}, " - + "notify group id: {},receivers:{},receiversCc:{}, run mode: {},process instance priority:{}, timeout: {}", + + "notify group id: {},receivers:{},receiversCc:{}, run mode: {},process instance priority:{}, workerGroupId: {}, timeout: {}", loginUser.getUserName(), projectName, processDefinitionId, scheduleTime, failureStrategy, - taskDependType, warningType, warningGroupId,receivers,receiversCc,runMode,processInstancePriority,timeout); + taskDependType, warningType, warningGroupId,receivers,receiversCc,runMode,processInstancePriority, + workerGroupId, timeout); if (timeout == null) { timeout = cn.escheduler.common.Constants.MAX_TASK_TIMEOUT; @@ -80,7 +82,7 @@ public class ExecutorController extends BaseController { Map result = execService.execProcessInstance(loginUser, projectName, processDefinitionId, scheduleTime, execType, failureStrategy, startNodeList, taskDependType, warningType, - warningGroupId,receivers,receiversCc, runMode,processInstancePriority,timeout); + warningGroupId,receivers,receiversCc, runMode,processInstancePriority, workerGroupId, timeout); return returnDataList(result); } catch (Exception e) { logger.error(START_PROCESS_INSTANCE_ERROR.getMsg(),e); diff --git a/escheduler-api/src/main/java/cn/escheduler/api/controller/SchedulerController.java b/escheduler-api/src/main/java/cn/escheduler/api/controller/SchedulerController.java index 3ef011d323..8449bd1d38 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/controller/SchedulerController.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/controller/SchedulerController.java @@ -76,13 +76,15 @@ public class SchedulerController extends BaseController{ @RequestParam(value = "failureStrategy", required = false, defaultValue = DEFAULT_FAILURE_POLICY) FailureStrategy failureStrategy, @RequestParam(value = "receivers", required = false) String receivers, @RequestParam(value = "receiversCc", required = false) String receiversCc, + @RequestParam(value = "workerGroupId", required = false, defaultValue = "-1") int workerGroupId, @RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority) { logger.info("login user {}, project name: {}, process name: {}, create schedule: {}, warning type: {}, warning group id: {}," + - "failure policy: {},receivers : {},receiversCc : {},processInstancePriority : {}", - loginUser.getUserName(), projectName, processDefinitionId, schedule, warningType, warningGroupId, failureStrategy,receivers,receiversCc,processInstancePriority); + "failure policy: {},receivers : {},receiversCc : {},processInstancePriority : {}, workGroupId:{}", + loginUser.getUserName(), projectName, processDefinitionId, schedule, warningType, warningGroupId, + failureStrategy,receivers,receiversCc,processInstancePriority,workerGroupId); try { Map result = schedulerService.insertSchedule(loginUser, projectName, processDefinitionId, schedule, - warningType, warningGroupId, failureStrategy, receivers,receiversCc,processInstancePriority); + warningType, warningGroupId, failureStrategy, receivers,receiversCc,processInstancePriority,workerGroupId); return returnDataList(result); }catch (Exception e){ @@ -113,14 +115,16 @@ public class SchedulerController extends BaseController{ @RequestParam(value = "failureStrategy", required = false, defaultValue = "END") FailureStrategy failureStrategy, @RequestParam(value = "receivers", required = false) String receivers, @RequestParam(value = "receiversCc", required = false) String receiversCc, + @RequestParam(value = "workerGroupId", required = false, defaultValue = "-1") int workerGroupId, @RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority) { logger.info("login user {}, project name: {},id: {}, updateProcessInstance schedule: {}, notify type: {}, notify mails: {}, " + - "failure policy: {},receivers : {},receiversCc : {},processInstancePriority : {}", - loginUser.getUserName(), projectName, id, schedule, warningType, warningGroupId, failureStrategy,receivers,receiversCc,processInstancePriority); + "failure policy: {},receivers : {},receiversCc : {},processInstancePriority : {},workerGroupId:{}", + loginUser.getUserName(), projectName, id, schedule, warningType, warningGroupId, failureStrategy, + receivers,receiversCc,processInstancePriority,workerGroupId); try { Map result = schedulerService.updateSchedule(loginUser, projectName, id, schedule, - warningType, warningGroupId, failureStrategy, receivers,receiversCc,null,processInstancePriority); + warningType, warningGroupId, failureStrategy, receivers,receiversCc,null,processInstancePriority, workerGroupId); return returnDataList(result); }catch (Exception e){ diff --git a/escheduler-api/src/main/java/cn/escheduler/api/controller/WorkerGroupController.java b/escheduler-api/src/main/java/cn/escheduler/api/controller/WorkerGroupController.java new file mode 100644 index 0000000000..4843a8083e --- /dev/null +++ b/escheduler-api/src/main/java/cn/escheduler/api/controller/WorkerGroupController.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cn.escheduler.api.controller; + + +import cn.escheduler.api.enums.Status; +import cn.escheduler.api.service.WorkerGroupService; +import cn.escheduler.api.utils.Constants; +import cn.escheduler.api.utils.Result; +import cn.escheduler.dao.model.User; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpStatus; +import org.springframework.web.bind.annotation.*; + +import java.util.Map; + +/** + * worker group controller + */ +@RestController +@RequestMapping("/worker-group") +public class WorkerGroupController extends BaseController{ + + private static final Logger logger = LoggerFactory.getLogger(WorkerGroupController.class); + + + @Autowired + WorkerGroupService workerGroupService; + + + /** + * create or update a worker group + * @param loginUser + * @param id + * @param name + * @param ipList + * @return + */ + @PostMapping(value = "/save") + @ResponseStatus(HttpStatus.OK) + public Result saveWorkerGroup(@RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @RequestParam(value = "id", required = false, defaultValue = "0") int id, + @RequestParam(value = "name") String name, + @RequestParam(value = "ipList") String ipList + ) { + logger.info("save worker group: login user {}, id:{}, name: {}, ipList: {} ", + loginUser.getUserName(), id, name, ipList); + + try { + Map result = workerGroupService.saveWorkerGroup(id, name, ipList); + return returnDataList(result); + }catch (Exception e){ + logger.error(Status.SAVE_ERROR.getMsg(),e); + return error(Status.SAVE_ERROR.getCode(), Status.SAVE_ERROR.getMsg()); + } + } + + /** + * query worker groups paging + * @param loginUser + * @param pageNo + * @param searchVal + * @param pageSize + * @return + */ + @GetMapping(value = "/list-paging") + @ResponseStatus(HttpStatus.OK) + public Result queryAllWorkerGroupsPaging(@RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @RequestParam("pageNo") Integer pageNo, + @RequestParam(value = "searchVal", required = false) String searchVal, + @RequestParam("pageSize") Integer pageSize + ) { + logger.info("query all worker group paging: login user {}, pageNo:{}, pageSize:{}, searchVal:{}", + loginUser.getUserName() , pageNo, pageSize, searchVal); + + try { + Map result = workerGroupService.queryAllGroupPaging(pageNo, pageSize, searchVal); + return returnDataListPaging(result); + }catch (Exception e){ + logger.error(Status.SAVE_ERROR.getMsg(),e); + return error(Status.SAVE_ERROR.getCode(), Status.SAVE_ERROR.getMsg()); + } + } + + /** + * query all worker groups + * @param loginUser + * @return + */ + @GetMapping(value = "/all-groups") + @ResponseStatus(HttpStatus.OK) + public Result queryAllWorkerGroups(@RequestAttribute(value = Constants.SESSION_USER) User loginUser + ) { + logger.info("query all worker group: login user {}", + loginUser.getUserName() ); + + try { + Map result = workerGroupService.queryAllGroup(); + return returnDataList(result); + }catch (Exception e){ + logger.error(Status.SAVE_ERROR.getMsg(),e); + return error(Status.SAVE_ERROR.getCode(), Status.SAVE_ERROR.getMsg()); + } + } + + /** + * delete worker group by id + * @param loginUser + * @param id + * @return + */ + @GetMapping(value = "/delete-by-id") + @ResponseStatus(HttpStatus.OK) + public Result deleteById(@RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @RequestParam("id") Integer id + ) { + logger.info("delete worker group: login user {}, id:{} ", + loginUser.getUserName() , id); + + try { + Map result = workerGroupService.deleteWorkerGroupById(id); + return returnDataList(result); + }catch (Exception e){ + logger.error(Status.SAVE_ERROR.getMsg(),e); + return error(Status.SAVE_ERROR.getCode(), Status.SAVE_ERROR.getMsg()); + } + } +} diff --git a/escheduler-api/src/main/java/cn/escheduler/api/enums/Status.java b/escheduler-api/src/main/java/cn/escheduler/api/enums/Status.java index 45706c1140..f32d290ab6 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/enums/Status.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/enums/Status.java @@ -156,6 +156,10 @@ public enum Status { UPDATE_QUEUE_ERROR(10131, "update queue error"), NEED_NOT_UPDATE_QUEUE(10132, "no content changes, no updates are required"), VERIFY_QUEUE_ERROR(10133,"verify queue error"), + NAME_NULL(10134,"name must be not null"), + NAME_EXIST(10135, "name {0} already exists"), + SAVE_ERROR(10136, "save error"), + UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found"), diff --git a/escheduler-api/src/main/java/cn/escheduler/api/quartz/ProcessScheduleJob.java b/escheduler-api/src/main/java/cn/escheduler/api/quartz/ProcessScheduleJob.java index 6fcaf1171d..96e283d7d5 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/quartz/ProcessScheduleJob.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/quartz/ProcessScheduleJob.java @@ -132,6 +132,7 @@ public class ProcessScheduleJob implements Job { command.setScheduleTime(scheduledFireTime); command.setStartTime(fireTime); command.setWarningGroupId(schedule.getWarningGroupId()); + command.setWorkerGroupId(schedule.getWorkerGroupId()); command.setWarningType(schedule.getWarningType()); command.setProcessInstancePriority(schedule.getProcessInstancePriority()); diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/ExecutorService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/ExecutorService.java index de000233ec..ee57ff9c91 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/service/ExecutorService.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/ExecutorService.java @@ -90,7 +90,7 @@ public class ExecutorService extends BaseService{ FailureStrategy failureStrategy, String startNodeList, TaskDependType taskDependType, WarningType warningType, int warningGroupId, String receivers, String receiversCc, RunMode runMode, - Priority processInstancePriority, Integer timeout) throws ParseException { + Priority processInstancePriority, int workerGroupId, Integer timeout) throws ParseException { Map result = new HashMap<>(5); // timeout is valid if (timeout <= 0 || timeout > MAX_TASK_TIMEOUT) { @@ -115,7 +115,7 @@ public class ExecutorService extends BaseService{ */ int create = this.createCommand(commandType, processDefinitionId, taskDependType, failureStrategy, startNodeList, cronTime, warningType, loginUser.getId(), - warningGroupId, runMode,processInstancePriority); + warningGroupId, runMode,processInstancePriority, workerGroupId); if(create > 0 ){ /** * according to the process definition ID updateProcessInstance and CC recipient @@ -405,7 +405,7 @@ public class ExecutorService extends BaseService{ TaskDependType nodeDep, FailureStrategy failureStrategy, String startNodeList, String schedule, WarningType warningType, int excutorId, int warningGroupId, - RunMode runMode,Priority processInstancePriority) throws ParseException { + RunMode runMode,Priority processInstancePriority, int workerGroupId) throws ParseException { /** * instantiate command schedule instance @@ -436,6 +436,7 @@ public class ExecutorService extends BaseService{ command.setExecutorId(excutorId); command.setWarningGroupId(warningGroupId); command.setProcessInstancePriority(processInstancePriority); + command.setWorkerGroupId(workerGroupId); Date start = null; Date end = null; diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/SchedulerService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/SchedulerService.java index df73181d7b..232b9d7b15 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/service/SchedulerService.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/SchedulerService.java @@ -88,7 +88,7 @@ public class SchedulerService extends BaseService { @Transactional(value = "TransactionManager", rollbackFor = Exception.class) public Map insertSchedule(User loginUser, String projectName, Integer processDefineId, String schedule, WarningType warningType, int warningGroupId, FailureStrategy failureStrategy, - String receivers, String receiversCc,Priority processInstancePriority) throws IOException { + String receivers, String receiversCc,Priority processInstancePriority, int workerGroupId) throws IOException { Map result = new HashMap(5); @@ -133,6 +133,7 @@ public class SchedulerService extends BaseService { scheduleObj.setUserName(loginUser.getUserName()); scheduleObj.setReleaseState(ReleaseState.OFFLINE); scheduleObj.setProcessInstancePriority(processInstancePriority); + scheduleObj.setWorkerGroupId(workerGroupId); scheduleMapper.insert(scheduleObj); /** @@ -156,13 +157,14 @@ public class SchedulerService extends BaseService { * @param warningGroupId * @param failureStrategy * @param scheduleStatus + * @param workerGroupId * @return */ @Transactional(value = "TransactionManager", rollbackFor = Exception.class) public Map updateSchedule(User loginUser, String projectName, Integer id, String scheduleExpression, WarningType warningType, int warningGroupId, FailureStrategy failureStrategy, String receivers, String receiversCc, ReleaseState scheduleStatus, - Priority processInstancePriority) throws IOException { + Priority processInstancePriority, int workerGroupId) throws IOException { Map result = new HashMap(5); Project project = projectMapper.queryByName(projectName); @@ -221,6 +223,7 @@ public class SchedulerService extends BaseService { if (scheduleStatus != null) { schedule.setReleaseState(scheduleStatus); } + schedule.setWorkerGroupId(workerGroupId); schedule.setUpdateTime(now); schedule.setProcessInstancePriority(processInstancePriority); scheduleMapper.update(schedule); diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/WorkerGroupService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/WorkerGroupService.java new file mode 100644 index 0000000000..1db1bcb48d --- /dev/null +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/WorkerGroupService.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cn.escheduler.api.service; + +import cn.escheduler.api.enums.Status; +import cn.escheduler.api.utils.Constants; +import cn.escheduler.api.utils.PageInfo; +import cn.escheduler.dao.mapper.WorkerGroupMapper; +import cn.escheduler.dao.model.User; +import cn.escheduler.dao.model.WorkerGroup; +import org.apache.commons.lang3.StringUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * work group service + */ +@Service +public class WorkerGroupService extends BaseService { + + + @Autowired + WorkerGroupMapper workerGroupMapper; + + /** + * create or update a worker group + * @param id + * @param name + * @param ipList + * @return + */ + public Map saveWorkerGroup(int id, String name, String ipList){ + + Map result = new HashMap<>(5); + + if(StringUtils.isEmpty(name)){ + putMsg(result, Status.NAME_NULL); + return result; + } + Date now = new Date(); + WorkerGroup workerGroup = null; + if(id != 0){ + workerGroup = workerGroupMapper.queryById(id); + }else{ + workerGroup = new WorkerGroup(); + workerGroup.setCreateTime(now); + } + workerGroup.setName(name); + workerGroup.setIpList(ipList); + workerGroup.setUpdateTime(now); + + if(checkWorkerGroupNameExists(workerGroup)){ + putMsg(result, Status.NAME_EXIST, workerGroup.getName()); + return result; + } + if(workerGroup.getId() != 0 ){ + workerGroupMapper.update(workerGroup); + }else{ + workerGroupMapper.insert(workerGroup); + } + putMsg(result, Status.SUCCESS); + return result; + } + + /** + * check worker group name exists + * @param workerGroup + * @return + */ + private boolean checkWorkerGroupNameExists(WorkerGroup workerGroup) { + + List workerGroupList = workerGroupMapper.queryWorkerGroupByName(workerGroup.getName()); + + if(workerGroupList.size() > 0 ){ + // new group has same name.. + if(workerGroup.getId() == 0){ + return true; + } + // update group... + for(WorkerGroup group : workerGroupList){ + if(group.getId() != workerGroup.getId()){ + return true; + } + } + } + return false; + } + + /** + * query worker group paging + * @param pageNo + * @param pageSize + * @param searchVal + * @return + */ + public Map queryAllGroupPaging(Integer pageNo, Integer pageSize, String searchVal) { + + Map result = new HashMap<>(5); + int count = workerGroupMapper.countPaging(searchVal); + + + PageInfo pageInfo = new PageInfo<>(pageNo, pageSize); + List workerGroupList = workerGroupMapper.queryListPaging(pageInfo.getStart(), pageSize, searchVal); + pageInfo.setTotalCount(count); + pageInfo.setLists(workerGroupList); + result.put(Constants.DATA_LIST, pageInfo); + putMsg(result, Status.SUCCESS); + return result; + } + + /** + * delete worker group by id + * @param id + * @return + */ + public Map deleteWorkerGroupById(Integer id) { + + Map result = new HashMap<>(5); + + int delete = workerGroupMapper.deleteById(id); + putMsg(result, Status.SUCCESS); + return result; + } + + /** + * query all worker group + * @return + */ + public Map queryAllGroup() { + Map result = new HashMap<>(5); + List workerGroupList = workerGroupMapper.queryAllWorkerGroup(); + result.put(Constants.DATA_LIST, workerGroupList); + putMsg(result, Status.SUCCESS); + return result; + } +} diff --git a/escheduler-common/src/main/java/cn/escheduler/common/model/TaskNode.java b/escheduler-common/src/main/java/cn/escheduler/common/model/TaskNode.java index fb9ffe1145..e09775558c 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/model/TaskNode.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/model/TaskNode.java @@ -113,6 +113,12 @@ public class TaskNode { */ private Priority taskInstancePriority; + /** + * worker group id + */ + private int workerGroupId; + + /** * task time out */ @@ -224,6 +230,7 @@ public class TaskNode { Objects.equals(extras, taskNode.extras) && Objects.equals(runFlag, taskNode.runFlag) && Objects.equals(dependence, taskNode.dependence) && + Objects.equals(workerGroupId, taskNode.workerGroupId) && CollectionUtils.equalLists(depList, taskNode.depList); } @@ -303,6 +310,15 @@ public class TaskNode { ", dependence='" + dependence + '\'' + ", taskInstancePriority=" + taskInstancePriority + ", timeout='" + timeout + '\'' + + ", workerGroupId='" + workerGroupId + '\'' + '}'; } + + public int getWorkerGroupId() { + return workerGroupId; + } + + public void setWorkerGroupId(int workerGroupId) { + this.workerGroupId = workerGroupId; + } } diff --git a/escheduler-common/src/main/java/cn/escheduler/common/queue/ITaskQueue.java b/escheduler-common/src/main/java/cn/escheduler/common/queue/ITaskQueue.java index 7c01bc4ec2..42629324a2 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/queue/ITaskQueue.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/queue/ITaskQueue.java @@ -54,10 +54,17 @@ public interface ITaskQueue { * an element pops out of the queue * * @param key queue name + * @param remove where remove the element * @return */ - String poll(String key); + String poll(String key, boolean remove); + /** + * remove a element from queue + * @param key + * @param value + */ + void removeNode(String key, String value); /** * add an element to the set diff --git a/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java b/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java index ec11f6dbe6..1b39cb1e8d 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java @@ -137,10 +137,11 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { * * 流程实例优先级_流程实例id_任务优先级_任务id high <- low * @param key task queue name + * @param remove whether remove the element * @return the task id to be executed */ @Override - public String poll(String key) { + public String poll(String key, boolean remove) { try{ CuratorFramework zk = getZkClient(); String tasksQueuePath = getTasksPath(key) + Constants.SINGLE_SLASH; @@ -181,18 +182,11 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { String[] vals = targetTaskKey.split(Constants.UNDERLINE); - try{ - zk.delete().forPath(taskIdPath); - -// String path = conf.getString(Constants.ZOOKEEPER_SCHEDULER_ROOT) + Constants.SINGLE_SLASH + Constants.SCHEDULER_TASKS_QUEUE + "_remove" + Constants.SINGLE_SLASH + targetTaskKey; -// getZkClient().create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, -// Bytes.toBytes(targetTaskKey)); - }catch(Exception e){ - logger.error(String.format("delete task:%s from zookeeper fail, task detail: %s exception" ,targetTaskKey, vals[vals.length - 1]) ,e); + if(remove){ + removeNode(key, targetTaskKey); } - logger.info("consume task: {},there still have {} tasks need to be executed", targetTaskKey, size - 1); - - return vals[vals.length - 1]; + logger.info("consume task: {},there still have {} tasks need to be executed", vals[vals.length - 1], size - 1); + return targetTaskKey; }else{ logger.error("should not go here, task queue poll error, please check!"); } @@ -204,6 +198,21 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { return null; } + @Override + public void removeNode(String key, String nodeValue){ + + CuratorFramework zk = getZkClient(); + String tasksQueuePath = getTasksPath(key) + Constants.SINGLE_SLASH; + String taskIdPath = tasksQueuePath + nodeValue; + logger.info("consume task {}", taskIdPath); + try{ + zk.delete().forPath(taskIdPath); + }catch(Exception e){ + logger.error(String.format("delete task:%s from zookeeper fail, exception:" ,nodeValue) ,e); + } + + } + /** diff --git a/escheduler-common/src/test/java/cn/escheduler/common/queue/TaskQueueImplTest.java b/escheduler-common/src/test/java/cn/escheduler/common/queue/TaskQueueImplTest.java index 5bc8931839..03ba29a840 100644 --- a/escheduler-common/src/test/java/cn/escheduler/common/queue/TaskQueueImplTest.java +++ b/escheduler-common/src/test/java/cn/escheduler/common/queue/TaskQueueImplTest.java @@ -49,9 +49,9 @@ public class TaskQueueImplTest { tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"4"); //pop - String node1 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE); + String node1 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, false); assertEquals(node1,"1"); - String node2 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE); + String node2 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, false); assertEquals(node2,"2"); //sadd @@ -99,7 +99,7 @@ public class TaskQueueImplTest { } } - String node1 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE); + String node1 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, false); assertEquals(node1,"0"); //clear all data diff --git a/escheduler-dao/readme.txt b/escheduler-dao/readme.txt index 9c8471ad95..6e7acb635a 100644 --- a/escheduler-dao/readme.txt +++ b/escheduler-dao/readme.txt @@ -12,9 +12,9 @@ CREATE TABLE `t_escheduler_access_token` ( PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8; -CREATE TABLE `escheduler`.`t_escheduler_error_command` ( +CREATE TABLE `t_escheduler_error_command` ( `id` int(11) NOT NULL COMMENT '主键', - `command_type` tinyint(4) NULL DEFAULT NULL COMMENT '命令类型:0 启动工作流,1 从当前节点开始执行,2 恢复被容错的工作流,3 恢复暂停流程 4 从失败节点开始执行', + `command_type` tinyint(4) NULL DEFAULT NULL COMMENT '命令类型:0 启动工作流,1 从当前节点开始执行,2 恢复被容错的工作流,3 恢复暂停流程,4 从失败节点开始执行,5 补数,6 调度,7 重跑,8 暂停,9 停止,10 恢复等待线程', `executor_id` int(11) NULL DEFAULT NULL COMMENT '命令执行者', `process_definition_id` int(11) NULL DEFAULT NULL COMMENT '流程定义id', `command_param` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '命令的参数(json格式)', @@ -30,3 +30,25 @@ CREATE TABLE `escheduler`.`t_escheduler_error_command` ( `message` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '执行信息', PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic; + + +CREATE TABLE `t_escheduler_worker_group` ( + `id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT 'id', + `name` varchar(256) CHARACTER SET latin1 COLLATE latin1_swedish_ci NULL DEFAULT NULL COMMENT '组名称', + `ip_list` varchar(256) CHARACTER SET latin1 COLLATE latin1_swedish_ci NULL DEFAULT NULL COMMENT 'worker地址列表', + `create_time` datetime(0) NULL DEFAULT NULL COMMENT '创建时间', + `update_time` datetime(0) NULL DEFAULT NULL COMMENT '更新时间', + PRIMARY KEY (`id`) USING BTREE +) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic; + +ALTER TABLE `t_escheduler_task_instance` +ADD COLUMN `worker_group_id` int(11) NULL DEFAULT -1 COMMENT '任务指定运行的worker分组' AFTER `task_instance_priority`; + +ALTER TABLE `t_escheduler_command` +ADD COLUMN `worker_group_id` int(11) NULL DEFAULT -1 COMMENT '任务指定运行的worker分组' NULL AFTER `process_instance_priority`; + +ALTER TABLE `t_escheduler_error_command` +ADD COLUMN `worker_group_id` int(11) NULL DEFAULT -1 COMMENT '任务指定运行的worker分组' NULL AFTER `process_instance_priority`; + +ALTER TABLE `t_escheduler_schedules` +ADD COLUMN `worker_group_id` int(11) NULL DEFAULT -1 COMMENT '任务指定运行的worker分组' NULL AFTER `process_instance_priority`; \ No newline at end of file diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java b/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java index ec458f98a9..bb48d8e523 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java @@ -88,6 +88,9 @@ public class ProcessDao extends AbstractBaseDao { @Autowired private ResourceMapper resourceMapper; + @Autowired + private WorkerGroupMapper workerGroupMapper; + @Autowired private ErrorCommandMapper errorCommandMapper; @@ -115,6 +118,7 @@ public class ProcessDao extends AbstractBaseDao { scheduleMapper = getMapper(ScheduleMapper.class); udfFuncMapper = getMapper(UdfFuncMapper.class); resourceMapper = getMapper(ResourceMapper.class); + workerGroupMapper = getMapper(WorkerGroupMapper.class); taskQueue = TaskQueueFactory.getTaskQueueInstance(); } @@ -477,6 +481,7 @@ public class ProcessDao extends AbstractBaseDao { processInstance.setProcessInstanceJson(processDefinition.getProcessDefinitionJson()); // set process instance priority processInstance.setProcessInstancePriority(command.getProcessInstancePriority()); + processInstance.setWorkerGroupId(command.getWorkerGroupId()); return processInstance; } @@ -1575,6 +1580,15 @@ public class ProcessDao extends AbstractBaseDao { return userMapper.queryQueueByProcessInstanceId(processInstanceId); } + /** + * query worker group by id + * @param workerGroupId + * @return + */ + public WorkerGroup queryWorkerGroupById(int workerGroupId){ + return workerGroupMapper.queryById(workerGroupId); + } + } diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/CommandMapper.java b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/CommandMapper.java index a577e99a75..c7e22bb02d 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/CommandMapper.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/CommandMapper.java @@ -79,6 +79,7 @@ public interface CommandMapper { @Result(property = "scheduleTime", column = "schedule_time", javaType = Timestamp.class, jdbcType = JdbcType.DATE), @Result(property = "updateTime", column = "update_time", javaType = Timestamp.class, jdbcType = JdbcType.DATE), @Result(property = "startTime", column = "start_time", javaType = Timestamp.class, jdbcType = JdbcType.DATE), + @Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER), @Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) }) @SelectProvider(type = CommandMapperProvider.class, method = "queryOneCommand") @@ -101,6 +102,7 @@ public interface CommandMapper { @Result(property = "scheduleTime", column = "schedule_time", javaType = Timestamp.class, jdbcType = JdbcType.DATE), @Result(property = "updateTime", column = "update_time", javaType = Timestamp.class, jdbcType = JdbcType.DATE), @Result(property = "startTime", column = "start_time", javaType = Timestamp.class, jdbcType = JdbcType.DATE), + @Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER), @Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) }) @SelectProvider(type = CommandMapperProvider.class, method = "queryAllCommand") diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/CommandMapperProvider.java b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/CommandMapperProvider.java index e32f047125..712afe0bf1 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/CommandMapperProvider.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/CommandMapperProvider.java @@ -51,6 +51,7 @@ public class CommandMapperProvider { VALUES("`warning_group_id`", "#{command.warningGroupId}"); VALUES("`schedule_time`", "#{command.scheduleTime}"); VALUES("`update_time`", "#{command.updateTime}"); + VALUES("`worker_group_id`", "#{command.workerGroupId}"); VALUES("`start_time`", "#{command.startTime}"); } @@ -95,6 +96,7 @@ public class CommandMapperProvider { SET("`warning_group_id`=#{command.warningGroupId}"); SET("`schedule_time`=#{command.scheduleTime}"); SET("`update_time`=#{command.updateTime}"); + SET("`worker_group_id`=#{command.workerGroupId}"); SET("`start_time`=#{command.startTime}"); WHERE("`id`=#{command.id}"); @@ -166,8 +168,4 @@ public class CommandMapperProvider { } }.toString(); } - - - - } diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ErrorCommandMapperProvider.java b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ErrorCommandMapperProvider.java index 46cec52aa8..9ea05229ef 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ErrorCommandMapperProvider.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ErrorCommandMapperProvider.java @@ -22,6 +22,7 @@ public class ErrorCommandMapperProvider { return new SQL() { { INSERT_INTO(TABLE_NAME); + VALUES("`id`", "#{errorCommand.id}"); VALUES("`command_type`", EnumFieldUtil.genFieldStr("errorCommand.commandType", CommandType.class)); VALUES("`process_definition_id`", "#{errorCommand.processDefinitionId}"); VALUES("`executor_id`", "#{errorCommand.executorId}"); @@ -34,6 +35,7 @@ public class ErrorCommandMapperProvider { VALUES("`schedule_time`", "#{errorCommand.scheduleTime}"); VALUES("`update_time`", "#{errorCommand.updateTime}"); VALUES("`start_time`", "#{errorCommand.startTime}"); + VALUES("`worker_group_id`", "#{errorCommand.workerGroupId}"); VALUES("`message`", "#{errorCommand.message}"); } }.toString(); diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessInstanceMapper.java b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessInstanceMapper.java index 3eb77af92b..b3f9f10b8b 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessInstanceMapper.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessInstanceMapper.java @@ -94,6 +94,7 @@ public interface ProcessInstanceMapper { @Result(property = "dependenceScheduleTimes", column = "dependence_schedule_times", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "duration", column = "duration", javaType = Long.class, jdbcType = JdbcType.BIGINT), @Result(property = "tenantCode", column = "tenant_code", javaType = String.class, jdbcType = JdbcType.VARCHAR), + @Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER), @Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) }) @SelectProvider(type = ProcessInstanceMapperProvider.class, method = "queryDetailById") @@ -131,6 +132,7 @@ public interface ProcessInstanceMapper { @Result(property = "connects", column = "connects", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "historyCmd", column = "history_cmd", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "dependenceScheduleTimes", column = "dependence_schedule_times", javaType = String.class, jdbcType = JdbcType.VARCHAR), + @Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER), @Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) }) @SelectProvider(type = ProcessInstanceMapperProvider.class, method = "queryById") @@ -168,6 +170,7 @@ public interface ProcessInstanceMapper { @Result(property = "historyCmd", column = "history_cmd", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "dependenceScheduleTimes", column = "dependence_schedule_times", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR), + @Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER), @Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) }) @@ -205,6 +208,7 @@ public interface ProcessInstanceMapper { @Result(property = "historyCmd", column = "history_cmd", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "dependenceScheduleTimes", column = "dependence_schedule_times", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR), + @Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER), @Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) }) @@ -251,6 +255,7 @@ public interface ProcessInstanceMapper { @Result(property = "dependenceScheduleTimes", column = "dependence_schedule_times", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "duration", column = "duration", javaType = Long.class, jdbcType = JdbcType.BIGINT), @Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR), + @Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER), @Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) }) @@ -346,6 +351,7 @@ public interface ProcessInstanceMapper { @Result(property = "dependenceScheduleTimes", column = "dependence_schedule_times", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "duration", column = "duration", javaType = Long.class, jdbcType = JdbcType.BIGINT), @Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR), + @Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER), @Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) }) @@ -437,6 +443,7 @@ public interface ProcessInstanceMapper { @Result(property = "dependenceScheduleTimes", column = "dependence_schedule_times", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "duration", column = "duration", javaType = Long.class, jdbcType = JdbcType.BIGINT), @Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR), + @Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER), @Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) }) @@ -480,6 +487,7 @@ public interface ProcessInstanceMapper { @Result(property = "dependenceScheduleTimes", column = "dependence_schedule_times", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "duration", column = "duration", javaType = Long.class, jdbcType = JdbcType.BIGINT), @Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR), + @Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER), @Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) }) @@ -523,6 +531,7 @@ public interface ProcessInstanceMapper { @Result(property = "historyCmd", column = "history_cmd", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "dependenceScheduleTimes", column = "dependence_schedule_times", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR), + @Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER), @Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) }) @@ -564,6 +573,7 @@ public interface ProcessInstanceMapper { @Result(property = "historyCmd", column = "history_cmd", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "dependenceScheduleTimes", column = "dependence_schedule_times", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR), + @Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER), @Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) }) @SelectProvider(type = ProcessInstanceMapperProvider.class, method = "queryLastRunningProcess") @@ -605,6 +615,7 @@ public interface ProcessInstanceMapper { @Result(property = "historyCmd", column = "history_cmd", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "dependenceScheduleTimes", column = "dependence_schedule_times", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR), + @Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER), @Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) }) @SelectProvider(type = ProcessInstanceMapperProvider.class, method = "queryLastManualProcess") diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessInstanceMapperProvider.java b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessInstanceMapperProvider.java index 05e1538c8d..92df768732 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessInstanceMapperProvider.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessInstanceMapperProvider.java @@ -67,6 +67,7 @@ public class ProcessInstanceMapperProvider { VALUES("`dependence_schedule_times`", "#{processInstance.dependenceScheduleTimes}"); VALUES("`is_sub_process`", EnumFieldUtil.genFieldStr("processInstance.isSubProcess", Flag.class)); VALUES("`executor_id`", "#{processInstance.executorId}"); + VALUES("`worker_group_id`", "#{processInstance.workerGroupId}"); VALUES("`process_instance_priority`", EnumFieldUtil.genFieldStr("processInstance.processInstancePriority", Priority.class)); } }.toString(); @@ -139,6 +140,7 @@ public class ProcessInstanceMapperProvider { SET("`dependence_schedule_times`=#{processInstance.dependenceScheduleTimes}"); SET("`is_sub_process`="+EnumFieldUtil.genFieldStr("processInstance.isSubProcess", Flag.class)); SET("`executor_id`=#{processInstance.executorId}"); + SET("`worker_group_id`=#{processInstance.workerGroupId}"); WHERE("`id`=#{processInstance.id}"); diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ScheduleMapper.java b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ScheduleMapper.java index 2e6f3fc40b..f1a48bde94 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ScheduleMapper.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ScheduleMapper.java @@ -75,6 +75,7 @@ public interface ScheduleMapper { @Result(property = "userName", column = "user_name", javaType = String.class, jdbcType = JdbcType.INTEGER), @Result(property = "releaseState", column = "release_state", typeHandler = EnumOrdinalTypeHandler.class, javaType = ReleaseState.class, jdbcType = JdbcType.TINYINT), @Result(property = "warningGroupId", column = "warning_group_id", javaType = int.class, jdbcType = JdbcType.INTEGER), + @Result(property = "workerGroupId", column = "worker_group_id", javaType = int.class, jdbcType = JdbcType.INTEGER), @Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) }) @SelectProvider(type = ScheduleMapperProvider.class, method = "queryByProcessDefineIdPaging") @@ -117,6 +118,7 @@ public interface ScheduleMapper { @Result(property = "userName", column = "user_name", javaType = String.class, jdbcType = JdbcType.INTEGER), @Result(property = "releaseState", column = "release_state", typeHandler = EnumOrdinalTypeHandler.class, javaType = ReleaseState.class, jdbcType = JdbcType.TINYINT), @Result(property = "warningGroupId", column = "warning_group_id", javaType = int.class, jdbcType = JdbcType.INTEGER), + @Result(property = "workerGroupId", column = "worker_group_id", javaType = int.class, jdbcType = JdbcType.INTEGER), @Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) }) @SelectProvider(type = ScheduleMapperProvider.class, method = "querySchedulerListByProjectName") @@ -141,6 +143,7 @@ public interface ScheduleMapper { @Result(property = "userId", column = "user_id", javaType = int.class, jdbcType = JdbcType.INTEGER), @Result(property = "releaseState", column = "release_state", typeHandler = EnumOrdinalTypeHandler.class, javaType = ReleaseState.class, jdbcType = JdbcType.TINYINT), @Result(property = "warningGroupId", column = "warning_group_id", javaType = int.class, jdbcType = JdbcType.INTEGER), + @Result(property = "workerGroupId", column = "worker_group_id", javaType = int.class, jdbcType = JdbcType.INTEGER), @Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) }) @SelectProvider(type = ScheduleMapperProvider.class, method = "queryById") @@ -164,6 +167,7 @@ public interface ScheduleMapper { @Result(property = "userId", column = "user_id", javaType = int.class, jdbcType = JdbcType.INTEGER), @Result(property = "releaseState", column = "release_state", typeHandler = EnumOrdinalTypeHandler.class, javaType = ReleaseState.class, jdbcType = JdbcType.TINYINT), @Result(property = "warningGroupId", column = "warning_group_id", javaType = int.class, jdbcType = JdbcType.INTEGER), + @Result(property = "workerGroupId", column = "worker_group_id", javaType = int.class, jdbcType = JdbcType.INTEGER), @Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) }) @SelectProvider(type = ScheduleMapperProvider.class, method = "selectAllByProcessDefineArray") diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ScheduleMapperProvider.java b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ScheduleMapperProvider.java index af82318da2..5674675a25 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ScheduleMapperProvider.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ScheduleMapperProvider.java @@ -48,6 +48,7 @@ public class ScheduleMapperProvider { VALUES("`user_id`", "#{schedule.userId}"); VALUES("`release_state`", EnumFieldUtil.genFieldStr("schedule.releaseState", ReleaseState.class)); VALUES("`warning_group_id`", "#{schedule.warningGroupId}"); + VALUES("`worker_group_id`", "#{schedule.workerGroupId}"); VALUES("`process_instance_priority`", EnumFieldUtil.genFieldStr("schedule.processInstancePriority", Priority.class)); }}.toString(); } @@ -67,6 +68,7 @@ public class ScheduleMapperProvider { SET("`user_id`=#{schedule.userId}"); SET("`release_state`=" + EnumFieldUtil.genFieldStr("schedule.releaseState", ReleaseState.class)); SET("`warning_group_id`=#{schedule.warningGroupId}"); + SET("`worker_group_id`=#{schedule.workerGroupId}"); SET("`process_instance_priority`="+ EnumFieldUtil.genFieldStr("schedule.processInstancePriority", Priority.class)); WHERE("`id` = #{schedule.id}"); diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/TaskInstanceMapper.java b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/TaskInstanceMapper.java index c3d44440d2..7dfa2543a8 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/TaskInstanceMapper.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/TaskInstanceMapper.java @@ -88,6 +88,7 @@ public interface TaskInstanceMapper { @Result(property = "appLink", column = "app_link", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "duration", column = "duration", javaType = Long.class, jdbcType = JdbcType.BIGINT), @Result(property = "flag", column = "flag", typeHandler = EnumOrdinalTypeHandler.class, javaType = Flag.class, jdbcType = JdbcType.TINYINT), + @Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER), @Result(property = "taskInstancePriority", column = "task_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) }) @SelectProvider(type = TaskInstanceMapperProvider.class, method = "queryById") @@ -131,6 +132,7 @@ public interface TaskInstanceMapper { @Result(property = "appLink", column = "app_link", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "duration", column = "duration", javaType = Long.class, jdbcType = JdbcType.BIGINT), @Result(property = "flag", column = "flag", typeHandler = EnumOrdinalTypeHandler.class, javaType = Flag.class, jdbcType = JdbcType.TINYINT), + @Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER), @Result(property = "taskInstancePriority", column = "task_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) }) @SelectProvider(type = TaskInstanceMapperProvider.class, method = "findValidTaskListByProcessId") @@ -164,6 +166,7 @@ public interface TaskInstanceMapper { @Result(property = "appLink", column = "app_link", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "duration", column = "duration", javaType = Long.class, jdbcType = JdbcType.BIGINT), @Result(property = "flag", column = "flag", typeHandler = EnumOrdinalTypeHandler.class, javaType = Flag.class, jdbcType = JdbcType.TINYINT), + @Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER), @Result(property = "taskInstancePriority", column = "task_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) }) @SelectProvider(type = TaskInstanceMapperProvider.class, method = "queryByHostAndStatus") @@ -255,6 +258,7 @@ public interface TaskInstanceMapper { @Result(property = "appLink", column = "app_link", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "duration", column = "duration", javaType = Long.class, jdbcType = JdbcType.BIGINT), @Result(property = "flag", column = "flag", typeHandler = EnumOrdinalTypeHandler.class, javaType = Flag.class, jdbcType = JdbcType.TINYINT), + @Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER), @Result(property = "taskInstancePriority", column = "task_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) }) @SelectProvider(type = TaskInstanceMapperProvider.class, method = "queryTaskInstanceListPaging") @@ -299,6 +303,7 @@ public interface TaskInstanceMapper { @Result(property = "appLink", column = "app_link", javaType = String.class, jdbcType = JdbcType.VARCHAR), @Result(property = "duration", column = "duration", javaType = Long.class, jdbcType = JdbcType.BIGINT), @Result(property = "flag", column = "flag", typeHandler = EnumOrdinalTypeHandler.class, javaType = Flag.class, jdbcType = JdbcType.TINYINT), + @Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER), @Result(property = "taskInstancePriority", column = "task_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT) }) @SelectProvider(type = TaskInstanceMapperProvider.class, method = "queryByInstanceIdAndName") diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/TaskInstanceMapperProvider.java b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/TaskInstanceMapperProvider.java index 141b152568..511b0970fe 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/TaskInstanceMapperProvider.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/TaskInstanceMapperProvider.java @@ -62,6 +62,7 @@ public class TaskInstanceMapperProvider { VALUES("`max_retry_times`", "#{taskInstance.maxRetryTimes}"); VALUES("`retry_interval`", "#{taskInstance.retryInterval}"); VALUES("`app_link`", "#{taskInstance.appLink}"); + VALUES("`worker_group_id`", "#{taskInstance.workerGroupId}"); VALUES("`flag`", EnumFieldUtil.genFieldStr("taskInstance.flag", Flag.class)); VALUES("`task_instance_priority`", EnumFieldUtil.genFieldStr("taskInstance.taskInstancePriority", Priority.class)); @@ -114,6 +115,7 @@ public class TaskInstanceMapperProvider { SET("`max_retry_times`=#{taskInstance.maxRetryTimes}"); SET("`retry_interval`=#{taskInstance.retryInterval}"); SET("`app_link`=#{taskInstance.appLink}"); + SET("`worker_group_id`=#{taskInstance.workerGroupId}"); SET("`flag`="+ EnumFieldUtil.genFieldStr("taskInstance.flag", Flag.class)); SET("`task_instance_priority`="+ EnumFieldUtil.genFieldStr("taskInstance.taskInstancePriority", Priority.class)); diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/WorkerGroupMapper.java b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/WorkerGroupMapper.java new file mode 100644 index 0000000000..df4b9126c9 --- /dev/null +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/WorkerGroupMapper.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cn.escheduler.dao.mapper; + +import cn.escheduler.dao.model.WorkerGroup; +import org.apache.ibatis.annotations.*; +import org.apache.ibatis.type.JdbcType; + +import java.util.Date; +import java.util.List; + +/** + * worker group mapper + */ +public interface WorkerGroupMapper { + + /** + * query all worker group list + * + * @return + */ + @Results(value = { + @Result(property = "id", column = "id", javaType = Integer.class, jdbcType = JdbcType.INTEGER), + @Result(property = "ipList", column = "ip_list", javaType = String.class, jdbcType = JdbcType.VARCHAR), + @Result(property = "name", column = "name", javaType = String.class, jdbcType = JdbcType.VARCHAR), + @Result(property = "createTime", column = "create_time", javaType = Date.class, jdbcType = JdbcType.TIMESTAMP), + @Result(property = "updateTime", column = "update_time", javaType = Date.class, jdbcType = JdbcType.TIMESTAMP), + }) + @SelectProvider(type = WorkerGroupMapperProvider.class, method = "queryAllWorkerGroup") + List queryAllWorkerGroup(); + + /** + * query worker group by name + * + * @return + */ + @Results(value = { + @Result(property = "id", column = "id", javaType = Integer.class, jdbcType = JdbcType.INTEGER), + @Result(property = "ipList", column = "ip_list", javaType = String.class, jdbcType = JdbcType.VARCHAR), + @Result(property = "name", column = "name", javaType = String.class, jdbcType = JdbcType.VARCHAR), + @Result(property = "createTime", column = "create_time", javaType = Date.class, jdbcType = JdbcType.TIMESTAMP), + @Result(property = "updateTime", column = "update_time", javaType = Date.class, jdbcType = JdbcType.TIMESTAMP), + }) + @SelectProvider(type = WorkerGroupMapperProvider.class, method = "queryWorkerGroupByName") + List queryWorkerGroupByName(@Param("name") String name); + + /** + * query worker group paging by search value + * + * @return + */ + @Results(value = { + @Result(property = "id", column = "id", javaType = Integer.class, jdbcType = JdbcType.INTEGER), + @Result(property = "ipList", column = "ip_list", javaType = String.class, jdbcType = JdbcType.VARCHAR), + @Result(property = "name", column = "name", javaType = String.class, jdbcType = JdbcType.VARCHAR), + @Result(property = "createTime", column = "create_time", javaType = Date.class, jdbcType = JdbcType.TIMESTAMP), + @Result(property = "updateTime", column = "update_time", javaType = Date.class, jdbcType = JdbcType.TIMESTAMP), + }) + @SelectProvider(type = WorkerGroupMapperProvider.class, method = "queryListPaging") + List queryListPaging(@Param("offset") int offset, + @Param("pageSize") int pageSize, + @Param("searchVal") String searchVal); + + /** + * count worker group by search value + * @param searchVal + * @return + */ + @SelectProvider(type = WorkerGroupMapperProvider.class, method = "countPaging") + int countPaging(@Param("searchVal") String searchVal); + + /** + * insert worker server + * + * @param workerGroup + * @return + */ + @InsertProvider(type = WorkerGroupMapperProvider.class, method = "insert") + @Options(useGeneratedKeys = true,keyProperty = "workerGroup.id") + @SelectKey(statement = "SELECT LAST_INSERT_ID()", keyProperty = "workerGroup.id", before = false, resultType = int.class) + int insert(@Param("workerGroup") WorkerGroup workerGroup); + + /** + * update worker + * + * @param workerGroup + * @return + */ + @UpdateProvider(type = WorkerGroupMapperProvider.class, method = "update") + int update(@Param("workerGroup") WorkerGroup workerGroup); + + /** + * delete work group by id + * @param id + * @return + */ + @DeleteProvider(type = WorkerGroupMapperProvider.class, method = "deleteById") + int deleteById(@Param("id") int id); + + /** + * query work group by id + * @param id + * @return + */ + @Results(value = { + @Result(property = "id", column = "id", javaType = Integer.class, jdbcType = JdbcType.INTEGER), + @Result(property = "ipList", column = "ip_list", javaType = String.class, jdbcType = JdbcType.VARCHAR), + @Result(property = "name", column = "name", javaType = String.class, jdbcType = JdbcType.VARCHAR), + @Result(property = "createTime", column = "create_time", javaType = Date.class, jdbcType = JdbcType.TIMESTAMP), + @Result(property = "updateTime", column = "update_time", javaType = Date.class, jdbcType = JdbcType.TIMESTAMP), + }) + @SelectProvider(type = WorkerGroupMapperProvider.class, method = "queryById") + WorkerGroup queryById(@Param("id") int id); + + + +} diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/WorkerGroupMapperProvider.java b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/WorkerGroupMapperProvider.java new file mode 100644 index 0000000000..a6c1947089 --- /dev/null +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/WorkerGroupMapperProvider.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cn.escheduler.dao.mapper; + + +import org.apache.commons.lang3.StringUtils; +import org.apache.ibatis.jdbc.SQL; + +import java.util.Map; + +/** + * worker group mapper provider + */ +public class WorkerGroupMapperProvider { + + private static final String TABLE_NAME = "t_escheduler_worker_group"; + + /** + * query worker list + * @return + */ + public String queryAllWorkerGroup() { + return new SQL() {{ + SELECT("*"); + + FROM(TABLE_NAME); + + ORDER_BY("update_time desc"); + }}.toString(); + } + + /** + * insert worker server + * @param parameter + * @return + */ + public String insert(Map parameter) { + return new SQL() {{ + INSERT_INTO(TABLE_NAME); + + VALUES("id", "#{workerGroup.id}"); + VALUES("name", "#{workerGroup.name}"); + VALUES("ip_list", "#{workerGroup.ipList}"); + VALUES("create_time", "#{workerGroup.createTime}"); + VALUES("update_time", "#{workerGroup.updateTime}"); + }}.toString(); + } + + /** + * update worker group + * + * @param parameter + * @return + */ + public String update(Map parameter) { + return new SQL() {{ + UPDATE(TABLE_NAME); + + SET("name = #{workerGroup.name}"); + SET("ip_list = #{workerGroup.ipList}"); + SET("create_time = #{workerGroup.createTime}"); + SET("update_time = #{workerGroup.updateTime}"); + + WHERE("id = #{workerGroup.id}"); + }}.toString(); + } + + /** + * delete worker group by id + * @param parameter + * @return + */ + public String deleteById(Map parameter) { + return new SQL() {{ + DELETE_FROM(TABLE_NAME); + + WHERE("id = #{id}"); + }}.toString(); + } + + /** + * query worker group by name + * @param parameter + * @return + */ + public String queryWorkerGroupByName(Map parameter) { + return new SQL() {{ + + SELECT("*"); + FROM(TABLE_NAME); + + WHERE("name = #{name}"); + }}.toString(); + } + + /** + * query worker group by id + * @param parameter + * @return + */ + public String queryById(Map parameter) { + return new SQL() {{ + + SELECT("*"); + FROM(TABLE_NAME); + + WHERE("id = #{id}"); + }}.toString(); + } + + +/** + * query worker group by id + * @param parameter + * @return + */ + public String queryListPaging(Map parameter) { + return new SQL() {{ + + SELECT("*"); + FROM(TABLE_NAME); + + Object searchVal = parameter.get("searchVal"); + if(searchVal != null && StringUtils.isNotEmpty(searchVal.toString())){ + WHERE( " name like concat('%', #{searchVal}, '%') "); + } + ORDER_BY(" update_time desc limit #{offset},#{pageSize} "); + }}.toString(); + } + + /** + * count worker group number by search value + * @param parameter + * @return + */ + public String countPaging(Map parameter) { + return new SQL() {{ + SELECT("count(0)"); + FROM(TABLE_NAME); + Object searchVal = parameter.get("searchVal"); + if(searchVal != null && StringUtils.isNotEmpty(searchVal.toString())){ + WHERE( " name like concat('%', #{searchVal}, '%') "); + } + }}.toString(); + } +} diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/model/Command.java b/escheduler-dao/src/main/java/cn/escheduler/dao/model/Command.java index 399e378ae8..e2a31a6702 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/model/Command.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/model/Command.java @@ -91,6 +91,12 @@ public class Command { private Date updateTime; + /** + * + */ + private int workerGroupId; + + public Command(){ this.taskDependType = TaskDependType.TASK_POST; this.failureStrategy = FailureStrategy.CONTINUE; @@ -229,6 +235,15 @@ public class Command { this.updateTime = updateTime; } + + public int getWorkerGroupId() { + return workerGroupId; + } + + public void setWorkerGroupId(int workerGroupId) { + this.workerGroupId = workerGroupId; + } + @Override public String toString() { return "Command{" + @@ -245,6 +260,7 @@ public class Command { ", startTime=" + startTime + ", processInstancePriority=" + processInstancePriority + ", updateTime=" + updateTime + + ", workerGroupId=" + workerGroupId + '}'; } } diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/model/ErrorCommand.java b/escheduler-dao/src/main/java/cn/escheduler/dao/model/ErrorCommand.java index 2128455703..3f839c5868 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/model/ErrorCommand.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/model/ErrorCommand.java @@ -95,6 +95,11 @@ public class ErrorCommand { */ private String message; + /** + * worker group id + */ + private int workerGroupId; + public ErrorCommand(Command command, String message){ this.commandType = command.getCommandType(); @@ -245,6 +250,14 @@ public class ErrorCommand { this.updateTime = updateTime; } + public int getWorkerGroupId() { + return workerGroupId; + } + + public void setWorkerGroupId(int workerGroupId) { + this.workerGroupId = workerGroupId; + } + @Override public String toString() { return "Command{" + @@ -272,4 +285,6 @@ public class ErrorCommand { public void setMessage(String message) { this.message = message; } + + } diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessInstance.java b/escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessInstance.java index 06656ebeba..b37450c330 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessInstance.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessInstance.java @@ -177,6 +177,12 @@ public class ProcessInstance { */ private Priority processInstancePriority; + + /** + * worker group id + */ + private int workerGroupId; + public ProcessInstance(){ } @@ -481,6 +487,13 @@ public class ProcessInstance { this.duration = duration; } + public int getWorkerGroupId() { + return workerGroupId; + } + + public void setWorkerGroupId(int workerGroupId) { + this.workerGroupId = workerGroupId; + } @Override public String toString() { diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/model/Schedule.java b/escheduler-dao/src/main/java/cn/escheduler/dao/model/Schedule.java index 40579a53ac..39926947c1 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/model/Schedule.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/model/Schedule.java @@ -111,6 +111,11 @@ public class Schedule { */ private Priority processInstancePriority; + /** + * worker group id + */ + private int workerGroupId; + public int getWarningGroupId() { return warningGroupId; } @@ -256,6 +261,15 @@ public class Schedule { this.processInstancePriority = processInstancePriority; } + + public int getWorkerGroupId() { + return workerGroupId; + } + + public void setWorkerGroupId(int workerGroupId) { + this.workerGroupId = workerGroupId; + } + @Override public String toString() { return "Schedule{" + @@ -276,6 +290,8 @@ public class Schedule { ", releaseState=" + releaseState + ", warningGroupId=" + warningGroupId + ", processInstancePriority=" + processInstancePriority + + ", workerGroupId=" + workerGroupId + '}'; } + } diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/model/TaskInstance.java b/escheduler-dao/src/main/java/cn/escheduler/dao/model/TaskInstance.java index 9d63f7cc7e..f37b1e4349 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/model/TaskInstance.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/model/TaskInstance.java @@ -182,6 +182,13 @@ public class TaskInstance { private String dependentResult; + /** + * worker group id + * @return + */ + private int workerGroupId; + + public ProcessInstance getProcessInstance() { return processInstance; } @@ -439,6 +446,14 @@ public class TaskInstance { this.processInstancePriority = processInstancePriority; } + public int getWorkerGroupId() { + return workerGroupId; + } + + public void setWorkerGroupId(int workerGroupId) { + this.workerGroupId = workerGroupId; + } + @Override public String toString() { return "TaskInstance{" + @@ -470,6 +485,7 @@ public class TaskInstance { ", retryInterval=" + retryInterval + ", taskInstancePriority=" + taskInstancePriority + ", processInstancePriority=" + processInstancePriority + + ", workGroupId=" + workerGroupId + '}'; } diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/model/WorkerGroup.java b/escheduler-dao/src/main/java/cn/escheduler/dao/model/WorkerGroup.java new file mode 100644 index 0000000000..664fd93463 --- /dev/null +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/model/WorkerGroup.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cn.escheduler.dao.model; + +import java.util.Date; + +/** + * worker group for task running + */ +public class WorkerGroup { + + private int id; + + private String name; + + private String ipList; + + private Date createTime; + + private Date updateTime; + + + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } + + public String getIpList() { + return ipList; + } + + public void setIpList(String ipList) { + this.ipList = ipList; + } + + public Date getCreateTime() { + return createTime; + } + + public void setCreateTime(Date createTime) { + this.createTime = createTime; + } + + public Date getUpdateTime() { + return updateTime; + } + + public void setUpdateTime(Date updateTime) { + this.updateTime = updateTime; + } + + @Override + public String toString() { + return "Worker group model{" + + "id= " + id + + ",name= " + name + + ",ipList= " + ipList + + ",createTime= " + createTime + + ",updateTime= " + updateTime + + + "}"; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } +} diff --git a/escheduler-dao/src/main/resources/dao/data_source.properties b/escheduler-dao/src/main/resources/dao/data_source.properties index cac3aa5e20..3c89dd1fd2 100644 --- a/escheduler-dao/src/main/resources/dao/data_source.properties +++ b/escheduler-dao/src/main/resources/dao/data_source.properties @@ -1,9 +1,9 @@ # base spring data source configuration spring.datasource.type=com.alibaba.druid.pool.DruidDataSource spring.datasource.driver-class-name=com.mysql.jdbc.Driver -spring.datasource.url=jdbc:mysql://192.168.xx.xx:3306/escheduler?characterEncoding=UTF-8 -spring.datasource.username=xx -spring.datasource.password=xx +spring.datasource.url=jdbc:mysql://192.168.220.188:3306/escheduler_new?characterEncoding=UTF-8 +spring.datasource.username=root +spring.datasource.password=root@123 # connection configuration spring.datasource.initialSize=5 diff --git a/escheduler-dao/src/test/java/cn/escheduler/dao/mapper/WorkerGroupMapperTest.java b/escheduler-dao/src/test/java/cn/escheduler/dao/mapper/WorkerGroupMapperTest.java new file mode 100644 index 0000000000..fa4bdeae20 --- /dev/null +++ b/escheduler-dao/src/test/java/cn/escheduler/dao/mapper/WorkerGroupMapperTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cn.escheduler.dao.mapper; + +import cn.escheduler.dao.datasource.ConnectionFactory; +import cn.escheduler.dao.model.WorkerGroup; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Date; +import java.util.List; + +/** + * worker group mapper test + */ +public class WorkerGroupMapperTest { + + WorkerGroupMapper workerGroupMapper; + + + @Before + public void before() { + workerGroupMapper = ConnectionFactory.getSqlSession().getMapper(WorkerGroupMapper.class); + } + + + @Test + public void test() { + WorkerGroup workerGroup = new WorkerGroup(); + + String name = "workerGroup3"; + workerGroup.setName(name); + workerGroup.setIpList("192.168.220.154,192.168.220.188"); + workerGroup.setCreateTime(new Date()); + workerGroup.setUpdateTime(new Date()); + workerGroupMapper.insert(workerGroup); + Assert.assertNotEquals(workerGroup.getId(), 0); + + List workerGroups2 = workerGroupMapper.queryWorkerGroupByName(name); + Assert.assertEquals(workerGroups2.size(), 1); + + workerGroup.setName("workerGroup11"); + workerGroupMapper.update(workerGroup); + + List workerGroups = workerGroupMapper.queryAllWorkerGroup(); + Assert.assertNotEquals(workerGroups.size(), 0); + + workerGroupMapper.deleteById(workerGroup.getId()); + + workerGroups = workerGroupMapper.queryAllWorkerGroup(); + Assert.assertEquals(workerGroups.size(), 0); + } + +} \ No newline at end of file diff --git a/escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterExecThread.java b/escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterExecThread.java index 101e72e300..b5ed0053a2 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterExecThread.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterExecThread.java @@ -404,6 +404,9 @@ public class MasterExecThread implements Runnable { taskInstance.setTaskInstancePriority(taskNode.getTaskInstancePriority()); } + int workerGroupId = taskNode.getWorkerGroupId(); + taskInstance.setWorkerGroupId(workerGroupId); + } return taskInstance; } diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java index 64e5b57498..f163364b06 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java @@ -26,6 +26,7 @@ import cn.escheduler.dao.ProcessDao; import cn.escheduler.dao.model.ProcessDefinition; import cn.escheduler.dao.model.ProcessInstance; import cn.escheduler.dao.model.TaskInstance; +import cn.escheduler.dao.model.WorkerGroup; import cn.escheduler.server.zk.ZKWorkerClient; import com.cronutils.utils.StringUtils; import org.apache.commons.configuration.Configuration; @@ -33,7 +34,9 @@ import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Arrays; import java.util.Date; +import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; @@ -90,6 +93,42 @@ public class FetchTaskThread implements Runnable{ this.taskQueue = taskQueue; } + /** + * Check if the task runs on this worker + * @param taskInstance + * @param host + * @return + */ + private boolean checkWorkerGroup(TaskInstance taskInstance, String host){ + + int taskWorkerGroupId = taskInstance.getWorkerGroupId(); + ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskInstance.getId()); + if(processInstance == null){ + logger.error("cannot find the task:{} process instance", taskInstance.getId()); + return false; + } + int processWorkerGroupId = processInstance.getWorkerGroupId(); + + taskWorkerGroupId = (taskWorkerGroupId <= 0 ? processWorkerGroupId : taskWorkerGroupId); + + if(taskWorkerGroupId <= 0){ + return true; + } + WorkerGroup workerGroup = processDao.queryWorkerGroupById(taskWorkerGroupId); + if(workerGroup == null ){ + logger.info("task {} cannot find the worker group, use all worker instead.", taskInstance.getId()); + return true; + } + String ips = workerGroup.getIpList(); + if(ips == null){ + logger.error("task:{} worker group:{} parameters(ip_list) is null, this task would be running on all workers", + taskInstance.getId(), workerGroup.getId()); + } + String[] ipArray = ips.split(","); + List ipList = Arrays.asList(ipArray); + return ipList.contains(host); + } + @Override public void run() { @@ -116,11 +155,13 @@ public class FetchTaskThread implements Runnable{ } // task instance id str - String taskInstIdStr = taskQueue.poll(Constants.SCHEDULER_TASKS_QUEUE); + String taskQueueStr = taskQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, false); - if (!StringUtils.isEmpty(taskInstIdStr)) { - Date now = new Date(); + if (!StringUtils.isEmpty(taskQueueStr )) { + String[] taskStringArray = taskQueueStr.split(Constants.UNDERLINE); + String taskInstIdStr = taskStringArray[taskStringArray.length - 1]; + Date now = new Date(); Integer taskId = Integer.parseInt(taskInstIdStr); // find task instance by task id @@ -136,10 +177,15 @@ public class FetchTaskThread implements Runnable{ retryTimes--; } - if (taskInstance == null) { + if (taskInstance == null ) { logger.error("task instance is null. task id : {} ", taskId); continue; } + if(!checkWorkerGroup(taskInstance, OSUtils.getHost())){ + continue; + } + taskQueue.removeNode(Constants.SCHEDULER_TASKS_QUEUE, taskQueueStr); + logger.info("remove task:{} from queue", taskQueueStr); // set execute task worker host taskInstance.setHost(OSUtils.getHost()); diff --git a/escheduler-server/src/test/java/cn/escheduler/server/zk/ZKWorkerClientTest.java b/escheduler-server/src/test/java/cn/escheduler/server/zk/ZKWorkerClientTest.java index f18be54e0c..c8aa0930a2 100644 --- a/escheduler-server/src/test/java/cn/escheduler/server/zk/ZKWorkerClientTest.java +++ b/escheduler-server/src/test/java/cn/escheduler/server/zk/ZKWorkerClientTest.java @@ -3,6 +3,9 @@ package cn.escheduler.server.zk; import cn.escheduler.common.Constants; import org.junit.Test; +import java.util.Arrays; +import java.util.List; + import static org.junit.Assert.*; /** @@ -20,4 +23,12 @@ public class ZKWorkerClientTest { } + @Test + public void test(){ + String ips = ""; + + List ipList = Arrays.asList(ips.split(",")); + + System.out.println(ipList); + } } \ No newline at end of file diff --git a/escheduler-ui/.env b/escheduler-ui/.env index d4dcd9f473..52ce563e9b 100644 --- a/escheduler-ui/.env +++ b/escheduler-ui/.env @@ -1,6 +1,6 @@ # 后端接口地址 -API_BASE = http://192.168.220.154:12345 +API_BASE = http://192.168.220.247:12345 # 本地开发如需ip访问项目把"#"号去掉 #DEV_HOST = 192.168.xx.xx diff --git a/escheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sql.vue b/escheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sql.vue index dcbf9928dc..5d85089720 100644 --- a/escheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sql.vue +++ b/escheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sql.vue @@ -236,6 +236,12 @@ editor.setValue(this.sql) return editor + }, + _getReceiver () { + this.store.dispatch('dag/getReceiver', { processDefinitionId: this.item.id }).then(res => { + this.receivers = res.receivers && res.receivers.split(',') || [] + this.receiversCc = res.receiversCc && res.receiversCc.split(',') || [] + }) } }, watch: { @@ -280,6 +286,10 @@ this.receivers = o.params.receivers && o.params.receivers.split(',') || [] this.receiversCc = o.params.receiversCc && o.params.receiversCc.split(',') || [] } + // + if (this.router.history.current.name === 'definition-create') { + this._getReceiver() + } }, mounted () { setTimeout(() => { diff --git a/escheduler-ui/src/js/conf/home/pages/projects/pages/index/_source/chartConfig.js b/escheduler-ui/src/js/conf/home/pages/projects/pages/index/_source/chartConfig.js index 7ce8b8fb82..ef30ad9a0b 100644 --- a/escheduler-ui/src/js/conf/home/pages/projects/pages/index/_source/chartConfig.js +++ b/escheduler-ui/src/js/conf/home/pages/projects/pages/index/_source/chartConfig.js @@ -15,6 +15,9 @@ * limitations under the License. */ +import _ from 'lodash' +import { tasksState } from '@/conf/home/pages/dag/_source/config' + let pie = { series: [ { @@ -63,4 +66,33 @@ let bar = { }] } -export { pie, bar } +let simple = { + xAxis: { + splitLine: { + show: false + }, + axisLabel: { + interval: 0, + showMaxLabel: true, + formatter (v) { + return tasksState[v].desc + } + } + }, + tooltip: { + formatter (data) { + let str = '' + _.map(data, (v, i) => { + if (i === 0) { + str += `${tasksState[v.name].desc}
` + } + str += `
${v.seriesName} : ${v.data}
` + }) + return str + } + } + + +} + +export { pie, bar, simple } diff --git a/escheduler-ui/src/js/conf/home/pages/projects/pages/index/_source/projectChart.vue b/escheduler-ui/src/js/conf/home/pages/projects/pages/index/_source/projectChart.vue index b37e22fd63..4479a9a77e 100644 --- a/escheduler-ui/src/js/conf/home/pages/projects/pages/index/_source/projectChart.vue +++ b/escheduler-ui/src/js/conf/home/pages/projects/pages/index/_source/projectChart.vue @@ -73,34 +73,7 @@
-
- 命令状态统计 -
-
-
-
-
-
-
- - - - - - - - - - - -
{{$t('#')}}{{$t('Number')}}{{$t('State')}}
{{$index+1}} - - {{item.value}} - - {{item.key}}
-
-
-
+
@@ -108,19 +81,19 @@
-
+
- - + + - + - +
{{$t('#')}}{{$t('Number')}}{{$t('State')}}等待执行任务等待Kill任务
{{$index+1}}{{item.value}}{{item.value}} {{item.key}}
@@ -129,6 +102,16 @@
+
+
+
+ 命令状态统计 +
+
+
+
+
+
@@ -150,7 +133,7 @@ import _ from 'lodash' import dayjs from 'dayjs' import { mapActions } from 'vuex' - import { pie, bar } from './chartConfig' + import { pie, bar, simple } from './chartConfig' import Chart from '~/@analysys/ana-charts' import mSpin from '@/module/components/spin/spin' import mNoData from '@/module/components/noData/noData' @@ -166,6 +149,7 @@ processStateList: [], defineUserList: [], commandStateList: [], + queueList: [], searchParams: { projectId: this.id, startDate: '', @@ -203,7 +187,6 @@ } }) }, - _handleTaskCtatus (res) { let data = res.data.taskCountDtos this.taskCtatusList = _.map(data, v => { @@ -263,10 +246,32 @@ } }, _handleCommandState (res) { - + let data = [] + _.forEach(res.data, (v, i) => { + let key = _.keys(v) + if (key[0] === 'errorCount') { + data.push({ typeName: '错误指令数', key: v.commandState, value: v.errorCount }) + } + }) + _.forEach(res.data, (v, i) => { + let key = _.keys(v) + if (key[1] === 'normalCount') { + data.push({ typeName: '正常指令数', key: v.commandState, value: v.normalCount }) + } + }) + const myChart = Chart.bar('#command-state-bar', data, { + title: '' + }) + myChart.echart.setOption(simple) + }, + _handleQueue (res) { + _.forEach(res.data, (v, k) => this.queueList.push({ + key: k === 'taskQueue' ? '等待执行任务' : '等待kill任务', + value: v + })) + const myChart = Chart.pie('#queue-pie', this.queueList, { title: '' }) + myChart.echart.setOption(pie) }, - _handleQueue () {}, - _getData (is = true) { this.isLoading = true let ioList = [