Browse Source

[feature-4417][JsonSplit]refactor json in process instance and process definition (#4993)

* remove the use of json

* remove json in process definition and process instance
pull/3/MERGE
bao liang 4 years ago committed by GitHub
parent
commit
5e2b97b573
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 59
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  2. 6
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
  3. 3
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java
  4. 75
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
  5. 1
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  6. 17
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

59
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -621,17 +621,15 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
* get process definition list by ids
*/
private List<ProcessMeta> getProcessDefinitionList(String processDefinitionIds) {
List<ProcessMeta> processDefinitionList = new ArrayList<>();
String[] processDefinitionIdArray = processDefinitionIds.split(",");
List<ProcessMeta> processDefinitionList = new ArrayList<>();
for (String strProcessDefinitionId : processDefinitionIdArray) {
//get workflow info
int processDefinitionId = Integer.parseInt(strProcessDefinitionId);
ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineId(processDefinitionId);
String processDefinitionJson = JSONUtils.toJsonString(processService.genProcessData(processDefinition));
processDefinition.setProcessDefinitionJson(processDefinitionJson);
processDefinitionList.add(exportProcessMetaData(processDefinitionId, processDefinition));
processDefinitionList.add(exportProcessMetaData(processDefinition));
}
return processDefinitionList;
}
@ -671,39 +669,25 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* get export process metadata string
*
* @param processDefinitionId process definition id
* @param processDefinition process definition
* @return export process metadata string
*/
public String exportProcessMetaDataStr(Integer processDefinitionId, ProcessDefinition processDefinition) {
//create workflow json file
return JSONUtils.toJsonString(exportProcessMetaData(processDefinitionId, processDefinition));
}
/**
* get export process metadata string
*
* @param processDefinitionId process definition id
* @param processDefinition process definition
* @return export process metadata string
*/
public ProcessMeta exportProcessMetaData(Integer processDefinitionId, ProcessDefinition processDefinition) {
String processDefinitionJson = processDefinition.getProcessDefinitionJson();
public ProcessMeta exportProcessMetaData(ProcessDefinition processDefinition) {
ProcessData processData = processService.genProcessData(processDefinition);
//correct task param which has data source or dependent param
String correctProcessDefinitionJson = addExportTaskNodeSpecialParam(processDefinitionJson);
processDefinition.setProcessDefinitionJson(correctProcessDefinitionJson);
addExportTaskNodeSpecialParam(processData);
//export process metadata
ProcessMeta exportProcessMeta = new ProcessMeta();
exportProcessMeta.setProjectName(processDefinition.getProjectName());
exportProcessMeta.setProcessDefinitionName(processDefinition.getName());
exportProcessMeta.setProcessDefinitionJson(processDefinitionJson);
exportProcessMeta.setProcessDefinitionJson(JSONUtils.toJsonString(processService.genProcessData(processDefinition)));
exportProcessMeta.setProcessDefinitionDescription(processDefinition.getDescription());
exportProcessMeta.setProcessDefinitionLocations(processDefinition.getLocations());
exportProcessMeta.setProcessDefinitionConnects(processDefinition.getConnects());
//schedule info
List<Schedule> schedules = scheduleMapper.queryByProcessDefinitionId(processDefinitionId);
List<Schedule> schedules = scheduleMapper.queryByProcessDefinitionId(processDefinition.getId());
if (!schedules.isEmpty()) {
Schedule schedule = schedules.get(0);
exportProcessMeta.setScheduleWarningType(schedule.getWarningType().toString());
@ -723,26 +707,21 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* correct task param which has datasource or dependent
*
* @param processDefinitionJson processDefinitionJson
* @param processData process data
* @return correct processDefinitionJson
*/
private String addExportTaskNodeSpecialParam(String processDefinitionJson) {
ObjectNode jsonObject = JSONUtils.parseObject(processDefinitionJson);
ArrayNode jsonArray = (ArrayNode) jsonObject.path(TASKS);
for (int i = 0; i < jsonArray.size(); i++) {
JsonNode taskNode = jsonArray.path(i);
if (StringUtils.isNotEmpty(taskNode.path("type").asText())) {
String taskType = taskNode.path("type").asText();
ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType);
if (null != addTaskParam) {
addTaskParam.addExportSpecialParam(taskNode);
}
private void addExportTaskNodeSpecialParam(ProcessData processData) {
List<TaskNode> taskNodeList = processData.getTasks();
List<TaskNode> tmpNodeList = new ArrayList<>();
for (TaskNode taskNode : taskNodeList) {
ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskNode.getType());
JsonNode jsonNode = JSONUtils.toJsonNode(taskNode);
if (null != addTaskParam) {
addTaskParam.addExportSpecialParam(jsonNode);
}
tmpNodeList.add(JSONUtils.parseObject(jsonNode.toString(), TaskNode.class));
}
jsonObject.set(TASKS, jsonArray);
return jsonObject.toString();
processData.setTasks(tmpNodeList);
}
/**

6
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java

@ -453,7 +453,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
int update = processService.updateProcessInstance(processInstance);
int updateDefine = 1;
if (Boolean.TRUE.equals(syncDefine)) {
updateDefine = syncDefinition(loginUser, project, processInstanceJson, locations, connects,
updateDefine = syncDefinition(loginUser, project, locations, connects,
processInstance, processDefinition, processData);
}
if (update > 0 && updateDefine > 0) {
@ -467,12 +467,11 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
/**
* sync definition according process instance
*/
private int syncDefinition(User loginUser, Project project, String processInstanceJson, String locations, String connects,
private int syncDefinition(User loginUser, Project project, String locations, String connects,
ProcessInstance processInstance, ProcessDefinition processDefinition,
ProcessData processData) {
String originDefParams = JSONUtils.toJsonString(processData.getGlobalParams());
processDefinition.setProcessDefinitionJson(processInstanceJson);
processDefinition.setGlobalParams(originDefParams);
processDefinition.setLocations(locations);
processDefinition.setConnects(connects);
@ -513,7 +512,6 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
if (tenant != null) {
processInstance.setTenantCode(tenant.getTenantCode());
}
processInstance.setProcessInstanceJson(processInstanceJson);
processInstance.setGlobalParams(globalParams);
}

3
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java

@ -184,7 +184,6 @@ public class ProcessDefinitionControllerTest {
processDefinition.setId(id);
processDefinition.setLocations(locations);
processDefinition.setName(name);
processDefinition.setProcessDefinitionJson(json);
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS);
@ -267,7 +266,6 @@ public class ProcessDefinitionControllerTest {
processDefinition.setId(id);
processDefinition.setLocations(locations);
processDefinition.setName(name);
processDefinition.setProcessDefinitionJson(json);
String name2 = "dag_test";
int id2 = 2;
@ -279,7 +277,6 @@ public class ProcessDefinitionControllerTest {
processDefinition2.setId(id2);
processDefinition2.setLocations(locations);
processDefinition2.setName(name2);
processDefinition2.setProcessDefinitionJson(json);
resourceList.add(processDefinition);
resourceList.add(processDefinition2);

75
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java

@ -34,6 +34,7 @@ import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.shell.ShellParameters;
import org.apache.dolphinscheduler.common.utils.DateUtils;
@ -391,14 +392,7 @@ public class ProcessDefinitionServiceTest {
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result);
Mockito.when(processDefineMapper.queryByDefineName(project.getCode(), "test_def")).thenReturn(null);
String processDefinitionJson = "{\"globalParams\":[],\"tasks\":[{\"conditionResult\":"
+ "{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\",\"dependence\":{}"
+ ",\"description\":\"\",\"id\":\"tasks-3011\",\"maxRetryTimes\":\"0\",\"name\":\"tsssss\""
+ ",\"params\":{\"localParams\":[],\"rawScript\":\"echo \\\"123123\\\"\",\"resourceList\":[]}"
+ ",\"preTasks\":[],\"retryInterval\":\"1\",\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\""
+ ",\"timeout\":{\"enable\":false,\"interval\":null,\"strategy\":\"\"},\"type\":\"SHELL\""
+ ",\"waitStartTimeout\":{},\"workerGroup\":\"default\"}],\"tenantId\":4,\"timeout\":0}";
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
ProcessData processData = getProcessData();
Mockito.when(processService.genProcessData(Mockito.any())).thenReturn(processData);
Map<String, Object> instanceNotexitRes = processDefinitionService.queryProcessDefinitionByName(loginUser,
"project_test1", "test_def");
@ -416,12 +410,6 @@ public class ProcessDefinitionServiceTest {
String projectName = "project_test1";
Project project = getProject(projectName);
String processDefinitionJson = "{\"globalParams\":[],\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-36196\","
+ "\"name\":\"ssh_test1\",\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"aa=\\\"1234"
+ "\\\"\\necho ${aa}\"},\"desc\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\","
+ "\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},"
+ "\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":-1,\"timeout\":0}";
User loginUser = new User();
loginUser.setId(-1);
loginUser.setUserType(UserType.GENERAL_USER);
@ -463,7 +451,6 @@ public class ProcessDefinitionServiceTest {
// instance exit
ProcessDefinition definition = getProcessDefinition();
definition.setLocations("{\"tasks-36196\":{\"name\":\"ssh_test1\",\"targetarr\":\"\",\"x\":141,\"y\":70}}");
definition.setProcessDefinitionJson(processDefinitionJson);
definition.setConnects("[]");
Mockito.when(processDefineMapper.selectById(46)).thenReturn(definition);
@ -478,8 +465,7 @@ public class ProcessDefinitionServiceTest {
, Mockito.any(ProcessDefinition.class)))
.thenReturn(1);
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
Mockito.when(processService.genProcessData(Mockito.any())).thenReturn(processData);
Mockito.when(processService.genProcessData(Mockito.any())).thenReturn(getProcessData());
Map<String, Object> map3 = processDefinitionService.batchCopyProcessDefinition(
loginUser, projectName, "46", 1);
@ -518,11 +504,6 @@ public class ProcessDefinitionServiceTest {
ProcessDefinition definition = getProcessDefinition();
definition.setLocations("{\"tasks-36196\":{\"name\":\"ssh_test1\",\"targetarr\":\"\",\"x\":141,\"y\":70}}");
definition.setProcessDefinitionJson("{\"globalParams\":[],\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-36196\""
+ ",\"name\":\"ssh_test1\",\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"aa=\\\"1234"
+ "\\\"\\necho ${aa}\"},\"desc\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\","
+ "\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},"
+ "\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":-1,\"timeout\":0}");
definition.setConnects("[]");
// check target project result == null
@ -656,16 +637,6 @@ public class ProcessDefinitionServiceTest {
loginUser, "project_test1", 46, ReleaseState.getEnum(2));
Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR, failRes.get(Constants.STATUS));
//FIXME has function exit code 1 when exception
//process definition offline
// List<Schedule> schedules = new ArrayList<>();
// Schedule schedule = getSchedule();
// schedules.add(schedule);
// Mockito.when(scheduleMapper.selectAllByProcessDefineArray(new int[]{46})).thenReturn(schedules);
// Mockito.when(scheduleMapper.updateById(schedule)).thenReturn(1);
// Map<String, Object> offlineRes = processDefinitionService.releaseProcessDefinition(loginUser, "project_test1",
// 46, ReleaseState.OFFLINE.getCode());
// Assert.assertEquals(Status.SUCCESS, offlineRes.get(Constants.STATUS));
}
@Test
@ -746,7 +717,6 @@ public class ProcessDefinitionServiceTest {
//success
Mockito.when(processService.genProcessData(Mockito.any())).thenReturn(new ProcessData());
processDefinition.setProcessDefinitionJson(SHELL_JSON);
Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition);
Map<String, Object> dataNotValidRes = processDefinitionService.getTaskNodeListByDefinitionId(46);
Assert.assertEquals(Status.SUCCESS, dataNotValidRes.get(Constants.STATUS));
@ -763,29 +733,37 @@ public class ProcessDefinitionServiceTest {
//process definition exist
ProcessDefinition processDefinition = getProcessDefinition();
processDefinition.setProcessDefinitionJson(SHELL_JSON);
List<ProcessDefinition> processDefinitionList = new ArrayList<>();
processDefinitionList.add(processDefinition);
Mockito.when(processDefineMapper.queryDefinitionListByIdList(idArray)).thenReturn(processDefinitionList);
String processDefinitionJson = "{\"globalParams\":[],\"tasks\":[{\"conditionResult\":"
+ "{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\",\"dependence\":{}"
+ ",\"description\":\"\",\"id\":\"tasks-3011\",\"maxRetryTimes\":\"0\",\"name\":\"tsssss\""
+ ",\"params\":{\"localParams\":[],\"rawScript\":\"echo \\\"123123\\\"\",\"resourceList\":[]}"
+ ",\"preTasks\":[],\"retryInterval\":\"1\",\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\""
+ ",\"timeout\":{\"enable\":false,\"interval\":null,\"strategy\":\"\"},\"type\":\"SHELL\""
+ ",\"waitStartTimeout\":{},\"workerGroup\":\"default\"}],\"tenantId\":4,\"timeout\":0}";
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
ProcessData processData = getProcessData();
Mockito.when(processService.genProcessData(processDefinition)).thenReturn(processData);
Map<String, Object> successRes = processDefinitionService.getTaskNodeListByDefinitionIdList(defineIdList);
Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
}
private ProcessData getProcessData() {
ProcessData processData = new ProcessData();
List<TaskNode> taskNodeList = new ArrayList<>();
processData.setTasks(taskNodeList);
List<Property> properties = new ArrayList<>();
processData.setGlobalParams(properties);
processData.setTenantId(10);
processData.setTimeout(100);
return processData;
}
@Test
public void testQueryProcessDefinitionAllByProjectId() {
int projectId = 1;
Long projectCode = 2L;
Project project = new Project();
project.setId(projectId);
project.setCode(projectCode);
Mockito.when(projectMapper.selectById(projectId)).thenReturn(project);
ProcessDefinition processDefinition = getProcessDefinition();
processDefinition.setProcessDefinitionJson(SHELL_JSON);
List<ProcessDefinition> processDefinitionList = new ArrayList<>();
processDefinitionList.add(processDefinition);
Project test = getProject("test");
@ -1188,6 +1166,7 @@ public class ProcessDefinitionServiceTest {
processDefinition.setProjectId(2);
processDefinition.setTenantId(1);
processDefinition.setDescription("");
processDefinition.setCode(9999L);
return processDefinition;
}
@ -1284,15 +1263,7 @@ public class ProcessDefinitionServiceTest {
Integer processDefinitionId = 111;
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setId(processDefinitionId);
processDefinition.setProcessDefinitionJson("{\"globalParams\":[],\"tasks\":[{\"conditionResult\":"
+ "{\"failedNode\":[\"\"],\"successNode\":"
+ "[\"\"]},\"delayTime\":\"0\",\"dependence\":{},"
+ "\"description\":\"\",\"id\":\"tasks-3011\",\"maxRetryTimes\":\"0\",\"name\":\"tsssss\","
+ "\"params\":{\"localParams\":[],\"rawScript\":\"echo \\\"123123\\\"\",\"resourceList\":[]},"
+ "\"preTasks\":[],\"retryInterval\":\"1\",\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\","
+ "\"timeout\":{\"enable\":false,\"interval\":null,\"strategy\":\"\"},\"type\":\"SHELL\","
+ "\"waitStartTimeout\":{},\"workerGroup\":\"default\"}],\"tenantId\":4,\"timeout\":0}");
Assert.assertNotNull(processDefinitionService.exportProcessMetaData(processDefinitionId, processDefinition));
Assert.assertNotNull(processDefinitionService.exportProcessMetaData(processDefinition));
}
@Test

1
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -1582,7 +1582,6 @@ public class ProcessService {
String locations, String connects) {
ProcessInstance processInstance = processInstanceMapper.queryDetailById(processInstanceId);
if (processInstance != null) {
processInstance.setProcessInstanceJson(processJson);
processInstance.setGlobalParams(globalParams);
processInstance.setScheduleTime(scheduleTime);
processInstance.setLocations(locations);

17
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

@ -315,28 +315,11 @@ public class ProcessServiceTest {
public void testRecurseFindSubProcessId() {
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setCode(10L);
processDefinition.setProcessDefinitionJson("{\"globalParams\":[],\"tasks\":[{\"conditionResult\":"
+ "{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\""
+ ",\"dependence\":{},\"description\":\"\",\"id\":\"tasks-76544\""
+ ",\"maxRetryTimes\":\"0\",\"name\":\"test\",\"params\":{\"localParams\":[],"
+ "\"rawScript\":\"echo \\\"123123\\\"\",\"resourceList\":[],\"processDefinitionId\""
+ ":\"222\"},\"preTasks\":[],\"retryInterval\":\"1\",\"runFlag\":\"NORMAL\","
+ "\"taskInstancePriority\":\"MEDIUM\",\"timeout\":{\"enable\":false,\"interval\":"
+ "null,\"strategy\":\"\"},\"type\":\"SHELL\",\"waitStartTimeout\":{},\"workerGroup\":\"default\"}],"
+ "\"tenantId\":4,\"timeout\":0}");
int parentId = 111;
List<Integer> ids = new ArrayList<>();
ProcessDefinition processDefinition2 = new ProcessDefinition();
processDefinition2.setCode(11L);
processDefinition2.setProcessDefinitionJson("{\"globalParams\":[],\"tasks\":[{\"conditionResult\""
+ ":{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\",\"dependence\":{},"
+ "\"description\":\"\",\"id\":\"tasks-76544\",\"maxRetryTimes\":\"0\",\"name\":\"test\","
+ "\"params\":{\"localParams\":[],\"rawScript\":\"echo \\\"123123\\\"\",\"resourceList\":[]},"
+ "\"preTasks\":[],\"retryInterval\":\"1\",\"runFlag\":\"NORMAL\",\"taskInstancePriority\":"
+ "\"MEDIUM\",\"timeout\":{\"enable\":false,\"interval\":null,\"strategy\":\"\"},\"type\":"
+ "\"SHELL\",\"waitStartTimeout\":{},\"workerGroup\":\"default\"}],\"tenantId\":4,\"timeout\":0}");
Mockito.when(processDefineMapper.selectById(parentId)).thenReturn(processDefinition);
List<ProcessTaskRelationLog> relationLogList = new ArrayList<>();
Mockito.when(processTaskRelationLogMapper.queryByProcessCodeAndVersion(Mockito.anyLong()
, Mockito.anyInt()))

Loading…
Cancel
Save