From 0b53adeb07df72a9b888566092642a77dfccf65d Mon Sep 17 00:00:00 2001 From: JinYong Li <42576980+JinyLeeChina@users.noreply.github.com> Date: Thu, 23 Sep 2021 15:06:49 +0800 Subject: [PATCH] [Fix-6302] [API] Add a new task to an existing workflow, and the task is lost (#6307) * fix processDefinition save/update/delete/move/import transaction * fix ut * fix processDefinnition release/processInstance update/taskDefinition transaction * fix processDefinnition release/processInstance update/taskDefinition transaction * fix 6302 update processDefinition,the task lost * fix ut Co-authored-by: JinyLeeChina <297062848@qq.com> --- .../service/process/ProcessService.java | 14 +++---- .../service/process/ProcessServiceTest.java | 37 +++++++++++++++++++ 2 files changed, 44 insertions(+), 7 deletions(-) diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index c8619b9785..c652a511a2 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -2188,23 +2188,23 @@ public class ProcessService { } newTaskDefinitionLogs.add(taskDefinitionLog); } + int insertResult = 0; + int updateResult = 0; for (TaskDefinitionLog taskDefinitionToUpdate : updateTaskDefinitionLogs) { TaskDefinition task = taskDefinitionMapper.queryByCode(taskDefinitionToUpdate.getCode()); if (task == null) { newTaskDefinitionLogs.add(taskDefinitionToUpdate); } else { - int insert = taskDefinitionLogMapper.insert(taskDefinitionToUpdate); + insertResult += taskDefinitionLogMapper.insert(taskDefinitionToUpdate); taskDefinitionToUpdate.setId(task.getId()); - int update = taskDefinitionMapper.updateById(taskDefinitionToUpdate); - return update & insert; + updateResult += taskDefinitionMapper.updateById(taskDefinitionToUpdate); } } if (!newTaskDefinitionLogs.isEmpty()) { - int insert = taskDefinitionMapper.batchInsert(newTaskDefinitionLogs); - int logInsert = taskDefinitionLogMapper.batchInsert(newTaskDefinitionLogs); - return logInsert & insert; + updateResult += taskDefinitionMapper.batchInsert(newTaskDefinitionLogs); + insertResult += taskDefinitionLogMapper.batchInsert(newTaskDefinitionLogs); } - return Constants.EXIT_CODE_SUCCESS; + return (insertResult & updateResult) > 0 ? 1 : Constants.EXIT_CODE_SUCCESS; } /** diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java index dee4c0cb4d..1bbbca3104 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java @@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.TaskType; +import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.model.TaskNode; @@ -51,6 +52,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; +import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.UserMapper; import org.apache.dolphinscheduler.service.quartz.cron.CronUtilsTest; @@ -72,6 +74,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.Lists; /** * process service test @@ -100,6 +103,8 @@ public class ProcessServiceTest { @Mock private TaskDefinitionLogMapper taskDefinitionLogMapper; @Mock + private TaskDefinitionMapper taskDefinitionMapper; + @Mock private ProcessTaskRelationMapper processTaskRelationMapper; @Mock private ProcessDefinitionLogMapper processDefineLogMapper; @@ -360,6 +365,38 @@ public class ProcessServiceTest { Assert.assertEquals(0, processService.switchVersion(processDefinition, processDefinitionLog)); } + @Test + public void testSaveTaskDefine() { + User operator = new User(); + operator.setId(-1); + operator.setUserType(UserType.GENERAL_USER); + long projectCode = 751485690568704L; + String taskJson = "[{\"code\":751500437479424,\"name\":\"aa\",\"version\":1,\"description\":\"\",\"delayTime\":0," + + "\"taskType\":\"SHELL\",\"taskParams\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"sleep 1s\\necho 11\"," + + "\"dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"waitStartTimeout\":{}}," + + "\"flag\":\"YES\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"yarn\",\"failRetryTimes\":0,\"failRetryInterval\":1," + + "\"timeoutFlag\":\"OPEN\",\"timeoutNotifyStrategy\":\"FAILED\",\"timeout\":1,\"environmentCode\":751496815697920}," + + "{\"code\":751516889636864,\"name\":\"bb\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":{\"resourceList\":[]," + + "\"localParams\":[],\"rawScript\":\"echo 22\",\"dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]}," + + "\"waitStartTimeout\":{}},\"flag\":\"YES\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"failRetryTimes\":\"0\"," + + "\"failRetryInterval\":\"1\",\"timeoutFlag\":\"CLOSE\",\"timeoutNotifyStrategy\":\"\",\"timeout\":0,\"delayTime\":\"0\",\"environmentCode\":-1}]"; + List taskDefinitionLogs = JSONUtils.toList(taskJson, TaskDefinitionLog.class); + TaskDefinitionLog taskDefinition = new TaskDefinitionLog(); + taskDefinition.setCode(751500437479424L); + taskDefinition.setName("aa"); + taskDefinition.setProjectCode(751485690568704L); + taskDefinition.setTaskType(TaskType.SHELL.getDesc()); + taskDefinition.setUserId(-1); + taskDefinition.setVersion(1); + taskDefinition.setCreateTime(new Date()); + taskDefinition.setUpdateTime(new Date()); + Mockito.when(taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskDefinition.getCode(), taskDefinition.getVersion())).thenReturn(taskDefinition); + Mockito.when(taskDefinitionLogMapper.queryMaxVersionForDefinition(taskDefinition.getCode())).thenReturn(1); + Mockito.when(taskDefinitionMapper.queryByCode(taskDefinition.getCode())).thenReturn(taskDefinition); + int result = processService.saveTaskDefine(operator, projectCode, taskDefinitionLogs); + Assert.assertEquals(0, result); + } + @Test public void testGenDagGraph() { ProcessDefinition processDefinition = new ProcessDefinition();