Browse Source

[Feature][JsonSplit] fix processDefinitionService ut and bug (#4894)

* Modify Project and ProjectUser Mapper

* Modify Project and ProjectUser Mapper

* project_code is bigint(20)

* modify ERROR name

* modify saveProcessDefine, remove the duplicate code with createTaskAndRelation

* modify import/export processdefinition, add genProcessData

* fix ut and bug

* code style
pull/3/MERGE
Simon 4 years ago committed by GitHub
parent
commit
6d087ccce0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 9
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  2. 90
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
  3. 133
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/VarPoolUtilsTest.java
  4. 19
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

9
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -354,12 +354,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
ProcessDefinition processDefinition = processDefinitionMapper.selectById(processId); ProcessDefinition processDefinition = processDefinitionMapper.selectById(processId);
ProcessData processData = processService.genProcessData(processDefinition);
processDefinition.setProcessDefinitionJson(JSONUtils.toJsonString(processData));
if (processDefinition == null) { if (processDefinition == null) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processId); putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processId);
} else { } else {
ProcessData processData = processService.genProcessData(processDefinition);
processDefinition.setProcessDefinitionJson(JSONUtils.toJsonString(processData));
result.put(Constants.DATA_LIST, processDefinition); result.put(Constants.DATA_LIST, processDefinition);
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
} }
@ -379,12 +378,12 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
} }
ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(project.getId(), processDefinitionName); ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(project.getId(), processDefinitionName);
ProcessData processData = processService.genProcessData(processDefinition);
processDefinition.setProcessDefinitionJson(JSONUtils.toJsonString(processData));
if (processDefinition == null) { if (processDefinition == null) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionName); putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionName);
} else { } else {
ProcessData processData = processService.genProcessData(processDefinition);
processDefinition.setProcessDefinitionJson(JSONUtils.toJsonString(processData));
result.put(Constants.DATA_LIST, processDefinition); result.put(Constants.DATA_LIST, processDefinition);
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
} }

90
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java

@ -32,6 +32,7 @@ import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.shell.ShellParameters; import org.apache.dolphinscheduler.common.task.shell.ShellParameters;
@ -48,6 +49,7 @@ import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
@ -238,6 +240,8 @@ public class ProcessDefinitionServiceTest {
@Mock @Mock
private ProcessDefinitionMapper processDefineMapper; private ProcessDefinitionMapper processDefineMapper;
@Mock @Mock
private ProcessTaskRelationMapper processTaskRelationMapper;
@Mock
private ProjectMapper projectMapper; private ProjectMapper projectMapper;
@Mock @Mock
private ProjectServiceImpl projectService; private ProjectServiceImpl projectService;
@ -342,6 +346,17 @@ public class ProcessDefinitionServiceTest {
putMsg(result, Status.SUCCESS, projectName); putMsg(result, Status.SUCCESS, projectName);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result); Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result);
Mockito.when(processDefineMapper.selectById(1)).thenReturn(null); Mockito.when(processDefineMapper.selectById(1)).thenReturn(null);
String processDefinitionJson = "{\"globalParams\":[],\"tasks\":[{\"conditionResult\":"
+ "{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\",\"dependence\":{}"
+ ",\"description\":\"\",\"id\":\"tasks-3011\",\"maxRetryTimes\":\"0\",\"name\":\"tsssss\""
+ ",\"params\":{\"localParams\":[],\"rawScript\":\"echo \\\"123123\\\"\",\"resourceList\":[]}"
+ ",\"preTasks\":[],\"retryInterval\":\"1\",\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\""
+ ",\"timeout\":{\"enable\":false,\"interval\":null,\"strategy\":\"\"},\"type\":\"SHELL\""
+ ",\"waitStartTimeout\":{},\"workerGroup\":\"default\"}],\"tenantId\":4,\"timeout\":0}";
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
Mockito.when(processService.genProcessData(Mockito.any())).thenReturn(processData);
Map<String, Object> instanceNotexitRes = processDefinitionService.queryProcessDefinitionById(loginUser, Map<String, Object> instanceNotexitRes = processDefinitionService.queryProcessDefinitionById(loginUser,
"project_test1", 1); "project_test1", 1);
Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, instanceNotexitRes.get(Constants.STATUS)); Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, instanceNotexitRes.get(Constants.STATUS));
@ -376,13 +391,23 @@ public class ProcessDefinitionServiceTest {
//project check auth success, instance not exist //project check auth success, instance not exist
putMsg(result, Status.SUCCESS, projectName); putMsg(result, Status.SUCCESS, projectName);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result); Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result);
Mockito.when(processDefineMapper.queryByDefineName(project.getId(),"test_def")).thenReturn(null); Mockito.when(processDefineMapper.queryByDefineName(project.getId(), "test_def")).thenReturn(null);
String processDefinitionJson = "{\"globalParams\":[],\"tasks\":[{\"conditionResult\":"
+ "{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\",\"dependence\":{}"
+ ",\"description\":\"\",\"id\":\"tasks-3011\",\"maxRetryTimes\":\"0\",\"name\":\"tsssss\""
+ ",\"params\":{\"localParams\":[],\"rawScript\":\"echo \\\"123123\\\"\",\"resourceList\":[]}"
+ ",\"preTasks\":[],\"retryInterval\":\"1\",\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\""
+ ",\"timeout\":{\"enable\":false,\"interval\":null,\"strategy\":\"\"},\"type\":\"SHELL\""
+ ",\"waitStartTimeout\":{},\"workerGroup\":\"default\"}],\"tenantId\":4,\"timeout\":0}";
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
Mockito.when(processService.genProcessData(Mockito.any())).thenReturn(processData);
Map<String, Object> instanceNotexitRes = processDefinitionService.queryProcessDefinitionByName(loginUser, Map<String, Object> instanceNotexitRes = processDefinitionService.queryProcessDefinitionByName(loginUser,
"project_test1", "test_def"); "project_test1", "test_def");
Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, instanceNotexitRes.get(Constants.STATUS)); Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, instanceNotexitRes.get(Constants.STATUS));
//instance exit //instance exit
Mockito.when(processDefineMapper.queryByDefineName(project.getId(),"test")).thenReturn(getProcessDefinition()); Mockito.when(processDefineMapper.queryByDefineName(project.getId(), "test")).thenReturn(getProcessDefinition());
Map<String, Object> successRes = processDefinitionService.queryProcessDefinitionByName(loginUser, Map<String, Object> successRes = processDefinitionService.queryProcessDefinitionByName(loginUser,
"project_test1", "test"); "project_test1", "test");
Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS)); Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
@ -393,6 +418,11 @@ public class ProcessDefinitionServiceTest {
String projectName = "project_test1"; String projectName = "project_test1";
Project project = getProject(projectName); Project project = getProject(projectName);
String processDefinitionJson = "{\"globalParams\":[],\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-36196\","
+ "\"name\":\"ssh_test1\",\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"aa=\\\"1234"
+ "\\\"\\necho ${aa}\"},\"desc\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\","
+ "\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},"
+ "\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":-1,\"timeout\":0}";
User loginUser = new User(); User loginUser = new User();
loginUser.setId(-1); loginUser.setId(-1);
@ -435,15 +465,24 @@ public class ProcessDefinitionServiceTest {
// instance exit // instance exit
ProcessDefinition definition = getProcessDefinition(); ProcessDefinition definition = getProcessDefinition();
definition.setLocations("{\"tasks-36196\":{\"name\":\"ssh_test1\",\"targetarr\":\"\",\"x\":141,\"y\":70}}"); definition.setLocations("{\"tasks-36196\":{\"name\":\"ssh_test1\",\"targetarr\":\"\",\"x\":141,\"y\":70}}");
definition.setProcessDefinitionJson("{\"globalParams\":[],\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-36196\"," definition.setProcessDefinitionJson(processDefinitionJson);
+ "\"name\":\"ssh_test1\",\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"aa=\\\"1234"
+ "\\\"\\necho ${aa}\"},\"desc\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\","
+ "\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},"
+ "\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":-1,\"timeout\":0}");
definition.setConnects("[]"); definition.setConnects("[]");
Mockito.when(processDefineMapper.selectById(46)).thenReturn(definition); Mockito.when(processDefineMapper.selectById(46)).thenReturn(definition);
Mockito.when(processService.saveProcessDefinition(Mockito.eq(loginUser)
, Mockito.eq(project2)
, Mockito.anyString()
, Mockito.anyString()
, Mockito.anyString()
, Mockito.anyString()
, Mockito.any(ProcessData.class)
, Mockito.any(ProcessDefinition.class)))
.thenReturn(1);
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
Mockito.when(processService.genProcessData(Mockito.any())).thenReturn(processData);
Map<String, Object> map3 = processDefinitionService.batchCopyProcessDefinition( Map<String, Object> map3 = processDefinitionService.batchCopyProcessDefinition(
loginUser, projectName, "46", 1); loginUser, projectName, "46", 1);
Assert.assertEquals(Status.SUCCESS, map3.get(Constants.STATUS)); Assert.assertEquals(Status.SUCCESS, map3.get(Constants.STATUS));
@ -568,6 +607,7 @@ public class ProcessDefinitionServiceTest {
schedules.add(schedule); schedules.add(schedule);
Mockito.when(scheduleMapper.queryByProcessDefinitionId(46)).thenReturn(schedules); Mockito.when(scheduleMapper.queryByProcessDefinitionId(46)).thenReturn(schedules);
Mockito.when(processDefineMapper.deleteById(46)).thenReturn(0); Mockito.when(processDefineMapper.deleteById(46)).thenReturn(0);
Mockito.when(processTaskRelationMapper.deleteByCode(null, null)).thenReturn(0);
Map<String, Object> deleteFail = processDefinitionService.deleteProcessDefinitionById(loginUser, Map<String, Object> deleteFail = processDefinitionService.deleteProcessDefinitionById(loginUser,
"project_test1", 46); "project_test1", 46);
Assert.assertEquals(Status.DELETE_PROCESS_DEFINE_BY_ID_ERROR, deleteFail.get(Constants.STATUS)); Assert.assertEquals(Status.DELETE_PROCESS_DEFINE_BY_ID_ERROR, deleteFail.get(Constants.STATUS));
@ -708,6 +748,7 @@ public class ProcessDefinitionServiceTest {
Assert.assertEquals(Status.DATA_IS_NOT_VALID, successRes.get(Constants.STATUS)); Assert.assertEquals(Status.DATA_IS_NOT_VALID, successRes.get(Constants.STATUS));
//success //success
Mockito.when(processService.genProcessData(Mockito.any())).thenReturn(new ProcessData());
processDefinition.setProcessDefinitionJson(SHELL_JSON); processDefinition.setProcessDefinitionJson(SHELL_JSON);
Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition); Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition);
Map<String, Object> dataNotValidRes = processDefinitionService.getTaskNodeListByDefinitionId(46); Map<String, Object> dataNotValidRes = processDefinitionService.getTaskNodeListByDefinitionId(46);
@ -729,6 +770,16 @@ public class ProcessDefinitionServiceTest {
List<ProcessDefinition> processDefinitionList = new ArrayList<>(); List<ProcessDefinition> processDefinitionList = new ArrayList<>();
processDefinitionList.add(processDefinition); processDefinitionList.add(processDefinition);
Mockito.when(processDefineMapper.queryDefinitionListByIdList(idArray)).thenReturn(processDefinitionList); Mockito.when(processDefineMapper.queryDefinitionListByIdList(idArray)).thenReturn(processDefinitionList);
String processDefinitionJson = "{\"globalParams\":[],\"tasks\":[{\"conditionResult\":"
+ "{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\",\"dependence\":{}"
+ ",\"description\":\"\",\"id\":\"tasks-3011\",\"maxRetryTimes\":\"0\",\"name\":\"tsssss\""
+ ",\"params\":{\"localParams\":[],\"rawScript\":\"echo \\\"123123\\\"\",\"resourceList\":[]}"
+ ",\"preTasks\":[],\"retryInterval\":\"1\",\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\""
+ ",\"timeout\":{\"enable\":false,\"interval\":null,\"strategy\":\"\"},\"type\":\"SHELL\""
+ ",\"waitStartTimeout\":{},\"workerGroup\":\"default\"}],\"tenantId\":4,\"timeout\":0}";
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
Mockito.when(processService.genProcessData(processDefinition)).thenReturn(processData);
Map<String, Object> successRes = processDefinitionService.getTaskNodeListByDefinitionIdList(defineIdList); Map<String, Object> successRes = processDefinitionService.getTaskNodeListByDefinitionIdList(defineIdList);
Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS)); Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
} }
@ -776,12 +827,11 @@ public class ProcessDefinitionServiceTest {
//task instance not exist //task instance not exist
Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition); Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition);
Mockito.when(processInstanceService.queryByProcessDefineId(46, 10)).thenReturn(processInstanceList); Mockito.when(processInstanceService.queryByProcessDefineId(46, 10)).thenReturn(processInstanceList);
Mockito.when(taskInstanceMapper.queryByInstanceIdAndName(processInstance.getId(), "shell-1")).thenReturn(null); Mockito.when(processService.genDagGraph(processDefinition)).thenReturn(new DAG<>());
Map<String, Object> taskNullRes = processDefinitionService.viewTree(46, 10); Map<String, Object> taskNullRes = processDefinitionService.viewTree(46, 10);
Assert.assertEquals(Status.SUCCESS, taskNullRes.get(Constants.STATUS)); Assert.assertEquals(Status.SUCCESS, taskNullRes.get(Constants.STATUS));
//task instance exist //task instance exist
Mockito.when(taskInstanceMapper.queryByInstanceIdAndName(processInstance.getId(), "shell-1")).thenReturn(taskInstance);
Map<String, Object> taskNotNuLLRes = processDefinitionService.viewTree(46, 10); Map<String, Object> taskNotNuLLRes = processDefinitionService.viewTree(46, 10);
Assert.assertEquals(Status.SUCCESS, taskNotNuLLRes.get(Constants.STATUS)); Assert.assertEquals(Status.SUCCESS, taskNotNuLLRes.get(Constants.STATUS));
@ -843,8 +893,8 @@ public class ProcessDefinitionServiceTest {
+ "}"); + "}");
//task instance exist //task instance exist
Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition); Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition);
Mockito.when(processService.genDagGraph(processDefinition)).thenReturn(new DAG<>());
Mockito.when(processInstanceService.queryByProcessDefineId(46, 10)).thenReturn(processInstanceList); Mockito.when(processInstanceService.queryByProcessDefineId(46, 10)).thenReturn(processInstanceList);
Mockito.when(taskInstanceMapper.queryByInstanceIdAndName(processInstance.getId(), "shell-1")).thenReturn(taskInstance);
Map<String, Object> taskNotNuLLRes = processDefinitionService.viewTree(46, 10); Map<String, Object> taskNotNuLLRes = processDefinitionService.viewTree(46, 10);
Assert.assertEquals(Status.SUCCESS, taskNotNuLLRes.get(Constants.STATUS)); Assert.assertEquals(Status.SUCCESS, taskNotNuLLRes.get(Constants.STATUS));
@ -944,7 +994,6 @@ public class ProcessDefinitionServiceTest {
Mockito.when(projectMapper.queryByName(currentProjectName)).thenReturn(getProject(currentProjectName)); Mockito.when(projectMapper.queryByName(currentProjectName)).thenReturn(getProject(currentProjectName));
Mockito.when(projectService.checkProjectAndAuth(loginUser, getProject(currentProjectName), currentProjectName)).thenReturn(result); Mockito.when(projectService.checkProjectAndAuth(loginUser, getProject(currentProjectName), currentProjectName)).thenReturn(result);
Mockito.when(processDefineMapper.queryByDefineId(46)).thenReturn(shellDefinition2);
Map<String, Object> importProcessResult = processDefinitionService.importProcessDefinition(loginUser, multipartFile, currentProjectName); Map<String, Object> importProcessResult = processDefinitionService.importProcessDefinition(loginUser, multipartFile, currentProjectName);
@ -972,7 +1021,6 @@ public class ProcessDefinitionServiceTest {
Mockito.when(projectMapper.queryByName(projectName)).thenReturn(getProject(projectName)); Mockito.when(projectMapper.queryByName(projectName)).thenReturn(getProject(projectName));
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result); Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result);
Mockito.when(processService.findProcessDefineById(1)).thenReturn(processDefinition); Mockito.when(processService.findProcessDefineById(1)).thenReturn(processDefinition);
Mockito.when(processDefinitionVersionService.addProcessDefinitionVersion(processDefinition)).thenReturn(1);
String sqlDependentJson = "{\n" String sqlDependentJson = "{\n"
+ " \"globalParams\": [\n" + " \"globalParams\": [\n"
@ -1074,6 +1122,13 @@ public class ProcessDefinitionServiceTest {
processDefinitionService.batchExportProcessDefinitionByIds( processDefinitionService.batchExportProcessDefinitionByIds(
null, null, null, null); null, null, null, null);
String processDefinitionJson = "{\"globalParams\":[],\"tasks\":[{\"conditionResult\":"
+ "{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\",\"dependence\":{}"
+ ",\"description\":\"\",\"id\":\"tasks-3011\",\"maxRetryTimes\":\"0\",\"name\":\"tsssss\""
+ ",\"params\":{\"localParams\":[],\"rawScript\":\"echo \\\"123123\\\"\",\"resourceList\":[]}"
+ ",\"preTasks\":[],\"retryInterval\":\"1\",\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\""
+ ",\"timeout\":{\"enable\":false,\"interval\":null,\"strategy\":\"\"},\"type\":\"SHELL\""
+ ",\"waitStartTimeout\":{},\"workerGroup\":\"default\"}],\"tenantId\":4,\"timeout\":0}";
User loginUser = new User(); User loginUser = new User();
loginUser.setId(1); loginUser.setId(1);
loginUser.setUserType(UserType.ADMIN_USER); loginUser.setUserType(UserType.ADMIN_USER);
@ -1091,13 +1146,7 @@ public class ProcessDefinitionServiceTest {
ProcessDefinition processDefinition = new ProcessDefinition(); ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setId(1); processDefinition.setId(1);
processDefinition.setProcessDefinitionJson("{\"globalParams\":[],\"tasks\":[{\"conditionResult\":" processDefinition.setProcessDefinitionJson(processDefinitionJson);
+ "{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\",\"dependence\":{}"
+ ",\"description\":\"\",\"id\":\"tasks-3011\",\"maxRetryTimes\":\"0\",\"name\":\"tsssss\""
+ ",\"params\":{\"localParams\":[],\"rawScript\":\"echo \\\"123123\\\"\",\"resourceList\":[]}"
+ ",\"preTasks\":[],\"retryInterval\":\"1\",\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\""
+ ",\"timeout\":{\"enable\":false,\"interval\":null,\"strategy\":\"\"},\"type\":\"SHELL\""
+ ",\"waitStartTimeout\":{},\"workerGroup\":\"default\"}],\"tenantId\":4,\"timeout\":0}");
Map<String, Object> checkResult = new HashMap<>(); Map<String, Object> checkResult = new HashMap<>();
checkResult.put(Constants.STATUS, Status.SUCCESS); checkResult.put(Constants.STATUS, Status.SUCCESS);
Mockito.when(projectMapper.queryByName(projectName)).thenReturn(project); Mockito.when(projectMapper.queryByName(projectName)).thenReturn(project);
@ -1105,6 +1154,9 @@ public class ProcessDefinitionServiceTest {
Mockito.when(processDefineMapper.queryByDefineId(1)).thenReturn(processDefinition); Mockito.when(processDefineMapper.queryByDefineId(1)).thenReturn(processDefinition);
HttpServletResponse response = mock(HttpServletResponse.class); HttpServletResponse response = mock(HttpServletResponse.class);
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
Mockito.when(processService.genProcessData(processDefinition)).thenReturn(processData);
ServletOutputStream outputStream = mock(ServletOutputStream.class); ServletOutputStream outputStream = mock(ServletOutputStream.class);
when(response.getOutputStream()).thenReturn(outputStream); when(response.getOutputStream()).thenReturn(outputStream);
processDefinitionService.batchExportProcessDefinitionByIds( processDefinitionService.batchExportProcessDefinitionByIds(

133
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/VarPoolUtilsTest.java

@ -28,8 +28,10 @@ import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
public class VarPoolUtilsTest { public class VarPoolUtilsTest {
private static final Logger logger = LoggerFactory.getLogger(VarPoolUtilsTest.class); private static final Logger logger = LoggerFactory.getLogger(VarPoolUtilsTest.class);
@Test @Test
@ -37,53 +39,122 @@ public class VarPoolUtilsTest {
String varPool = "p1,66$VarPool$p2,69$VarPool$"; String varPool = "p1,66$VarPool$p2,69$VarPool$";
ConcurrentHashMap<String, Object> propToValue = new ConcurrentHashMap<String, Object>(); ConcurrentHashMap<String, Object> propToValue = new ConcurrentHashMap<String, Object>();
VarPoolUtils.convertVarPoolToMap(propToValue, varPool); VarPoolUtils.convertVarPoolToMap(propToValue, varPool);
Assert.assertEquals((String)propToValue.get("p1"), "66"); Assert.assertEquals((String) propToValue.get("p1"), "66");
Assert.assertEquals((String)propToValue.get("p2"), "69"); Assert.assertEquals((String) propToValue.get("p2"), "69");
logger.info(propToValue.toString()); logger.info(propToValue.toString());
} }
@Test @Test
public void testConvertPythonScriptPlaceholders() throws Exception { public void testConvertPythonScriptPlaceholders() throws Exception {
String rawScript = "print(${p1});\n${setShareVar(${p1},3)};\n${setShareVar(${p2},4)};"; String rawScript = "print(${p1});\n${setShareVar(${p1},3)};\n${setShareVar(${p2},4)};";
rawScript = VarPoolUtils.convertPythonScriptPlaceholders(rawScript); rawScript = VarPoolUtils.convertPythonScriptPlaceholders(rawScript);
Assert.assertEquals(rawScript, "print(${p1});\n" Assert.assertEquals(rawScript, "print(${p1});\n"
+ "print(\"${{setValue({},{})}}\".format(\"p1\",3));\n" + "print(\"${{setValue({},{})}}\".format(\"p1\",3));\n"
+ "print(\"${{setValue({},{})}}\".format(\"p2\",4));"); + "print(\"${{setValue({},{})}}\".format(\"p2\",4));");
logger.info(rawScript); logger.info(rawScript);
} }
@Test @Test
public void testSetTaskNodeLocalParams() throws Exception { public void testSetTaskNodeLocalParams() throws Exception {
String taskJson = "{\"id\":\"tasks-66199\",\"name\":\"file-shell\",\"desc\":null,\"type\":\"SHELL\"," String taskJson = "{\"id\":\"tasks-66199\",\"name\":\"file-shell\",\"desc\":null,\"type\":\"SHELL\","
+ "\"runFlag\":\"NORMAL\",\"loc\":null,\"maxRetryTimes\":0,\"retryInterval\":1,\"" + "\"runFlag\":\"NORMAL\",\"loc\":null,\"maxRetryTimes\":0,\"retryInterval\":1,\""
+ "params\":{\"rawScript\":\"sh n-1/n-1-1/run.sh\",\"" + "params\":{\"rawScript\":\"sh n-1/n-1-1/run.sh\",\""
+ "localParams\":[{\"prop\":\"k1\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"v1\"},{\"prop\":\"k2\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"v2\"}," + "localParams\":[{\"prop\":\"k1\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"v1\"},{\"prop\":\"k2\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"v2\"},"
+ "{\"prop\":\"k3\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"v3\"}],\"" + "{\"prop\":\"k3\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"v3\"}],\""
+ "resourceList\":[{\"id\":\"dolphinschedule-code\",\"res\":\"n-1/n-1-1/dolphinscheduler-api-server.log\"}," + "resourceList\":[{\"id\":\"dolphinschedule-code\",\"res\":\"n-1/n-1-1/dolphinscheduler-api-server.log\"},"
+ "{\"id\":\"mr-code\",\"res\":\"n-1/n-1-1/hadoop-mapreduce-examples-2.7.4.jar\"}," + "{\"id\":\"mr-code\",\"res\":\"n-1/n-1-1/hadoop-mapreduce-examples-2.7.4.jar\"},"
+ "{\"id\":\"run\",\"res\":\"n-1/n-1-1/run.sh\"}]},\"preTasks\":[],\"extras\":null,\"depList\":[],\"" + "{\"id\":\"run\",\"res\":\"n-1/n-1-1/run.sh\"}]},\"preTasks\":[],\"extras\":null,\"depList\":[],\""
+ "dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"taskInstancePriority\":\"MEDIUM\",\"" + "dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"taskInstancePriority\":\"MEDIUM\",\""
+ "workerGroup\":\"default\",\"workerGroupId\":null,\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"delayTime\":0}"; + "workerGroup\":\"default\",\"workerGroupId\":null,\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"delayTime\":0}";
String changeTaskJson = "{\"id\":\"tasks-66199\",\"name\":\"file-shell\",\"desc\":null,\"type\":\"SHELL\","
+ "\"runFlag\":\"NORMAL\",\"loc\":null,\"maxRetryTimes\":0,\"retryInterval\":1,\"" String changeTaskJson = "{"
+ "params\":{\"rawScript\":\"sh n-1/n-1-1/run.sh\",\"" + " \"id\":\"tasks-66199\","
+ "localParams\":[{\"prop\":\"k1\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"k1-value-change\"}," + " \"code\":null,"
+ "{\"prop\":\"k2\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"k2-value-change\"}," + " \"version\":0,"
+ "{\"prop\":\"k3\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"v3\"}],\"" + " \"name\":\"file-shell\","
+ "resourceList\":[{\"id\":\"dolphinschedule-code\",\"res\":\"n-1/n-1-1/dolphinscheduler-api-server.log\"}," + " \"desc\":null,"
+ "{\"id\":\"mr-code\",\"res\":\"n-1/n-1-1/hadoop-mapreduce-examples-2.7.4.jar\"}," + " \"type\":\"SHELL\","
+ "{\"id\":\"run\",\"res\":\"n-1/n-1-1/run.sh\"}]},\"preTasks\":[],\"extras\":null,\"depList\":[],\"" + " \"runFlag\":\"NORMAL\","
+ "dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"taskInstancePriority\":\"MEDIUM\",\"" + " \"loc\":null,"
+ "workerGroup\":\"default\",\"workerGroupId\":null,\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"delayTime\":0}"; + " \"maxRetryTimes\":0,"
+ " \"retryInterval\":1,"
+ " \"params\":{"
+ " \"rawScript\":\"sh n-1/n-1-1/run.sh\","
+ " \"localParams\":["
+ " {"
+ " \"prop\":\"k1\","
+ " \"direct\":\"IN\","
+ " \"type\":\"VARCHAR\","
+ " \"value\":\"k1-value-change\""
+ " },"
+ " {"
+ " \"prop\":\"k2\","
+ " \"direct\":\"IN\","
+ " \"type\":\"VARCHAR\","
+ " \"value\":\"k2-value-change\""
+ " },"
+ " {"
+ " \"prop\":\"k3\","
+ " \"direct\":\"IN\","
+ " \"type\":\"VARCHAR\","
+ " \"value\":\"v3\""
+ " }"
+ " ],"
+ " \"resourceList\":["
+ " {"
+ " \"id\":\"dolphinschedule-code\","
+ " \"res\":\"n-1/n-1-1/dolphinscheduler-api-server.log\""
+ " },"
+ " {"
+ " \"id\":\"mr-code\","
+ " \"res\":\"n-1/n-1-1/hadoop-mapreduce-examples-2.7.4.jar\""
+ " },"
+ " {"
+ " \"id\":\"run\","
+ " \"res\":\"n-1/n-1-1/run.sh\""
+ " }"
+ " ]"
+ " },"
+ " \"preTasks\":["
+ ""
+ " ],"
+ " \"preTaskNodeList\":null,"
+ " \"extras\":null,"
+ " \"depList\":["
+ ""
+ " ],"
+ " \"dependence\":{"
+ ""
+ " },"
+ " \"conditionResult\":{"
+ " \"successNode\":["
+ " \"\""
+ " ],"
+ " \"failedNode\":["
+ " \"\""
+ " ]"
+ " },"
+ " \"taskInstancePriority\":\"MEDIUM\","
+ " \"workerGroup\":\"default\","
+ " \"workerGroupId\":null,"
+ " \"timeout\":{"
+ " \"strategy\":\"\","
+ " \"interval\":null,"
+ " \"enable\":false"
+ " },"
+ " \"delayTime\":0"
+ "}";
ObjectNode jsonNodes = JSONUtils.parseObject(changeTaskJson);
Map<String, Object> propToValue = new HashMap<String, Object>(); Map<String, Object> propToValue = new HashMap<String, Object>();
propToValue.put("k1","k1-value-change"); propToValue.put("k1", "k1-value-change");
propToValue.put("k2","k2-value-change"); propToValue.put("k2", "k2-value-change");
TaskNode taskNode = JSONUtils.parseObject(taskJson,TaskNode.class); TaskNode taskNode = JSONUtils.parseObject(taskJson, TaskNode.class);
VarPoolUtils.setTaskNodeLocalParams(taskNode,propToValue); VarPoolUtils.setTaskNodeLocalParams(taskNode, propToValue);
Assert.assertEquals(changeTaskJson,JSONUtils.toJsonString(taskNode)); Assert.assertEquals(JSONUtils.toJsonString(jsonNodes), JSONUtils.toJsonString(taskNode).trim());
} }

19
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

@ -34,12 +34,14 @@ import org.apache.dolphinscheduler.dao.entity.ProcessData;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap; import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.CommandMapper; import org.apache.dolphinscheduler.dao.mapper.CommandMapper;
import org.apache.dolphinscheduler.dao.mapper.ErrorCommandMapper; import org.apache.dolphinscheduler.dao.mapper.ErrorCommandMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper; import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtilsTest; import org.apache.dolphinscheduler.service.quartz.cron.CronUtilsTest;
@ -72,15 +74,12 @@ public class ProcessServiceTest {
@InjectMocks @InjectMocks
private ProcessService processService; private ProcessService processService;
@Mock @Mock
private CommandMapper commandMapper; private CommandMapper commandMapper;
@Mock
private ProcessTaskRelationLogMapper processTaskRelationLogMapper;
@Mock @Mock
private ErrorCommandMapper errorCommandMapper; private ErrorCommandMapper errorCommandMapper;
@Mock @Mock
private ProcessDefinitionMapper processDefineMapper; private ProcessDefinitionMapper processDefineMapper;
@Mock @Mock
@ -238,6 +237,7 @@ public class ProcessServiceTest {
processDefinition.setId(123); processDefinition.setId(123);
processDefinition.setName("test"); processDefinition.setName("test");
processDefinition.setVersion(1); processDefinition.setVersion(1);
processDefinition.setCode(11L);
processDefinition.setProcessDefinitionJson("{\"globalParams\":[{\"prop\":\"startParam1\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"}],\"tasks\":[{\"conditionResult\":" processDefinition.setProcessDefinitionJson("{\"globalParams\":[{\"prop\":\"startParam1\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"}],\"tasks\":[{\"conditionResult\":"
+ "{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\",\"dependence\":{}" + "{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\",\"dependence\":{}"
+ ",\"description\":\"\",\"id\":\"tasks-3011\",\"maxRetryTimes\":\"0\",\"name\":\"tsssss\"" + ",\"description\":\"\",\"id\":\"tasks-3011\",\"maxRetryTimes\":\"0\",\"name\":\"tsssss\""
@ -314,6 +314,7 @@ public class ProcessServiceTest {
@Test @Test
public void testRecurseFindSubProcessId() { public void testRecurseFindSubProcessId() {
ProcessDefinition processDefinition = new ProcessDefinition(); ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setCode(10L);
processDefinition.setProcessDefinitionJson("{\"globalParams\":[],\"tasks\":[{\"conditionResult\":" processDefinition.setProcessDefinitionJson("{\"globalParams\":[],\"tasks\":[{\"conditionResult\":"
+ "{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\"" + "{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\""
+ ",\"dependence\":{},\"description\":\"\",\"id\":\"tasks-76544\"" + ",\"dependence\":{},\"description\":\"\",\"id\":\"tasks-76544\""
@ -326,6 +327,7 @@ public class ProcessServiceTest {
int parentId = 111; int parentId = 111;
List<Integer> ids = new ArrayList<>(); List<Integer> ids = new ArrayList<>();
ProcessDefinition processDefinition2 = new ProcessDefinition(); ProcessDefinition processDefinition2 = new ProcessDefinition();
processDefinition2.setCode(11L);
processDefinition2.setProcessDefinitionJson("{\"globalParams\":[],\"tasks\":[{\"conditionResult\"" processDefinition2.setProcessDefinitionJson("{\"globalParams\":[],\"tasks\":[{\"conditionResult\""
+ ":{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\",\"dependence\":{}," + ":{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\",\"dependence\":{},"
+ "\"description\":\"\",\"id\":\"tasks-76544\",\"maxRetryTimes\":\"0\",\"name\":\"test\"," + "\"description\":\"\",\"id\":\"tasks-76544\",\"maxRetryTimes\":\"0\",\"name\":\"test\","
@ -334,7 +336,12 @@ public class ProcessServiceTest {
+ "\"MEDIUM\",\"timeout\":{\"enable\":false,\"interval\":null,\"strategy\":\"\"},\"type\":" + "\"MEDIUM\",\"timeout\":{\"enable\":false,\"interval\":null,\"strategy\":\"\"},\"type\":"
+ "\"SHELL\",\"waitStartTimeout\":{},\"workerGroup\":\"default\"}],\"tenantId\":4,\"timeout\":0}"); + "\"SHELL\",\"waitStartTimeout\":{},\"workerGroup\":\"default\"}],\"tenantId\":4,\"timeout\":0}");
Mockito.when(processDefineMapper.selectById(parentId)).thenReturn(processDefinition); Mockito.when(processDefineMapper.selectById(parentId)).thenReturn(processDefinition);
Mockito.when(processDefineMapper.selectById(222)).thenReturn(processDefinition2);
List<ProcessTaskRelationLog> relationLogList = new ArrayList<>();
Mockito.when(processTaskRelationLogMapper.queryByProcessCodeAndVersion(Mockito.anyLong()
, Mockito.anyInt()))
.thenReturn(relationLogList);
processService.recurseFindSubProcessId(parentId, ids); processService.recurseFindSubProcessId(parentId, ids);
} }

Loading…
Cancel
Save