From a6e2d1eb82043b6f9654d22e0ff841caaeb38405 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B9=94=E5=8D=A0=E5=8D=AB?= <825193156@qq.com> Date: Mon, 9 Sep 2019 14:34:35 +0800 Subject: [PATCH] the process instance is deleted, the task corresponding to the zk queue still exists, and the task is squeezed. #754 bug fix (#775) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * mission log disorder,bug #751 fix * the log path of the task and the log path of the task instance are different. The log cannot be viewed. #723 * the log path of the task and the log path of the task instance are different. The log cannot be viewed. #723 bug fix * after starting kerberos authentication, tgt expires after one day,bug #742 fix * log pattern modify * LoggerServer remove comment code and ShellCommandExecutor modify * PythonCommandExecutor modify * Concurrent task log bug #730 fix * remove invalid commit * The process instance is deleted, the task corresponding to the zk queue still exists, and the task is squeezed. #754 fix bug * The process instance is deleted, the task corresponding to the zk queue still exists, and the task is squeezed. #754 re fix * The process instance is deleted, the task corresponding to the zk queue still exists, and the task is squeezed. #754 bug fix --- .../api/service/ProcessInstanceService.java | 68 ++++++++++++++----- 1 file changed, 50 insertions(+), 18 deletions(-) diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java index 3171f26dcd..0e05d944ae 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java @@ -32,10 +32,7 @@ import cn.escheduler.common.model.TaskNodeRelation; import cn.escheduler.common.process.Property; import cn.escheduler.common.queue.ITaskQueue; import cn.escheduler.common.queue.TaskQueueFactory; -import cn.escheduler.common.utils.CollectionUtils; -import cn.escheduler.common.utils.DateUtils; -import cn.escheduler.common.utils.JSONUtils; -import cn.escheduler.common.utils.ParameterUtils; +import cn.escheduler.common.utils.*; import cn.escheduler.common.utils.placeholder.BusinessTimeUtils; import cn.escheduler.dao.ProcessDao; import cn.escheduler.dao.mapper.*; @@ -493,29 +490,64 @@ public class ProcessInstanceService extends BaseDAGService { return result; } - int delete = processDao.deleteWorkProcessInstanceById(processInstanceId); - processDao.deleteAllSubWorkProcessByParentId(processInstanceId); - processDao.deleteWorkProcessMapByParentId(processInstanceId); + // delete zk queue + if (CollectionUtils.isNotEmpty(taskInstanceList)){ + for (TaskInstance taskInstance : taskInstanceList){ + // task instance priority + int taskInstancePriority = taskInstance.getTaskInstancePriority().ordinal(); + + StringBuilder nodeValueSb = new StringBuilder(100); + nodeValueSb.append(processInstancePriority) + .append(UNDERLINE) + .append(processInstanceId) + .append(UNDERLINE) + .append(taskInstancePriority) + .append(UNDERLINE) + .append(taskInstance.getId()) + .append(UNDERLINE); + + int taskWorkerGroupId = processDao.getTaskWorkerGroupId(taskInstance); + WorkerGroup workerGroup = workerGroupMapper.queryById(taskWorkerGroupId); + + if(workerGroup == null){ + nodeValueSb.append(DEFAULT_WORKER_ID); + }else { + + String ips = workerGroup.getIpList(); + StringBuilder ipSb = new StringBuilder(100); + String[] ipArray = ips.split(COMMA); + + for (String ip : ipArray) { + long ipLong = IpUtils.ipToLong(ip); + ipSb.append(ipLong).append(COMMA); + } - if (delete > 0) { - if (CollectionUtils.isNotEmpty(taskInstanceList)){ - for (TaskInstance taskInstance : taskInstanceList){ - // task instance priority - int taskInstancePriority = taskInstance.getTaskInstancePriority().ordinal(); - String nodeValue=processInstancePriority + "_" + processInstanceId + "_" +taskInstancePriority + "_" + taskInstance.getId(); - try { - logger.info("delete task queue node : {}",nodeValue); - tasksQueue.removeNode(cn.escheduler.common.Constants.SCHEDULER_TASKS_QUEUE, nodeValue); - }catch (Exception e){ - logger.error("delete task queue node : {}", nodeValue); + if(ipSb.length() > 0) { + ipSb.deleteCharAt(ipSb.length() - 1); } + nodeValueSb.append(ipSb); + } + + try { + logger.info("delete task queue node : {}",nodeValueSb.toString()); + tasksQueue.removeNode(cn.escheduler.common.Constants.SCHEDULER_TASKS_QUEUE, nodeValueSb.toString()); + }catch (Exception e){ + logger.error("delete task queue node : {}", nodeValueSb.toString()); } } + } + // delete database cascade + int delete = processDao.deleteWorkProcessInstanceById(processInstanceId); + processDao.deleteAllSubWorkProcessByParentId(processInstanceId); + processDao.deleteWorkProcessMapByParentId(processInstanceId); + + if (delete > 0) { putMsg(result, Status.SUCCESS); } else { putMsg(result, Status.DELETE_PROCESS_INSTANCE_BY_ID_ERROR); } + return result; }