diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationController.java index 299e43d235..cd9f5633dd 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationController.java @@ -127,8 +127,18 @@ public class ProcessTaskRelationController extends BaseController { @RequestParam(name = "processDefinitionCode", required = true) long processDefinitionCode, @RequestParam(name = "targetProcessDefinitionCode", required = true) long targetProcessDefinitionCode, @RequestParam(name = "taskCode", required = true) long taskCode) { - return returnDataList(processTaskRelationService.moveTaskProcessRelation(loginUser, projectCode, processDefinitionCode, - targetProcessDefinitionCode, taskCode)); + Map result = new HashMap<>(); + if (processDefinitionCode == 0L) { + putMsg(result, DATA_IS_NOT_VALID, "processDefinitionCode"); + } else if (targetProcessDefinitionCode == 0L) { + putMsg(result, DATA_IS_NOT_VALID, "targetProcessDefinitionCode"); + } else if (taskCode == 0L) { + putMsg(result, DATA_IS_NOT_VALID, "taskCode"); + } else { + result = processTaskRelationService.moveTaskProcessRelation(loginUser, projectCode, processDefinitionCode, + targetProcessDefinitionCode, taskCode); + } + return returnDataList(result); } /** diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java index dae4028177..47452a24ad 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java @@ -286,9 +286,10 @@ public enum Status { QUERY_TASK_PROCESS_RELATION_ERROR(50049, "query process task relation error", "查询工作流任务关系错误"), TASK_DEFINE_STATE_ONLINE(50050, "task definition {0} is already on line", "任务定义[{0}]已上线"), TASK_HAS_DOWNSTREAM(50051, "Task [{0}] exists downstream dependence", "任务[{0}]存在下游依赖"), - MAIN_TABLE_USING_VERSION(50052, "the version that the master table is using", "主表正在使用该版本"), - PROJECT_PROCESS_NOT_MATCH(50053, "the project and the process is not match", "项目和工作流不匹配"), - DELETE_EDGE_ERROR(50054, "delete edge error", "删除工作流任务连接线错误"), + TASK_HAS_UPSTREAM(50052, "Task [{0}] exists upstream dependence", "任务[{0}]存在上游依赖"), + MAIN_TABLE_USING_VERSION(50053, "the version that the master table is using", "主表正在使用该版本"), + PROJECT_PROCESS_NOT_MATCH(50054, "the project and the process is not match", "项目和工作流不匹配"), + DELETE_EDGE_ERROR(50055, "delete edge error", "删除工作流任务连接线错误"), HDFS_NOT_STARTUP(60001, "hdfs not startup", "hdfs未启用"), /** diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java index 5f1d0a39e4..d6c606c27a 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java @@ -17,12 +17,15 @@ package org.apache.dolphinscheduler.api.service.impl; +import static org.apache.dolphinscheduler.api.enums.Status.DATA_IS_NOT_VALID; + import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.exceptions.ServiceException; import org.apache.dolphinscheduler.api.service.ProcessTaskRelationService; import org.apache.dolphinscheduler.api.service.ProjectService; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.TaskType; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; @@ -42,6 +45,7 @@ import org.apache.commons.collections.CollectionUtils; import java.util.ArrayList; import java.util.Date; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -52,6 +56,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.Lists; /** @@ -186,7 +192,85 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P */ @Override public Map moveTaskProcessRelation(User loginUser, long projectCode, long processDefinitionCode, long targetProcessDefinitionCode, long taskCode) { - return null; + Project project = projectMapper.queryByCode(projectCode); + //check user access for project + Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode); + if (result.get(Constants.STATUS) != Status.SUCCESS) { + return result; + } + ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(targetProcessDefinitionCode); + if (processDefinition == null) { + putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, targetProcessDefinitionCode); + return result; + } + if (processDefinition.getProjectCode() != projectCode) { + putMsg(result, Status.PROJECT_PROCESS_NOT_MATCH); + return result; + } + List downstreamList = processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, taskCode, 0L); + if (CollectionUtils.isNotEmpty(downstreamList)) { + Set postTaskCodes = downstreamList + .stream() + .map(ProcessTaskRelation::getPostTaskCode) + .collect(Collectors.toSet()); + putMsg(result, Status.TASK_HAS_DOWNSTREAM, org.apache.commons.lang.StringUtils.join(postTaskCodes, ",")); + return result; + } + List upstreamList = processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, 0L, taskCode); + if (upstreamList.isEmpty()) { + putMsg(result, Status.PROCESS_TASK_RELATION_NOT_EXIST, "taskCode:" + taskCode); + return result; + } else { + Set preTaskCodes = upstreamList + .stream() + .map(ProcessTaskRelation::getPreTaskCode) + .collect(Collectors.toSet()); + if (preTaskCodes.size() > 1 || !preTaskCodes.contains(0L)) { + putMsg(result, Status.TASK_HAS_UPSTREAM, org.apache.commons.lang.StringUtils.join(preTaskCodes, ",")); + return result; + } + } + TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode); + if (null == taskDefinition) { + putMsg(result, Status.DATA_IS_NULL, "taskDefinition"); + return result; + } + ObjectNode paramNode = JSONUtils.parseObject(taskDefinition.getTaskParams()); + if (TaskType.DEPENDENT.getDesc().equals(taskDefinition.getTaskType())) { + Set depProcessDefinitionCodes = new HashSet<>(); + ObjectNode dependence = (ObjectNode) paramNode.get("dependence"); + ArrayNode dependTaskList = JSONUtils.parseArray(JSONUtils.toJsonString(dependence.get("dependTaskList"))); + for (int i = 0; i < dependTaskList.size(); i++) { + ObjectNode dependTask = (ObjectNode) dependTaskList.path(i); + ArrayNode dependItemList = JSONUtils.parseArray(JSONUtils.toJsonString(dependTask.get("dependItemList"))); + for (int j = 0; j < dependItemList.size(); j++) { + ObjectNode dependItem = (ObjectNode) dependItemList.path(j); + long definitionCode = dependItem.get("definitionCode").asLong(); + depProcessDefinitionCodes.add(definitionCode); + } + } + if (depProcessDefinitionCodes.contains(targetProcessDefinitionCode)) { + putMsg(result, DATA_IS_NOT_VALID, "targetProcessDefinitionCode"); + return result; + } + } + if (TaskType.SUB_PROCESS.getDesc().equals(taskDefinition.getTaskType())) { + long subProcessDefinitionCode = paramNode.get("processDefinitionCode").asLong(); + if (targetProcessDefinitionCode == subProcessDefinitionCode) { + putMsg(result, DATA_IS_NOT_VALID, "targetProcessDefinitionCode"); + return result; + } + } + Date now = new Date(); + ProcessTaskRelation processTaskRelation = upstreamList.get(0); + processTaskRelation.setProcessDefinitionCode(processDefinition.getCode()); + processTaskRelation.setProcessDefinitionVersion(processDefinition.getVersion()); + processTaskRelation.setUpdateTime(now); + int update = processTaskRelationMapper.updateById(processTaskRelation); + if (update == 0) { + putMsg(result, Status.MOVE_PROCESS_TASK_RELATION_ERROR); + } + return result; } /** diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java index 3434fb3e0c..0de0c917c8 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java @@ -246,6 +246,7 @@ public class ProcessTaskRelationServiceTest { taskDefinition.setProjectCode(1L); taskDefinition.setCode(1L); taskDefinition.setVersion(1); + taskDefinition.setTaskType(TaskType.SHELL.getDesc()); return taskDefinition; } @@ -280,6 +281,40 @@ public class ProcessTaskRelationServiceTest { processTaskRelationList.add(processTaskRelationLog); Mockito.when(processTaskRelationMapper.batchInsert(processTaskRelationList)).thenReturn(1); Mockito.when(processTaskRelationLogMapper.batchInsert(processTaskRelationList)).thenReturn(1); + Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); + } + + @Test + public void testMoveTaskProcessRelation() { + long projectCode = 1L; + long processDefinitionCode = 1L; + long taskCode = 1L; + + Project project = getProject(projectCode); + Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project); + + User loginUser = new User(); + loginUser.setId(-1); + loginUser.setUserType(UserType.GENERAL_USER); + + Map result = new HashMap<>(); + putMsg(result, Status.SUCCESS, projectCode); + Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result); + Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode)).thenReturn(getProcessDefinition()); + Mockito.when(processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, taskCode, 0L)).thenReturn(Lists.newArrayList()); + Mockito.when(taskDefinitionMapper.queryByCode(taskCode)).thenReturn(getTaskDefinition()); + List processTaskRelationList = Lists.newArrayList(); + ProcessTaskRelation processTaskRelation = new ProcessTaskRelation(); + processTaskRelation.setProjectCode(projectCode); + processTaskRelation.setProcessDefinitionCode(processDefinitionCode); + processTaskRelation.setPreTaskCode(0L); + processTaskRelation.setPreTaskVersion(0); + processTaskRelation.setPostTaskCode(taskCode); + processTaskRelation.setPostTaskVersion(1); + processTaskRelationList.add(processTaskRelation); + Mockito.when(processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, 0L, taskCode)).thenReturn(processTaskRelationList); + Mockito.when(processTaskRelationMapper.updateById(processTaskRelation)).thenReturn(1); + Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); } @Test