Browse Source

Merge branch 'json_split' of https://github.com/apache/incubator-dolphinscheduler into spilit

pull/3/MERGE
lenboo 4 years ago
parent
commit
df0e3e96b8
  1. 9
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  2. 90
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
  3. 129
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/VarPoolUtilsTest.java
  4. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
  5. 11
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java
  6. 37
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  7. 19
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

9
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -354,12 +354,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
ProcessDefinition processDefinition = processDefinitionMapper.selectById(processId);
ProcessData processData = processService.genProcessData(processDefinition);
processDefinition.setProcessDefinitionJson(JSONUtils.toJsonString(processData));
if (processDefinition == null) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processId);
} else {
ProcessData processData = processService.genProcessData(processDefinition);
processDefinition.setProcessDefinitionJson(JSONUtils.toJsonString(processData));
result.put(Constants.DATA_LIST, processDefinition);
putMsg(result, Status.SUCCESS);
}
@ -379,12 +378,12 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(project.getId(), processDefinitionName);
ProcessData processData = processService.genProcessData(processDefinition);
processDefinition.setProcessDefinitionJson(JSONUtils.toJsonString(processData));
if (processDefinition == null) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionName);
} else {
ProcessData processData = processService.genProcessData(processDefinition);
processDefinition.setProcessDefinitionJson(JSONUtils.toJsonString(processData));
result.put(Constants.DATA_LIST, processDefinition);
putMsg(result, Status.SUCCESS);
}

90
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java

@ -32,6 +32,7 @@ import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.shell.ShellParameters;
@ -48,6 +49,7 @@ import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
@ -238,6 +240,8 @@ public class ProcessDefinitionServiceTest {
@Mock
private ProcessDefinitionMapper processDefineMapper;
@Mock
private ProcessTaskRelationMapper processTaskRelationMapper;
@Mock
private ProjectMapper projectMapper;
@Mock
private ProjectServiceImpl projectService;
@ -342,6 +346,17 @@ public class ProcessDefinitionServiceTest {
putMsg(result, Status.SUCCESS, projectName);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result);
Mockito.when(processDefineMapper.selectById(1)).thenReturn(null);
String processDefinitionJson = "{\"globalParams\":[],\"tasks\":[{\"conditionResult\":"
+ "{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\",\"dependence\":{}"
+ ",\"description\":\"\",\"id\":\"tasks-3011\",\"maxRetryTimes\":\"0\",\"name\":\"tsssss\""
+ ",\"params\":{\"localParams\":[],\"rawScript\":\"echo \\\"123123\\\"\",\"resourceList\":[]}"
+ ",\"preTasks\":[],\"retryInterval\":\"1\",\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\""
+ ",\"timeout\":{\"enable\":false,\"interval\":null,\"strategy\":\"\"},\"type\":\"SHELL\""
+ ",\"waitStartTimeout\":{},\"workerGroup\":\"default\"}],\"tenantId\":4,\"timeout\":0}";
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
Mockito.when(processService.genProcessData(Mockito.any())).thenReturn(processData);
Map<String, Object> instanceNotexitRes = processDefinitionService.queryProcessDefinitionById(loginUser,
"project_test1", 1);
Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, instanceNotexitRes.get(Constants.STATUS));
@ -376,13 +391,23 @@ public class ProcessDefinitionServiceTest {
//project check auth success, instance not exist
putMsg(result, Status.SUCCESS, projectName);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result);
Mockito.when(processDefineMapper.queryByDefineName(project.getId(),"test_def")).thenReturn(null);
Mockito.when(processDefineMapper.queryByDefineName(project.getId(), "test_def")).thenReturn(null);
String processDefinitionJson = "{\"globalParams\":[],\"tasks\":[{\"conditionResult\":"
+ "{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\",\"dependence\":{}"
+ ",\"description\":\"\",\"id\":\"tasks-3011\",\"maxRetryTimes\":\"0\",\"name\":\"tsssss\""
+ ",\"params\":{\"localParams\":[],\"rawScript\":\"echo \\\"123123\\\"\",\"resourceList\":[]}"
+ ",\"preTasks\":[],\"retryInterval\":\"1\",\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\""
+ ",\"timeout\":{\"enable\":false,\"interval\":null,\"strategy\":\"\"},\"type\":\"SHELL\""
+ ",\"waitStartTimeout\":{},\"workerGroup\":\"default\"}],\"tenantId\":4,\"timeout\":0}";
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
Mockito.when(processService.genProcessData(Mockito.any())).thenReturn(processData);
Map<String, Object> instanceNotexitRes = processDefinitionService.queryProcessDefinitionByName(loginUser,
"project_test1", "test_def");
Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, instanceNotexitRes.get(Constants.STATUS));
//instance exit
Mockito.when(processDefineMapper.queryByDefineName(project.getId(),"test")).thenReturn(getProcessDefinition());
Mockito.when(processDefineMapper.queryByDefineName(project.getId(), "test")).thenReturn(getProcessDefinition());
Map<String, Object> successRes = processDefinitionService.queryProcessDefinitionByName(loginUser,
"project_test1", "test");
Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
@ -393,6 +418,11 @@ public class ProcessDefinitionServiceTest {
String projectName = "project_test1";
Project project = getProject(projectName);
String processDefinitionJson = "{\"globalParams\":[],\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-36196\","
+ "\"name\":\"ssh_test1\",\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"aa=\\\"1234"
+ "\\\"\\necho ${aa}\"},\"desc\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\","
+ "\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},"
+ "\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":-1,\"timeout\":0}";
User loginUser = new User();
loginUser.setId(-1);
@ -435,15 +465,24 @@ public class ProcessDefinitionServiceTest {
// instance exit
ProcessDefinition definition = getProcessDefinition();
definition.setLocations("{\"tasks-36196\":{\"name\":\"ssh_test1\",\"targetarr\":\"\",\"x\":141,\"y\":70}}");
definition.setProcessDefinitionJson("{\"globalParams\":[],\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-36196\","
+ "\"name\":\"ssh_test1\",\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"aa=\\\"1234"
+ "\\\"\\necho ${aa}\"},\"desc\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\","
+ "\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},"
+ "\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":-1,\"timeout\":0}");
definition.setProcessDefinitionJson(processDefinitionJson);
definition.setConnects("[]");
Mockito.when(processDefineMapper.selectById(46)).thenReturn(definition);
Mockito.when(processService.saveProcessDefinition(Mockito.eq(loginUser)
, Mockito.eq(project2)
, Mockito.anyString()
, Mockito.anyString()
, Mockito.anyString()
, Mockito.anyString()
, Mockito.any(ProcessData.class)
, Mockito.any(ProcessDefinition.class)))
.thenReturn(1);
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
Mockito.when(processService.genProcessData(Mockito.any())).thenReturn(processData);
Map<String, Object> map3 = processDefinitionService.batchCopyProcessDefinition(
loginUser, projectName, "46", 1);
Assert.assertEquals(Status.SUCCESS, map3.get(Constants.STATUS));
@ -568,6 +607,7 @@ public class ProcessDefinitionServiceTest {
schedules.add(schedule);
Mockito.when(scheduleMapper.queryByProcessDefinitionId(46)).thenReturn(schedules);
Mockito.when(processDefineMapper.deleteById(46)).thenReturn(0);
Mockito.when(processTaskRelationMapper.deleteByCode(null, null)).thenReturn(0);
Map<String, Object> deleteFail = processDefinitionService.deleteProcessDefinitionById(loginUser,
"project_test1", 46);
Assert.assertEquals(Status.DELETE_PROCESS_DEFINE_BY_ID_ERROR, deleteFail.get(Constants.STATUS));
@ -708,6 +748,7 @@ public class ProcessDefinitionServiceTest {
Assert.assertEquals(Status.DATA_IS_NOT_VALID, successRes.get(Constants.STATUS));
//success
Mockito.when(processService.genProcessData(Mockito.any())).thenReturn(new ProcessData());
processDefinition.setProcessDefinitionJson(SHELL_JSON);
Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition);
Map<String, Object> dataNotValidRes = processDefinitionService.getTaskNodeListByDefinitionId(46);
@ -729,6 +770,16 @@ public class ProcessDefinitionServiceTest {
List<ProcessDefinition> processDefinitionList = new ArrayList<>();
processDefinitionList.add(processDefinition);
Mockito.when(processDefineMapper.queryDefinitionListByIdList(idArray)).thenReturn(processDefinitionList);
String processDefinitionJson = "{\"globalParams\":[],\"tasks\":[{\"conditionResult\":"
+ "{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\",\"dependence\":{}"
+ ",\"description\":\"\",\"id\":\"tasks-3011\",\"maxRetryTimes\":\"0\",\"name\":\"tsssss\""
+ ",\"params\":{\"localParams\":[],\"rawScript\":\"echo \\\"123123\\\"\",\"resourceList\":[]}"
+ ",\"preTasks\":[],\"retryInterval\":\"1\",\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\""
+ ",\"timeout\":{\"enable\":false,\"interval\":null,\"strategy\":\"\"},\"type\":\"SHELL\""
+ ",\"waitStartTimeout\":{},\"workerGroup\":\"default\"}],\"tenantId\":4,\"timeout\":0}";
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
Mockito.when(processService.genProcessData(processDefinition)).thenReturn(processData);
Map<String, Object> successRes = processDefinitionService.getTaskNodeListByDefinitionIdList(defineIdList);
Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
}
@ -776,12 +827,11 @@ public class ProcessDefinitionServiceTest {
//task instance not exist
Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition);
Mockito.when(processInstanceService.queryByProcessDefineId(46, 10)).thenReturn(processInstanceList);
Mockito.when(taskInstanceMapper.queryByInstanceIdAndName(processInstance.getId(), "shell-1")).thenReturn(null);
Mockito.when(processService.genDagGraph(processDefinition)).thenReturn(new DAG<>());
Map<String, Object> taskNullRes = processDefinitionService.viewTree(46, 10);
Assert.assertEquals(Status.SUCCESS, taskNullRes.get(Constants.STATUS));
//task instance exist
Mockito.when(taskInstanceMapper.queryByInstanceIdAndName(processInstance.getId(), "shell-1")).thenReturn(taskInstance);
Map<String, Object> taskNotNuLLRes = processDefinitionService.viewTree(46, 10);
Assert.assertEquals(Status.SUCCESS, taskNotNuLLRes.get(Constants.STATUS));
@ -843,8 +893,8 @@ public class ProcessDefinitionServiceTest {
+ "}");
//task instance exist
Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition);
Mockito.when(processService.genDagGraph(processDefinition)).thenReturn(new DAG<>());
Mockito.when(processInstanceService.queryByProcessDefineId(46, 10)).thenReturn(processInstanceList);
Mockito.when(taskInstanceMapper.queryByInstanceIdAndName(processInstance.getId(), "shell-1")).thenReturn(taskInstance);
Map<String, Object> taskNotNuLLRes = processDefinitionService.viewTree(46, 10);
Assert.assertEquals(Status.SUCCESS, taskNotNuLLRes.get(Constants.STATUS));
@ -944,7 +994,6 @@ public class ProcessDefinitionServiceTest {
Mockito.when(projectMapper.queryByName(currentProjectName)).thenReturn(getProject(currentProjectName));
Mockito.when(projectService.checkProjectAndAuth(loginUser, getProject(currentProjectName), currentProjectName)).thenReturn(result);
Mockito.when(processDefineMapper.queryByDefineId(46)).thenReturn(shellDefinition2);
Map<String, Object> importProcessResult = processDefinitionService.importProcessDefinition(loginUser, multipartFile, currentProjectName);
@ -972,7 +1021,6 @@ public class ProcessDefinitionServiceTest {
Mockito.when(projectMapper.queryByName(projectName)).thenReturn(getProject(projectName));
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result);
Mockito.when(processService.findProcessDefineById(1)).thenReturn(processDefinition);
Mockito.when(processDefinitionVersionService.addProcessDefinitionVersion(processDefinition)).thenReturn(1);
String sqlDependentJson = "{\n"
+ " \"globalParams\": [\n"
@ -1074,6 +1122,13 @@ public class ProcessDefinitionServiceTest {
processDefinitionService.batchExportProcessDefinitionByIds(
null, null, null, null);
String processDefinitionJson = "{\"globalParams\":[],\"tasks\":[{\"conditionResult\":"
+ "{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\",\"dependence\":{}"
+ ",\"description\":\"\",\"id\":\"tasks-3011\",\"maxRetryTimes\":\"0\",\"name\":\"tsssss\""
+ ",\"params\":{\"localParams\":[],\"rawScript\":\"echo \\\"123123\\\"\",\"resourceList\":[]}"
+ ",\"preTasks\":[],\"retryInterval\":\"1\",\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\""
+ ",\"timeout\":{\"enable\":false,\"interval\":null,\"strategy\":\"\"},\"type\":\"SHELL\""
+ ",\"waitStartTimeout\":{},\"workerGroup\":\"default\"}],\"tenantId\":4,\"timeout\":0}";
User loginUser = new User();
loginUser.setId(1);
loginUser.setUserType(UserType.ADMIN_USER);
@ -1091,13 +1146,7 @@ public class ProcessDefinitionServiceTest {
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setId(1);
processDefinition.setProcessDefinitionJson("{\"globalParams\":[],\"tasks\":[{\"conditionResult\":"
+ "{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\",\"dependence\":{}"
+ ",\"description\":\"\",\"id\":\"tasks-3011\",\"maxRetryTimes\":\"0\",\"name\":\"tsssss\""
+ ",\"params\":{\"localParams\":[],\"rawScript\":\"echo \\\"123123\\\"\",\"resourceList\":[]}"
+ ",\"preTasks\":[],\"retryInterval\":\"1\",\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\""
+ ",\"timeout\":{\"enable\":false,\"interval\":null,\"strategy\":\"\"},\"type\":\"SHELL\""
+ ",\"waitStartTimeout\":{},\"workerGroup\":\"default\"}],\"tenantId\":4,\"timeout\":0}");
processDefinition.setProcessDefinitionJson(processDefinitionJson);
Map<String, Object> checkResult = new HashMap<>();
checkResult.put(Constants.STATUS, Status.SUCCESS);
Mockito.when(projectMapper.queryByName(projectName)).thenReturn(project);
@ -1105,6 +1154,9 @@ public class ProcessDefinitionServiceTest {
Mockito.when(processDefineMapper.queryByDefineId(1)).thenReturn(processDefinition);
HttpServletResponse response = mock(HttpServletResponse.class);
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
Mockito.when(processService.genProcessData(processDefinition)).thenReturn(processData);
ServletOutputStream outputStream = mock(ServletOutputStream.class);
when(response.getOutputStream()).thenReturn(outputStream);
processDefinitionService.batchExportProcessDefinitionByIds(

129
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/VarPoolUtilsTest.java

@ -28,6 +28,8 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
public class VarPoolUtilsTest {
private static final Logger logger = LoggerFactory.getLogger(VarPoolUtilsTest.class);
@ -37,8 +39,8 @@ public class VarPoolUtilsTest {
String varPool = "p1,66$VarPool$p2,69$VarPool$";
ConcurrentHashMap<String, Object> propToValue = new ConcurrentHashMap<String, Object>();
VarPoolUtils.convertVarPoolToMap(propToValue, varPool);
Assert.assertEquals((String)propToValue.get("p1"), "66");
Assert.assertEquals((String)propToValue.get("p2"), "69");
Assert.assertEquals((String) propToValue.get("p1"), "66");
Assert.assertEquals((String) propToValue.get("p2"), "69");
logger.info(propToValue.toString());
}
@ -47,43 +49,112 @@ public class VarPoolUtilsTest {
String rawScript = "print(${p1});\n${setShareVar(${p1},3)};\n${setShareVar(${p2},4)};";
rawScript = VarPoolUtils.convertPythonScriptPlaceholders(rawScript);
Assert.assertEquals(rawScript, "print(${p1});\n"
+ "print(\"${{setValue({},{})}}\".format(\"p1\",3));\n"
+ "print(\"${{setValue({},{})}}\".format(\"p2\",4));");
+ "print(\"${{setValue({},{})}}\".format(\"p1\",3));\n"
+ "print(\"${{setValue({},{})}}\".format(\"p2\",4));");
logger.info(rawScript);
}
@Test
public void testSetTaskNodeLocalParams() throws Exception {
String taskJson = "{\"id\":\"tasks-66199\",\"name\":\"file-shell\",\"desc\":null,\"type\":\"SHELL\","
+ "\"runFlag\":\"NORMAL\",\"loc\":null,\"maxRetryTimes\":0,\"retryInterval\":1,\""
+ "params\":{\"rawScript\":\"sh n-1/n-1-1/run.sh\",\""
+ "localParams\":[{\"prop\":\"k1\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"v1\"},{\"prop\":\"k2\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"v2\"},"
+ "{\"prop\":\"k3\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"v3\"}],\""
+ "resourceList\":[{\"id\":\"dolphinschedule-code\",\"res\":\"n-1/n-1-1/dolphinscheduler-api-server.log\"},"
+ "{\"id\":\"mr-code\",\"res\":\"n-1/n-1-1/hadoop-mapreduce-examples-2.7.4.jar\"},"
+ "{\"id\":\"run\",\"res\":\"n-1/n-1-1/run.sh\"}]},\"preTasks\":[],\"extras\":null,\"depList\":[],\""
+ "dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"taskInstancePriority\":\"MEDIUM\",\""
+ "workerGroup\":\"default\",\"workerGroupId\":null,\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"delayTime\":0}";
String changeTaskJson = "{\"id\":\"tasks-66199\",\"name\":\"file-shell\",\"desc\":null,\"type\":\"SHELL\","
+ "\"runFlag\":\"NORMAL\",\"loc\":null,\"maxRetryTimes\":0,\"retryInterval\":1,\""
+ "params\":{\"rawScript\":\"sh n-1/n-1-1/run.sh\",\""
+ "localParams\":[{\"prop\":\"k1\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"k1-value-change\"},"
+ "{\"prop\":\"k2\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"k2-value-change\"},"
+ "{\"prop\":\"k3\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"v3\"}],\""
+ "resourceList\":[{\"id\":\"dolphinschedule-code\",\"res\":\"n-1/n-1-1/dolphinscheduler-api-server.log\"},"
+ "{\"id\":\"mr-code\",\"res\":\"n-1/n-1-1/hadoop-mapreduce-examples-2.7.4.jar\"},"
+ "{\"id\":\"run\",\"res\":\"n-1/n-1-1/run.sh\"}]},\"preTasks\":[],\"extras\":null,\"depList\":[],\""
+ "dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"taskInstancePriority\":\"MEDIUM\",\""
+ "workerGroup\":\"default\",\"workerGroupId\":null,\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"delayTime\":0}";
+ "\"runFlag\":\"NORMAL\",\"loc\":null,\"maxRetryTimes\":0,\"retryInterval\":1,\""
+ "params\":{\"rawScript\":\"sh n-1/n-1-1/run.sh\",\""
+ "localParams\":[{\"prop\":\"k1\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"v1\"},{\"prop\":\"k2\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"v2\"},"
+ "{\"prop\":\"k3\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"v3\"}],\""
+ "resourceList\":[{\"id\":\"dolphinschedule-code\",\"res\":\"n-1/n-1-1/dolphinscheduler-api-server.log\"},"
+ "{\"id\":\"mr-code\",\"res\":\"n-1/n-1-1/hadoop-mapreduce-examples-2.7.4.jar\"},"
+ "{\"id\":\"run\",\"res\":\"n-1/n-1-1/run.sh\"}]},\"preTasks\":[],\"extras\":null,\"depList\":[],\""
+ "dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"taskInstancePriority\":\"MEDIUM\",\""
+ "workerGroup\":\"default\",\"workerGroupId\":null,\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"delayTime\":0}";
String changeTaskJson = "{"
+ " \"id\":\"tasks-66199\","
+ " \"code\":null,"
+ " \"version\":0,"
+ " \"name\":\"file-shell\","
+ " \"desc\":null,"
+ " \"type\":\"SHELL\","
+ " \"runFlag\":\"NORMAL\","
+ " \"loc\":null,"
+ " \"maxRetryTimes\":0,"
+ " \"retryInterval\":1,"
+ " \"params\":{"
+ " \"rawScript\":\"sh n-1/n-1-1/run.sh\","
+ " \"localParams\":["
+ " {"
+ " \"prop\":\"k1\","
+ " \"direct\":\"IN\","
+ " \"type\":\"VARCHAR\","
+ " \"value\":\"k1-value-change\""
+ " },"
+ " {"
+ " \"prop\":\"k2\","
+ " \"direct\":\"IN\","
+ " \"type\":\"VARCHAR\","
+ " \"value\":\"k2-value-change\""
+ " },"
+ " {"
+ " \"prop\":\"k3\","
+ " \"direct\":\"IN\","
+ " \"type\":\"VARCHAR\","
+ " \"value\":\"v3\""
+ " }"
+ " ],"
+ " \"resourceList\":["
+ " {"
+ " \"id\":\"dolphinschedule-code\","
+ " \"res\":\"n-1/n-1-1/dolphinscheduler-api-server.log\""
+ " },"
+ " {"
+ " \"id\":\"mr-code\","
+ " \"res\":\"n-1/n-1-1/hadoop-mapreduce-examples-2.7.4.jar\""
+ " },"
+ " {"
+ " \"id\":\"run\","
+ " \"res\":\"n-1/n-1-1/run.sh\""
+ " }"
+ " ]"
+ " },"
+ " \"preTasks\":["
+ ""
+ " ],"
+ " \"preTaskNodeList\":null,"
+ " \"extras\":null,"
+ " \"depList\":["
+ ""
+ " ],"
+ " \"dependence\":{"
+ ""
+ " },"
+ " \"conditionResult\":{"
+ " \"successNode\":["
+ " \"\""
+ " ],"
+ " \"failedNode\":["
+ " \"\""
+ " ]"
+ " },"
+ " \"taskInstancePriority\":\"MEDIUM\","
+ " \"workerGroup\":\"default\","
+ " \"workerGroupId\":null,"
+ " \"timeout\":{"
+ " \"strategy\":\"\","
+ " \"interval\":null,"
+ " \"enable\":false"
+ " },"
+ " \"delayTime\":0"
+ "}";
ObjectNode jsonNodes = JSONUtils.parseObject(changeTaskJson);
Map<String, Object> propToValue = new HashMap<String, Object>();
propToValue.put("k1","k1-value-change");
propToValue.put("k2","k2-value-change");
propToValue.put("k1", "k1-value-change");
propToValue.put("k2", "k2-value-change");
TaskNode taskNode = JSONUtils.parseObject(taskJson,TaskNode.class);
TaskNode taskNode = JSONUtils.parseObject(taskJson, TaskNode.class);
VarPoolUtils.setTaskNodeLocalParams(taskNode,propToValue);
VarPoolUtils.setTaskNodeLocalParams(taskNode, propToValue);
Assert.assertEquals(changeTaskJson,JSONUtils.toJsonString(taskNode));
Assert.assertEquals(JSONUtils.toJsonString(jsonNodes), JSONUtils.toJsonString(taskNode).trim());
}

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

@ -388,7 +388,7 @@ public class MasterExecThread implements Runnable {
*/
private void buildFlowDag() throws Exception {
recoverNodeIdList = getStartTaskInstanceList(processInstance.getCommandParam());
List<TaskNode> taskNodeList = processService.getTaskNodeListByDefinitionId(processInstance.getProcessDefinitionId());
List<TaskNode> taskNodeList = processService.genTaskNodeList(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion());
forbiddenTaskList.clear();
taskNodeList.stream().forEach(taskNode -> {
if (taskNode.isForbidden()) {

11
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java

@ -17,11 +17,13 @@
package org.apache.dolphinscheduler.server.master;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.model.DateInterval;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.utils.dependent.DependentDateUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.DependentTaskExecThread;
@ -182,11 +184,12 @@ public class DependentTaskTest {
}
private List<TaskNode> getTaskNodes(){
List<TaskNode> list = new ArrayList<>();
TaskNode taskNode = new TaskNode();
private List<TaskDefinition> getTaskNodes(){
List<TaskDefinition> list = new ArrayList<>();
TaskDefinition taskNode = new TaskDefinition();
taskNode.setCode(1111L);
taskNode.setName("C");
taskNode.setType("SQL");
taskNode.setTaskType(TaskType.SQL);
list.add(taskNode);
return list;
}

37
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -344,25 +344,23 @@ public class ProcessService {
/**
* get task node list by definitionId
*/
public List<TaskNode> getTaskNodeListByDefinitionId(Integer defineId) {
public List<TaskDefinition> getTaskNodeListByDefinitionId(Integer defineId) {
ProcessDefinition processDefinition = processDefineMapper.selectById(defineId);
if (processDefinition == null) {
logger.error("process define not exists");
return new ArrayList<>();
}
List<ProcessTaskRelation> processTaskRelations = getProcessTaskRelationList(processDefinition.getCode(), processDefinition.getVersion());
Map<Long, TaskDefinition> taskDefinitionMap = new HashMap<>();
for (ProcessTaskRelation processTaskRelation : processTaskRelations) {
if (taskDefinitionMap.containsKey(processTaskRelation.getPostTaskCode())) {
TaskDefinition taskDefinition = taskDefinitionMapper.queryByDefinitionCode(processTaskRelation.getPostTaskCode());
TaskDefinition taskDefinition = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(
processTaskRelation.getPostTaskCode(), processTaskRelation.getPostNodeVersion());
taskDefinitionMap.put(processTaskRelation.getPostTaskCode(), taskDefinition);
}
}
return taskDefinitionMap.entrySet()
.stream()
.map(e -> JSONUtils.parseObject(JSONUtils.toJsonString(e.getValue()), TaskNode.class))
.collect(Collectors.toList());
return new ArrayList<>(taskDefinitionMap.values());
}
/**
@ -394,23 +392,12 @@ public class ProcessService {
public ProcessDefinition findProcessDefinition(Long processDefinitionCode, int version) {
ProcessDefinition processDefinition = processDefineMapper.queryByCode(processDefinitionCode);
if (processDefinition.getVersion() != version) {
ProcessDefinitionLog log = processDefineLogMapper.queryByDefinitionCodeAndVersion(processDefinitionCode, version);
processDefinition = convertFromLog(log);
processDefinition = processDefineLogMapper.queryByDefinitionCodeAndVersion(processDefinitionCode, version);
processDefinition.setId(0);
}
return processDefinition;
}
/**
* covert log to process definition
*/
public ProcessDefinition convertFromLog(ProcessDefinitionLog processDefinitionLog) {
ProcessDefinition definition = processDefinitionLog;
if (null != definition) {
definition.setId(0);
}
return definition;
}
/**
* delete work process instance by id
*
@ -500,11 +487,13 @@ public class ProcessService {
* @param ids ids
*/
public void recurseFindSubProcessId(int parentId, List<Integer> ids) {
List<TaskNode> taskNodeList = this.getTaskNodeListByDefinitionId(parentId);
List<TaskDefinition> taskNodeList = this.getTaskNodeListByDefinitionId(parentId);
if (taskNodeList != null && !taskNodeList.isEmpty()) {
for (TaskNode taskNode : taskNodeList) {
String parameter = taskNode.getParams();
for (TaskDefinition taskNode : taskNodeList) {
String parameter = taskNode.getTaskParams();
ObjectNode parameterJson = JSONUtils.parseObject(parameter);
if (parameterJson.get(CMD_PARAM_SUB_PROCESS_DEFINE_ID) != null) {
SubProcessParameters subProcessParam = JSONUtils.parseObject(parameter, SubProcessParameters.class);
@ -2404,7 +2393,7 @@ public class ProcessService {
* @return dag graph
*/
public DAG<String, TaskNode, TaskNodeRelation> genDagGraph(ProcessDefinition processDefinition) {
List<TaskNode> taskNodeList = this.getTaskNodeListByDefinitionId(processDefinition.getId());
List<TaskNode> taskNodeList = genTaskNodeList(processDefinition.getCode(), processDefinition.getVersion());
List<ProcessTaskRelation> processTaskRelations = getProcessTaskRelationList(processDefinition.getCode(), processDefinition.getVersion());
ProcessDag processDag = DagHelper.getProcessDag(taskNodeList, processTaskRelations);
// Generate concrete Dag to be executed

19
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

@ -34,12 +34,14 @@ import org.apache.dolphinscheduler.dao.entity.ProcessData;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.CommandMapper;
import org.apache.dolphinscheduler.dao.mapper.ErrorCommandMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtilsTest;
@ -72,15 +74,12 @@ public class ProcessServiceTest {
@InjectMocks
private ProcessService processService;
@Mock
private CommandMapper commandMapper;
@Mock
private ProcessTaskRelationLogMapper processTaskRelationLogMapper;
@Mock
private ErrorCommandMapper errorCommandMapper;
@Mock
private ProcessDefinitionMapper processDefineMapper;
@Mock
@ -238,6 +237,7 @@ public class ProcessServiceTest {
processDefinition.setId(123);
processDefinition.setName("test");
processDefinition.setVersion(1);
processDefinition.setCode(11L);
processDefinition.setProcessDefinitionJson("{\"globalParams\":[{\"prop\":\"startParam1\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"}],\"tasks\":[{\"conditionResult\":"
+ "{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\",\"dependence\":{}"
+ ",\"description\":\"\",\"id\":\"tasks-3011\",\"maxRetryTimes\":\"0\",\"name\":\"tsssss\""
@ -314,6 +314,7 @@ public class ProcessServiceTest {
@Test
public void testRecurseFindSubProcessId() {
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setCode(10L);
processDefinition.setProcessDefinitionJson("{\"globalParams\":[],\"tasks\":[{\"conditionResult\":"
+ "{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\""
+ ",\"dependence\":{},\"description\":\"\",\"id\":\"tasks-76544\""
@ -326,6 +327,7 @@ public class ProcessServiceTest {
int parentId = 111;
List<Integer> ids = new ArrayList<>();
ProcessDefinition processDefinition2 = new ProcessDefinition();
processDefinition2.setCode(11L);
processDefinition2.setProcessDefinitionJson("{\"globalParams\":[],\"tasks\":[{\"conditionResult\""
+ ":{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\",\"dependence\":{},"
+ "\"description\":\"\",\"id\":\"tasks-76544\",\"maxRetryTimes\":\"0\",\"name\":\"test\","
@ -334,7 +336,12 @@ public class ProcessServiceTest {
+ "\"MEDIUM\",\"timeout\":{\"enable\":false,\"interval\":null,\"strategy\":\"\"},\"type\":"
+ "\"SHELL\",\"waitStartTimeout\":{},\"workerGroup\":\"default\"}],\"tenantId\":4,\"timeout\":0}");
Mockito.when(processDefineMapper.selectById(parentId)).thenReturn(processDefinition);
Mockito.when(processDefineMapper.selectById(222)).thenReturn(processDefinition2);
List<ProcessTaskRelationLog> relationLogList = new ArrayList<>();
Mockito.when(processTaskRelationLogMapper.queryByProcessCodeAndVersion(Mockito.anyLong()
, Mockito.anyInt()))
.thenReturn(relationLogList);
processService.recurseFindSubProcessId(parentId, ids);
}

Loading…
Cancel
Save