Browse Source

[Feature][JsonSplit] fix taskId in json (#5165)

* modify checkDAGRing and ProcessService method

* merge

* modify dagRing

* modify process instance for project home page

* fix save process bug

* codeStyle

* Fix logical bug in saving process definition

* codeSytle

* Fix bug in interface of  queryProcessDefinitionList

* codeSytle

* Fix api bug"

* fix taskId in processDefinitionJson

* fix json taskId

* codeStyle

Co-authored-by: JinyLeeChina <297062848@qq.com>
pull/3/MERGE
JinyLeeChina 4 years ago committed by GitHub
parent
commit
a467fffd7b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 9
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
  2. 2
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java
  3. 17
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
  4. 47
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  5. 15
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
  6. 2
      dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js

9
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java

@ -292,7 +292,8 @@ public class ProcessDefinitionController extends BaseController {
@RequestParam(value = "pageNo") int pageNo, @RequestParam(value = "pageNo") int pageNo,
@RequestParam(value = "pageSize") int pageSize, @RequestParam(value = "pageSize") int pageSize,
@RequestParam(value = "processDefinitionId") int processDefinitionId) { @RequestParam(value = "processDefinitionId") int processDefinitionId) {
logger.info("login user {}, query process versions, project name: {}, pageNo: {}, pageSize: {}, processDefinitionId: {}",
loginUser.getUserName(), projectName, pageNo, pageSize, processDefinitionId);
Map<String, Object> result = processDefinitionService.queryProcessDefinitionVersions(loginUser Map<String, Object> result = processDefinitionService.queryProcessDefinitionVersions(loginUser
, projectName, pageNo, pageSize, processDefinitionId); , projectName, pageNo, pageSize, processDefinitionId);
@ -320,7 +321,8 @@ public class ProcessDefinitionController extends BaseController {
@ApiParam(name = "projectName", value = "PROJECT_NAME", required = true) @PathVariable String projectName, @ApiParam(name = "projectName", value = "PROJECT_NAME", required = true) @PathVariable String projectName,
@RequestParam(value = "processDefinitionId") int processDefinitionId, @RequestParam(value = "processDefinitionId") int processDefinitionId,
@RequestParam(value = "version") long version) { @RequestParam(value = "version") long version) {
logger.info("login user {}, switch process version, project name: {}, processDefinitionId: {}, version: {}",
loginUser.getUserName(), projectName, processDefinitionId, version);
Map<String, Object> result = processDefinitionService.switchProcessDefinitionVersion(loginUser, projectName Map<String, Object> result = processDefinitionService.switchProcessDefinitionVersion(loginUser, projectName
, processDefinitionId, version); , processDefinitionId, version);
return returnDataList(result); return returnDataList(result);
@ -347,7 +349,8 @@ public class ProcessDefinitionController extends BaseController {
@ApiParam(name = "projectName", value = "PROJECT_NAME", required = true) @PathVariable String projectName, @ApiParam(name = "projectName", value = "PROJECT_NAME", required = true) @PathVariable String projectName,
@RequestParam(value = "processDefinitionId") int processDefinitionId, @RequestParam(value = "processDefinitionId") int processDefinitionId,
@RequestParam(value = "version") long version) { @RequestParam(value = "version") long version) {
logger.info("login user {}, delete process definition, project name: {}, processDefinitionId: {}, version: {}",
loginUser.getUserName(), projectName, processDefinitionId, version);
Map<String, Object> result = processDefinitionService.deleteByProcessDefinitionIdAndVersion(loginUser, projectName, processDefinitionId, version); Map<String, Object> result = processDefinitionService.deleteByProcessDefinitionIdAndVersion(loginUser, projectName, processDefinitionId, version);
return returnDataList(result); return returnDataList(result);
} }

2
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java

@ -206,7 +206,7 @@ public class JSONUtils {
return null; return null;
} }
return node.toString(); return node.asText();
} }
/** /**

17
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

@ -78,7 +78,6 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -185,8 +184,8 @@ public class MasterExecThread implements Runnable {
/** /**
* constructor of MasterExecThread * constructor of MasterExecThread
* *
* @param processInstance processInstance * @param processInstance processInstance
* @param processService processService * @param processService processService
* @param nettyRemotingClient nettyRemotingClient * @param nettyRemotingClient nettyRemotingClient
*/ */
public MasterExecThread(ProcessInstance processInstance public MasterExecThread(ProcessInstance processInstance
@ -388,7 +387,7 @@ public class MasterExecThread implements Runnable {
*/ */
private void buildFlowDag() throws Exception { private void buildFlowDag() throws Exception {
recoverNodeIdList = getStartTaskInstanceList(processInstance.getCommandParam()); recoverNodeIdList = getStartTaskInstanceList(processInstance.getCommandParam());
List<TaskNode> taskNodeList = processService.genTaskNodeList(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion()); List<TaskNode> taskNodeList = processService.genTaskNodeList(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion(), new HashMap<>());
forbiddenTaskList.clear(); forbiddenTaskList.clear();
taskNodeList.stream().forEach(taskNode -> { taskNodeList.stream().forEach(taskNode -> {
if (taskNode.isForbidden()) { if (taskNode.isForbidden()) {
@ -496,7 +495,7 @@ public class MasterExecThread implements Runnable {
* encapsulation task * encapsulation task
* *
* @param processInstance process instance * @param processInstance process instance
* @param nodeName node name * @param nodeName node name
* @return TaskInstance * @return TaskInstance
*/ */
private TaskInstance createTaskInstance(ProcessInstance processInstance, String nodeName, private TaskInstance createTaskInstance(ProcessInstance processInstance, String nodeName,
@ -1274,10 +1273,10 @@ public class MasterExecThread implements Runnable {
/** /**
* generate flow dag * generate flow dag
* *
* @param totalTaskNodeList total task node list * @param totalTaskNodeList total task node list
* @param startNodeNameList start node name list * @param startNodeNameList start node name list
* @param recoveryNodeNameList recovery node name list * @param recoveryNodeNameList recovery node name list
* @param depNodeType depend node type * @param depNodeType depend node type
* @return ProcessDag process dag * @return ProcessDag process dag
* @throws Exception exception * @throws Exception exception
*/ */

47
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -120,6 +120,7 @@ import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects; import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -1121,7 +1122,7 @@ public class ProcessService {
/** /**
* complement data needs transform parent parameter to child. * complement data needs transform parent parameter to child.
*/ */
private String getSubWorkFlowParam(ProcessInstanceMap instanceMap, ProcessInstance parentProcessInstance,Map<String,String> fatherParams) { private String getSubWorkFlowParam(ProcessInstanceMap instanceMap, ProcessInstance parentProcessInstance, Map<String, String> fatherParams) {
// set sub work process command // set sub work process command
String processMapStr = JSONUtils.toJsonString(instanceMap); String processMapStr = JSONUtils.toJsonString(instanceMap);
Map<String, String> cmdParam = JSONUtils.toMap(processMapStr); Map<String, String> cmdParam = JSONUtils.toMap(processMapStr);
@ -1165,13 +1166,13 @@ public class ProcessService {
Object localParams = subProcessParam.get(Constants.LOCAL_PARAMS); Object localParams = subProcessParam.get(Constants.LOCAL_PARAMS);
List<Property> allParam = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class); List<Property> allParam = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class);
Map<String, String> globalMap = this.getGlobalParamMap(parentProcessInstance.getGlobalParams()); Map<String, String> globalMap = this.getGlobalParamMap(parentProcessInstance.getGlobalParams());
Map<String,String> fatherParams = new HashMap<>(); Map<String, String> fatherParams = new HashMap<>();
if (CollectionUtils.isNotEmpty(allParam)) { if (CollectionUtils.isNotEmpty(allParam)) {
for (Property info : allParam) { for (Property info : allParam) {
fatherParams.put(info.getProp(), globalMap.get(info.getProp())); fatherParams.put(info.getProp(), globalMap.get(info.getProp()));
} }
} }
String processParam = getSubWorkFlowParam(instanceMap, parentProcessInstance,fatherParams); String processParam = getSubWorkFlowParam(instanceMap, parentProcessInstance, fatherParams);
return new Command( return new Command(
commandType, commandType,
@ -2239,8 +2240,7 @@ public class ProcessService {
} }
private void setTaskFromTaskNode(TaskNode taskNode, TaskDefinition taskDefinition) { private void setTaskFromTaskNode(TaskNode taskNode, TaskDefinition taskDefinition) {
// TODO for the front-end UI, name with id taskDefinition.setName(taskNode.getName());
taskDefinition.setName(taskNode.getId() + "|" + taskNode.getName());
taskDefinition.setDescription(taskNode.getDesc()); taskDefinition.setDescription(taskNode.getDesc());
taskDefinition.setTaskType(TaskType.of(taskNode.getType())); taskDefinition.setTaskType(TaskType.of(taskNode.getType()));
taskDefinition.setTaskParams(TaskType.of(taskNode.getType()) == TaskType.DEPENDENT ? taskNode.getDependence() : taskNode.getParams()); taskDefinition.setTaskParams(TaskType.of(taskNode.getType()) == TaskType.DEPENDENT ? taskNode.getDependence() : taskNode.getParams());
@ -2340,7 +2340,7 @@ public class ProcessService {
List<TaskNode> taskNodeList = (processData.getTasks() == null) ? new ArrayList<>() : processData.getTasks(); List<TaskNode> taskNodeList = (processData.getTasks() == null) ? new ArrayList<>() : processData.getTasks();
Map<String, TaskDefinition> taskNameAndCode = new HashMap<>(); Map<String, TaskDefinition> taskNameAndCode = new HashMap<>();
for (TaskNode taskNode : taskNodeList) { for (TaskNode taskNode : taskNodeList) {
TaskDefinition taskDefinition = taskDefinitionMapper.queryByDefinitionName(projectCode, taskNode.getName()); TaskDefinition taskDefinition = taskDefinitionMapper.queryByDefinitionCode(taskNode.getCode());
if (taskDefinition == null) { if (taskDefinition == null) {
try { try {
long code = SnowFlakeUtils.getInstance().nextId(); long code = SnowFlakeUtils.getInstance().nextId();
@ -2448,7 +2448,8 @@ public class ProcessService {
* @return dag graph * @return dag graph
*/ */
public DAG<String, TaskNode, TaskNodeRelation> genDagGraph(ProcessDefinition processDefinition) { public DAG<String, TaskNode, TaskNodeRelation> genDagGraph(ProcessDefinition processDefinition) {
List<TaskNode> taskNodeList = genTaskNodeList(processDefinition.getCode(), processDefinition.getVersion()); Map<String, String> locationMap = locationToMap(processDefinition.getLocations());
List<TaskNode> taskNodeList = genTaskNodeList(processDefinition.getCode(), processDefinition.getVersion(), locationMap);
List<ProcessTaskRelationLog> processTaskRelations = processTaskRelationLogMapper.queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion()); List<ProcessTaskRelationLog> processTaskRelations = processTaskRelationLogMapper.queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion());
ProcessDag processDag = DagHelper.getProcessDag(taskNodeList, new ArrayList<>(processTaskRelations)); ProcessDag processDag = DagHelper.getProcessDag(taskNodeList, new ArrayList<>(processTaskRelations));
// Generate concrete Dag to be executed // Generate concrete Dag to be executed
@ -2459,7 +2460,8 @@ public class ProcessService {
* generate ProcessData * generate ProcessData
*/ */
public ProcessData genProcessData(ProcessDefinition processDefinition) { public ProcessData genProcessData(ProcessDefinition processDefinition) {
List<TaskNode> taskNodes = genTaskNodeList(processDefinition.getCode(), processDefinition.getVersion()); Map<String, String> locationMap = locationToMap(processDefinition.getLocations());
List<TaskNode> taskNodes = genTaskNodeList(processDefinition.getCode(), processDefinition.getVersion(), locationMap);
ProcessData processData = new ProcessData(); ProcessData processData = new ProcessData();
processData.setTasks(taskNodes); processData.setTasks(taskNodes);
processData.setGlobalParams(JSONUtils.toList(processDefinition.getGlobalParams(), Property.class)); processData.setGlobalParams(JSONUtils.toList(processDefinition.getGlobalParams(), Property.class));
@ -2468,7 +2470,7 @@ public class ProcessService {
return processData; return processData;
} }
public List<TaskNode> genTaskNodeList(Long processCode, int processVersion) { public List<TaskNode> genTaskNodeList(Long processCode, int processVersion, Map<String, String> locationMap) {
List<ProcessTaskRelationLog> processTaskRelations = processTaskRelationLogMapper.queryByProcessCodeAndVersion(processCode, processVersion); List<ProcessTaskRelationLog> processTaskRelations = processTaskRelationLogMapper.queryByProcessCodeAndVersion(processCode, processVersion);
Set<TaskDefinition> taskDefinitionSet = new HashSet<>(); Set<TaskDefinition> taskDefinitionSet = new HashSet<>();
Map<Long, TaskNode> taskNodeMap = new HashMap<>(); Map<Long, TaskNode> taskNodeMap = new HashMap<>();
@ -2501,10 +2503,9 @@ public class ProcessService {
Map<Long, TaskDefinitionLog> taskDefinitionLogMap = taskDefinitionLogs.stream().collect(Collectors.toMap(TaskDefinitionLog::getCode, log -> log)); Map<Long, TaskDefinitionLog> taskDefinitionLogMap = taskDefinitionLogs.stream().collect(Collectors.toMap(TaskDefinitionLog::getCode, log -> log));
taskNodeMap.forEach((k, v) -> { taskNodeMap.forEach((k, v) -> {
TaskDefinitionLog taskDefinitionLog = taskDefinitionLogMap.get(k); TaskDefinitionLog taskDefinitionLog = taskDefinitionLogMap.get(k);
// TODO split from name v.setId(locationMap.get(taskDefinitionLog.getName()));
v.setId(StringUtils.substringBefore(taskDefinitionLog.getName(), "|"));
v.setCode(taskDefinitionLog.getCode()); v.setCode(taskDefinitionLog.getCode());
v.setName(StringUtils.substringAfter(taskDefinitionLog.getName(), "|")); v.setName(taskDefinitionLog.getName());
v.setDesc(taskDefinitionLog.getDescription()); v.setDesc(taskDefinitionLog.getDescription());
v.setType(taskDefinitionLog.getTaskType().getDescp().toUpperCase()); v.setType(taskDefinitionLog.getTaskType().getDescp().toUpperCase());
v.setRunFlag(taskDefinitionLog.getFlag() == Flag.YES ? Constants.FLOWNODE_RUN_FLAG_NORMAL : Constants.FLOWNODE_RUN_FLAG_FORBIDDEN); v.setRunFlag(taskDefinitionLog.getFlag() == Flag.YES ? Constants.FLOWNODE_RUN_FLAG_NORMAL : Constants.FLOWNODE_RUN_FLAG_FORBIDDEN);
@ -2517,15 +2518,35 @@ public class ProcessService {
v.setTimeout(JSONUtils.toJsonString(new TaskTimeoutParameter(taskDefinitionLog.getTimeoutFlag() == TimeoutFlag.OPEN, v.setTimeout(JSONUtils.toJsonString(new TaskTimeoutParameter(taskDefinitionLog.getTimeoutFlag() == TimeoutFlag.OPEN,
taskDefinitionLog.getTimeoutNotifyStrategy(), taskDefinitionLog.getTimeoutNotifyStrategy(),
taskDefinitionLog.getTimeout()))); taskDefinitionLog.getTimeout())));
// TODO name will be remove
v.getPreTaskNodeList().forEach(task -> task.setName(taskDefinitionLogMap.get(task.getCode()).getName())); v.getPreTaskNodeList().forEach(task -> task.setName(taskDefinitionLogMap.get(task.getCode()).getName()));
v.setPreTasks(JSONUtils.toJsonString(v.getPreTaskNodeList().stream().map(PreviousTaskNode::getName).collect(Collectors.toList()))); v.setPreTasks(JSONUtils.toJsonString(v.getPreTaskNodeList().stream().map(PreviousTaskNode::getName).collect(Collectors.toList())));
}); });
return new ArrayList<>(taskNodeMap.values()); return new ArrayList<>(taskNodeMap.values());
} }
/**
* parse locations
*
* @param locations processDefinition locations
* @return key:taskName,value:taskId
*/
public Map<String, String> locationToMap(String locations) {
Map<String, String> frontTaskIdAndNameMap = new HashMap<>();
if (StringUtils.isBlank(locations)) {
return frontTaskIdAndNameMap;
}
ObjectNode jsonNodes = JSONUtils.parseObject(locations);
Iterator<Entry<String, JsonNode>> fields = jsonNodes.fields();
while (fields.hasNext()) {
Entry<String, JsonNode> jsonNodeEntry = fields.next();
frontTaskIdAndNameMap.put(JSONUtils.findValue(jsonNodeEntry.getValue(), "name"), jsonNodeEntry.getKey());
}
return frontTaskIdAndNameMap;
}
/** /**
* add authorized resources * add authorized resources
*
* @param ownResources own resources * @param ownResources own resources
* @param userId userId * @param userId userId
*/ */

15
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

@ -503,4 +503,19 @@ public class ProcessServiceTest {
Assert.assertEquals(processDefinitionJson, json); Assert.assertEquals(processDefinitionJson, json);
} }
@Test
public void locationToMap() {
String locations = "{\"tasks-64888\":{\"name\":\"test_a\",\"targetarr\":\"\",\"nodenumber\":\"1\",\"x\":134,\"y\":183},"
+ "\"tasks-24501\":{\"name\":\"test_b\",\"targetarr\":\"tasks-64888\",\"nodenumber\":\"0\",\"x\":392,\"y\":184},"
+ "\"tasks-81137\":{\"name\":\"test_c\",\"targetarr\":\"\",\"nodenumber\":\"1\",\"x\":122,\"y\":327},"
+ "\"tasks-41367\":{\"name\":\"test_d\",\"targetarr\":\"tasks-81137\",\"nodenumber\":\"0\",\"x\":409,\"y\":324}}";
Map<String, String> frontTaskIdAndNameMap = new HashMap<>();
frontTaskIdAndNameMap.put("test_a", "tasks-64888");
frontTaskIdAndNameMap.put("test_b", "tasks-24501");
frontTaskIdAndNameMap.put("test_c", "tasks-81137");
frontTaskIdAndNameMap.put("test_d", "tasks-41367");
Map<String, String> locationToMap = processService.locationToMap(locations);
Assert.assertEquals(frontTaskIdAndNameMap, locationToMap);
}
} }

2
dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js

@ -46,7 +46,7 @@ export default {
Minute: '分', Minute: '分',
'Delay execution time': '延时执行时间', 'Delay execution time': '延时执行时间',
'Delay execution': '延时执行', 'Delay execution': '延时执行',
'Forced success': '强制成功', 'Forced success': '强制成功',
Cancel: '取消', Cancel: '取消',
'Confirm add': '确认添加', 'Confirm add': '确认添加',
'The newly created sub-Process has not yet been executed and cannot enter the sub-Process': '新创建子工作流还未执行不能进入子工作流', 'The newly created sub-Process has not yet been executed and cannot enter the sub-Process': '新创建子工作流还未执行不能进入子工作流',

Loading…
Cancel
Save