From 493e4ac912cb7f94fa57ed01f0589ab8b5fe5eda Mon Sep 17 00:00:00 2001 From: qiaozhanwei Date: Wed, 15 May 2019 16:05:10 +0800 Subject: [PATCH 1/6] Delete the process instance and delete the corresponding queue information. --- .../controller/ProcessInstanceController.java | 6 ++- .../api/service/ProcessInstanceService.java | 39 ++++++++++++++----- 2 files changed, 35 insertions(+), 10 deletions(-) diff --git a/escheduler-api/src/main/java/cn/escheduler/api/controller/ProcessInstanceController.java b/escheduler-api/src/main/java/cn/escheduler/api/controller/ProcessInstanceController.java index b05aa656f1..cea47be2c6 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/controller/ProcessInstanceController.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/controller/ProcessInstanceController.java @@ -22,6 +22,8 @@ import cn.escheduler.api.utils.Constants; import cn.escheduler.api.utils.Result; import cn.escheduler.common.enums.ExecutionStatus; import cn.escheduler.common.enums.Flag; +import cn.escheduler.common.queue.ITaskQueue; +import cn.escheduler.common.queue.TaskQueueFactory; import cn.escheduler.common.utils.ParameterUtils; import cn.escheduler.dao.model.User; import org.slf4j.Logger; @@ -189,7 +191,9 @@ public class ProcessInstanceController extends BaseController{ try{ logger.info("delete process instance by id, login user:{}, project name:{}, process instance id:{}", loginUser.getUserName(), projectName, processInstanceId); - Map result = processInstanceService.deleteProcessInstanceById(loginUser, projectName, processInstanceId); + // task queue + ITaskQueue tasksQueue = TaskQueueFactory.getTaskQueueInstance(); + Map result = processInstanceService.deleteProcessInstanceById(loginUser, projectName, processInstanceId,tasksQueue); return returnDataList(result); }catch (Exception e){ logger.error(DELETE_PROCESS_INSTANCE_BY_ID_ERROR.getMsg(),e); diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java index 70abaf07eb..53d83536a0 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java @@ -30,6 +30,8 @@ import cn.escheduler.common.graph.DAG; import cn.escheduler.common.model.TaskNode; import cn.escheduler.common.model.TaskNodeRelation; import cn.escheduler.common.process.Property; +import cn.escheduler.common.queue.ITaskQueue; +import cn.escheduler.common.queue.TaskQueueFactory; import cn.escheduler.common.utils.CollectionUtils; import cn.escheduler.common.utils.DateUtils; import cn.escheduler.common.utils.JSONUtils; @@ -446,13 +448,13 @@ public class ProcessInstanceService extends BaseDAGService { /** * delete process instance by id, at the same time,delete task instance and their mapping relation data - * * @param loginUser * @param projectName - * @param workflowId + * @param processInstanceId + * @param tasksQueue * @return */ - public Map deleteProcessInstanceById(User loginUser, String projectName, Integer workflowId) { + public Map deleteProcessInstanceById(User loginUser, String projectName, Integer processInstanceId,ITaskQueue tasksQueue) { Map result = new HashMap<>(5); Project project = projectMapper.queryByName(projectName); @@ -462,17 +464,34 @@ public class ProcessInstanceService extends BaseDAGService { if (resultEnum != Status.SUCCESS) { return checkResult; } - ProcessInstance processInstance = processDao.findProcessInstanceDetailById(workflowId); + ProcessInstance processInstance = processDao.findProcessInstanceDetailById(processInstanceId); + //process instance priority + int processInstancePriority = processInstance.getProcessInstancePriority().ordinal(); if (processInstance == null) { - putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, workflowId); + putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId); return result; } - int delete = processDao.deleteWorkProcessInstanceById(workflowId); - processDao.deleteAllSubWorkProcessByParentId(workflowId); - processDao.deleteWorkProcessMapByParentId(workflowId); + int delete = processDao.deleteWorkProcessInstanceById(processInstanceId); + processDao.deleteAllSubWorkProcessByParentId(processInstanceId); + processDao.deleteWorkProcessMapByParentId(processInstanceId); if (delete > 0) { + List taskInstanceList = processDao.findValidTaskListByProcessId(processInstanceId); + if (CollectionUtils.isNotEmpty(taskInstanceList)){ + for (TaskInstance taskInstance : taskInstanceList){ + // task instance priority + int taskInstancePriority = taskInstance.getTaskInstancePriority().ordinal(); + String nodeValue=processInstancePriority + "_" + processInstanceId + "_" +taskInstancePriority + "_" + taskInstance.getId(); + try { + logger.info("delete task queue node : {}",nodeValue); + tasksQueue.removeNode(cn.escheduler.common.Constants.SCHEDULER_TASKS_QUEUE, nodeValue); + }catch (Exception e){ + logger.error("delete task queue node : {}",nodeValue); + } + } + } + putMsg(result, Status.SUCCESS); } else { putMsg(result, Status.DELETE_PROCESS_INSTANCE_BY_ID_ERROR); @@ -489,6 +508,8 @@ public class ProcessInstanceService extends BaseDAGService { * @return */ public Map batchDeleteProcessInstanceByIds(User loginUser, String projectName, String processInstanceIds) { + // task queue + ITaskQueue tasksQueue = TaskQueueFactory.getTaskQueueInstance(); Map result = new HashMap<>(5); List deleteFailedIdList = new ArrayList(); @@ -507,7 +528,7 @@ public class ProcessInstanceService extends BaseDAGService { for (String strProcessInstanceId:processInstanceIdArray) { int processInstanceId = Integer.parseInt(strProcessInstanceId); try { - deleteProcessInstanceById(loginUser, projectName, processInstanceId); + deleteProcessInstanceById(loginUser, projectName, processInstanceId,tasksQueue); } catch (Exception e) { deleteFailedIdList.add(processInstanceId); } From 702a64c43f356e968484c36a677c9e51f0ed71db Mon Sep 17 00:00:00 2001 From: qiaozhanwei Date: Wed, 15 May 2019 17:00:58 +0800 Subject: [PATCH 2/6] =?UTF-8?q?Delete=20the=20process=20instance=20and=20d?= =?UTF-8?q?elete=20the=20corresponding=20queue=20information=EF=BC=8Cupdat?= =?UTF-8?q?e?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../cn/escheduler/api/service/ProcessInstanceService.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java index 53d83536a0..6475b42cee 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java @@ -465,6 +465,7 @@ public class ProcessInstanceService extends BaseDAGService { return checkResult; } ProcessInstance processInstance = processDao.findProcessInstanceDetailById(processInstanceId); + List taskInstanceList = processDao.findValidTaskListByProcessId(processInstanceId); //process instance priority int processInstancePriority = processInstance.getProcessInstancePriority().ordinal(); if (processInstance == null) { @@ -477,7 +478,6 @@ public class ProcessInstanceService extends BaseDAGService { processDao.deleteWorkProcessMapByParentId(processInstanceId); if (delete > 0) { - List taskInstanceList = processDao.findValidTaskListByProcessId(processInstanceId); if (CollectionUtils.isNotEmpty(taskInstanceList)){ for (TaskInstance taskInstance : taskInstanceList){ // task instance priority @@ -487,7 +487,7 @@ public class ProcessInstanceService extends BaseDAGService { logger.info("delete task queue node : {}",nodeValue); tasksQueue.removeNode(cn.escheduler.common.Constants.SCHEDULER_TASKS_QUEUE, nodeValue); }catch (Exception e){ - logger.error("delete task queue node : {}",nodeValue); + logger.error("delete task queue node : {}", nodeValue); } } } From bd387218011f412d117d1bb9d2b76b53668c8e2f Mon Sep 17 00:00:00 2001 From: gongzijian Date: Wed, 15 May 2019 17:39:02 +0800 Subject: [PATCH 3/6] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=8F=98=E9=87=8F?= =?UTF-8?q?=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- escheduler-ui/src/js/conf/home/store/dag/actions.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/escheduler-ui/src/js/conf/home/store/dag/actions.js b/escheduler-ui/src/js/conf/home/store/dag/actions.js index 0872249d9c..929c4f6df5 100644 --- a/escheduler-ui/src/js/conf/home/store/dag/actions.js +++ b/escheduler-ui/src/js/conf/home/store/dag/actions.js @@ -538,7 +538,7 @@ export default { */ getReceiver ({ state }, payload) { return new Promise((resolve, reject) => { - io.get(`projects/{projectName}/executors/get-receiver-cc`, payload, res => { + io.get(`projects/${state.projectName}/executors/get-receiver-cc`, payload, res => { resolve(res.data) }).catch(e => { reject(e) @@ -547,7 +547,7 @@ export default { }, getTaskListDefIdAll ({ state }, payload) { return new Promise((resolve, reject) => { - io.get(`projects/{projectName}/process/get-task-list`, payload, res => { + io.get(`projects/${state.projectName}/process/get-task-list`, payload, res => { resolve(res.data) }).catch(e => { reject(e) From ad18251c7a606a8551d3b1deb55070453e4ae8a3 Mon Sep 17 00:00:00 2001 From: qiaozhanwei Date: Thu, 16 May 2019 19:16:13 +0800 Subject: [PATCH 4/6] work tolerance update --- .../src/main/java/cn/escheduler/server/utils/AlertManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/escheduler-server/src/main/java/cn/escheduler/server/utils/AlertManager.java b/escheduler-server/src/main/java/cn/escheduler/server/utils/AlertManager.java index fc62bcf73d..147f538bca 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/utils/AlertManager.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/utils/AlertManager.java @@ -175,7 +175,7 @@ public class AlertManager { alert.setContent(content); alert.setAlertType(AlertType.EMAIL); alert.setCreateTime(new Date()); - alert.setAlertGroupId(processInstance.getWarningGroupId()); + alert.setAlertGroupId(processInstance.getWarningGroupId() == null ? 1:processInstance.getWarningGroupId()); alert.setReceivers(processInstance.getProcessDefinition().getReceivers()); alert.setReceiversCc(processInstance.getProcessDefinition().getReceiversCc()); From 0d5b4abcbfe01d64ff22a70674a844cb67145562 Mon Sep 17 00:00:00 2001 From: qiaozhanwei Date: Fri, 17 May 2019 15:09:27 +0800 Subject: [PATCH 5/6] only one master restart need failure and tolerant task restart scheduler --- .../src/main/java/cn/escheduler/dao/ProcessDao.java | 8 ++++++++ .../main/java/cn/escheduler/server/zk/ZKMasterClient.java | 4 ++-- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java b/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java index b18fb5e974..1c7fc969ae 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java @@ -1526,6 +1526,14 @@ public class ProcessDao extends AbstractBaseDao { } + public void selfFaultTolerant(int ... states){ + List processInstanceList = processInstanceMapper.listByStatus(states); + for (ProcessInstance processInstance:processInstanceList){ + selfFaultTolerant(processInstance); + } + + } + @Transactional(value = "TransactionManager",rollbackFor = Exception.class) public void selfFaultTolerant(ProcessInstance processInstance){ diff --git a/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java b/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java index 624d0193be..fe3360484c 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java @@ -123,9 +123,9 @@ public class ZKMasterClient extends AbstractZKClient { // register master this.registMaster(); - // check if fault tolerance is required + // check if fault tolerance is required,failure and tolerance if (getActiveMasterNum() == 1) { - processDao.selfFaultTolerant(ExecutionStatus.RUNNING_EXEUTION.ordinal()); + processDao.selfFaultTolerant(ExecutionStatus.RUNNING_EXEUTION.ordinal(),ExecutionStatus.NEED_FAULT_TOLERANCE.ordinal()); } } From d1490155c28df232500f0f7e3fe546abb538b0f5 Mon Sep 17 00:00:00 2001 From: huyuanming Date: Mon, 20 May 2019 10:09:05 +0800 Subject: [PATCH 6/6] env --- escheduler-ui/.env | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/escheduler-ui/.env b/escheduler-ui/.env index 5cd5ba736d..fc1b619f69 100644 --- a/escheduler-ui/.env +++ b/escheduler-ui/.env @@ -1,5 +1,5 @@ -# 后端接口地址 +# 后端接口地址11 API_BASE = http://192.168.xx.xx:12345 # 本地开发如需ip访问项目把"#"号去掉