Browse Source

[Fix-6194] [Server] fix master buildFlowDag error (#6195)

* fix bug of view-tree api

* fix bug of view-tree api

* fix ut

* fix ut

* fix master buildFlowDag error

Co-authored-by: JinyLeeChina <297062848@qq.com>
2.0.7-release
JinyLeeChina 3 years ago committed by GitHub
parent
commit
71e2c8808b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
  2. 90
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java

@ -495,7 +495,7 @@ public class WorkflowExecuteThread implements Runnable {
processInstance.getProcessDefinitionVersion());
recoverNodeIdList = getStartTaskInstanceList(processInstance.getCommandParam());
List<TaskNode> taskNodeList =
processService.genTaskNodeList(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion(), new HashMap<>());
processService.transformTask(processService.findRelationByCode(processDefinition.getProjectCode(), processDefinition.getCode()), Lists.newArrayList());
forbiddenTaskList.clear();
taskNodeList.forEach(taskNode -> {
if (taskNode.isForbidden()) {

90
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -2144,26 +2144,6 @@ public class ProcessService {
return result;
}
/**
* update task definition
*/
public int updateTaskDefinition(User operator, Long projectCode, TaskNode taskNode, TaskDefinition taskDefinition) {
Integer version = taskDefinitionLogMapper.queryMaxVersionForDefinition(taskDefinition.getCode());
Date now = new Date();
taskDefinition.setProjectCode(projectCode);
taskDefinition.setUserId(operator.getId());
taskDefinition.setVersion(version == null || version == 0 ? 1 : version + 1);
taskDefinition.setUpdateTime(now);
setTaskFromTaskNode(taskNode, taskDefinition);
int update = taskDefinitionMapper.updateById(taskDefinition);
// save task definition log
TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog(taskDefinition);
taskDefinitionLog.setOperator(operator.getId());
taskDefinitionLog.setOperateTime(now);
int insert = taskDefinitionLogMapper.insert(taskDefinitionLog);
return insert & update;
}
private void setTaskFromTaskNode(TaskNode taskNode, TaskDefinition taskDefinition) {
taskDefinition.setName(taskNode.getName());
taskDefinition.setDescription(taskNode.getDesc());
@ -2382,67 +2362,6 @@ public class ProcessService {
return taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionSet);
}
@Deprecated
public List<TaskNode> genTaskNodeList(Long processCode, int processVersion, Map<String, String> locationMap) {
List<ProcessTaskRelationLog> processTaskRelations = processTaskRelationLogMapper.queryByProcessCodeAndVersion(processCode, processVersion);
Set<TaskDefinition> taskDefinitionSet = new HashSet<>();
Map<Long, TaskNode> taskNodeMap = new HashMap<>();
for (ProcessTaskRelationLog processTaskRelation : processTaskRelations) {
if (processTaskRelation.getPreTaskCode() > 0) {
taskDefinitionSet.add(new TaskDefinition(processTaskRelation.getPreTaskCode(), processTaskRelation.getPreTaskVersion()));
}
if (processTaskRelation.getPostTaskCode() > 0) {
taskDefinitionSet.add(new TaskDefinition(processTaskRelation.getPostTaskCode(), processTaskRelation.getPostTaskVersion()));
}
taskNodeMap.compute(processTaskRelation.getPostTaskCode(), (k, v) -> {
if (v == null) {
v = new TaskNode();
v.setCode(processTaskRelation.getPostTaskCode());
v.setVersion(processTaskRelation.getPostTaskVersion());
List<PreviousTaskNode> preTaskNodeList = new ArrayList<>();
if (processTaskRelation.getPreTaskCode() > 0) {
preTaskNodeList.add(new PreviousTaskNode(processTaskRelation.getPreTaskCode(), "", processTaskRelation.getPreTaskVersion()));
}
v.setPreTaskNodeList(preTaskNodeList);
} else {
List<PreviousTaskNode> preTaskDefinitionList = v.getPreTaskNodeList();
preTaskDefinitionList.add(new PreviousTaskNode(processTaskRelation.getPreTaskCode(), "", processTaskRelation.getPreTaskVersion()));
}
return v;
});
}
List<TaskDefinitionLog> taskDefinitionLogs = taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionSet);
Map<Long, TaskDefinitionLog> taskDefinitionLogMap = taskDefinitionLogs.stream().collect(Collectors.toMap(TaskDefinitionLog::getCode, log -> log));
taskNodeMap.forEach((k, v) -> {
TaskDefinitionLog taskDefinitionLog = taskDefinitionLogMap.get(k);
v.setId(locationMap.get(taskDefinitionLog.getName()));
v.setCode(taskDefinitionLog.getCode());
v.setName(taskDefinitionLog.getName());
v.setDesc(taskDefinitionLog.getDescription());
v.setType(taskDefinitionLog.getTaskType().toUpperCase());
v.setRunFlag(taskDefinitionLog.getFlag() == Flag.YES ? Constants.FLOWNODE_RUN_FLAG_NORMAL : Constants.FLOWNODE_RUN_FLAG_FORBIDDEN);
v.setMaxRetryTimes(taskDefinitionLog.getFailRetryTimes());
v.setRetryInterval(taskDefinitionLog.getFailRetryInterval());
Map<String, Object> taskParamsMap = v.taskParamsToJsonObj(taskDefinitionLog.getTaskParams());
v.setConditionResult(JSONUtils.toJsonString(taskParamsMap.get(Constants.CONDITION_RESULT)));
v.setSwitchResult(JSONUtils.toJsonString(taskParamsMap.get(Constants.SWITCH_RESULT)));
v.setDependence(JSONUtils.toJsonString(taskParamsMap.get(Constants.DEPENDENCE)));
taskParamsMap.remove(Constants.CONDITION_RESULT);
taskParamsMap.remove(Constants.DEPENDENCE);
v.setParams(JSONUtils.toJsonString(taskParamsMap));
v.setTaskInstancePriority(taskDefinitionLog.getTaskPriority());
v.setWorkerGroup(taskDefinitionLog.getWorkerGroup());
v.setEnvironmentCode(taskDefinitionLog.getEnvironmentCode());
v.setTimeout(JSONUtils.toJsonString(new TaskTimeoutParameter(taskDefinitionLog.getTimeoutFlag() == TimeoutFlag.OPEN,
taskDefinitionLog.getTimeoutNotifyStrategy(),
taskDefinitionLog.getTimeout())));
v.setDelayTime(taskDefinitionLog.getDelayTime());
v.getPreTaskNodeList().forEach(task -> task.setName(taskDefinitionLogMap.get(task.getCode()).getName()));
v.setPreTasks(JSONUtils.toJsonString(v.getPreTaskNodeList().stream().map(PreviousTaskNode::getName).collect(Collectors.toList())));
});
return new ArrayList<>(taskNodeMap.values());
}
/**
* find task definition by code and version
*/
@ -2450,6 +2369,13 @@ public class ProcessService {
return taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode, taskDefinitionVersion);
}
/**
* find process task relation list by projectCode and processDefinitionCode
*/
public List<ProcessTaskRelation> findRelationByCode(long projectCode, long processDefinitionCode) {
return processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
}
/**
* add authorized resources
*
@ -2498,12 +2424,14 @@ public class ProcessService {
taskNode.setRetryInterval(taskDefinitionLog.getFailRetryInterval());
Map<String, Object> taskParamsMap = taskNode.taskParamsToJsonObj(taskDefinitionLog.getTaskParams());
taskNode.setConditionResult(JSONUtils.toJsonString(taskParamsMap.get(Constants.CONDITION_RESULT)));
taskNode.setSwitchResult(JSONUtils.toJsonString(taskParamsMap.get(Constants.SWITCH_RESULT)));
taskNode.setDependence(JSONUtils.toJsonString(taskParamsMap.get(Constants.DEPENDENCE)));
taskParamsMap.remove(Constants.CONDITION_RESULT);
taskParamsMap.remove(Constants.DEPENDENCE);
taskNode.setParams(JSONUtils.toJsonString(taskParamsMap));
taskNode.setTaskInstancePriority(taskDefinitionLog.getTaskPriority());
taskNode.setWorkerGroup(taskDefinitionLog.getWorkerGroup());
taskNode.setEnvironmentCode(taskDefinitionLog.getEnvironmentCode());
taskNode.setTimeout(JSONUtils.toJsonString(new TaskTimeoutParameter(taskDefinitionLog.getTimeoutFlag() == TimeoutFlag.OPEN,
taskDefinitionLog.getTimeoutNotifyStrategy(),
taskDefinitionLog.getTimeout())));

Loading…
Cancel
Save