Browse Source

[Feature][JsonSplit-api] checkProcessNode of processDefinition (#5946)

* check has cycle of ProcessDefinition

* checkProcessNode of processDefinition

Co-authored-by: JinyLeeChina <297062848@qq.com>
2.0.7-release
JinyLeeChina 3 years ago committed by GitHub
parent
commit
8bd88d90c4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
  2. 15
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  3. 4
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
  4. 329
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
  5. 2
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
  6. 5
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

8
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java

@ -229,14 +229,12 @@ public interface ProcessDefinitionService {
MultipartFile file);
/**
* check the process definition node meets the specifications
* check the process task relation json
*
* @param processData process data
* @param processDefinitionJson process definition json
* @param processTaskRelationJson process task relation json
* @return check result code
*/
Map<String, Object> checkProcessNodeList(ProcessData processData,
String processDefinitionJson);
Map<String, Object> checkProcessNodeList(String processTaskRelationJson);
/**
* get task node details based on process definition

15
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -868,25 +868,24 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
}
/**
* check the process definition node meets the specifications
* check the process task relation json
*
* @param processData process data
* @param processDefinitionJson process definition json
* @param processTaskRelationJson process task relation json
* @return check result code
*/
@Override
public Map<String, Object> checkProcessNodeList(ProcessData processData, String processDefinitionJson) {
public Map<String, Object> checkProcessNodeList(String processTaskRelationJson) {
Map<String, Object> result = new HashMap<>();
try {
if (processData == null) {
if (processTaskRelationJson == null) {
logger.error("process data is null");
putMsg(result, Status.DATA_IS_NOT_VALID, processDefinitionJson);
putMsg(result, Status.DATA_IS_NOT_VALID, processTaskRelationJson);
return result;
}
List<ProcessTaskRelationLog> taskRelationList = JSONUtils.toList(processTaskRelationJson, ProcessTaskRelationLog.class);
// Check whether the task node is normal
List<TaskNode> taskNodes = processData.getTasks();
List<TaskNode> taskNodes = processService.transformTask(taskRelationList);
if (CollectionUtils.isEmpty(taskNodes)) {
logger.error("process node info is empty");

4
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java

@ -450,8 +450,8 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
ProcessDefinition processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
ProcessData processData = JSONUtils.parseObject(processInstanceJson, ProcessData.class);
//check workflow json is valid
result = processDefinitionService.checkProcessNodeList(processData, processInstanceJson);
//check workflow json is valid TODO processInstanceJson --> processTaskRelationJson
result = processDefinitionService.checkProcessNodeList(processInstanceJson);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}

329
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java

@ -23,27 +23,18 @@ import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.impl.ProcessDefinitionServiceImpl;
import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.DagData;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.ProcessData;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
@ -52,7 +43,6 @@ import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.io.IOException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Arrays;
@ -83,148 +73,10 @@ import com.google.common.collect.Lists;
@RunWith(MockitoJUnitRunner.class)
public class ProcessDefinitionServiceTest {
private static final String SHELL_JSON = "{\n"
+ " \"globalParams\": [\n"
+ " \n"
+ " ],\n"
+ " \"tasks\": [\n"
+ " {\n"
+ " \"type\": \"SHELL\",\n"
+ " \"id\": \"tasks-9527\",\n"
+ " \"name\": \"shell-1\",\n"
+ " \"params\": {\n"
+ " \"resourceList\": [\n"
+ " \n"
+ " ],\n"
+ " \"localParams\": [\n"
+ " \n"
+ " ],\n"
+ " \"rawScript\": \"#!/bin/bash\\necho \\\"shell-1\\\"\"\n"
+ " },\n"
+ " \"description\": \"\",\n"
+ " \"runFlag\": \"NORMAL\",\n"
+ " \"dependence\": {\n"
+ " \n"
+ " },\n"
+ " \"maxRetryTimes\": \"0\",\n"
+ " \"retryInterval\": \"1\",\n"
+ " \"timeout\": {\n"
+ " \"strategy\": \"\",\n"
+ " \"interval\": 1,\n"
+ " \"enable\": false\n"
+ " },\n"
+ " \"taskInstancePriority\": \"MEDIUM\",\n"
+ " \"workerGroupId\": -1,\n"
+ " \"preTasks\": [\n"
+ " \n"
+ " ]\n"
+ " }\n"
+ " ],\n"
+ " \"tenantId\": 1,\n"
+ " \"timeout\": 0\n"
+ "}";
private static final String CYCLE_SHELL_JSON = "{\n"
+ " \"globalParams\": [\n"
+ " \n"
+ " ],\n"
+ " \"tasks\": [\n"
+ " {\n"
+ " \"type\": \"SHELL\",\n"
+ " \"id\": \"tasks-9527\",\n"
+ " \"name\": \"shell-1\",\n"
+ " \"params\": {\n"
+ " \"resourceList\": [\n"
+ " \n"
+ " ],\n"
+ " \"localParams\": [\n"
+ " \n"
+ " ],\n"
+ " \"rawScript\": \"#!/bin/bash\\necho \\\"shell-1\\\"\"\n"
+ " },\n"
+ " \"description\": \"\",\n"
+ " \"runFlag\": \"NORMAL\",\n"
+ " \"dependence\": {\n"
+ " \n"
+ " },\n"
+ " \"maxRetryTimes\": \"0\",\n"
+ " \"retryInterval\": \"1\",\n"
+ " \"timeout\": {\n"
+ " \"strategy\": \"\",\n"
+ " \"interval\": 1,\n"
+ " \"enable\": false\n"
+ " },\n"
+ " \"taskInstancePriority\": \"MEDIUM\",\n"
+ " \"workerGroupId\": -1,\n"
+ " \"preTasks\": [\n"
+ " \"tasks-9529\"\n"
+ " ]\n"
+ " },\n"
+ " {\n"
+ " \"type\": \"SHELL\",\n"
+ " \"id\": \"tasks-9528\",\n"
+ " \"name\": \"shell-1\",\n"
+ " \"params\": {\n"
+ " \"resourceList\": [\n"
+ " \n"
+ " ],\n"
+ " \"localParams\": [\n"
+ " \n"
+ " ],\n"
+ " \"rawScript\": \"#!/bin/bash\\necho \\\"shell-1\\\"\"\n"
+ " },\n"
+ " \"description\": \"\",\n"
+ " \"runFlag\": \"NORMAL\",\n"
+ " \"dependence\": {\n"
+ " \n"
+ " },\n"
+ " \"maxRetryTimes\": \"0\",\n"
+ " \"retryInterval\": \"1\",\n"
+ " \"timeout\": {\n"
+ " \"strategy\": \"\",\n"
+ " \"interval\": 1,\n"
+ " \"enable\": false\n"
+ " },\n"
+ " \"taskInstancePriority\": \"MEDIUM\",\n"
+ " \"workerGroupId\": -1,\n"
+ " \"preTasks\": [\n"
+ " \"tasks-9527\"\n"
+ " ]\n"
+ " },\n"
+ " {\n"
+ " \"type\": \"SHELL\",\n"
+ " \"id\": \"tasks-9529\",\n"
+ " \"name\": \"shell-1\",\n"
+ " \"params\": {\n"
+ " \"resourceList\": [\n"
+ " \n"
+ " ],\n"
+ " \"localParams\": [\n"
+ " \n"
+ " ],\n"
+ " \"rawScript\": \"#!/bin/bash\\necho \\\"shell-1\\\"\"\n"
+ " },\n"
+ " \"description\": \"\",\n"
+ " \"runFlag\": \"NORMAL\",\n"
+ " \"dependence\": {\n"
+ " \n"
+ " },\n"
+ " \"maxRetryTimes\": \"0\",\n"
+ " \"retryInterval\": \"1\",\n"
+ " \"timeout\": {\n"
+ " \"strategy\": \"\",\n"
+ " \"interval\": 1,\n"
+ " \"enable\": false\n"
+ " },\n"
+ " \"taskInstancePriority\": \"MEDIUM\",\n"
+ " \"workerGroupId\": -1,\n"
+ " \"preTasks\": [\n"
+ " \"tasks-9528\"\n"
+ " ]\n"
+ " }\n"
+ " ],\n"
+ " \"tenantId\": 1,\n"
+ " \"timeout\": 0\n"
+ "}";
private static final String taskRelationJson = "[{\"name\":\"\",\"preTaskCode\":0,\"preTaskVersion\":0,\"postTaskCode\":123456789,"
+ "\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"},{\"name\":\"\",\"preTaskCode\":123456789,"
+ "\"preTaskVersion\":1,\"postTaskCode\":123451234,\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"}]";
@InjectMocks
private ProcessDefinitionServiceImpl processDefinitionService;
@Mock
@ -298,14 +150,14 @@ public class ProcessDefinitionServiceTest {
Page<ProcessDefinition> page = new Page<>(1, 10);
page.setTotal(30);
Mockito.when(processDefineMapper.queryDefineListPaging(
Mockito.any(IPage.class)
, Mockito.eq("")
, Mockito.eq(loginUser.getId())
, Mockito.eq(project.getCode())
, Mockito.anyBoolean())).thenReturn(page);
Mockito.any(IPage.class)
, Mockito.eq("")
, Mockito.eq(loginUser.getId())
, Mockito.eq(project.getCode())
, Mockito.anyBoolean())).thenReturn(page);
Map<String, Object> map1 = processDefinitionService.queryProcessDefinitionListPaging(
loginUser, 1L, "", 1, 10, loginUser.getId());
loginUser, 1L, "", 1, 10, loginUser.getId());
Assert.assertEquals(Status.SUCCESS, map1.get(Constants.STATUS));
}
@ -402,7 +254,7 @@ public class ProcessDefinitionServiceTest {
putMsg(result, Status.PROJECT_NOT_FOUNT, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, project.getName())).thenReturn(result);
Map<String, Object> map1 = processDefinitionService.batchCopyProcessDefinition(
loginUser, projectCode, String.valueOf(project.getId()), 2L);
loginUser, projectCode, String.valueOf(project.getId()), 2L);
Assert.assertEquals(Status.PROJECT_NOT_FOUNT, map1.get(Constants.STATUS));
// project check auth success, target project name not equal project name, check auth target project fail
@ -419,7 +271,7 @@ public class ProcessDefinitionServiceTest {
Mockito.when(processDefineMapper.queryByCodes(definitionCodes)).thenReturn(processDefinitionList);
Map<String, Object> map3 = processDefinitionService.batchCopyProcessDefinition(
loginUser, projectCode, "46", 1L);
loginUser, projectCode, "46", 1L);
Assert.assertEquals(Status.COPY_PROCESS_DEFINITION_ERROR, map3.get(Constants.STATUS));
}
@ -448,11 +300,11 @@ public class ProcessDefinitionServiceTest {
processDefinitionList.add(definition);
Set<Long> definitionCodes = Arrays.stream("46".split(Constants.COMMA)).map(Long::parseLong).collect(Collectors.toSet());
Mockito.when(processDefineMapper.queryByCodes(definitionCodes)).thenReturn(processDefinitionList);
Mockito.when(processTaskRelationMapper.queryByProcessCode(projectCode, 46L)).thenReturn(getProcessTaskRelation(projectCode, 46L));
Mockito.when(processTaskRelationMapper.queryByProcessCode(projectCode, 46L)).thenReturn(getProcessTaskRelation(projectCode));
putMsg(result, Status.SUCCESS);
Map<String, Object> successRes = processDefinitionService.batchMoveProcessDefinition(
loginUser, projectCode, "46", projectCode2);
loginUser, projectCode, "46", projectCode2);
Assert.assertEquals(Status.MOVE_PROCESS_DEFINITION_ERROR, successRes.get(Constants.STATUS));
}
@ -495,7 +347,7 @@ public class ProcessDefinitionServiceTest {
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, project.getName())).thenReturn(result);
processDefinition.setReleaseState(ReleaseState.ONLINE);
Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition);
Map<String, Object> dfOnlineRes = processDefinitionService.deleteProcessDefinitionById(loginUser,projectCode, 46);
Map<String, Object> dfOnlineRes = processDefinitionService.deleteProcessDefinitionById(loginUser, projectCode, 46);
Assert.assertEquals(Status.PROCESS_DEFINE_STATE_ONLINE, dfOnlineRes.get(Constants.STATUS));
//scheduler list elements > 1
@ -555,26 +407,26 @@ public class ProcessDefinitionServiceTest {
putMsg(result, Status.PROJECT_NOT_FOUNT, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, project.getName())).thenReturn(result);
Map<String, Object> map = processDefinitionService.releaseProcessDefinition(loginUser, projectCode,
6, ReleaseState.OFFLINE);
6, ReleaseState.OFFLINE);
Assert.assertEquals(Status.PROJECT_NOT_FOUNT, map.get(Constants.STATUS));
// project check auth success, processs definition online
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(getProcessDefinition());
Map<String, Object> onlineRes = processDefinitionService.releaseProcessDefinition(
loginUser, projectCode, 46, ReleaseState.ONLINE);
loginUser, projectCode, 46, ReleaseState.ONLINE);
Assert.assertEquals(Status.SUCCESS, onlineRes.get(Constants.STATUS));
// project check auth success, processs definition online
ProcessDefinition processDefinition1 = getProcessDefinition();
processDefinition1.setResourceIds("1,2");
Map<String, Object> onlineWithResourceRes = processDefinitionService.releaseProcessDefinition(
loginUser, projectCode, 46, ReleaseState.ONLINE);
loginUser, projectCode, 46, ReleaseState.ONLINE);
Assert.assertEquals(Status.SUCCESS, onlineWithResourceRes.get(Constants.STATUS));
// release error code
Map<String, Object> failRes = processDefinitionService.releaseProcessDefinition(
loginUser, projectCode, 46, ReleaseState.getEnum(2));
loginUser, projectCode, 46, ReleaseState.getEnum(2));
Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR, failRes.get(Constants.STATUS));
}
@ -594,51 +446,30 @@ public class ProcessDefinitionServiceTest {
putMsg(result, Status.PROJECT_NOT_FOUNT, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, project.getName())).thenReturn(result);
Map<String, Object> map = processDefinitionService.verifyProcessDefinitionName(loginUser,
projectCode, "test_pdf");
projectCode, "test_pdf");
Assert.assertEquals(Status.PROJECT_NOT_FOUNT, map.get(Constants.STATUS));
//project check auth success, process not exist
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(processDefineMapper.verifyByDefineName(project.getCode(), "test_pdf")).thenReturn(null);
Map<String, Object> processNotExistRes = processDefinitionService.verifyProcessDefinitionName(loginUser,
projectCode, "test_pdf");
projectCode, "test_pdf");
Assert.assertEquals(Status.SUCCESS, processNotExistRes.get(Constants.STATUS));
//process exist
Mockito.when(processDefineMapper.verifyByDefineName(project.getCode(), "test_pdf")).thenReturn(getProcessDefinition());
Map<String, Object> processExistRes = processDefinitionService.verifyProcessDefinitionName(loginUser,
projectCode, "test_pdf");
projectCode, "test_pdf");
Assert.assertEquals(Status.PROCESS_DEFINITION_NAME_EXIST, processExistRes.get(Constants.STATUS));
}
@Test
public void testCheckProcessNodeList() {
Map<String, Object> dataNotValidRes = processDefinitionService.checkProcessNodeList(null, "");
Map<String, Object> dataNotValidRes = processDefinitionService.checkProcessNodeList(null);
Assert.assertEquals(Status.DATA_IS_NOT_VALID, dataNotValidRes.get(Constants.STATUS));
// task not empty
String processDefinitionJson = SHELL_JSON;
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
Assert.assertNotNull(processData);
Map<String, Object> taskEmptyRes = processDefinitionService.checkProcessNodeList(processData, processDefinitionJson);
Assert.assertEquals(Status.SUCCESS, taskEmptyRes.get(Constants.STATUS));
// task empty
processData.setTasks(null);
Map<String, Object> taskNotEmptyRes = processDefinitionService.checkProcessNodeList(processData, processDefinitionJson);
Assert.assertEquals(Status.PROCESS_DAG_IS_EMPTY, taskNotEmptyRes.get(Constants.STATUS));
// task cycle
String processDefinitionJsonCycle = CYCLE_SHELL_JSON;
ProcessData processDataCycle = JSONUtils.parseObject(processDefinitionJsonCycle, ProcessData.class);
Map<String, Object> taskCycleRes = processDefinitionService.checkProcessNodeList(processDataCycle, processDefinitionJsonCycle);
Assert.assertEquals(Status.PROCESS_NODE_HAS_CYCLE, taskCycleRes.get(Constants.STATUS));
//json abnormal
String abnormalJson = processDefinitionJson.replaceAll(TaskType.SHELL.getDesc(), "");
processData = JSONUtils.parseObject(abnormalJson, ProcessData.class);
Map<String, Object> abnormalTaskRes = processDefinitionService.checkProcessNodeList(processData, abnormalJson);
Assert.assertEquals(Status.PROCESS_NODE_S_PARAMETER_INVALID, abnormalTaskRes.get(Constants.STATUS));
Map<String, Object> taskEmptyRes = processDefinitionService.checkProcessNodeList(taskRelationJson);
Assert.assertEquals(Status.PROCESS_DAG_IS_EMPTY, taskEmptyRes.get(Constants.STATUS));
}
@Test
@ -701,17 +532,6 @@ public class ProcessDefinitionServiceTest {
Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
}
private ProcessData getProcessData() {
ProcessData processData = new ProcessData();
List<TaskNode> taskNodeList = new ArrayList<>();
processData.setTasks(taskNodeList);
List<Property> properties = new ArrayList<>();
processData.setGlobalParams(properties);
processData.setTenantId(10);
processData.setTimeout(100);
return processData;
}
@Test
public void testQueryAllProcessDefinitionByProjectCode() {
User loginUser = new User();
@ -735,29 +555,9 @@ public class ProcessDefinitionServiceTest {
public void testViewTree() {
//process definition not exist
ProcessDefinition processDefinition = getProcessDefinition();
processDefinition.setProcessDefinitionJson(SHELL_JSON);
Map<String, Object> processDefinitionNullRes = processDefinitionService.viewTree(46, 10);
Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionNullRes.get(Constants.STATUS));
List<ProcessInstance> processInstanceList = new ArrayList<>();
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(1);
processInstance.setName("test_instance");
processInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
processInstance.setHost("192.168.xx.xx");
processInstance.setStartTime(new Date());
processInstance.setEndTime(new Date());
processInstanceList.add(processInstance);
TaskInstance taskInstance = new TaskInstance();
taskInstance.setStartTime(new Date());
taskInstance.setEndTime(new Date());
taskInstance.setTaskType(TaskType.SHELL.getDesc());
taskInstance.setId(1);
taskInstance.setName("test_task_instance");
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
taskInstance.setHost("192.168.xx.xx");
//task instance not exist
Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
Mockito.when(processService.genDagGraph(processDefinition)).thenReturn(new DAG<>());
@ -767,38 +567,15 @@ public class ProcessDefinitionServiceTest {
//task instance exist
Map<String, Object> taskNotNuLLRes = processDefinitionService.viewTree(46, 10);
Assert.assertEquals(Status.SUCCESS, taskNotNuLLRes.get(Constants.STATUS));
}
@Test
public void testSubProcessViewTree() {
ProcessDefinition processDefinition = getProcessDefinition();
processDefinition.setProcessDefinitionJson(SHELL_JSON);
List<ProcessInstance> processInstanceList = new ArrayList<>();
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(1);
processInstance.setName("test_instance");
processInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
processInstance.setHost("192.168.xx.xx");
processInstance.setStartTime(new Date());
processInstance.setEndTime(new Date());
processInstanceList.add(processInstance);
TaskInstance taskInstance = new TaskInstance();
taskInstance.setStartTime(new Date());
taskInstance.setEndTime(new Date());
taskInstance.setTaskType(TaskType.SUB_PROCESS.getDesc());
taskInstance.setId(1);
taskInstance.setName("test_task_instance");
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
taskInstance.setHost("192.168.xx.xx");
taskInstance.setTaskParams("\"processDefinitionId\": \"222\",\n");
Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
Mockito.when(processService.genDagGraph(processDefinition)).thenReturn(new DAG<>());
Map<String, Object> taskNotNuLLRes = processDefinitionService.viewTree(46, 10);
Assert.assertEquals(Status.SUCCESS, taskNotNuLLRes.get(Constants.STATUS));
}
@Test
@ -812,25 +589,17 @@ public class ProcessDefinitionServiceTest {
long projectCode = 1L;
Project project = getProject(projectCode);
ProcessDefinition processDefinition = getProcessDefinition();
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(getProject(projectCode));
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, project.getName())).thenReturn(result);
String taskRelationJson = "[{\"name\":\"\",\"preTaskCode\":0,\"preTaskVersion\":0,\"postTaskCode\":123456789,"
+ "\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":{}},{\"name\":\"\",\"preTaskCode\":123456789,"
+ "\"preTaskVersion\":1,\"postTaskCode\":123451234,\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":{}}]";
Map<String, Object> updateResult = processDefinitionService.updateProcessDefinition(loginUser, projectCode, "test", 1,
"", "", "", 0, "root", taskRelationJson);
"", "", "", 0, "root", null);
Assert.assertEquals(Status.DATA_IS_NOT_VALID, updateResult.get(Constants.STATUS));
}
@Test
public void testBatchExportProcessDefinitionByCodes() throws IOException {
processDefinitionService.batchExportProcessDefinitionByCodes(
null, 1L, null, null);
public void testBatchExportProcessDefinitionByCodes() {
processDefinitionService.batchExportProcessDefinitionByCodes(null, 1L, null, null);
User loginUser = new User();
loginUser.setId(1);
@ -845,7 +614,7 @@ public class ProcessDefinitionServiceTest {
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, project.getName())).thenReturn(result);
processDefinitionService.batchExportProcessDefinitionByCodes(
loginUser, projectCode, "1", null);
loginUser, projectCode, "1", null);
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setId(1);
@ -857,23 +626,10 @@ public class ProcessDefinitionServiceTest {
DagData dagData = new DagData(getProcessDefinition(), null, null);
Mockito.when(processService.genDagData(Mockito.any())).thenReturn(dagData);
processDefinitionService.batchExportProcessDefinitionByCodes(
loginUser, projectCode, "1", response);
processDefinitionService.batchExportProcessDefinitionByCodes(loginUser, projectCode, "1", response);
Assert.assertNotNull(processDefinitionService.exportProcessDagData(processDefinition));
}
/**
* get mock datasource
*
* @return DataSource
*/
private DataSource getDataSource() {
DataSource dataSource = new DataSource();
dataSource.setId(2);
dataSource.setName("test");
return dataSource;
}
/**
* get mock processDefinition
*
@ -888,7 +644,6 @@ public class ProcessDefinitionServiceTest {
processDefinition.setTenantId(1);
processDefinition.setDescription("");
processDefinition.setCode(46L);
return processDefinition;
}
@ -907,30 +662,16 @@ public class ProcessDefinitionServiceTest {
return project;
}
private List<ProcessTaskRelation> getProcessTaskRelation(long projectCode, long processCode) {
private List<ProcessTaskRelation> getProcessTaskRelation(long projectCode) {
List<ProcessTaskRelation> processTaskRelations = new ArrayList<>();
ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
processTaskRelation.setProjectCode(projectCode);
processTaskRelation.setProcessDefinitionCode(processCode);
processTaskRelation.setProcessDefinitionCode(46L);
processTaskRelation.setProcessDefinitionVersion(1);
processTaskRelations.add(processTaskRelation);
return processTaskRelations;
}
/**
* get mock Project
*
* @param projectId projectId
* @return Project
*/
private Project getProjectById(int projectId) {
Project project = new Project();
project.setId(projectId);
project.setName("project_test2");
project.setUserId(1);
return project;
}
/**
* get mock schedule
*
@ -954,12 +695,6 @@ public class ProcessDefinitionServiceTest {
return schedule;
}
private List<Schedule> getSchedulerList() {
List<Schedule> scheduleList = new ArrayList<>();
scheduleList.add(getSchedule());
return scheduleList;
}
private void putMsg(Map<String, Object> result, Status status, Object... statusParams) {
result.put(Constants.STATUS, status);
if (statusParams != null && statusParams.length > 0) {

2
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java

@ -400,7 +400,7 @@ public class ProcessInstanceServiceTest {
when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
when(processService.getTenantForProcess(Mockito.anyInt(), Mockito.anyInt())).thenReturn(tenant);
when(processService.updateProcessInstance(processInstance)).thenReturn(1);
when(processDefinitionService.checkProcessNodeList(Mockito.any(), eq(shellJson))).thenReturn(result);
when(processDefinitionService.checkProcessNodeList(shellJson)).thenReturn(result);
when(processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion())).thenReturn(processDefinition);

5
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -2371,11 +2371,9 @@ public class ProcessService {
* @param processDefinition process definition
* @return dag graph
*/
@Deprecated
public DAG<String, TaskNode, TaskNodeRelation> genDagGraph(ProcessDefinition processDefinition) {
Map<String, String> locationMap = locationToMap(processDefinition.getLocations());
List<TaskNode> taskNodeList = genTaskNodeList(processDefinition.getCode(), processDefinition.getVersion(), locationMap);
List<ProcessTaskRelationLog> processTaskRelations = processTaskRelationLogMapper.queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion());
List<TaskNode> taskNodeList = transformTask(processTaskRelations);
ProcessDag processDag = DagHelper.getProcessDag(taskNodeList, new ArrayList<>(processTaskRelations));
// Generate concrete Dag to be executed
return DagHelper.buildDagGraph(processDag);
@ -2418,6 +2416,7 @@ public class ProcessService {
return processData;
}
@Deprecated
public List<TaskNode> genTaskNodeList(Long processCode, int processVersion, Map<String, String> locationMap) {
List<ProcessTaskRelationLog> processTaskRelations = processTaskRelationLogMapper.queryByProcessCodeAndVersion(processCode, processVersion);
Set<TaskDefinition> taskDefinitionSet = new HashSet<>();

Loading…
Cancel
Save