Browse Source

[Improvement][API][num-9] add ProcessTaskRelationServiceImpl.moveTaskProcessRelation (#7053)

* add processDefinition releaseWorkflowAndSchedule

* add ProcessTaskRelationServiceImpl.moveTaskProcessRelation

* add dependency and subprocess judgement
3.0.0/version-upgrade
JinYong Li 3 years ago committed by GitHub
parent
commit
6e55394783
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 14
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationController.java
  2. 7
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
  3. 86
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
  4. 35
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java

14
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationController.java

@ -127,8 +127,18 @@ public class ProcessTaskRelationController extends BaseController {
@RequestParam(name = "processDefinitionCode", required = true) long processDefinitionCode,
@RequestParam(name = "targetProcessDefinitionCode", required = true) long targetProcessDefinitionCode,
@RequestParam(name = "taskCode", required = true) long taskCode) {
return returnDataList(processTaskRelationService.moveTaskProcessRelation(loginUser, projectCode, processDefinitionCode,
targetProcessDefinitionCode, taskCode));
Map<String, Object> result = new HashMap<>();
if (processDefinitionCode == 0L) {
putMsg(result, DATA_IS_NOT_VALID, "processDefinitionCode");
} else if (targetProcessDefinitionCode == 0L) {
putMsg(result, DATA_IS_NOT_VALID, "targetProcessDefinitionCode");
} else if (taskCode == 0L) {
putMsg(result, DATA_IS_NOT_VALID, "taskCode");
} else {
result = processTaskRelationService.moveTaskProcessRelation(loginUser, projectCode, processDefinitionCode,
targetProcessDefinitionCode, taskCode);
}
return returnDataList(result);
}
/**

7
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java

@ -286,9 +286,10 @@ public enum Status {
QUERY_TASK_PROCESS_RELATION_ERROR(50049, "query process task relation error", "查询工作流任务关系错误"),
TASK_DEFINE_STATE_ONLINE(50050, "task definition {0} is already on line", "任务定义[{0}]已上线"),
TASK_HAS_DOWNSTREAM(50051, "Task [{0}] exists downstream dependence", "任务[{0}]存在下游依赖"),
MAIN_TABLE_USING_VERSION(50052, "the version that the master table is using", "主表正在使用该版本"),
PROJECT_PROCESS_NOT_MATCH(50053, "the project and the process is not match", "项目和工作流不匹配"),
DELETE_EDGE_ERROR(50054, "delete edge error", "删除工作流任务连接线错误"),
TASK_HAS_UPSTREAM(50052, "Task [{0}] exists upstream dependence", "任务[{0}]存在上游依赖"),
MAIN_TABLE_USING_VERSION(50053, "the version that the master table is using", "主表正在使用该版本"),
PROJECT_PROCESS_NOT_MATCH(50054, "the project and the process is not match", "项目和工作流不匹配"),
DELETE_EDGE_ERROR(50055, "delete edge error", "删除工作流任务连接线错误"),
HDFS_NOT_STARTUP(60001, "hdfs not startup", "hdfs未启用"),
/**

86
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java

@ -17,12 +17,15 @@
package org.apache.dolphinscheduler.api.service.impl;
import static org.apache.dolphinscheduler.api.enums.Status.DATA_IS_NOT_VALID;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.ProcessTaskRelationService;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
@ -42,6 +45,7 @@ import org.apache.commons.collections.CollectionUtils;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -52,6 +56,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.Lists;
/**
@ -186,7 +192,85 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
*/
@Override
public Map<String, Object> moveTaskProcessRelation(User loginUser, long projectCode, long processDefinitionCode, long targetProcessDefinitionCode, long taskCode) {
return null;
Project project = projectMapper.queryByCode(projectCode);
//check user access for project
Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(targetProcessDefinitionCode);
if (processDefinition == null) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, targetProcessDefinitionCode);
return result;
}
if (processDefinition.getProjectCode() != projectCode) {
putMsg(result, Status.PROJECT_PROCESS_NOT_MATCH);
return result;
}
List<ProcessTaskRelation> downstreamList = processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, taskCode, 0L);
if (CollectionUtils.isNotEmpty(downstreamList)) {
Set<Long> postTaskCodes = downstreamList
.stream()
.map(ProcessTaskRelation::getPostTaskCode)
.collect(Collectors.toSet());
putMsg(result, Status.TASK_HAS_DOWNSTREAM, org.apache.commons.lang.StringUtils.join(postTaskCodes, ","));
return result;
}
List<ProcessTaskRelation> upstreamList = processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, 0L, taskCode);
if (upstreamList.isEmpty()) {
putMsg(result, Status.PROCESS_TASK_RELATION_NOT_EXIST, "taskCode:" + taskCode);
return result;
} else {
Set<Long> preTaskCodes = upstreamList
.stream()
.map(ProcessTaskRelation::getPreTaskCode)
.collect(Collectors.toSet());
if (preTaskCodes.size() > 1 || !preTaskCodes.contains(0L)) {
putMsg(result, Status.TASK_HAS_UPSTREAM, org.apache.commons.lang.StringUtils.join(preTaskCodes, ","));
return result;
}
}
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode);
if (null == taskDefinition) {
putMsg(result, Status.DATA_IS_NULL, "taskDefinition");
return result;
}
ObjectNode paramNode = JSONUtils.parseObject(taskDefinition.getTaskParams());
if (TaskType.DEPENDENT.getDesc().equals(taskDefinition.getTaskType())) {
Set<Long> depProcessDefinitionCodes = new HashSet<>();
ObjectNode dependence = (ObjectNode) paramNode.get("dependence");
ArrayNode dependTaskList = JSONUtils.parseArray(JSONUtils.toJsonString(dependence.get("dependTaskList")));
for (int i = 0; i < dependTaskList.size(); i++) {
ObjectNode dependTask = (ObjectNode) dependTaskList.path(i);
ArrayNode dependItemList = JSONUtils.parseArray(JSONUtils.toJsonString(dependTask.get("dependItemList")));
for (int j = 0; j < dependItemList.size(); j++) {
ObjectNode dependItem = (ObjectNode) dependItemList.path(j);
long definitionCode = dependItem.get("definitionCode").asLong();
depProcessDefinitionCodes.add(definitionCode);
}
}
if (depProcessDefinitionCodes.contains(targetProcessDefinitionCode)) {
putMsg(result, DATA_IS_NOT_VALID, "targetProcessDefinitionCode");
return result;
}
}
if (TaskType.SUB_PROCESS.getDesc().equals(taskDefinition.getTaskType())) {
long subProcessDefinitionCode = paramNode.get("processDefinitionCode").asLong();
if (targetProcessDefinitionCode == subProcessDefinitionCode) {
putMsg(result, DATA_IS_NOT_VALID, "targetProcessDefinitionCode");
return result;
}
}
Date now = new Date();
ProcessTaskRelation processTaskRelation = upstreamList.get(0);
processTaskRelation.setProcessDefinitionCode(processDefinition.getCode());
processTaskRelation.setProcessDefinitionVersion(processDefinition.getVersion());
processTaskRelation.setUpdateTime(now);
int update = processTaskRelationMapper.updateById(processTaskRelation);
if (update == 0) {
putMsg(result, Status.MOVE_PROCESS_TASK_RELATION_ERROR);
}
return result;
}
/**

35
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java

@ -246,6 +246,7 @@ public class ProcessTaskRelationServiceTest {
taskDefinition.setProjectCode(1L);
taskDefinition.setCode(1L);
taskDefinition.setVersion(1);
taskDefinition.setTaskType(TaskType.SHELL.getDesc());
return taskDefinition;
}
@ -280,6 +281,40 @@ public class ProcessTaskRelationServiceTest {
processTaskRelationList.add(processTaskRelationLog);
Mockito.when(processTaskRelationMapper.batchInsert(processTaskRelationList)).thenReturn(1);
Mockito.when(processTaskRelationLogMapper.batchInsert(processTaskRelationList)).thenReturn(1);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
}
@Test
public void testMoveTaskProcessRelation() {
long projectCode = 1L;
long processDefinitionCode = 1L;
long taskCode = 1L;
Project project = getProject(projectCode);
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
User loginUser = new User();
loginUser.setId(-1);
loginUser.setUserType(UserType.GENERAL_USER);
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode)).thenReturn(getProcessDefinition());
Mockito.when(processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, taskCode, 0L)).thenReturn(Lists.newArrayList());
Mockito.when(taskDefinitionMapper.queryByCode(taskCode)).thenReturn(getTaskDefinition());
List<ProcessTaskRelation> processTaskRelationList = Lists.newArrayList();
ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
processTaskRelation.setProjectCode(projectCode);
processTaskRelation.setProcessDefinitionCode(processDefinitionCode);
processTaskRelation.setPreTaskCode(0L);
processTaskRelation.setPreTaskVersion(0);
processTaskRelation.setPostTaskCode(taskCode);
processTaskRelation.setPostTaskVersion(1);
processTaskRelationList.add(processTaskRelation);
Mockito.when(processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, 0L, taskCode)).thenReturn(processTaskRelationList);
Mockito.when(processTaskRelationMapper.updateById(processTaskRelation)).thenReturn(1);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
}
@Test

Loading…
Cancel
Save