Browse Source

Merge branch 'dev' of https://github.com/apache/incubator-dolphinscheduler into origin/dev

pull/3/MERGE
dailidong 5 years ago
parent
commit
9dae1aee42
  1. 40
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/BaseDAGService.java
  2. 6
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
  3. 93
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
  4. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java
  5. 177
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
  6. 2
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java
  7. 6
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/JSONUtilsTest.java
  8. 35
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
  9. 2
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java
  10. 19
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java
  11. 15
      dolphinscheduler-dist/src/main/assembly/dolphinscheduler-binary.xml
  12. 1
      pom.xml
  13. 2
      script/dolphinscheduler-daemon.sh

40
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/BaseDAGService.java

@ -20,12 +20,11 @@ import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.process.ProcessDag;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessData;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
import java.util.ArrayList;
import java.util.List;
/**
@ -48,41 +47,8 @@ public class BaseDAGService extends BaseService{
List<TaskNode> taskNodeList = processData.getTasks();
List<TaskNodeRelation> taskNodeRelations = new ArrayList<>();
ProcessDag processDag = DagHelper.getProcessDag(taskNodeList);
//Traversing node information and building relationships
for (TaskNode taskNode : taskNodeList) {
String preTasks = taskNode.getPreTasks();
List<String> preTasksList = JSONUtils.toList(preTasks, String.class);
//if previous tasks not empty
if (preTasksList != null) {
for (String depNode : preTasksList) {
taskNodeRelations.add(new TaskNodeRelation(depNode, taskNode.getName()));
}
}
}
ProcessDag processDag = new ProcessDag();
processDag.setEdges(taskNodeRelations);
processDag.setNodes(taskNodeList);
// generate detail Dag, to be executed
DAG<String, TaskNode, TaskNodeRelation> dag = new DAG<>();
if (CollectionUtils.isNotEmpty(processDag.getNodes())) {
for (TaskNode node : processDag.getNodes()) {
dag.addNode(node.getName(), node);
}
}
if (CollectionUtils.isNotEmpty(processDag.getEdges())) {
for (TaskNodeRelation edge : processDag.getEdges()) {
dag.addEdge(edge.getStartNode(), edge.getEndNode());
}
}
return dag;
return DagHelper.buildDagGraph(processDag);
}
}

6
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java

@ -117,7 +117,7 @@ public class ExecutorService extends BaseService{
}
if (!checkTenantSuitable(processDefinition)){
logger.error("there is not any vaild tenant for the process definition: id:{},name:{}, ",
logger.error("there is not any valid tenant for the process definition: id:{},name:{}, ",
processDefinition.getId(), processDefinition.getName());
putMsg(result, Status.TENANT_NOT_SUITABLE);
return result;
@ -206,7 +206,7 @@ public class ExecutorService extends BaseService{
return checkResult;
}
if (!checkTenantSuitable(processDefinition)){
logger.error("there is not any vaild tenant for the process definition: id:{},name:{}, ",
logger.error("there is not any valid tenant for the process definition: id:{},name:{}, ",
processDefinition.getId(), processDefinition.getName());
putMsg(result, Status.TENANT_NOT_SUITABLE);
}
@ -539,7 +539,7 @@ public class ExecutorService extends BaseService{
}
}
}else{
logger.error("there is not vaild schedule date for the process definition: id:{},date:{}",
logger.error("there is not valid schedule date for the process definition: id:{},date:{}",
processDefineId, schedule);
}
}else{

93
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java

@ -45,6 +45,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.mapper.*;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -465,7 +466,7 @@ public class ProcessDefinitionService extends BaseDAGService {
);
for(Schedule schedule:scheduleList){
logger.info("set schedule offline, schedule id: {}, process definition id: {}", project.getId(), schedule.getId(), id);
logger.info("set schedule offline, project id: {}, schedule id: {}, process definition id: {}", project.getId(), schedule.getId(), id);
// set status
schedule.setReleaseState(ReleaseState.OFFLINE);
scheduleMapper.updateById(schedule);
@ -948,11 +949,16 @@ public class ProcessDefinitionService extends BaseDAGService {
return result;
}
String processDefinitionJson = processDefinition.getProcessDefinitionJson();
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
//process data check
if (null == processData) {
logger.error("process data is null");
putMsg(result,Status.DATA_IS_NOT_VALID, processDefinitionJson);
return result;
}
List<TaskNode> taskNodeList = (processData.getTasks() == null) ? new ArrayList<>() : processData.getTasks();
result.put(Constants.DATA_LIST, taskNodeList);
@ -974,14 +980,13 @@ public class ProcessDefinitionService extends BaseDAGService {
Map<Integer, List<TaskNode>> taskNodeMap = new HashMap<>();
String[] idList = defineIdList.split(",");
List<String> definitionIdList = Arrays.asList(idList);
List<Integer> idIntList = new ArrayList<>();
for(String definitionId : definitionIdList) {
for(String definitionId : idList) {
idIntList.add(Integer.parseInt(definitionId));
}
Integer[] idArray = idIntList.toArray(new Integer[idIntList.size()]);
List<ProcessDefinition> processDefinitionList = processDefineMapper.queryDefinitionListByIdList(idArray);
if (processDefinitionList == null || processDefinitionList.size() ==0) {
if (CollectionUtils.isEmpty(processDefinitionList)) {
logger.info("process definition not exists");
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, defineIdList);
return result;
@ -1031,9 +1036,10 @@ public class ProcessDefinitionService extends BaseDAGService {
Map<String, Object> result = new HashMap<>();
ProcessDefinition processDefinition = processDefineMapper.selectById(processId);
if (processDefinition == null) {
if (null == processDefinition) {
logger.info("process define not exists");
throw new RuntimeException("process define not exists");
putMsg(result,Status.PROCESS_DEFINE_NOT_EXIST, processDefinition);
return result;
}
DAG<String, TaskNode, TaskNodeRelation> dag = genDagGraph(processDefinition);
/**
@ -1121,10 +1127,10 @@ public class ProcessDefinitionService extends BaseDAGService {
pTreeViewDto.getChildren().add(treeViewDto);
}
postNodeList = dag.getSubsequentNodes(nodeName);
if (postNodeList != null && postNodeList.size() > 0) {
if (CollectionUtils.isNotEmpty(postNodeList)) {
for (String nextNodeName : postNodeList) {
List<TreeViewDto> treeViewDtoList = waitingRunningNodeMap.get(nextNodeName);
if (treeViewDtoList != null && treeViewDtoList.size() > 0) {
if (CollectionUtils.isNotEmpty(treeViewDtoList)) {
treeViewDtoList.add(treeViewDto);
waitingRunningNodeMap.put(nextNodeName, treeViewDtoList);
} else {
@ -1136,7 +1142,6 @@ public class ProcessDefinitionService extends BaseDAGService {
}
runningNodeMap.remove(nodeName);
}
if (waitingRunningNodeMap == null || waitingRunningNodeMap.size() == 0) {
break;
} else {
@ -1161,75 +1166,29 @@ public class ProcessDefinitionService extends BaseDAGService {
private DAG<String, TaskNode, TaskNodeRelation> genDagGraph(ProcessDefinition processDefinition) throws Exception {
String processDefinitionJson = processDefinition.getProcessDefinitionJson();
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
List<TaskNode> taskNodeList = processData.getTasks();
//check process data
if (null != processData) {
List<TaskNode> taskNodeList = processData.getTasks();
processDefinition.setGlobalParamList(processData.getGlobalParams());
ProcessDag processDag = DagHelper.getProcessDag(taskNodeList);
processDefinition.setGlobalParamList(processData.getGlobalParams());
List<TaskNodeRelation> taskNodeRelations = new ArrayList<>();
// Traverse node information and build relationships
for (TaskNode taskNode : taskNodeList) {
String preTasks = taskNode.getPreTasks();
List<String> preTasksList = JSONUtils.toList(preTasks, String.class);
// If the dependency is not empty
if (preTasksList != null) {
for (String depNode : preTasksList) {
taskNodeRelations.add(new TaskNodeRelation(depNode, taskNode.getName()));
}
}
// Generate concrete Dag to be executed
return DagHelper.buildDagGraph(processDag);
}
ProcessDag processDag = new ProcessDag();
processDag.setEdges(taskNodeRelations);
processDag.setNodes(taskNodeList);
// Generate concrete Dag to be executed
return genDagGraph(processDag);
return new DAG<>();
}
/**
* Generate the DAG of process
*
* @return DAG
*/
private DAG<String, TaskNode, TaskNodeRelation> genDagGraph(ProcessDag processDag) {
DAG<String, TaskNode, TaskNodeRelation> dag = new DAG<>();
/**
* Add the ndoes
*/
if (CollectionUtils.isNotEmpty(processDag.getNodes())) {
for (TaskNode node : processDag.getNodes()) {
dag.addNode(node.getName(), node);
}
}
/**
* Add the edges
*/
if (CollectionUtils.isNotEmpty(processDag.getEdges())) {
for (TaskNodeRelation edge : processDag.getEdges()) {
dag.addEdge(edge.getStartNode(), edge.getEndNode());
}
}
return dag;
}
/**
* whether the graph has a ring
*
* @param taskNodeResponseList
* @return
* @param taskNodeResponseList task node response list
* @return if graph has cycle flag
*/
private boolean graphHasCycle(List<TaskNode> taskNodeResponseList) {
DAG<String, TaskNode, String> graph = new DAG<>();

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java

@ -86,7 +86,7 @@ public class CheckUtils {
* @return true if other parameters are valid, otherwise return false
*/
public static boolean checkOtherParams(String otherParams) {
return StringUtils.isNotEmpty(otherParams) && !JSONUtils.checkJsonVaild(otherParams);
return StringUtils.isNotEmpty(otherParams) && !JSONUtils.checkJsonValid(otherParams);
}
/**

177
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java

@ -16,10 +16,8 @@
*/
package org.apache.dolphinscheduler.api.service;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.metadata.IPage;
import org.apache.dolphinscheduler.api.ApiApplicationServer;
import org.apache.dolphinscheduler.api.dto.ProcessMeta;
import org.apache.dolphinscheduler.api.enums.Status;
@ -82,6 +80,12 @@ public class ProcessDefinitionServiceTest {
@Mock
private ProcessService processService;
@Mock
private ProcessInstanceMapper processInstanceMapper;
@Mock
private TaskInstanceMapper taskInstanceMapper;
private String sqlDependentJson = "{\"globalParams\":[]," +
"\"tasks\":[{\"type\":\"SQL\",\"id\":\"tasks-27297\",\"name\":\"sql\"," +
"\"params\":{\"type\":\"MYSQL\",\"datasource\":1,\"sql\":\"select * from test\"," +
@ -99,6 +103,12 @@ public class ProcessDefinitionServiceTest {
"\"timeout\":{\"strategy\":\"\",\"enable\":false},\"taskInstancePriority\":\"MEDIUM\"," +
"\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":1,\"timeout\":0}";
private String shellJson = "{\"globalParams\":[],\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-9527\",\"name\":\"shell-1\"," +
"\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"#!/bin/bash\\necho \\\"shell-1\\\"\"}," +
"\"description\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\"," +
"\"timeout\":{\"strategy\":\"\",\"interval\":1,\"enable\":false},\"taskInstancePriority\":\"MEDIUM\"," +
"\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":1,\"timeout\":0}";
@Test
public void testQueryProccessDefinitionList() {
String projectName = "project_test1";
@ -149,7 +159,7 @@ public class ProcessDefinitionServiceTest {
}
@Test
public void testQueryProccessDefinitionById() {
public void testQueryProcessDefinitionById() {
String projectName = "project_test1";
Mockito.when(projectMapper.queryByName(projectName)).thenReturn(getProject(projectName));
@ -255,7 +265,7 @@ public class ProcessDefinitionServiceTest {
"project_test1", 46);
Assert.assertEquals(Status.DELETE_PROCESS_DEFINE_BY_ID_ERROR, deleteFail.get(Constants.STATUS));
//delte success
//delete success
Mockito.when(processDefineMapper.deleteById(46)).thenReturn(1);
Map<String, Object> deleteSuccess = processDefinitionService.deleteProcessDefinitionById(loginUser,
"project_test1", 46);
@ -304,6 +314,155 @@ public class ProcessDefinitionServiceTest {
Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR, failRes.get(Constants.STATUS));
}
@Test
public void testVerifyProcessDefinitionName() {
String projectName = "project_test1";
Mockito.when(projectMapper.queryByName(projectName)).thenReturn(getProject(projectName));
Project project = getProject(projectName);
User loginUser = new User();
loginUser.setId(-1);
loginUser.setUserType(UserType.GENERAL_USER);
//project check auth fail
Map<String, Object> result = new HashMap<>(5);
putMsg(result, Status.PROJECT_NOT_FOUNT, projectName);
Mockito.when(projectService.checkProjectAndAuth(loginUser,project,projectName)).thenReturn(result);
Map<String, Object> map = processDefinitionService.verifyProccessDefinitionName(loginUser,
"project_test1", "test_pdf");
Assert.assertEquals(Status.PROJECT_NOT_FOUNT, map.get(Constants.STATUS));
//project check auth success, process not exist
putMsg(result, Status.SUCCESS, projectName);
Mockito.when(processDefineMapper.queryByDefineName(project.getId(),"test_pdf")).thenReturn(null);
Map<String, Object> processNotExistRes = processDefinitionService.verifyProccessDefinitionName(loginUser,
"project_test1", "test_pdf");
Assert.assertEquals(Status.SUCCESS, processNotExistRes.get(Constants.STATUS));
//process exist
Mockito.when(processDefineMapper.queryByDefineName(project.getId(),"test_pdf")).thenReturn(getProcessDefinition());
Map<String, Object> processExistRes = processDefinitionService.verifyProccessDefinitionName(loginUser,
"project_test1", "test_pdf");
Assert.assertEquals(Status.PROCESS_INSTANCE_EXIST, processExistRes.get(Constants.STATUS));
}
@Test
public void testCheckProcessNodeList() {
Map<String, Object> dataNotValidRes = processDefinitionService.checkProcessNodeList(null, "");
Assert.assertEquals(Status.DATA_IS_NOT_VALID, dataNotValidRes.get(Constants.STATUS));
//task not empty
String processDefinitionJson = shellJson;
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
assert processData != null;
Map<String, Object> taskEmptyRes = processDefinitionService.checkProcessNodeList(processData, processDefinitionJson);
Assert.assertEquals(Status.SUCCESS, taskEmptyRes.get(Constants.STATUS));
//task empty
processData.setTasks(null);
Map<String, Object> taskNotEmptyRes = processDefinitionService.checkProcessNodeList(processData, processDefinitionJson);
Assert.assertEquals(Status.DATA_IS_NULL, taskNotEmptyRes.get(Constants.STATUS));
//json abnormal
String abnormalJson = processDefinitionJson.replaceAll("SHELL","");
processData = JSONUtils.parseObject(abnormalJson, ProcessData.class);
Map<String, Object> abnormalTaskRes = processDefinitionService.checkProcessNodeList(processData, abnormalJson);
Assert.assertEquals(Status.PROCESS_NODE_S_PARAMETER_INVALID, abnormalTaskRes.get(Constants.STATUS));
}
@Test
public void testGetTaskNodeListByDefinitionId() throws Exception {
//process definition not exist
Mockito.when(processDefineMapper.selectById(46)).thenReturn(null);
Map<String, Object> processDefinitionNullRes = processDefinitionService.getTaskNodeListByDefinitionId(46);
Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionNullRes.get(Constants.STATUS));
//process data null
ProcessDefinition processDefinition = getProcessDefinition();
Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition);
Map<String, Object> successRes = processDefinitionService.getTaskNodeListByDefinitionId(46);
Assert.assertEquals(Status.DATA_IS_NOT_VALID, successRes.get(Constants.STATUS));
//success
processDefinition.setProcessDefinitionJson(shellJson);
Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition);
Map<String, Object> dataNotValidRes = processDefinitionService.getTaskNodeListByDefinitionId(46);
Assert.assertEquals(Status.SUCCESS, dataNotValidRes.get(Constants.STATUS));
}
@Test
public void testGetTaskNodeListByDefinitionIdList() throws Exception {
//process definition not exist
String defineIdList = "46";
Integer[] idArray = {46};
Mockito.when(processDefineMapper.queryDefinitionListByIdList(idArray)).thenReturn(null);
Map<String, Object> processNotExistRes = processDefinitionService.getTaskNodeListByDefinitionIdList(defineIdList);
Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, processNotExistRes.get(Constants.STATUS));
//process definition exist
ProcessDefinition processDefinition = getProcessDefinition();
processDefinition.setProcessDefinitionJson(shellJson);
List<ProcessDefinition> processDefinitionList = new ArrayList<>();
processDefinitionList.add(processDefinition);
Mockito.when(processDefineMapper.queryDefinitionListByIdList(idArray)).thenReturn(processDefinitionList);
Map<String, Object> successRes = processDefinitionService.getTaskNodeListByDefinitionIdList(defineIdList);
Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
}
@Test
public void testQueryProccessDefinitionAllByProjectId() {
int projectId = 1;
ProcessDefinition processDefinition = getProcessDefinition();
processDefinition.setProcessDefinitionJson(shellJson);
List<ProcessDefinition> processDefinitionList = new ArrayList<>();
processDefinitionList.add(processDefinition);
Mockito.when(processDefineMapper.queryAllDefinitionList(projectId)).thenReturn(processDefinitionList);
Map<String, Object> successRes = processDefinitionService.queryProccessDefinitionAllByProjectId(projectId);
Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
}
@Test
public void testViewTree() throws Exception {
//process definition not exist
ProcessDefinition processDefinition = getProcessDefinition();
processDefinition.setProcessDefinitionJson(shellJson);
Mockito.when(processDefineMapper.selectById(46)).thenReturn(null);
Map<String, Object> processDefinitionNullRes = processDefinitionService.viewTree(46, 10);
Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionNullRes.get(Constants.STATUS));
List<ProcessInstance> processInstanceList = new ArrayList<>();
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(1);
processInstance.setName("test_instance");
processInstance.setState(ExecutionStatus.RUNNING_EXEUTION);
processInstance.setHost("192.168.xx.xx");
processInstance.setStartTime(new Date());
processInstance.setEndTime(new Date());
processInstanceList.add(processInstance);
TaskInstance taskInstance = new TaskInstance();
taskInstance.setStartTime(new Date());
taskInstance.setEndTime(new Date());
taskInstance.setTaskType("SHELL");
taskInstance.setId(1);
taskInstance.setName("test_task_instance");
taskInstance.setState(ExecutionStatus.RUNNING_EXEUTION);
taskInstance.setHost("192.168.xx.xx");
//task instance not exist
Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition);
Mockito.when(processInstanceMapper.queryByProcessDefineId(46, 10)).thenReturn(processInstanceList);
Mockito.when(taskInstanceMapper.queryByInstanceIdAndName(processInstance.getId(), "shell-1")).thenReturn(null);
Map<String, Object> taskNullRes = processDefinitionService.viewTree(46, 10);
Assert.assertEquals(Status.SUCCESS, taskNullRes.get(Constants.STATUS));
//task instance exist
Mockito.when(taskInstanceMapper.queryByInstanceIdAndName(processInstance.getId(), "shell-1")).thenReturn(taskInstance);
Map<String, Object> taskNotNuLLRes = processDefinitionService.viewTree(46, 10);
Assert.assertEquals(Status.SUCCESS, taskNotNuLLRes.get(Constants.STATUS));
}
/**
* add datasource param and dependent when export process
* @throws JSONException
@ -334,13 +493,9 @@ public class ProcessDefinitionServiceTest {
@Test
public void testAddExportTaskNodeSpecialParam() throws JSONException {
String shellJson = "{\"globalParams\":[],\"tasks\":[{\"id\":\"tasks-9527\",\"name\":\"shell-1\"," +
"\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"#!/bin/bash\\necho \\\"shell-1\\\"\"}," +
"\"description\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\"," +
"\"timeout\":{\"strategy\":\"\",\"interval\":1,\"enable\":false},\"taskInstancePriority\":\"MEDIUM\"," +
"\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":1,\"timeout\":0}";
String shellData = shellJson;
String resultStr = processDefinitionService.addExportTaskNodeSpecialParam(shellJson);
String resultStr = processDefinitionService.addExportTaskNodeSpecialParam(shellData);
JSONAssert.assertEquals(shellJson, resultStr, false);
}
@ -610,7 +765,7 @@ public class ProcessDefinitionServiceTest {
private ProcessDefinition getProcessDefinition(){
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setId(46);
processDefinition.setName("testProject");
processDefinition.setName("test_pdf");
processDefinition.setProjectId(2);
processDefinition.setTenantId(1);
processDefinition.setDescription("");

2
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java

@ -126,7 +126,7 @@ public class JSONUtils {
* @param json json
* @return true if valid
*/
public static boolean checkJsonVaild(String json) {
public static boolean checkJsonValid(String json) {
if (StringUtils.isEmpty(json)) {
return false;

6
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/JSONUtilsTest.java

@ -117,9 +117,9 @@ public class JSONUtilsTest {
}
@Test
public void testCheckJsonVaild() {
Assert.assertTrue(JSONUtils.checkJsonVaild("3"));
Assert.assertFalse(JSONUtils.checkJsonVaild(""));
public void testCheckJsonValid() {
Assert.assertTrue(JSONUtils.checkJsonValid("3"));
Assert.assertFalse(JSONUtils.checkJsonValid(""));
}
@Test

35
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java

@ -319,18 +319,14 @@ public class DagHelper {
DAG<String,TaskNode,TaskNodeRelation> dag = new DAG<>();
/**
* add vertex
*/
//add vertex
if (CollectionUtils.isNotEmpty(processDag.getNodes())){
for (TaskNode node : processDag.getNodes()){
dag.addNode(node.getName(),node);
}
}
/**
* add edge
*/
//add edge
if (CollectionUtils.isNotEmpty(processDag.getEdges())){
for (TaskNodeRelation edge : processDag.getEdges()){
dag.addEdge(edge.getStartNode(),edge.getEndNode());
@ -338,4 +334,31 @@ public class DagHelper {
}
return dag;
}
/**
* get process dag
* @param taskNodeList task node list
* @return Process dag
*/
public static ProcessDag getProcessDag(List<TaskNode> taskNodeList) {
List<TaskNodeRelation> taskNodeRelations = new ArrayList<>();
// Traverse node information and build relationships
for (TaskNode taskNode : taskNodeList) {
String preTasks = taskNode.getPreTasks();
List<String> preTasksList = JSONUtils.toList(preTasks, String.class);
// If the dependency is not empty
if (preTasksList != null) {
for (String depNode : preTasksList) {
taskNodeRelations.add(new TaskNodeRelation(depNode, taskNode.getName()));
}
}
}
ProcessDag processDag = new ProcessDag();
processDag.setEdges(taskNodeRelations);
processDag.setNodes(taskNodeList);
return processDag;
}
}

2
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java

@ -121,7 +121,7 @@ public class TaskInstanceMapperTest {
}
/**
* test find vaild task list by process instance id
* test find valid task list by process instance id
*/
@Test
public void testFindValidTaskListByProcessId() {

19
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java

@ -24,6 +24,8 @@ import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.process.ProcessDag;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessData;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.junit.Assert;
import org.junit.Test;
@ -37,7 +39,6 @@ import java.util.Map;
* dag helper test
*/
public class DagHelperTest {
/**
* test task node can submit
* @throws JsonProcessingException if error throws JsonProcessingException
@ -131,4 +132,20 @@ public class DagHelperTest {
return DagHelper.buildDagGraph(processDag);
}
@Test
public void testBuildDagGraph() {
String shellJson = "{\"globalParams\":[],\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-9527\",\"name\":\"shell-1\"," +
"\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"#!/bin/bash\\necho \\\"shell-1\\\"\"}," +
"\"description\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\"," +
"\"timeout\":{\"strategy\":\"\",\"interval\":1,\"enable\":false},\"taskInstancePriority\":\"MEDIUM\"," +
"\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":1,\"timeout\":0}";
ProcessData processData = JSONUtils.parseObject(shellJson, ProcessData.class);
assert processData != null;
List<TaskNode> taskNodeList = processData.getTasks();
ProcessDag processDag = DagHelper.getProcessDag(taskNodeList);
DAG<String, TaskNode, TaskNodeRelation> dag = DagHelper.buildDagGraph(processDag);
Assert.assertNotNull(dag);
}
}

15
dolphinscheduler-dist/src/main/assembly/dolphinscheduler-binary.xml vendored

@ -112,6 +112,21 @@
</fileSet>
<!--server end-->
<!--service end-->
<fileSet>
<directory>${basedir}/../dolphinscheduler-service/src/main/resources</directory>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
<include>**/*.json</include>
<include>**/*.yml</include>
</includes>
<outputDirectory>conf</outputDirectory>
</fileSet>
<!--service end-->
<fileSet>
<directory>${basedir}/../dolphinscheduler-server/target/dolphinscheduler-server-${project.version}</directory>
<includes>

1
pom.xml

@ -721,6 +721,7 @@
<include>**/dao/mapper/AlertMapperTest.java</include>
<include>**/dao/mapper/CommandMapperTest.java</include>
<include>**/dao/cron/CronUtilsTest.java</include>
<include>**/dao/utils/DagHelperTest.java</include>
<include>**/alert/template/AlertTemplateFactoryTest.java</include>
<include>**/alert/template/impl/DefaultHTMLTemplateTest.java</include>
<include>**/server/worker/task/datax/DataxTaskTest.java</include>

2
script/dolphinscheduler-daemon.sh

@ -69,7 +69,7 @@ elif [ "$command" = "alert-server" ]; then
LOG_FILE="-Dserver=alert-server"
CLASS=org.apache.dolphinscheduler.alert.AlertServer
elif [ "$command" = "logger-server" ]; then
CLASS=org.apache.dolphinscheduler.server.rpc.LoggerServer
CLASS=org.apache.dolphinscheduler.server.log.LoggerServer
elif [ "$command" = "combined-server" ]; then
LOG_FILE="-Dlogging.config=classpath:combined_logback.xml -Dspring.profiles.active=api -Dserver.is-combined-server=true"
CLASS=org.apache.dolphinscheduler.api.CombinedApplicationServer

Loading…
Cancel
Save