Browse Source

Merge remote-tracking branch 'remotes/upstream/dev' into dev

pull/2/head
qiaozhanwei 5 years ago
parent
commit
5b6ce80a39
  1. 40
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/BaseDAGService.java
  2. 6
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
  3. 93
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
  4. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java
  5. 177
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
  6. 2
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java
  7. 84
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java
  8. 6
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/JSONUtilsTest.java
  9. 23
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/OSUtilsTest.java
  10. 35
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
  11. 2
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java
  12. 19
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java
  13. 1
      pom.xml

40
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/BaseDAGService.java

@ -20,12 +20,11 @@ import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation; import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.process.ProcessDag; import org.apache.dolphinscheduler.common.process.ProcessDag;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessData; import org.apache.dolphinscheduler.dao.entity.ProcessData;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
import java.util.ArrayList;
import java.util.List; import java.util.List;
/** /**
@ -48,41 +47,8 @@ public class BaseDAGService extends BaseService{
List<TaskNode> taskNodeList = processData.getTasks(); List<TaskNode> taskNodeList = processData.getTasks();
List<TaskNodeRelation> taskNodeRelations = new ArrayList<>(); ProcessDag processDag = DagHelper.getProcessDag(taskNodeList);
//Traversing node information and building relationships return DagHelper.buildDagGraph(processDag);
for (TaskNode taskNode : taskNodeList) {
String preTasks = taskNode.getPreTasks();
List<String> preTasksList = JSONUtils.toList(preTasks, String.class);
//if previous tasks not empty
if (preTasksList != null) {
for (String depNode : preTasksList) {
taskNodeRelations.add(new TaskNodeRelation(depNode, taskNode.getName()));
}
}
}
ProcessDag processDag = new ProcessDag();
processDag.setEdges(taskNodeRelations);
processDag.setNodes(taskNodeList);
// generate detail Dag, to be executed
DAG<String, TaskNode, TaskNodeRelation> dag = new DAG<>();
if (CollectionUtils.isNotEmpty(processDag.getNodes())) {
for (TaskNode node : processDag.getNodes()) {
dag.addNode(node.getName(), node);
}
}
if (CollectionUtils.isNotEmpty(processDag.getEdges())) {
for (TaskNodeRelation edge : processDag.getEdges()) {
dag.addEdge(edge.getStartNode(), edge.getEndNode());
}
}
return dag;
} }
} }

6
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java

@ -117,7 +117,7 @@ public class ExecutorService extends BaseService{
} }
if (!checkTenantSuitable(processDefinition)){ if (!checkTenantSuitable(processDefinition)){
logger.error("there is not any vaild tenant for the process definition: id:{},name:{}, ", logger.error("there is not any valid tenant for the process definition: id:{},name:{}, ",
processDefinition.getId(), processDefinition.getName()); processDefinition.getId(), processDefinition.getName());
putMsg(result, Status.TENANT_NOT_SUITABLE); putMsg(result, Status.TENANT_NOT_SUITABLE);
return result; return result;
@ -206,7 +206,7 @@ public class ExecutorService extends BaseService{
return checkResult; return checkResult;
} }
if (!checkTenantSuitable(processDefinition)){ if (!checkTenantSuitable(processDefinition)){
logger.error("there is not any vaild tenant for the process definition: id:{},name:{}, ", logger.error("there is not any valid tenant for the process definition: id:{},name:{}, ",
processDefinition.getId(), processDefinition.getName()); processDefinition.getId(), processDefinition.getName());
putMsg(result, Status.TENANT_NOT_SUITABLE); putMsg(result, Status.TENANT_NOT_SUITABLE);
} }
@ -539,7 +539,7 @@ public class ExecutorService extends BaseService{
} }
} }
}else{ }else{
logger.error("there is not vaild schedule date for the process definition: id:{},date:{}", logger.error("there is not valid schedule date for the process definition: id:{},date:{}",
processDefineId, schedule); processDefineId, schedule);
} }
}else{ }else{

93
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java

@ -45,6 +45,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.*; import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.mapper.*; import org.apache.dolphinscheduler.dao.mapper.*;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -465,7 +466,7 @@ public class ProcessDefinitionService extends BaseDAGService {
); );
for(Schedule schedule:scheduleList){ for(Schedule schedule:scheduleList){
logger.info("set schedule offline, schedule id: {}, process definition id: {}", project.getId(), schedule.getId(), id); logger.info("set schedule offline, project id: {}, schedule id: {}, process definition id: {}", project.getId(), schedule.getId(), id);
// set status // set status
schedule.setReleaseState(ReleaseState.OFFLINE); schedule.setReleaseState(ReleaseState.OFFLINE);
scheduleMapper.updateById(schedule); scheduleMapper.updateById(schedule);
@ -948,11 +949,16 @@ public class ProcessDefinitionService extends BaseDAGService {
return result; return result;
} }
String processDefinitionJson = processDefinition.getProcessDefinitionJson(); String processDefinitionJson = processDefinition.getProcessDefinitionJson();
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
//process data check
if (null == processData) {
logger.error("process data is null");
putMsg(result,Status.DATA_IS_NOT_VALID, processDefinitionJson);
return result;
}
List<TaskNode> taskNodeList = (processData.getTasks() == null) ? new ArrayList<>() : processData.getTasks(); List<TaskNode> taskNodeList = (processData.getTasks() == null) ? new ArrayList<>() : processData.getTasks();
result.put(Constants.DATA_LIST, taskNodeList); result.put(Constants.DATA_LIST, taskNodeList);
@ -974,14 +980,13 @@ public class ProcessDefinitionService extends BaseDAGService {
Map<Integer, List<TaskNode>> taskNodeMap = new HashMap<>(); Map<Integer, List<TaskNode>> taskNodeMap = new HashMap<>();
String[] idList = defineIdList.split(","); String[] idList = defineIdList.split(",");
List<String> definitionIdList = Arrays.asList(idList);
List<Integer> idIntList = new ArrayList<>(); List<Integer> idIntList = new ArrayList<>();
for(String definitionId : definitionIdList) { for(String definitionId : idList) {
idIntList.add(Integer.parseInt(definitionId)); idIntList.add(Integer.parseInt(definitionId));
} }
Integer[] idArray = idIntList.toArray(new Integer[idIntList.size()]); Integer[] idArray = idIntList.toArray(new Integer[idIntList.size()]);
List<ProcessDefinition> processDefinitionList = processDefineMapper.queryDefinitionListByIdList(idArray); List<ProcessDefinition> processDefinitionList = processDefineMapper.queryDefinitionListByIdList(idArray);
if (processDefinitionList == null || processDefinitionList.size() ==0) { if (CollectionUtils.isEmpty(processDefinitionList)) {
logger.info("process definition not exists"); logger.info("process definition not exists");
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, defineIdList); putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, defineIdList);
return result; return result;
@ -1031,9 +1036,10 @@ public class ProcessDefinitionService extends BaseDAGService {
Map<String, Object> result = new HashMap<>(); Map<String, Object> result = new HashMap<>();
ProcessDefinition processDefinition = processDefineMapper.selectById(processId); ProcessDefinition processDefinition = processDefineMapper.selectById(processId);
if (processDefinition == null) { if (null == processDefinition) {
logger.info("process define not exists"); logger.info("process define not exists");
throw new RuntimeException("process define not exists"); putMsg(result,Status.PROCESS_DEFINE_NOT_EXIST, processDefinition);
return result;
} }
DAG<String, TaskNode, TaskNodeRelation> dag = genDagGraph(processDefinition); DAG<String, TaskNode, TaskNodeRelation> dag = genDagGraph(processDefinition);
/** /**
@ -1121,10 +1127,10 @@ public class ProcessDefinitionService extends BaseDAGService {
pTreeViewDto.getChildren().add(treeViewDto); pTreeViewDto.getChildren().add(treeViewDto);
} }
postNodeList = dag.getSubsequentNodes(nodeName); postNodeList = dag.getSubsequentNodes(nodeName);
if (postNodeList != null && postNodeList.size() > 0) { if (CollectionUtils.isNotEmpty(postNodeList)) {
for (String nextNodeName : postNodeList) { for (String nextNodeName : postNodeList) {
List<TreeViewDto> treeViewDtoList = waitingRunningNodeMap.get(nextNodeName); List<TreeViewDto> treeViewDtoList = waitingRunningNodeMap.get(nextNodeName);
if (treeViewDtoList != null && treeViewDtoList.size() > 0) { if (CollectionUtils.isNotEmpty(treeViewDtoList)) {
treeViewDtoList.add(treeViewDto); treeViewDtoList.add(treeViewDto);
waitingRunningNodeMap.put(nextNodeName, treeViewDtoList); waitingRunningNodeMap.put(nextNodeName, treeViewDtoList);
} else { } else {
@ -1136,7 +1142,6 @@ public class ProcessDefinitionService extends BaseDAGService {
} }
runningNodeMap.remove(nodeName); runningNodeMap.remove(nodeName);
} }
if (waitingRunningNodeMap == null || waitingRunningNodeMap.size() == 0) { if (waitingRunningNodeMap == null || waitingRunningNodeMap.size() == 0) {
break; break;
} else { } else {
@ -1161,75 +1166,29 @@ public class ProcessDefinitionService extends BaseDAGService {
private DAG<String, TaskNode, TaskNodeRelation> genDagGraph(ProcessDefinition processDefinition) throws Exception { private DAG<String, TaskNode, TaskNodeRelation> genDagGraph(ProcessDefinition processDefinition) throws Exception {
String processDefinitionJson = processDefinition.getProcessDefinitionJson(); String processDefinitionJson = processDefinition.getProcessDefinitionJson();
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
List<TaskNode> taskNodeList = processData.getTasks(); //check process data
if (null != processData) {
List<TaskNode> taskNodeList = processData.getTasks();
processDefinition.setGlobalParamList(processData.getGlobalParams());
ProcessDag processDag = DagHelper.getProcessDag(taskNodeList);
processDefinition.setGlobalParamList(processData.getGlobalParams()); // Generate concrete Dag to be executed
return DagHelper.buildDagGraph(processDag);
List<TaskNodeRelation> taskNodeRelations = new ArrayList<>();
// Traverse node information and build relationships
for (TaskNode taskNode : taskNodeList) {
String preTasks = taskNode.getPreTasks();
List<String> preTasksList = JSONUtils.toList(preTasks, String.class);
// If the dependency is not empty
if (preTasksList != null) {
for (String depNode : preTasksList) {
taskNodeRelations.add(new TaskNodeRelation(depNode, taskNode.getName()));
}
}
} }
ProcessDag processDag = new ProcessDag(); return new DAG<>();
processDag.setEdges(taskNodeRelations);
processDag.setNodes(taskNodeList);
// Generate concrete Dag to be executed
return genDagGraph(processDag);
} }
/**
* Generate the DAG of process
*
* @return DAG
*/
private DAG<String, TaskNode, TaskNodeRelation> genDagGraph(ProcessDag processDag) {
DAG<String, TaskNode, TaskNodeRelation> dag = new DAG<>();
/**
* Add the ndoes
*/
if (CollectionUtils.isNotEmpty(processDag.getNodes())) {
for (TaskNode node : processDag.getNodes()) {
dag.addNode(node.getName(), node);
}
}
/**
* Add the edges
*/
if (CollectionUtils.isNotEmpty(processDag.getEdges())) {
for (TaskNodeRelation edge : processDag.getEdges()) {
dag.addEdge(edge.getStartNode(), edge.getEndNode());
}
}
return dag;
}
/** /**
* whether the graph has a ring * whether the graph has a ring
* *
* @param taskNodeResponseList * @param taskNodeResponseList task node response list
* @return * @return if graph has cycle flag
*/ */
private boolean graphHasCycle(List<TaskNode> taskNodeResponseList) { private boolean graphHasCycle(List<TaskNode> taskNodeResponseList) {
DAG<String, TaskNode, String> graph = new DAG<>(); DAG<String, TaskNode, String> graph = new DAG<>();

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java

@ -86,7 +86,7 @@ public class CheckUtils {
* @return true if other parameters are valid, otherwise return false * @return true if other parameters are valid, otherwise return false
*/ */
public static boolean checkOtherParams(String otherParams) { public static boolean checkOtherParams(String otherParams) {
return StringUtils.isNotEmpty(otherParams) && !JSONUtils.checkJsonVaild(otherParams); return StringUtils.isNotEmpty(otherParams) && !JSONUtils.checkJsonValid(otherParams);
} }
/** /**

177
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java

@ -16,10 +16,8 @@
*/ */
package org.apache.dolphinscheduler.api.service; package org.apache.dolphinscheduler.api.service;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.metadata.IPage;
import org.apache.dolphinscheduler.api.ApiApplicationServer; import org.apache.dolphinscheduler.api.ApiApplicationServer;
import org.apache.dolphinscheduler.api.dto.ProcessMeta; import org.apache.dolphinscheduler.api.dto.ProcessMeta;
import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.enums.Status;
@ -82,6 +80,12 @@ public class ProcessDefinitionServiceTest {
@Mock @Mock
private ProcessService processService; private ProcessService processService;
@Mock
private ProcessInstanceMapper processInstanceMapper;
@Mock
private TaskInstanceMapper taskInstanceMapper;
private String sqlDependentJson = "{\"globalParams\":[]," + private String sqlDependentJson = "{\"globalParams\":[]," +
"\"tasks\":[{\"type\":\"SQL\",\"id\":\"tasks-27297\",\"name\":\"sql\"," + "\"tasks\":[{\"type\":\"SQL\",\"id\":\"tasks-27297\",\"name\":\"sql\"," +
"\"params\":{\"type\":\"MYSQL\",\"datasource\":1,\"sql\":\"select * from test\"," + "\"params\":{\"type\":\"MYSQL\",\"datasource\":1,\"sql\":\"select * from test\"," +
@ -99,6 +103,12 @@ public class ProcessDefinitionServiceTest {
"\"timeout\":{\"strategy\":\"\",\"enable\":false},\"taskInstancePriority\":\"MEDIUM\"," + "\"timeout\":{\"strategy\":\"\",\"enable\":false},\"taskInstancePriority\":\"MEDIUM\"," +
"\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":1,\"timeout\":0}"; "\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":1,\"timeout\":0}";
private String shellJson = "{\"globalParams\":[],\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-9527\",\"name\":\"shell-1\"," +
"\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"#!/bin/bash\\necho \\\"shell-1\\\"\"}," +
"\"description\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\"," +
"\"timeout\":{\"strategy\":\"\",\"interval\":1,\"enable\":false},\"taskInstancePriority\":\"MEDIUM\"," +
"\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":1,\"timeout\":0}";
@Test @Test
public void testQueryProccessDefinitionList() { public void testQueryProccessDefinitionList() {
String projectName = "project_test1"; String projectName = "project_test1";
@ -149,7 +159,7 @@ public class ProcessDefinitionServiceTest {
} }
@Test @Test
public void testQueryProccessDefinitionById() { public void testQueryProcessDefinitionById() {
String projectName = "project_test1"; String projectName = "project_test1";
Mockito.when(projectMapper.queryByName(projectName)).thenReturn(getProject(projectName)); Mockito.when(projectMapper.queryByName(projectName)).thenReturn(getProject(projectName));
@ -255,7 +265,7 @@ public class ProcessDefinitionServiceTest {
"project_test1", 46); "project_test1", 46);
Assert.assertEquals(Status.DELETE_PROCESS_DEFINE_BY_ID_ERROR, deleteFail.get(Constants.STATUS)); Assert.assertEquals(Status.DELETE_PROCESS_DEFINE_BY_ID_ERROR, deleteFail.get(Constants.STATUS));
//delte success //delete success
Mockito.when(processDefineMapper.deleteById(46)).thenReturn(1); Mockito.when(processDefineMapper.deleteById(46)).thenReturn(1);
Map<String, Object> deleteSuccess = processDefinitionService.deleteProcessDefinitionById(loginUser, Map<String, Object> deleteSuccess = processDefinitionService.deleteProcessDefinitionById(loginUser,
"project_test1", 46); "project_test1", 46);
@ -304,6 +314,155 @@ public class ProcessDefinitionServiceTest {
Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR, failRes.get(Constants.STATUS)); Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR, failRes.get(Constants.STATUS));
} }
@Test
public void testVerifyProcessDefinitionName() {
String projectName = "project_test1";
Mockito.when(projectMapper.queryByName(projectName)).thenReturn(getProject(projectName));
Project project = getProject(projectName);
User loginUser = new User();
loginUser.setId(-1);
loginUser.setUserType(UserType.GENERAL_USER);
//project check auth fail
Map<String, Object> result = new HashMap<>(5);
putMsg(result, Status.PROJECT_NOT_FOUNT, projectName);
Mockito.when(projectService.checkProjectAndAuth(loginUser,project,projectName)).thenReturn(result);
Map<String, Object> map = processDefinitionService.verifyProccessDefinitionName(loginUser,
"project_test1", "test_pdf");
Assert.assertEquals(Status.PROJECT_NOT_FOUNT, map.get(Constants.STATUS));
//project check auth success, process not exist
putMsg(result, Status.SUCCESS, projectName);
Mockito.when(processDefineMapper.queryByDefineName(project.getId(),"test_pdf")).thenReturn(null);
Map<String, Object> processNotExistRes = processDefinitionService.verifyProccessDefinitionName(loginUser,
"project_test1", "test_pdf");
Assert.assertEquals(Status.SUCCESS, processNotExistRes.get(Constants.STATUS));
//process exist
Mockito.when(processDefineMapper.queryByDefineName(project.getId(),"test_pdf")).thenReturn(getProcessDefinition());
Map<String, Object> processExistRes = processDefinitionService.verifyProccessDefinitionName(loginUser,
"project_test1", "test_pdf");
Assert.assertEquals(Status.PROCESS_INSTANCE_EXIST, processExistRes.get(Constants.STATUS));
}
@Test
public void testCheckProcessNodeList() {
Map<String, Object> dataNotValidRes = processDefinitionService.checkProcessNodeList(null, "");
Assert.assertEquals(Status.DATA_IS_NOT_VALID, dataNotValidRes.get(Constants.STATUS));
//task not empty
String processDefinitionJson = shellJson;
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
assert processData != null;
Map<String, Object> taskEmptyRes = processDefinitionService.checkProcessNodeList(processData, processDefinitionJson);
Assert.assertEquals(Status.SUCCESS, taskEmptyRes.get(Constants.STATUS));
//task empty
processData.setTasks(null);
Map<String, Object> taskNotEmptyRes = processDefinitionService.checkProcessNodeList(processData, processDefinitionJson);
Assert.assertEquals(Status.DATA_IS_NULL, taskNotEmptyRes.get(Constants.STATUS));
//json abnormal
String abnormalJson = processDefinitionJson.replaceAll("SHELL","");
processData = JSONUtils.parseObject(abnormalJson, ProcessData.class);
Map<String, Object> abnormalTaskRes = processDefinitionService.checkProcessNodeList(processData, abnormalJson);
Assert.assertEquals(Status.PROCESS_NODE_S_PARAMETER_INVALID, abnormalTaskRes.get(Constants.STATUS));
}
@Test
public void testGetTaskNodeListByDefinitionId() throws Exception {
//process definition not exist
Mockito.when(processDefineMapper.selectById(46)).thenReturn(null);
Map<String, Object> processDefinitionNullRes = processDefinitionService.getTaskNodeListByDefinitionId(46);
Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionNullRes.get(Constants.STATUS));
//process data null
ProcessDefinition processDefinition = getProcessDefinition();
Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition);
Map<String, Object> successRes = processDefinitionService.getTaskNodeListByDefinitionId(46);
Assert.assertEquals(Status.DATA_IS_NOT_VALID, successRes.get(Constants.STATUS));
//success
processDefinition.setProcessDefinitionJson(shellJson);
Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition);
Map<String, Object> dataNotValidRes = processDefinitionService.getTaskNodeListByDefinitionId(46);
Assert.assertEquals(Status.SUCCESS, dataNotValidRes.get(Constants.STATUS));
}
@Test
public void testGetTaskNodeListByDefinitionIdList() throws Exception {
//process definition not exist
String defineIdList = "46";
Integer[] idArray = {46};
Mockito.when(processDefineMapper.queryDefinitionListByIdList(idArray)).thenReturn(null);
Map<String, Object> processNotExistRes = processDefinitionService.getTaskNodeListByDefinitionIdList(defineIdList);
Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, processNotExistRes.get(Constants.STATUS));
//process definition exist
ProcessDefinition processDefinition = getProcessDefinition();
processDefinition.setProcessDefinitionJson(shellJson);
List<ProcessDefinition> processDefinitionList = new ArrayList<>();
processDefinitionList.add(processDefinition);
Mockito.when(processDefineMapper.queryDefinitionListByIdList(idArray)).thenReturn(processDefinitionList);
Map<String, Object> successRes = processDefinitionService.getTaskNodeListByDefinitionIdList(defineIdList);
Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
}
@Test
public void testQueryProccessDefinitionAllByProjectId() {
int projectId = 1;
ProcessDefinition processDefinition = getProcessDefinition();
processDefinition.setProcessDefinitionJson(shellJson);
List<ProcessDefinition> processDefinitionList = new ArrayList<>();
processDefinitionList.add(processDefinition);
Mockito.when(processDefineMapper.queryAllDefinitionList(projectId)).thenReturn(processDefinitionList);
Map<String, Object> successRes = processDefinitionService.queryProccessDefinitionAllByProjectId(projectId);
Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
}
@Test
public void testViewTree() throws Exception {
//process definition not exist
ProcessDefinition processDefinition = getProcessDefinition();
processDefinition.setProcessDefinitionJson(shellJson);
Mockito.when(processDefineMapper.selectById(46)).thenReturn(null);
Map<String, Object> processDefinitionNullRes = processDefinitionService.viewTree(46, 10);
Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionNullRes.get(Constants.STATUS));
List<ProcessInstance> processInstanceList = new ArrayList<>();
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(1);
processInstance.setName("test_instance");
processInstance.setState(ExecutionStatus.RUNNING_EXEUTION);
processInstance.setHost("192.168.xx.xx");
processInstance.setStartTime(new Date());
processInstance.setEndTime(new Date());
processInstanceList.add(processInstance);
TaskInstance taskInstance = new TaskInstance();
taskInstance.setStartTime(new Date());
taskInstance.setEndTime(new Date());
taskInstance.setTaskType("SHELL");
taskInstance.setId(1);
taskInstance.setName("test_task_instance");
taskInstance.setState(ExecutionStatus.RUNNING_EXEUTION);
taskInstance.setHost("192.168.xx.xx");
//task instance not exist
Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition);
Mockito.when(processInstanceMapper.queryByProcessDefineId(46, 10)).thenReturn(processInstanceList);
Mockito.when(taskInstanceMapper.queryByInstanceIdAndName(processInstance.getId(), "shell-1")).thenReturn(null);
Map<String, Object> taskNullRes = processDefinitionService.viewTree(46, 10);
Assert.assertEquals(Status.SUCCESS, taskNullRes.get(Constants.STATUS));
//task instance exist
Mockito.when(taskInstanceMapper.queryByInstanceIdAndName(processInstance.getId(), "shell-1")).thenReturn(taskInstance);
Map<String, Object> taskNotNuLLRes = processDefinitionService.viewTree(46, 10);
Assert.assertEquals(Status.SUCCESS, taskNotNuLLRes.get(Constants.STATUS));
}
/** /**
* add datasource param and dependent when export process * add datasource param and dependent when export process
* @throws JSONException * @throws JSONException
@ -334,13 +493,9 @@ public class ProcessDefinitionServiceTest {
@Test @Test
public void testAddExportTaskNodeSpecialParam() throws JSONException { public void testAddExportTaskNodeSpecialParam() throws JSONException {
String shellJson = "{\"globalParams\":[],\"tasks\":[{\"id\":\"tasks-9527\",\"name\":\"shell-1\"," + String shellData = shellJson;
"\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"#!/bin/bash\\necho \\\"shell-1\\\"\"}," +
"\"description\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\"," +
"\"timeout\":{\"strategy\":\"\",\"interval\":1,\"enable\":false},\"taskInstancePriority\":\"MEDIUM\"," +
"\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":1,\"timeout\":0}";
String resultStr = processDefinitionService.addExportTaskNodeSpecialParam(shellJson); String resultStr = processDefinitionService.addExportTaskNodeSpecialParam(shellData);
JSONAssert.assertEquals(shellJson, resultStr, false); JSONAssert.assertEquals(shellJson, resultStr, false);
} }
@ -610,7 +765,7 @@ public class ProcessDefinitionServiceTest {
private ProcessDefinition getProcessDefinition(){ private ProcessDefinition getProcessDefinition(){
ProcessDefinition processDefinition = new ProcessDefinition(); ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setId(46); processDefinition.setId(46);
processDefinition.setName("testProject"); processDefinition.setName("test_pdf");
processDefinition.setProjectId(2); processDefinition.setProjectId(2);
processDefinition.setTenantId(1); processDefinition.setTenantId(1);
processDefinition.setDescription(""); processDefinition.setDescription("");

2
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java

@ -126,7 +126,7 @@ public class JSONUtils {
* @param json json * @param json json
* @return true if valid * @return true if valid
*/ */
public static boolean checkJsonVaild(String json) { public static boolean checkJsonValid(String json) {
if (StringUtils.isEmpty(json)) { if (StringUtils.isEmpty(json)) {
return false; return false;

84
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java

@ -40,6 +40,7 @@ import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.regex.Pattern;
/** /**
* os utils * os utils
@ -138,7 +139,7 @@ public class OSUtils {
if (isMacOS()) { if (isMacOS()) {
return getUserListFromMac(); return getUserListFromMac();
} else if (isWindows()) { } else if (isWindows()) {
// do something return getUserListFromWindows();
} else { } else {
return getUserListFromLinux(); return getUserListFromLinux();
} }
@ -185,6 +186,47 @@ public class OSUtils {
return Collections.emptyList(); return Collections.emptyList();
} }
/**
* get user list from windows
* @return user list
* @throws IOException
*/
private static List<String> getUserListFromWindows() throws IOException {
String result = exeCmd("net user");
String[] lines = result.split("\n");
int startPos = 0;
int endPos = lines.length - 2;
for (int i = 0; i < lines.length; i++) {
if (lines[i].isEmpty()) {
continue;
}
int count = 0;
if (lines[i].charAt(0) == '-') {
for (int j = 0; j < lines[i].length(); j++) {
if (lines[i].charAt(i) == '-') {
count++;
}
}
}
if (count == lines[i].length()) {
startPos = i + 1;
break;
}
}
List<String> users = new ArrayList<>();
while (startPos <= endPos) {
Pattern pattern = Pattern.compile("\\s+");
users.addAll(Arrays.asList(pattern.split(lines[startPos])));
startPos++;
}
return users;
}
/** /**
* create user * create user
* @param userName user name * @param userName user name
@ -200,7 +242,7 @@ public class OSUtils {
if (isMacOS()) { if (isMacOS()) {
createMacUser(userName, userGroup); createMacUser(userName, userGroup);
} else if (isWindows()) { } else if (isWindows()) {
// do something createWindowsUser(userName, userGroup);
} else { } else {
createLinuxUser(userName, userGroup); createLinuxUser(userName, userGroup);
} }
@ -243,16 +285,46 @@ public class OSUtils {
OSUtils.exeCmd(appendGroupCmd); OSUtils.exeCmd(appendGroupCmd);
} }
/**
* create windows user
* @param userName user name
* @param userGroup user group
* @throws IOException in case of an I/O error
*/
private static void createWindowsUser(String userName, String userGroup) throws IOException {
logger.info("create windows os user : {}", userName);
String userCreateCmd = String.format("net user \"%s\" /add", userName);
String appendGroupCmd = String.format("net localgroup \"%s\" \"%s\" /add", userGroup, userName);
logger.info("execute create user command : {}", userCreateCmd);
OSUtils.exeCmd(userCreateCmd);
logger.info("execute append user to group : {}", appendGroupCmd);
OSUtils.exeCmd(appendGroupCmd);
}
/** /**
* get system group information * get system group information
* @return system group info * @return system group info
* @throws IOException errors * @throws IOException errors
*/ */
public static String getGroup() throws IOException { public static String getGroup() throws IOException {
String result = exeCmd("groups"); if (isWindows()) {
if (StringUtils.isNotEmpty(result)) { String currentProcUserName = System.getProperty("user.name");
String[] groupInfo = result.split(" "); String result = exeCmd(String.format("net user \"%s\"", currentProcUserName));
return groupInfo[0]; String line = result.split("\n")[22];
String group = Pattern.compile("\\s+").split(line)[1];
if (group.charAt(0) == '*') {
return group.substring(1);
} else {
return group;
}
} else {
String result = exeCmd("groups");
if (StringUtils.isNotEmpty(result)) {
String[] groupInfo = result.split(" ");
return groupInfo[0];
}
} }
return null; return null;

6
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/JSONUtilsTest.java

@ -117,9 +117,9 @@ public class JSONUtilsTest {
} }
@Test @Test
public void testCheckJsonVaild() { public void testCheckJsonValid() {
Assert.assertTrue(JSONUtils.checkJsonVaild("3")); Assert.assertTrue(JSONUtils.checkJsonValid("3"));
Assert.assertFalse(JSONUtils.checkJsonVaild("")); Assert.assertFalse(JSONUtils.checkJsonValid(""));
} }
@Test @Test

23
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/OSUtilsTest.java

@ -36,9 +36,9 @@ public class OSUtilsTest {
Assert.assertNotEquals("System user list should not be empty", userList.size(), 0); Assert.assertNotEquals("System user list should not be empty", userList.size(), 0);
logger.info("OS user list : {}", userList.toString()); logger.info("OS user list : {}", userList.toString());
} }
@Test @Test
public void testOSMetric(){ public void testOSMetric(){
double availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize(); double availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize();
Assert.assertTrue(availablePhysicalMemorySize > 0.0f); Assert.assertTrue(availablePhysicalMemorySize > 0.0f);
double totalMemorySize = OSUtils.totalMemorySize(); double totalMemorySize = OSUtils.totalMemorySize();
@ -50,17 +50,23 @@ public class OSUtilsTest {
double cpuUsage = OSUtils.cpuUsage(); double cpuUsage = OSUtils.cpuUsage();
Assert.assertTrue(cpuUsage > 0.0f); Assert.assertTrue(cpuUsage > 0.0f);
} }
@Test @Test
public void getGroup() { public void getGroup() {
if(OSUtils.isMacOS() || !OSUtils.isWindows()){ try {
try { String group = OSUtils.getGroup();
String group = OSUtils.getGroup(); Assert.assertNotNull(group);
Assert.assertNotNull(group); } catch (IOException e) {
} catch (IOException e) { Assert.fail("get group failed " + e.getMessage());
Assert.fail("get group failed " + e.getMessage());
}
} }
} }
@Test
public void createUser() {
boolean result = OSUtils.createUser("test123");
Assert.assertTrue(result);
}
@Test @Test
public void exeCmd() { public void exeCmd() {
if(OSUtils.isMacOS() || !OSUtils.isWindows()){ if(OSUtils.isMacOS() || !OSUtils.isWindows()){
@ -113,4 +119,5 @@ public class OSUtilsTest {
Assert.assertFalse(resource); Assert.assertFalse(resource);
} }
} }

35
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java

@ -319,18 +319,14 @@ public class DagHelper {
DAG<String,TaskNode,TaskNodeRelation> dag = new DAG<>(); DAG<String,TaskNode,TaskNodeRelation> dag = new DAG<>();
/** //add vertex
* add vertex
*/
if (CollectionUtils.isNotEmpty(processDag.getNodes())){ if (CollectionUtils.isNotEmpty(processDag.getNodes())){
for (TaskNode node : processDag.getNodes()){ for (TaskNode node : processDag.getNodes()){
dag.addNode(node.getName(),node); dag.addNode(node.getName(),node);
} }
} }
/** //add edge
* add edge
*/
if (CollectionUtils.isNotEmpty(processDag.getEdges())){ if (CollectionUtils.isNotEmpty(processDag.getEdges())){
for (TaskNodeRelation edge : processDag.getEdges()){ for (TaskNodeRelation edge : processDag.getEdges()){
dag.addEdge(edge.getStartNode(),edge.getEndNode()); dag.addEdge(edge.getStartNode(),edge.getEndNode());
@ -338,4 +334,31 @@ public class DagHelper {
} }
return dag; return dag;
} }
/**
* get process dag
* @param taskNodeList task node list
* @return Process dag
*/
public static ProcessDag getProcessDag(List<TaskNode> taskNodeList) {
List<TaskNodeRelation> taskNodeRelations = new ArrayList<>();
// Traverse node information and build relationships
for (TaskNode taskNode : taskNodeList) {
String preTasks = taskNode.getPreTasks();
List<String> preTasksList = JSONUtils.toList(preTasks, String.class);
// If the dependency is not empty
if (preTasksList != null) {
for (String depNode : preTasksList) {
taskNodeRelations.add(new TaskNodeRelation(depNode, taskNode.getName()));
}
}
}
ProcessDag processDag = new ProcessDag();
processDag.setEdges(taskNodeRelations);
processDag.setNodes(taskNodeList);
return processDag;
}
} }

2
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java

@ -121,7 +121,7 @@ public class TaskInstanceMapperTest {
} }
/** /**
* test find vaild task list by process instance id * test find valid task list by process instance id
*/ */
@Test @Test
public void testFindValidTaskListByProcessId() { public void testFindValidTaskListByProcessId() {

19
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java

@ -24,6 +24,8 @@ import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation; import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.process.ProcessDag; import org.apache.dolphinscheduler.common.process.ProcessDag;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessData;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -37,7 +39,6 @@ import java.util.Map;
* dag helper test * dag helper test
*/ */
public class DagHelperTest { public class DagHelperTest {
/** /**
* test task node can submit * test task node can submit
* @throws JsonProcessingException if error throws JsonProcessingException * @throws JsonProcessingException if error throws JsonProcessingException
@ -131,4 +132,20 @@ public class DagHelperTest {
return DagHelper.buildDagGraph(processDag); return DagHelper.buildDagGraph(processDag);
} }
@Test
public void testBuildDagGraph() {
String shellJson = "{\"globalParams\":[],\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-9527\",\"name\":\"shell-1\"," +
"\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"#!/bin/bash\\necho \\\"shell-1\\\"\"}," +
"\"description\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\"," +
"\"timeout\":{\"strategy\":\"\",\"interval\":1,\"enable\":false},\"taskInstancePriority\":\"MEDIUM\"," +
"\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":1,\"timeout\":0}";
ProcessData processData = JSONUtils.parseObject(shellJson, ProcessData.class);
assert processData != null;
List<TaskNode> taskNodeList = processData.getTasks();
ProcessDag processDag = DagHelper.getProcessDag(taskNodeList);
DAG<String, TaskNode, TaskNodeRelation> dag = DagHelper.buildDagGraph(processDag);
Assert.assertNotNull(dag);
}
} }

1
pom.xml

@ -721,6 +721,7 @@
<include>**/dao/mapper/AlertMapperTest.java</include> <include>**/dao/mapper/AlertMapperTest.java</include>
<include>**/dao/mapper/CommandMapperTest.java</include> <include>**/dao/mapper/CommandMapperTest.java</include>
<include>**/dao/cron/CronUtilsTest.java</include> <include>**/dao/cron/CronUtilsTest.java</include>
<include>**/dao/utils/DagHelperTest.java</include>
<include>**/alert/template/AlertTemplateFactoryTest.java</include> <include>**/alert/template/AlertTemplateFactoryTest.java</include>
<include>**/alert/template/impl/DefaultHTMLTemplateTest.java</include> <include>**/alert/template/impl/DefaultHTMLTemplateTest.java</include>
<include>**/server/worker/task/datax/DataxTaskTest.java</include> <include>**/server/worker/task/datax/DataxTaskTest.java</include>

Loading…
Cancel
Save