Browse Source

add process null check

Remove duplicate code in processDefinitionService
add processDefinitionService UT
pull/2/head
Yeleights 5 years ago
parent
commit
6c90575a39
  1. 40
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/BaseDAGService.java
  2. 87
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
  3. 177
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
  4. 35
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
  5. 19
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java

40
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/BaseDAGService.java

@ -20,12 +20,11 @@ import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation; import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.process.ProcessDag; import org.apache.dolphinscheduler.common.process.ProcessDag;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessData; import org.apache.dolphinscheduler.dao.entity.ProcessData;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
import java.util.ArrayList;
import java.util.List; import java.util.List;
/** /**
@ -48,41 +47,8 @@ public class BaseDAGService extends BaseService{
List<TaskNode> taskNodeList = processData.getTasks(); List<TaskNode> taskNodeList = processData.getTasks();
List<TaskNodeRelation> taskNodeRelations = new ArrayList<>(); ProcessDag processDag = DagHelper.getProcessDag(taskNodeList);
//Traversing node information and building relationships return DagHelper.buildDagGraph(processDag);
for (TaskNode taskNode : taskNodeList) {
String preTasks = taskNode.getPreTasks();
List<String> preTasksList = JSONUtils.toList(preTasks, String.class);
//if previous tasks not empty
if (preTasksList != null) {
for (String depNode : preTasksList) {
taskNodeRelations.add(new TaskNodeRelation(depNode, taskNode.getName()));
}
}
}
ProcessDag processDag = new ProcessDag();
processDag.setEdges(taskNodeRelations);
processDag.setNodes(taskNodeList);
// generate detail Dag, to be executed
DAG<String, TaskNode, TaskNodeRelation> dag = new DAG<>();
if (CollectionUtils.isNotEmpty(processDag.getNodes())) {
for (TaskNode node : processDag.getNodes()) {
dag.addNode(node.getName(), node);
}
}
if (CollectionUtils.isNotEmpty(processDag.getEdges())) {
for (TaskNodeRelation edge : processDag.getEdges()) {
dag.addEdge(edge.getStartNode(), edge.getEndNode());
}
}
return dag;
} }
} }

87
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java

@ -46,6 +46,7 @@ import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.ProcessDao; import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.*; import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.mapper.*; import org.apache.dolphinscheduler.dao.mapper.*;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -948,11 +949,16 @@ public class ProcessDefinitionService extends BaseDAGService {
return result; return result;
} }
String processDefinitionJson = processDefinition.getProcessDefinitionJson(); String processDefinitionJson = processDefinition.getProcessDefinitionJson();
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
//process data check
if (null == processData) {
logger.error("process data is null");
putMsg(result,Status.DATA_IS_NOT_VALID, processDefinitionJson);
return result;
}
List<TaskNode> taskNodeList = (processData.getTasks() == null) ? new ArrayList<>() : processData.getTasks(); List<TaskNode> taskNodeList = (processData.getTasks() == null) ? new ArrayList<>() : processData.getTasks();
result.put(Constants.DATA_LIST, taskNodeList); result.put(Constants.DATA_LIST, taskNodeList);
@ -974,14 +980,13 @@ public class ProcessDefinitionService extends BaseDAGService {
Map<Integer, List<TaskNode>> taskNodeMap = new HashMap<>(); Map<Integer, List<TaskNode>> taskNodeMap = new HashMap<>();
String[] idList = defineIdList.split(","); String[] idList = defineIdList.split(",");
List<String> definitionIdList = Arrays.asList(idList);
List<Integer> idIntList = new ArrayList<>(); List<Integer> idIntList = new ArrayList<>();
for(String definitionId : definitionIdList) { for(String definitionId : idList) {
idIntList.add(Integer.parseInt(definitionId)); idIntList.add(Integer.parseInt(definitionId));
} }
Integer[] idArray = idIntList.toArray(new Integer[idIntList.size()]); Integer[] idArray = idIntList.toArray(new Integer[idIntList.size()]);
List<ProcessDefinition> processDefinitionList = processDefineMapper.queryDefinitionListByIdList(idArray); List<ProcessDefinition> processDefinitionList = processDefineMapper.queryDefinitionListByIdList(idArray);
if (processDefinitionList == null || processDefinitionList.size() ==0) { if (CollectionUtils.isEmpty(processDefinitionList)) {
logger.info("process definition not exists"); logger.info("process definition not exists");
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, defineIdList); putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, defineIdList);
return result; return result;
@ -1031,9 +1036,10 @@ public class ProcessDefinitionService extends BaseDAGService {
Map<String, Object> result = new HashMap<>(); Map<String, Object> result = new HashMap<>();
ProcessDefinition processDefinition = processDefineMapper.selectById(processId); ProcessDefinition processDefinition = processDefineMapper.selectById(processId);
if (processDefinition == null) { if (null == processDefinition) {
logger.info("process define not exists"); logger.info("process define not exists");
throw new RuntimeException("process define not exists"); putMsg(result,Status.PROCESS_DEFINE_NOT_EXIST, processDefinition);
return result;
} }
DAG<String, TaskNode, TaskNodeRelation> dag = genDagGraph(processDefinition); DAG<String, TaskNode, TaskNodeRelation> dag = genDagGraph(processDefinition);
/** /**
@ -1121,10 +1127,10 @@ public class ProcessDefinitionService extends BaseDAGService {
pTreeViewDto.getChildren().add(treeViewDto); pTreeViewDto.getChildren().add(treeViewDto);
} }
postNodeList = dag.getSubsequentNodes(nodeName); postNodeList = dag.getSubsequentNodes(nodeName);
if (postNodeList != null && postNodeList.size() > 0) { if (CollectionUtils.isNotEmpty(postNodeList)) {
for (String nextNodeName : postNodeList) { for (String nextNodeName : postNodeList) {
List<TreeViewDto> treeViewDtoList = waitingRunningNodeMap.get(nextNodeName); List<TreeViewDto> treeViewDtoList = waitingRunningNodeMap.get(nextNodeName);
if (treeViewDtoList != null && treeViewDtoList.size() > 0) { if (CollectionUtils.isNotEmpty(treeViewDtoList)) {
treeViewDtoList.add(treeViewDto); treeViewDtoList.add(treeViewDto);
waitingRunningNodeMap.put(nextNodeName, treeViewDtoList); waitingRunningNodeMap.put(nextNodeName, treeViewDtoList);
} else { } else {
@ -1136,7 +1142,6 @@ public class ProcessDefinitionService extends BaseDAGService {
} }
runningNodeMap.remove(nodeName); runningNodeMap.remove(nodeName);
} }
if (waitingRunningNodeMap == null || waitingRunningNodeMap.size() == 0) { if (waitingRunningNodeMap == null || waitingRunningNodeMap.size() == 0) {
break; break;
} else { } else {
@ -1161,68 +1166,22 @@ public class ProcessDefinitionService extends BaseDAGService {
private DAG<String, TaskNode, TaskNodeRelation> genDagGraph(ProcessDefinition processDefinition) throws Exception { private DAG<String, TaskNode, TaskNodeRelation> genDagGraph(ProcessDefinition processDefinition) throws Exception {
String processDefinitionJson = processDefinition.getProcessDefinitionJson(); String processDefinitionJson = processDefinition.getProcessDefinitionJson();
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
List<TaskNode> taskNodeList = processData.getTasks(); //check process data
if (null != processData) {
List<TaskNode> taskNodeList = processData.getTasks();
processDefinition.setGlobalParamList(processData.getGlobalParams());
ProcessDag processDag = DagHelper.getProcessDag(taskNodeList);
processDefinition.setGlobalParamList(processData.getGlobalParams()); // Generate concrete Dag to be executed
return DagHelper.buildDagGraph(processDag);
List<TaskNodeRelation> taskNodeRelations = new ArrayList<>();
// Traverse node information and build relationships
for (TaskNode taskNode : taskNodeList) {
String preTasks = taskNode.getPreTasks();
List<String> preTasksList = JSONUtils.toList(preTasks, String.class);
// If the dependency is not empty
if (preTasksList != null) {
for (String depNode : preTasksList) {
taskNodeRelations.add(new TaskNodeRelation(depNode, taskNode.getName()));
}
}
} }
ProcessDag processDag = new ProcessDag(); return new DAG<>();
processDag.setEdges(taskNodeRelations);
processDag.setNodes(taskNodeList);
// Generate concrete Dag to be executed
return genDagGraph(processDag);
} }
/**
* Generate the DAG of process
*
* @return DAG
*/
private DAG<String, TaskNode, TaskNodeRelation> genDagGraph(ProcessDag processDag) {
DAG<String, TaskNode, TaskNodeRelation> dag = new DAG<>();
/**
* Add the ndoes
*/
if (CollectionUtils.isNotEmpty(processDag.getNodes())) {
for (TaskNode node : processDag.getNodes()) {
dag.addNode(node.getName(), node);
}
}
/**
* Add the edges
*/
if (CollectionUtils.isNotEmpty(processDag.getEdges())) {
for (TaskNodeRelation edge : processDag.getEdges()) {
dag.addEdge(edge.getStartNode(), edge.getEndNode());
}
}
return dag;
}
/** /**

177
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java

@ -16,10 +16,8 @@
*/ */
package org.apache.dolphinscheduler.api.service; package org.apache.dolphinscheduler.api.service;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.metadata.IPage;
import org.apache.dolphinscheduler.api.ApiApplicationServer; import org.apache.dolphinscheduler.api.ApiApplicationServer;
import org.apache.dolphinscheduler.api.dto.ProcessMeta; import org.apache.dolphinscheduler.api.dto.ProcessMeta;
import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.enums.Status;
@ -82,6 +80,12 @@ public class ProcessDefinitionServiceTest {
@Mock @Mock
private ProcessDao processDao; private ProcessDao processDao;
@Mock
private ProcessInstanceMapper processInstanceMapper;
@Mock
private TaskInstanceMapper taskInstanceMapper;
private String sqlDependentJson = "{\"globalParams\":[]," + private String sqlDependentJson = "{\"globalParams\":[]," +
"\"tasks\":[{\"type\":\"SQL\",\"id\":\"tasks-27297\",\"name\":\"sql\"," + "\"tasks\":[{\"type\":\"SQL\",\"id\":\"tasks-27297\",\"name\":\"sql\"," +
"\"params\":{\"type\":\"MYSQL\",\"datasource\":1,\"sql\":\"select * from test\"," + "\"params\":{\"type\":\"MYSQL\",\"datasource\":1,\"sql\":\"select * from test\"," +
@ -99,6 +103,12 @@ public class ProcessDefinitionServiceTest {
"\"timeout\":{\"strategy\":\"\",\"enable\":false},\"taskInstancePriority\":\"MEDIUM\"," + "\"timeout\":{\"strategy\":\"\",\"enable\":false},\"taskInstancePriority\":\"MEDIUM\"," +
"\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":1,\"timeout\":0}"; "\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":1,\"timeout\":0}";
private String shellJson = "{\"globalParams\":[],\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-9527\",\"name\":\"shell-1\"," +
"\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"#!/bin/bash\\necho \\\"shell-1\\\"\"}," +
"\"description\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\"," +
"\"timeout\":{\"strategy\":\"\",\"interval\":1,\"enable\":false},\"taskInstancePriority\":\"MEDIUM\"," +
"\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":1,\"timeout\":0}";
@Test @Test
public void testQueryProccessDefinitionList() { public void testQueryProccessDefinitionList() {
String projectName = "project_test1"; String projectName = "project_test1";
@ -149,7 +159,7 @@ public class ProcessDefinitionServiceTest {
} }
@Test @Test
public void testQueryProccessDefinitionById() { public void testQueryProcessDefinitionById() {
String projectName = "project_test1"; String projectName = "project_test1";
Mockito.when(projectMapper.queryByName(projectName)).thenReturn(getProject(projectName)); Mockito.when(projectMapper.queryByName(projectName)).thenReturn(getProject(projectName));
@ -255,7 +265,7 @@ public class ProcessDefinitionServiceTest {
"project_test1", 46); "project_test1", 46);
Assert.assertEquals(Status.DELETE_PROCESS_DEFINE_BY_ID_ERROR, deleteFail.get(Constants.STATUS)); Assert.assertEquals(Status.DELETE_PROCESS_DEFINE_BY_ID_ERROR, deleteFail.get(Constants.STATUS));
//delte success //delete success
Mockito.when(processDefineMapper.deleteById(46)).thenReturn(1); Mockito.when(processDefineMapper.deleteById(46)).thenReturn(1);
Map<String, Object> deleteSuccess = processDefinitionService.deleteProcessDefinitionById(loginUser, Map<String, Object> deleteSuccess = processDefinitionService.deleteProcessDefinitionById(loginUser,
"project_test1", 46); "project_test1", 46);
@ -304,6 +314,155 @@ public class ProcessDefinitionServiceTest {
Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR, failRes.get(Constants.STATUS)); Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR, failRes.get(Constants.STATUS));
} }
@Test
public void testVerifyProcessDefinitionName() {
String projectName = "project_test1";
Mockito.when(projectMapper.queryByName(projectName)).thenReturn(getProject(projectName));
Project project = getProject(projectName);
User loginUser = new User();
loginUser.setId(-1);
loginUser.setUserType(UserType.GENERAL_USER);
//project check auth fail
Map<String, Object> result = new HashMap<>(5);
putMsg(result, Status.PROJECT_NOT_FOUNT, projectName);
Mockito.when(projectService.checkProjectAndAuth(loginUser,project,projectName)).thenReturn(result);
Map<String, Object> map = processDefinitionService.verifyProccessDefinitionName(loginUser,
"project_test1", "test_pdf");
Assert.assertEquals(Status.PROJECT_NOT_FOUNT, map.get(Constants.STATUS));
//project check auth success, process not exist
putMsg(result, Status.SUCCESS, projectName);
Mockito.when(processDefineMapper.queryByDefineName(project.getId(),"test_pdf")).thenReturn(null);
Map<String, Object> processNotExistRes = processDefinitionService.verifyProccessDefinitionName(loginUser,
"project_test1", "test_pdf");
Assert.assertEquals(Status.SUCCESS, processNotExistRes.get(Constants.STATUS));
//process exist
Mockito.when(processDefineMapper.queryByDefineName(project.getId(),"test_pdf")).thenReturn(getProcessDefinition());
Map<String, Object> processExistRes = processDefinitionService.verifyProccessDefinitionName(loginUser,
"project_test1", "test_pdf");
Assert.assertEquals(Status.PROCESS_INSTANCE_EXIST, processExistRes.get(Constants.STATUS));
}
@Test
public void testCheckProcessNodeList() {
Map<String, Object> dataNotValidRes = processDefinitionService.checkProcessNodeList(null, "");
Assert.assertEquals(Status.DATA_IS_NOT_VALID, dataNotValidRes.get(Constants.STATUS));
//task not empty
String processDefinitionJson = shellJson;
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
assert processData != null;
Map<String, Object> taskEmptyRes = processDefinitionService.checkProcessNodeList(processData, processDefinitionJson);
Assert.assertEquals(Status.SUCCESS, taskEmptyRes.get(Constants.STATUS));
//task empty
processData.setTasks(null);
Map<String, Object> taskNotEmptyRes = processDefinitionService.checkProcessNodeList(processData, processDefinitionJson);
Assert.assertEquals(Status.DATA_IS_NULL, taskNotEmptyRes.get(Constants.STATUS));
//json abnormal
String abnormalJson = processDefinitionJson.replaceAll("SHELL","");
processData = JSONUtils.parseObject(abnormalJson, ProcessData.class);
Map<String, Object> abnormalTaskRes = processDefinitionService.checkProcessNodeList(processData, abnormalJson);
Assert.assertEquals(Status.PROCESS_NODE_S_PARAMETER_INVALID, abnormalTaskRes.get(Constants.STATUS));
}
@Test
public void testGetTaskNodeListByDefinitionId() throws Exception {
//process definition not exist
Mockito.when(processDefineMapper.selectById(46)).thenReturn(null);
Map<String, Object> processDefinitionNullRes = processDefinitionService.getTaskNodeListByDefinitionId(46);
Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionNullRes.get(Constants.STATUS));
//process data null
ProcessDefinition processDefinition = getProcessDefinition();
Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition);
Map<String, Object> successRes = processDefinitionService.getTaskNodeListByDefinitionId(46);
Assert.assertEquals(Status.DATA_IS_NOT_VALID, successRes.get(Constants.STATUS));
//success
processDefinition.setProcessDefinitionJson(shellJson);
Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition);
Map<String, Object> dataNotValidRes = processDefinitionService.getTaskNodeListByDefinitionId(46);
Assert.assertEquals(Status.SUCCESS, dataNotValidRes.get(Constants.STATUS));
}
@Test
public void testGetTaskNodeListByDefinitionIdList() throws Exception {
//process definition not exist
String defineIdList = "46";
Integer[] idArray = {46};
Mockito.when(processDefineMapper.queryDefinitionListByIdList(idArray)).thenReturn(null);
Map<String, Object> processNotExistRes = processDefinitionService.getTaskNodeListByDefinitionIdList(defineIdList);
Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, processNotExistRes.get(Constants.STATUS));
//process definition exist
ProcessDefinition processDefinition = getProcessDefinition();
processDefinition.setProcessDefinitionJson(shellJson);
List<ProcessDefinition> processDefinitionList = new ArrayList<>();
processDefinitionList.add(processDefinition);
Mockito.when(processDefineMapper.queryDefinitionListByIdList(idArray)).thenReturn(processDefinitionList);
Map<String, Object> successRes = processDefinitionService.getTaskNodeListByDefinitionIdList(defineIdList);
Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
}
@Test
public void testQueryProccessDefinitionAllByProjectId() {
int projectId = 1;
ProcessDefinition processDefinition = getProcessDefinition();
processDefinition.setProcessDefinitionJson(shellJson);
List<ProcessDefinition> processDefinitionList = new ArrayList<>();
processDefinitionList.add(processDefinition);
Mockito.when(processDefineMapper.queryAllDefinitionList(projectId)).thenReturn(processDefinitionList);
Map<String, Object> successRes = processDefinitionService.queryProccessDefinitionAllByProjectId(projectId);
Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
}
@Test
public void testViewTree() throws Exception {
//process definition not exist
ProcessDefinition processDefinition = getProcessDefinition();
processDefinition.setProcessDefinitionJson(shellJson);
Mockito.when(processDefineMapper.selectById(46)).thenReturn(null);
Map<String, Object> processDefinitionNullRes = processDefinitionService.viewTree(46, 10);
Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionNullRes.get(Constants.STATUS));
List<ProcessInstance> processInstanceList = new ArrayList<>();
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(1);
processInstance.setName("test_instance");
processInstance.setState(ExecutionStatus.RUNNING_EXEUTION);
processInstance.setHost("192.168.xx.xx");
processInstance.setStartTime(new Date());
processInstance.setEndTime(new Date());
processInstanceList.add(processInstance);
TaskInstance taskInstance = new TaskInstance();
taskInstance.setStartTime(new Date());
taskInstance.setEndTime(new Date());
taskInstance.setTaskType("SHELL");
taskInstance.setId(1);
taskInstance.setName("test_task_instance");
taskInstance.setState(ExecutionStatus.RUNNING_EXEUTION);
taskInstance.setHost("192.168.xx.xx");
//task instance not exist
Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition);
Mockito.when(processInstanceMapper.queryByProcessDefineId(46, 10)).thenReturn(processInstanceList);
Mockito.when(taskInstanceMapper.queryByInstanceIdAndName(processInstance.getId(), "shell-1")).thenReturn(null);
Map<String, Object> taskNullRes = processDefinitionService.viewTree(46, 10);
Assert.assertEquals(Status.SUCCESS, taskNullRes.get(Constants.STATUS));
//task instance exist
Mockito.when(taskInstanceMapper.queryByInstanceIdAndName(processInstance.getId(), "shell-1")).thenReturn(taskInstance);
Map<String, Object> taskNotNuLLRes = processDefinitionService.viewTree(46, 10);
Assert.assertEquals(Status.SUCCESS, taskNotNuLLRes.get(Constants.STATUS));
}
/** /**
* add datasource param and dependent when export process * add datasource param and dependent when export process
* @throws JSONException * @throws JSONException
@ -334,13 +493,9 @@ public class ProcessDefinitionServiceTest {
@Test @Test
public void testAddExportTaskNodeSpecialParam() throws JSONException { public void testAddExportTaskNodeSpecialParam() throws JSONException {
String shellJson = "{\"globalParams\":[],\"tasks\":[{\"id\":\"tasks-9527\",\"name\":\"shell-1\"," + String shellData = shellJson;
"\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"#!/bin/bash\\necho \\\"shell-1\\\"\"}," +
"\"description\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\"," +
"\"timeout\":{\"strategy\":\"\",\"interval\":1,\"enable\":false},\"taskInstancePriority\":\"MEDIUM\"," +
"\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":1,\"timeout\":0}";
String resultStr = processDefinitionService.addExportTaskNodeSpecialParam(shellJson); String resultStr = processDefinitionService.addExportTaskNodeSpecialParam(shellData);
JSONAssert.assertEquals(shellJson, resultStr, false); JSONAssert.assertEquals(shellJson, resultStr, false);
} }
@ -610,7 +765,7 @@ public class ProcessDefinitionServiceTest {
private ProcessDefinition getProcessDefinition(){ private ProcessDefinition getProcessDefinition(){
ProcessDefinition processDefinition = new ProcessDefinition(); ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setId(46); processDefinition.setId(46);
processDefinition.setName("testProject"); processDefinition.setName("test_pdf");
processDefinition.setProjectId(2); processDefinition.setProjectId(2);
processDefinition.setTenantId(1); processDefinition.setTenantId(1);
processDefinition.setDescription(""); processDefinition.setDescription("");

35
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java

@ -319,18 +319,14 @@ public class DagHelper {
DAG<String,TaskNode,TaskNodeRelation> dag = new DAG<>(); DAG<String,TaskNode,TaskNodeRelation> dag = new DAG<>();
/** //add vertex
* add vertex
*/
if (CollectionUtils.isNotEmpty(processDag.getNodes())){ if (CollectionUtils.isNotEmpty(processDag.getNodes())){
for (TaskNode node : processDag.getNodes()){ for (TaskNode node : processDag.getNodes()){
dag.addNode(node.getName(),node); dag.addNode(node.getName(),node);
} }
} }
/** //add edge
* add edge
*/
if (CollectionUtils.isNotEmpty(processDag.getEdges())){ if (CollectionUtils.isNotEmpty(processDag.getEdges())){
for (TaskNodeRelation edge : processDag.getEdges()){ for (TaskNodeRelation edge : processDag.getEdges()){
dag.addEdge(edge.getStartNode(),edge.getEndNode()); dag.addEdge(edge.getStartNode(),edge.getEndNode());
@ -338,4 +334,31 @@ public class DagHelper {
} }
return dag; return dag;
} }
/**
* get process dag
* @param taskNodeList task node list
* @return Process dag
*/
public static ProcessDag getProcessDag(List<TaskNode> taskNodeList) {
List<TaskNodeRelation> taskNodeRelations = new ArrayList<>();
// Traverse node information and build relationships
for (TaskNode taskNode : taskNodeList) {
String preTasks = taskNode.getPreTasks();
List<String> preTasksList = JSONUtils.toList(preTasks, String.class);
// If the dependency is not empty
if (preTasksList != null) {
for (String depNode : preTasksList) {
taskNodeRelations.add(new TaskNodeRelation(depNode, taskNode.getName()));
}
}
}
ProcessDag processDag = new ProcessDag();
processDag.setEdges(taskNodeRelations);
processDag.setNodes(taskNodeList);
return processDag;
}
} }

19
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java

@ -24,6 +24,8 @@ import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation; import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.process.ProcessDag; import org.apache.dolphinscheduler.common.process.ProcessDag;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessData;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -37,7 +39,6 @@ import java.util.Map;
* dag helper test * dag helper test
*/ */
public class DagHelperTest { public class DagHelperTest {
/** /**
* test task node can submit * test task node can submit
* @throws JsonProcessingException if error throws JsonProcessingException * @throws JsonProcessingException if error throws JsonProcessingException
@ -131,4 +132,20 @@ public class DagHelperTest {
return DagHelper.buildDagGraph(processDag); return DagHelper.buildDagGraph(processDag);
} }
@Test
public void testBuildDagGraph() {
String shellJson = "{\"globalParams\":[],\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-9527\",\"name\":\"shell-1\"," +
"\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"#!/bin/bash\\necho \\\"shell-1\\\"\"}," +
"\"description\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\"," +
"\"timeout\":{\"strategy\":\"\",\"interval\":1,\"enable\":false},\"taskInstancePriority\":\"MEDIUM\"," +
"\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":1,\"timeout\":0}";
ProcessData processData = JSONUtils.parseObject(shellJson, ProcessData.class);
assert processData != null;
List<TaskNode> taskNodeList = processData.getTasks();
ProcessDag processDag = DagHelper.getProcessDag(taskNodeList);
DAG<String, TaskNode, TaskNodeRelation> dag = DagHelper.buildDagGraph(processDag);
Assert.assertNotNull(dag);
}
} }

Loading…
Cancel
Save