From 8439b5dc69f2a7dfc1043393bbbb21a8277715da Mon Sep 17 00:00:00 2001 From: jackfanwan <61672564+jackfanwan@users.noreply.github.com> Date: Sun, 15 Jan 2023 19:58:47 +0800 Subject: [PATCH] [fix-12721] Fix cannot modify the upstream task in task definition page. (#12722) --- .../impl/TaskDefinitionServiceImpl.java | 100 +++++++++++++++++- .../TaskDefinitionServiceImplTest.java | 82 ++++++++++++-- .../repository/ProcessTaskRelationLogDao.java | 10 ++ .../impl/ProcessTaskRelationLogDaoImpl.java | 5 + 4 files changed, 186 insertions(+), 11 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java index 7dd38bba05..e79507c6b5 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java @@ -63,6 +63,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; +import org.apache.dolphinscheduler.dao.repository.ProcessTaskRelationLogDao; import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao; import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager; import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode; @@ -82,6 +83,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Function; import java.util.stream.Collectors; import org.slf4j.Logger; @@ -122,6 +124,9 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe @Autowired private ProcessTaskRelationMapper processTaskRelationMapper; + @Autowired + private ProcessTaskRelationLogDao processTaskRelationLogDao; + @Autowired private ProcessTaskRelationService processTaskRelationService; @@ -852,10 +857,15 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe } else { queryUpStreamTaskCodeMap = new HashMap<>(); } - if (CollectionUtils.isNotEmpty(upstreamTaskCodes)) { + if (MapUtils.isNotEmpty(queryUpStreamTaskCodeMap)) { ProcessTaskRelation taskRelation = upstreamTaskRelations.get(0); List processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, taskRelation.getProcessDefinitionCode()); + + // set upstream code list + updateUpstreamTask(new HashSet<>(queryUpStreamTaskCodeMap.keySet()), + taskCode, projectCode, taskRelation.getProcessDefinitionCode(), loginUser); + List processTaskRelationList = Lists.newArrayList(processTaskRelations); List relationList = Lists.newArrayList(); for (ProcessTaskRelation processTaskRelation : processTaskRelationList) { @@ -879,8 +889,6 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe if (MapUtils.isEmpty(queryUpStreamTaskCodeMap) && CollectionUtils.isNotEmpty(processTaskRelationList)) { processTaskRelationList.add(processTaskRelationList.get(0)); } - updateDag(loginUser, taskRelation.getProcessDefinitionCode(), processTaskRelations, - Lists.newArrayList(taskDefinitionToUpdate)); } logger.info( "Update task with upstream tasks complete, projectCode:{}, taskDefinitionCode:{}, upstreamTaskCodes:{}.", @@ -890,6 +898,92 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe return result; } + private void updateUpstreamTask(Set allPreTaskCodeSet, long taskCode, long projectCode, + long processDefinitionCode, User loginUser) { + // query all process task relation + List hadProcessTaskRelationList = processTaskRelationMapper + .queryUpstreamByCode(projectCode, taskCode); + // remove pre + Set removePreTaskSet = new HashSet<>(); + List removePreTaskList = new ArrayList<>(); + // add pre + Set addPreTaskSet = new HashSet<>(); + List addPreTaskList = new ArrayList<>(); + + List processTaskRelationLogList = new ArrayList<>(); + + // filter all process task relation + if (CollectionUtils.isNotEmpty(hadProcessTaskRelationList)) { + for (ProcessTaskRelation processTaskRelation : hadProcessTaskRelationList) { + if (processTaskRelation.getPreTaskCode() == 0) { + continue; + } + // had + if (allPreTaskCodeSet.contains(processTaskRelation.getPreTaskCode())) { + allPreTaskCodeSet.remove(processTaskRelation.getPreTaskCode()); + } else { + // remove + removePreTaskSet.add(processTaskRelation.getPreTaskCode()); + processTaskRelation.setPreTaskCode(0); + processTaskRelation.setPreTaskVersion(0); + removePreTaskList.add(processTaskRelation); + processTaskRelationLogList.add(createProcessTaskRelationLog(loginUser, processTaskRelation)); + } + } + } + // add + if (allPreTaskCodeSet.size() != 0) { + addPreTaskSet.addAll(allPreTaskCodeSet); + } + // get add task code map + allPreTaskCodeSet.add(Long.valueOf(taskCode)); + List taskDefinitionList = taskDefinitionMapper.queryByCodeList(allPreTaskCodeSet); + Map taskCodeMap = taskDefinitionList.stream().collect(Collectors + .toMap(TaskDefinition::getCode, Function.identity(), (a, b) -> a)); + + ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode); + TaskDefinition taskDefinition = taskCodeMap.get(taskCode); + + for (Long preTaskCode : addPreTaskSet) { + TaskDefinition preTaskRelation = taskCodeMap.get(preTaskCode); + ProcessTaskRelation processTaskRelation = new ProcessTaskRelation( + null, processDefinition.getVersion(), projectCode, processDefinition.getCode(), + preTaskRelation.getCode(), preTaskRelation.getVersion(), + taskDefinition.getCode(), taskDefinition.getVersion(), ConditionType.NONE, "{}"); + addPreTaskList.add(processTaskRelation); + processTaskRelationLogList.add(createProcessTaskRelationLog(loginUser, processTaskRelation)); + } + int insert = 0; + int remove = 0; + int log = 0; + // insert process task relation table data + if (CollectionUtils.isNotEmpty(addPreTaskList)) { + insert = processTaskRelationMapper.batchInsert(addPreTaskList); + } + if (CollectionUtils.isNotEmpty(removePreTaskList)) { + for (ProcessTaskRelation processTaskRelation : removePreTaskList) { + remove += processTaskRelationMapper.updateById(processTaskRelation); + } + } + if (CollectionUtils.isNotEmpty(processTaskRelationLogList)) { + log = processTaskRelationLogDao.batchInsert(processTaskRelationLogList); + } + if (insert + remove != log) { + throw new RuntimeException("updateUpstreamTask error"); + } + } + + private ProcessTaskRelationLog createProcessTaskRelationLog(User loginUser, + ProcessTaskRelation processTaskRelation) { + Date now = new Date(); + ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation); + processTaskRelationLog.setOperator(loginUser.getId()); + processTaskRelationLog.setOperateTime(now); + processTaskRelationLog.setCreateTime(now); + processTaskRelationLog.setUpdateTime(now); + return processTaskRelationLog; + } + /** * switch task definition * diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java index a066b81124..391769627f 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java @@ -56,12 +56,14 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; +import org.apache.dolphinscheduler.dao.repository.ProcessTaskRelationLogDao; import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessServiceImpl; import java.text.MessageFormat; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -124,11 +126,15 @@ public class TaskDefinitionServiceImplTest { @Mock private ProcessDefinitionService processDefinitionService; + @Mock + private ProcessTaskRelationLogDao processTaskRelationLogDao; + private static final String TASK_PARAMETER = "{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo 1\",\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"dependence\":{}}";; private static final long PROJECT_CODE = 1L; private static final long PROCESS_DEFINITION_CODE = 2L; private static final long TASK_CODE = 3L; + private static final String UPSTREAM_CODE = "3,5"; private static final int VERSION = 1; private static final int RESOURCE_RATE = -1; protected User user; @@ -169,14 +175,7 @@ public class TaskDefinitionServiceImplTest { @Test public void updateTaskDefinition() { - String taskDefinitionJson = - "{\"name\":\"detail_up\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":" - + "\"{\\\"resourceList\\\":[],\\\"localParams\\\":[{\\\"prop\\\":\\\"datetime\\\",\\\"direct\\\":\\\"IN\\\"," - + "\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"${system.datetime}\\\"}],\\\"rawScript\\\":" - + "\\\"echo ${datetime}\\\",\\\"conditionResult\\\":\\\"{\\\\\\\"successNode\\\\\\\":[\\\\\\\"\\\\\\\"]," - + "\\\\\\\"failedNode\\\\\\\":[\\\\\\\"\\\\\\\"]}\\\",\\\"dependence\\\":{}}\",\"flag\":0,\"taskPriority\":0," - + "\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":0,\"timeoutFlag\":0," - + "\"timeoutNotifyStrategy\":0,\"timeout\":0,\"delayTime\":0,\"resourceIds\":\"\"}"; + String taskDefinitionJson = getTaskDefinitionJson();; Project project = getProject(); Mockito.when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(project); @@ -591,6 +590,52 @@ public class TaskDefinitionServiceImplTest { Assertions.assertDoesNotThrow(() -> taskDefinitionService.getTaskDefinition(user, TASK_CODE)); } + @Test + public void testUpdateTaskWithUpstream() { + + String taskDefinitionJson = getTaskDefinitionJson(); + TaskDefinition taskDefinition = getTaskDefinition(); + taskDefinition.setFlag(Flag.NO); + TaskDefinition taskDefinitionSecond = getTaskDefinition(); + taskDefinitionSecond.setCode(5); + + user.setUserType(UserType.ADMIN_USER); + Mockito.when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(getProject()); + Mockito.when(projectService.hasProjectAndWritePerm(user, getProject(), new HashMap<>())).thenReturn(true); + Mockito.when(taskDefinitionMapper.queryByCode(TASK_CODE)).thenReturn(taskDefinition); + Mockito.when(taskPluginManager.checkTaskParameters(Mockito.any())).thenReturn(true); + Mockito.when(taskDefinitionLogMapper.queryMaxVersionForDefinition(TASK_CODE)).thenReturn(1); + Mockito.when(taskDefinitionMapper.updateById(Mockito.any())).thenReturn(1); + Mockito.when(taskDefinitionLogMapper.insert(Mockito.any())).thenReturn(1); + + Mockito.when(taskDefinitionMapper.queryByCodeList(Mockito.anySet())) + .thenReturn(Arrays.asList(taskDefinition, taskDefinitionSecond)); + + Mockito.when(processTaskRelationMapper.queryUpstreamByCode(PROJECT_CODE, TASK_CODE)) + .thenReturn(getProcessTaskRelationListV2()); + Mockito.when(processDefinitionMapper.queryByCode(PROCESS_DEFINITION_CODE)).thenReturn(getProcessDefinition()); + Mockito.when(processTaskRelationMapper.batchInsert(Mockito.anyList())).thenReturn(1); + Mockito.when(processTaskRelationMapper.updateById(Mockito.any())).thenReturn(1); + Mockito.when(processTaskRelationLogDao.batchInsert(Mockito.anyList())).thenReturn(2); + // success + Map successMap = taskDefinitionService.updateTaskWithUpstream(user, PROJECT_CODE, TASK_CODE, + taskDefinitionJson, UPSTREAM_CODE); + Assertions.assertEquals(Status.SUCCESS, successMap.get(Constants.STATUS)); + user.setUserType(UserType.GENERAL_USER); + } + + private String getTaskDefinitionJson() { + return "{\"name\":\"detail_up\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":" + + "\"{\\\"resourceList\\\":[],\\\"localParams\\\":[{\\\"prop\\\":\\\"datetime\\\"," + + "\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\"," + + "\\\"value\\\":\\\"${system.datetime}\\\"}],\\\"rawScript\\\":\\\"echo ${datetime}\\\"," + + "\\\"conditionResult\\\":\\\"{\\\\\\\"successNode\\\\\\\":[\\\\\\\"\\\\\\\"]," + + "\\\\\\\"failedNode\\\\\\\":[\\\\\\\"\\\\\\\"]}\\\",\\\"dependence\\\":{}}\"," + + "\"flag\":0,\"taskPriority\":0,\"workerGroup\":\"default\",\"failRetryTimes\":0," + + "\"failRetryInterval\":0,\"timeoutFlag\":0,\"timeoutNotifyStrategy\":0,\"timeout\":0," + + "\"delayTime\":0,\"resourceIds\":\"\"}"; + } + /** * create admin user */ @@ -663,6 +708,27 @@ public class TaskDefinitionServiceImplTest { return processTaskRelationList; } + private List getProcessTaskRelationListV2() { + List processTaskRelationList = new ArrayList<>(); + + ProcessTaskRelation processTaskRelation = new ProcessTaskRelation(); + fillProcessTaskRelation(processTaskRelation); + + processTaskRelationList.add(processTaskRelation); + processTaskRelation = new ProcessTaskRelation(); + fillProcessTaskRelation(processTaskRelation); + processTaskRelation.setPreTaskCode(4L); + processTaskRelationList.add(processTaskRelation); + return processTaskRelationList; + } + + private void fillProcessTaskRelation(ProcessTaskRelation processTaskRelation) { + processTaskRelation.setProjectCode(PROJECT_CODE); + processTaskRelation.setProcessDefinitionCode(PROCESS_DEFINITION_CODE); + processTaskRelation.setPreTaskCode(TASK_CODE); + processTaskRelation.setPostTaskCode(TASK_CODE + 1L); + } + private List getProcessTaskRelationLogList() { List processTaskRelationLogList = new ArrayList<>(); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessTaskRelationLogDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessTaskRelationLogDao.java index 1c1ba25783..d835cdd3ac 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessTaskRelationLogDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessTaskRelationLogDao.java @@ -19,6 +19,8 @@ package org.apache.dolphinscheduler.dao.repository; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; +import org.apache.ibatis.annotations.Param; + import java.util.List; public interface ProcessTaskRelationLogDao { @@ -26,4 +28,12 @@ public interface ProcessTaskRelationLogDao { List findByWorkflowDefinitionCode(long workflowDefinitionCode); void deleteByWorkflowDefinitionCode(long workflowDefinitionCode); + + /** + * batch insert process task relation + * + * @param taskRelationList taskRelationList + * @return int + */ + int batchInsert(@Param("taskRelationList") List taskRelationList); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessTaskRelationLogDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessTaskRelationLogDaoImpl.java index d64f67ad7d..223fa5efbc 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessTaskRelationLogDaoImpl.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessTaskRelationLogDaoImpl.java @@ -41,4 +41,9 @@ public class ProcessTaskRelationLogDaoImpl implements ProcessTaskRelationLogDao public void deleteByWorkflowDefinitionCode(long workflowDefinitionCode) { processTaskRelationLogMapper.deleteByWorkflowDefinitionCode(workflowDefinitionCode); } + + @Override + public int batchInsert(List taskRelationList) { + return processTaskRelationLogMapper.batchInsert(taskRelationList); + } }