diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java index 3171f26dcd..0e05d944ae 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java @@ -32,10 +32,7 @@ import cn.escheduler.common.model.TaskNodeRelation; import cn.escheduler.common.process.Property; import cn.escheduler.common.queue.ITaskQueue; import cn.escheduler.common.queue.TaskQueueFactory; -import cn.escheduler.common.utils.CollectionUtils; -import cn.escheduler.common.utils.DateUtils; -import cn.escheduler.common.utils.JSONUtils; -import cn.escheduler.common.utils.ParameterUtils; +import cn.escheduler.common.utils.*; import cn.escheduler.common.utils.placeholder.BusinessTimeUtils; import cn.escheduler.dao.ProcessDao; import cn.escheduler.dao.mapper.*; @@ -493,29 +490,64 @@ public class ProcessInstanceService extends BaseDAGService { return result; } - int delete = processDao.deleteWorkProcessInstanceById(processInstanceId); - processDao.deleteAllSubWorkProcessByParentId(processInstanceId); - processDao.deleteWorkProcessMapByParentId(processInstanceId); + // delete zk queue + if (CollectionUtils.isNotEmpty(taskInstanceList)){ + for (TaskInstance taskInstance : taskInstanceList){ + // task instance priority + int taskInstancePriority = taskInstance.getTaskInstancePriority().ordinal(); + + StringBuilder nodeValueSb = new StringBuilder(100); + nodeValueSb.append(processInstancePriority) + .append(UNDERLINE) + .append(processInstanceId) + .append(UNDERLINE) + .append(taskInstancePriority) + .append(UNDERLINE) + .append(taskInstance.getId()) + .append(UNDERLINE); + + int taskWorkerGroupId = processDao.getTaskWorkerGroupId(taskInstance); + WorkerGroup workerGroup = workerGroupMapper.queryById(taskWorkerGroupId); + + if(workerGroup == null){ + nodeValueSb.append(DEFAULT_WORKER_ID); + }else { + + String ips = workerGroup.getIpList(); + StringBuilder ipSb = new StringBuilder(100); + String[] ipArray = ips.split(COMMA); + + for (String ip : ipArray) { + long ipLong = IpUtils.ipToLong(ip); + ipSb.append(ipLong).append(COMMA); + } - if (delete > 0) { - if (CollectionUtils.isNotEmpty(taskInstanceList)){ - for (TaskInstance taskInstance : taskInstanceList){ - // task instance priority - int taskInstancePriority = taskInstance.getTaskInstancePriority().ordinal(); - String nodeValue=processInstancePriority + "_" + processInstanceId + "_" +taskInstancePriority + "_" + taskInstance.getId(); - try { - logger.info("delete task queue node : {}",nodeValue); - tasksQueue.removeNode(cn.escheduler.common.Constants.SCHEDULER_TASKS_QUEUE, nodeValue); - }catch (Exception e){ - logger.error("delete task queue node : {}", nodeValue); + if(ipSb.length() > 0) { + ipSb.deleteCharAt(ipSb.length() - 1); } + nodeValueSb.append(ipSb); + } + + try { + logger.info("delete task queue node : {}",nodeValueSb.toString()); + tasksQueue.removeNode(cn.escheduler.common.Constants.SCHEDULER_TASKS_QUEUE, nodeValueSb.toString()); + }catch (Exception e){ + logger.error("delete task queue node : {}", nodeValueSb.toString()); } } + } + // delete database cascade + int delete = processDao.deleteWorkProcessInstanceById(processInstanceId); + processDao.deleteAllSubWorkProcessByParentId(processInstanceId); + processDao.deleteWorkProcessMapByParentId(processInstanceId); + + if (delete > 0) { putMsg(result, Status.SUCCESS); } else { putMsg(result, Status.DELETE_PROCESS_INSTANCE_BY_ID_ERROR); } + return result; }