Browse Source

[Fix-6139][API] fix bug of view-tree api (#6188)

* fix bug of view-tree api

* fix bug of view-tree api

* fix ut

* fix ut

Co-authored-by: JinyLeeChina <297062848@qq.com>
2.0.7-release
JinyLeeChina 3 years ago committed by GitHub
parent
commit
14e4d7086d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
  2. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkFlowLineageController.java
  3. 39
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/treeview/Instance.java
  4. 34
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/treeview/TreeViewDto.java
  5. 49
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  6. 6
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java
  7. 14
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DagData.java
  8. 36
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  9. 28
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java

@ -533,7 +533,7 @@ public class ProcessDefinitionController extends BaseController {
*/ */
@ApiOperation(value = "getTaskListByDefinitionCodes", notes = "GET_TASK_LIST_BY_DEFINITION_CODE_NOTES") @ApiOperation(value = "getTaskListByDefinitionCodes", notes = "GET_TASK_LIST_BY_DEFINITION_CODE_NOTES")
@ApiImplicitParams({ @ApiImplicitParams({
@ApiImplicitParam(name = "processDefinitionCodes", value = "PROCESS_DEFINITION_CODES", required = true, type = "String", example = "100,200,300") @ApiImplicitParam(name = "codes", value = "PROCESS_DEFINITION_CODES", required = true, type = "String", example = "100,200,300")
}) })
@GetMapping(value = "/batch-query-tasks") @GetMapping(value = "/batch-query-tasks")
@ResponseStatus(HttpStatus.OK) @ResponseStatus(HttpStatus.OK)

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkFlowLineageController.java

@ -96,7 +96,7 @@ public class WorkFlowLineageController extends BaseController {
@GetMapping(value = "/list") @GetMapping(value = "/list")
@ResponseStatus(HttpStatus.OK) @ResponseStatus(HttpStatus.OK)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser") @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result<Map<String, Object>> queryWorkFlowLineageByIds(@ApiIgnore @RequestAttribute(value = SESSION_USER) User loginUser, public Result<Map<String, Object>> queryWorkFlowLineage(@ApiIgnore @RequestAttribute(value = SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode) { @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode) {
try { try {
Map<String, Object> result = workFlowLineageService.queryWorkFlowLineage(projectCode); Map<String, Object> result = workFlowLineageService.queryWorkFlowLineage(projectCode);

39
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/treeview/Instance.java

@ -14,23 +14,30 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.api.dto.treeview;
import com.fasterxml.jackson.annotation.JsonFormat; package org.apache.dolphinscheduler.api.dto.treeview;
import java.util.Date; import java.util.Date;
import com.fasterxml.jackson.annotation.JsonFormat;
/** /**
* Instance * Instance
*/ */
public class Instance { public class Instance {
private int id; private int id;
/** /**
* node name * node name
*/ */
private String name; private String name;
/**
* node code
*/
private long code;
/** /**
* node type * node type
*/ */
@ -44,17 +51,16 @@ public class Instance {
/** /**
* node start time * node start time
*/ */
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8") @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date startTime; private Date startTime;
/** /**
* node end time * node end time
*/ */
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8") @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date endTime; private Date endTime;
/** /**
* node running on which host * node running on which host
*/ */
@ -67,18 +73,20 @@ public class Instance {
private int subflowId; private int subflowId;
public Instance() {
}
public Instance(){} public Instance(int id, String name, long code, String type) {
public Instance(int id,String name, String type){
this.id = id; this.id = id;
this.name = name; this.name = name;
this.code = code;
this.type = type; this.type = type;
} }
public Instance(int id,String name, String type,String state,Date startTime, Date endTime, String host, String duration,int subflowId) { public Instance(int id, String name, long code, String type, String state, Date startTime, Date endTime, String host, String duration, int subflowId) {
this.id = id; this.id = id;
this.name = name; this.name = name;
this.code = code;
this.type = type; this.type = type;
this.state = state; this.state = state;
this.startTime = startTime; this.startTime = startTime;
@ -88,12 +96,11 @@ public class Instance {
this.subflowId = subflowId; this.subflowId = subflowId;
} }
public Instance(int id,String name, String type,String state,Date startTime, Date endTime, String host, String duration) { public Instance(int id, String name, long code, String type, String state, Date startTime, Date endTime, String host, String duration) {
this(id, name, type, state, startTime, endTime,host,duration,0); this(id, name, code, type, state, startTime, endTime, host, duration, 0);
} }
public int getId() { public int getId() {
return id; return id;
} }
@ -110,6 +117,14 @@ public class Instance {
this.name = name; this.name = name;
} }
public long getCode() {
return code;
}
public void setCode(long code) {
this.code = code;
}
public String getType() { public String getType() {
return type; return type;
} }

34
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/treeview/TreeViewDto.java

@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.api.dto.treeview; package org.apache.dolphinscheduler.api.dto.treeview;
import java.util.ArrayList; import java.util.ArrayList;
@ -34,6 +35,21 @@ public class TreeViewDto {
*/ */
private String type; private String type;
/**
* code
*/
private long code;
/**
* instances list
*/
private List<Instance> instances = new ArrayList<>();
/**
* children
*/
private List<TreeViewDto> children = new ArrayList<>();
public String getName() { public String getName() {
return name; return name;
@ -51,17 +67,13 @@ public class TreeViewDto {
this.type = type; this.type = type;
} }
/** public long getCode() {
* instances list return code;
*/ }
private List<Instance> instances = new ArrayList<>();
/**
* children
*/
private List<TreeViewDto> children = new ArrayList<>();
public void setCode(long code) {
this.code = code;
}
public List<Instance> getInstances() { public List<Instance> getInstances() {
return instances; return instances;
@ -78,6 +90,4 @@ public class TreeViewDto {
public void setChildren(List<TreeViewDto> children) { public void setChildren(List<TreeViewDto> children) {
this.children = children; this.children = children;
} }
} }

49
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -281,8 +281,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
putMsg(result, Status.DATA_IS_NOT_VALID, taskRelationJson); putMsg(result, Status.DATA_IS_NOT_VALID, taskRelationJson);
return result; return result;
} }
List<ProcessTaskRelation> processTaskRelations = taskRelationList.stream()
List<TaskNode> taskNodeList = processService.transformTask(taskRelationList, taskDefinitionLogs); .map(processTaskRelationLog -> JSONUtils.parseObject(JSONUtils.toJsonString(processTaskRelationLog), ProcessTaskRelation.class))
.collect(Collectors.toList());
List<TaskNode> taskNodeList = processService.transformTask(processTaskRelations, taskDefinitionLogs);
if (taskNodeList.size() != taskRelationList.size()) { if (taskNodeList.size() != taskRelationList.size()) {
Set<Long> postTaskCodes = taskRelationList.stream().map(ProcessTaskRelationLog::getPostTaskCode).collect(Collectors.toSet()); Set<Long> postTaskCodes = taskRelationList.stream().map(ProcessTaskRelationLog::getPostTaskCode).collect(Collectors.toSet());
Set<Long> taskNodeCodes = taskNodeList.stream().map(TaskNode::getCode).collect(Collectors.toSet()); Set<Long> taskNodeCodes = taskNodeList.stream().map(TaskNode::getCode).collect(Collectors.toSet());
@ -827,10 +829,12 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
putMsg(result, Status.CREATE_PROCESS_DEFINITION_ERROR); putMsg(result, Status.CREATE_PROCESS_DEFINITION_ERROR);
return false; return false;
} }
List<TaskDefinitionLog> taskDefinitionList = dagDataSchedule.getTaskDefinitionList(); List<TaskDefinition> taskDefinitionList = dagDataSchedule.getTaskDefinitionList();
Map<Long, Long> taskCodeMap = new HashMap<>(); Map<Long, Long> taskCodeMap = new HashMap<>();
Date now = new Date(); Date now = new Date();
for (TaskDefinitionLog taskDefinitionLog : taskDefinitionList) { List<TaskDefinitionLog> taskDefinitionLogList = new ArrayList<>();
for (TaskDefinition taskDefinition : taskDefinitionList) {
TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog(taskDefinition);
taskDefinitionLog.setName(taskDefinitionLog.getName() + "_import_" + DateUtils.getCurrentTimeStamp()); taskDefinitionLog.setName(taskDefinitionLog.getName() + "_import_" + DateUtils.getCurrentTimeStamp());
taskDefinitionLog.setProjectCode(projectCode); taskDefinitionLog.setProjectCode(projectCode);
taskDefinitionLog.setUserId(loginUser.getId()); taskDefinitionLog.setUserId(loginUser.getId());
@ -848,22 +852,26 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS, "Error generating task definition code"); putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS, "Error generating task definition code");
return false; return false;
} }
taskDefinitionLogList.add(taskDefinitionLog);
} }
int insert = taskDefinitionMapper.batchInsert(taskDefinitionList); int insert = taskDefinitionMapper.batchInsert(taskDefinitionLogList);
int logInsert = taskDefinitionLogMapper.batchInsert(taskDefinitionList); int logInsert = taskDefinitionLogMapper.batchInsert(taskDefinitionLogList);
if ((logInsert & insert) == 0) { if ((logInsert & insert) == 0) {
putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR); putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR);
return false; return false;
} }
List<ProcessTaskRelationLog> taskRelationList = dagDataSchedule.getProcessTaskRelationList(); List<ProcessTaskRelation> taskRelationList = dagDataSchedule.getProcessTaskRelationList();
taskRelationList.forEach(processTaskRelationLog -> { List<ProcessTaskRelationLog> taskRelationLogList = new ArrayList<>();
for (ProcessTaskRelation processTaskRelation : taskRelationList) {
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation);
processTaskRelationLog.setPreTaskCode(taskCodeMap.get(processTaskRelationLog.getPreTaskCode())); processTaskRelationLog.setPreTaskCode(taskCodeMap.get(processTaskRelationLog.getPreTaskCode()));
processTaskRelationLog.setPostTaskCode(taskCodeMap.get(processTaskRelationLog.getPostTaskCode())); processTaskRelationLog.setPostTaskCode(taskCodeMap.get(processTaskRelationLog.getPostTaskCode()));
processTaskRelationLog.setPreTaskVersion(Constants.VERSION_FIRST); processTaskRelationLog.setPreTaskVersion(Constants.VERSION_FIRST);
processTaskRelationLog.setPostTaskVersion(Constants.VERSION_FIRST); processTaskRelationLog.setPostTaskVersion(Constants.VERSION_FIRST);
}); taskRelationLogList.add(processTaskRelationLog);
Map<String, Object> createProcessResult = createProcessDefine(loginUser, result, taskRelationList, processDefinition, null); }
Map<String, Object> createProcessResult = createProcessDefine(loginUser, result, taskRelationLogList, processDefinition, null);
if (Status.SUCCESS.equals(createProcessResult.get(Constants.STATUS))) { if (Status.SUCCESS.equals(createProcessResult.get(Constants.STATUS))) {
putMsg(createProcessResult, Status.SUCCESS); putMsg(createProcessResult, Status.SUCCESS);
} else { } else {
@ -937,7 +945,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return result; return result;
} }
List<ProcessTaskRelationLog> taskRelationList = JSONUtils.toList(processTaskRelationJson, ProcessTaskRelationLog.class); List<ProcessTaskRelation> taskRelationList = JSONUtils.toList(processTaskRelationJson, ProcessTaskRelation.class);
// Check whether the task node is normal // Check whether the task node is normal
List<TaskNode> taskNodes = processService.transformTask(taskRelationList, Lists.newArrayList()); List<TaskNode> taskNodes = processService.transformTask(taskRelationList, Lists.newArrayList());
@ -1026,7 +1034,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, codes); putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, codes);
return result; return result;
} }
Map<Long, List<TaskDefinitionLog>> taskNodeMap = new HashMap<>(); Map<Long, List<TaskDefinition>> taskNodeMap = new HashMap<>();
for (ProcessDefinition processDefinition : processDefinitionList) { for (ProcessDefinition processDefinition : processDefinitionList) {
DagData dagData = processService.genDagData(processDefinition); DagData dagData = processService.genDagData(processDefinition);
taskNodeMap.put(processDefinition.getCode(), dagData.getTaskDefinitionList()); taskNodeMap.put(processDefinition.getCode(), dagData.getTaskDefinitionList());
@ -1087,7 +1095,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
// List of process instances // List of process instances
List<ProcessInstance> processInstanceList = processInstanceService.queryByProcessDefineCode(code, limit); List<ProcessInstance> processInstanceList = processInstanceService.queryByProcessDefineCode(code, limit);
processInstanceList.forEach(processInstance -> processInstance.setDuration(DateUtils.format2Duration(processInstance.getStartTime(), processInstance.getEndTime()))); processInstanceList.forEach(processInstance -> processInstance.setDuration(DateUtils.format2Duration(processInstance.getStartTime(), processInstance.getEndTime())));
List<TaskDefinitionLog> taskDefinitionList = processService.queryTaskDefinitionListByProcess(code, processDefinition.getVersion()); List<TaskDefinitionLog> taskDefinitionList = processService.genTaskDefineList(processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode()));
Map<Long, TaskDefinitionLog> taskDefinitionMap = taskDefinitionList.stream() Map<Long, TaskDefinitionLog> taskDefinitionMap = taskDefinitionList.stream()
.collect(Collectors.toMap(TaskDefinitionLog::getCode, taskDefinitionLog -> taskDefinitionLog)); .collect(Collectors.toMap(TaskDefinitionLog::getCode, taskDefinitionLog -> taskDefinitionLog));
@ -1098,12 +1106,13 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
TreeViewDto parentTreeViewDto = new TreeViewDto(); TreeViewDto parentTreeViewDto = new TreeViewDto();
parentTreeViewDto.setName("DAG"); parentTreeViewDto.setName("DAG");
parentTreeViewDto.setType(""); parentTreeViewDto.setType("");
parentTreeViewDto.setCode(0L);
// Specify the process definition, because it is a TreeView for a process definition // Specify the process definition, because it is a TreeView for a process definition
for (int i = limit - 1; i >= 0; i--) { for (int i = limit - 1; i >= 0; i--) {
ProcessInstance processInstance = processInstanceList.get(i); ProcessInstance processInstance = processInstanceList.get(i);
Date endTime = processInstance.getEndTime() == null ? new Date() : processInstance.getEndTime(); Date endTime = processInstance.getEndTime() == null ? new Date() : processInstance.getEndTime();
parentTreeViewDto.getInstances().add(new Instance(processInstance.getId(), processInstance.getName(), "", parentTreeViewDto.getInstances().add(new Instance(processInstance.getId(), processInstance.getName(), processInstance.getProcessDefinitionCode(),
processInstance.getState().toString(), processInstance.getStartTime(), endTime, processInstance.getHost(), "", processInstance.getState().toString(), processInstance.getStartTime(), endTime, processInstance.getHost(),
DateUtils.format2Readable(endTime.getTime() - processInstance.getStartTime().getTime()))); DateUtils.format2Readable(endTime.getTime() - processInstance.getStartTime().getTime())));
} }
@ -1126,13 +1135,13 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
treeViewDto.setName(nodeName); treeViewDto.setName(nodeName);
TaskNode taskNode = dag.getNode(nodeName); TaskNode taskNode = dag.getNode(nodeName);
treeViewDto.setType(taskNode.getType()); treeViewDto.setType(taskNode.getType());
treeViewDto.setCode(taskNode.getCode());
//set treeViewDto instances //set treeViewDto instances
for (int i = limit - 1; i >= 0; i--) { for (int i = limit - 1; i >= 0; i--) {
ProcessInstance processInstance = processInstanceList.get(i); ProcessInstance processInstance = processInstanceList.get(i);
TaskInstance taskInstance = taskInstanceMapper.queryByInstanceIdAndName(processInstance.getId(), nodeName); TaskInstance taskInstance = taskInstanceMapper.queryByInstanceIdAndName(processInstance.getId(), nodeName);
if (taskInstance == null) { if (taskInstance == null) {
treeViewDto.getInstances().add(new Instance(-1, "not running", "null")); treeViewDto.getInstances().add(new Instance(-1, "not running", 0, "null"));
} else { } else {
Date startTime = taskInstance.getStartTime() == null ? new Date() : taskInstance.getStartTime(); Date startTime = taskInstance.getStartTime() == null ? new Date() : taskInstance.getStartTime();
Date endTime = taskInstance.getEndTime() == null ? new Date() : taskInstance.getEndTime(); Date endTime = taskInstance.getEndTime() == null ? new Date() : taskInstance.getEndTime();
@ -1144,9 +1153,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
subProcessId = Integer.parseInt(JSONUtils.parseObject( subProcessId = Integer.parseInt(JSONUtils.parseObject(
taskDefinition.getTaskParams()).path(CMD_PARAM_SUB_PROCESS_DEFINE_ID).asText()); taskDefinition.getTaskParams()).path(CMD_PARAM_SUB_PROCESS_DEFINE_ID).asText());
} }
treeViewDto.getInstances().add(new Instance(taskInstance.getId(), taskInstance.getName(), taskInstance.getTaskType(), treeViewDto.getInstances().add(new Instance(taskInstance.getId(), taskInstance.getName(), taskInstance.getTaskCode(),
taskInstance.getState().toString(), taskInstance.getStartTime(), taskInstance.getEndTime(), taskInstance.getHost(), taskInstance.getTaskType(), taskInstance.getState().toString(), taskInstance.getStartTime(), taskInstance.getEndTime(),
DateUtils.format2Readable(endTime.getTime() - startTime.getTime()), subProcessId)); taskInstance.getHost(), DateUtils.format2Readable(endTime.getTime() - startTime.getTime()), subProcessId));
} }
} }
for (TreeViewDto pTreeViewDto : parentTreeViewDtoList) { for (TreeViewDto pTreeViewDto : parentTreeViewDtoList) {

6
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java

@ -45,7 +45,7 @@ public class TaskNode {
/** /**
* task node code * task node code
*/ */
private Long code; private long code;
/** /**
* task node version * task node version
@ -342,11 +342,11 @@ public class TaskNode {
this.delayTime = delayTime; this.delayTime = delayTime;
} }
public Long getCode() { public long getCode() {
return code; return code;
} }
public void setCode(Long code) { public void setCode(long code) {
this.code = code; this.code = code;
} }

14
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DagData.java

@ -32,14 +32,14 @@ public class DagData {
/** /**
* processTaskRelationList * processTaskRelationList
*/ */
private List<ProcessTaskRelationLog> processTaskRelationList; private List<ProcessTaskRelation> processTaskRelationList;
/** /**
* processTaskRelationList * processTaskRelationList
*/ */
private List<TaskDefinitionLog> taskDefinitionList; private List<TaskDefinition> taskDefinitionList;
public DagData(ProcessDefinition processDefinition, List<ProcessTaskRelationLog> processTaskRelationList, List<TaskDefinitionLog> taskDefinitionList) { public DagData(ProcessDefinition processDefinition, List<ProcessTaskRelation> processTaskRelationList, List<TaskDefinition> taskDefinitionList) {
this.processDefinition = processDefinition; this.processDefinition = processDefinition;
this.processTaskRelationList = processTaskRelationList; this.processTaskRelationList = processTaskRelationList;
this.taskDefinitionList = taskDefinitionList; this.taskDefinitionList = taskDefinitionList;
@ -56,19 +56,19 @@ public class DagData {
this.processDefinition = processDefinition; this.processDefinition = processDefinition;
} }
public List<ProcessTaskRelationLog> getProcessTaskRelationList() { public List<ProcessTaskRelation> getProcessTaskRelationList() {
return processTaskRelationList; return processTaskRelationList;
} }
public void setProcessTaskRelationList(List<ProcessTaskRelationLog> processTaskRelationList) { public void setProcessTaskRelationList(List<ProcessTaskRelation> processTaskRelationList) {
this.processTaskRelationList = processTaskRelationList; this.processTaskRelationList = processTaskRelationList;
} }
public List<TaskDefinitionLog> getTaskDefinitionList() { public List<TaskDefinition> getTaskDefinitionList() {
return taskDefinitionList; return taskDefinitionList;
} }
public void setTaskDefinitionList(List<TaskDefinitionLog> taskDefinitionList) { public void setTaskDefinitionList(List<TaskDefinition> taskDefinitionList) {
this.taskDefinitionList = taskDefinitionList; this.taskDefinitionList = taskDefinitionList;
} }
} }

36
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -2350,7 +2350,7 @@ public class ProcessService {
* @return dag graph * @return dag graph
*/ */
public DAG<String, TaskNode, TaskNodeRelation> genDagGraph(ProcessDefinition processDefinition) { public DAG<String, TaskNode, TaskNodeRelation> genDagGraph(ProcessDefinition processDefinition) {
List<ProcessTaskRelationLog> processTaskRelations = processTaskRelationLogMapper.queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion()); List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode());
List<TaskNode> taskNodeList = transformTask(processTaskRelations, Lists.newArrayList()); List<TaskNode> taskNodeList = transformTask(processTaskRelations, Lists.newArrayList());
ProcessDag processDag = DagHelper.getProcessDag(taskNodeList, new ArrayList<>(processTaskRelations)); ProcessDag processDag = DagHelper.getProcessDag(taskNodeList, new ArrayList<>(processTaskRelations));
// Generate concrete Dag to be executed // Generate concrete Dag to be executed
@ -2361,13 +2361,17 @@ public class ProcessService {
* generate DagData * generate DagData
*/ */
public DagData genDagData(ProcessDefinition processDefinition) { public DagData genDagData(ProcessDefinition processDefinition) {
List<ProcessTaskRelationLog> processTaskRelations = processTaskRelationLogMapper.queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion()); List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode());
return new DagData(processDefinition, processTaskRelations, genTaskDefineList(processTaskRelations)); List<TaskDefinitionLog> taskDefinitionLogList = genTaskDefineList(processTaskRelations);
List<TaskDefinition> taskDefinitions = taskDefinitionLogList.stream()
.map(taskDefinitionLog -> JSONUtils.parseObject(JSONUtils.toJsonString(taskDefinitionLog), TaskDefinition.class))
.collect(Collectors.toList());
return new DagData(processDefinition, processTaskRelations, taskDefinitions);
} }
private List<TaskDefinitionLog> genTaskDefineList(List<ProcessTaskRelationLog> processTaskRelations) { public List<TaskDefinitionLog> genTaskDefineList(List<ProcessTaskRelation> processTaskRelations) {
Set<TaskDefinition> taskDefinitionSet = new HashSet<>(); Set<TaskDefinition> taskDefinitionSet = new HashSet<>();
for (ProcessTaskRelationLog processTaskRelation : processTaskRelations) { for (ProcessTaskRelation processTaskRelation : processTaskRelations) {
if (processTaskRelation.getPreTaskCode() > 0) { if (processTaskRelation.getPreTaskCode() > 0) {
taskDefinitionSet.add(new TaskDefinition(processTaskRelation.getPreTaskCode(), processTaskRelation.getPreTaskVersion())); taskDefinitionSet.add(new TaskDefinition(processTaskRelation.getPreTaskCode(), processTaskRelation.getPreTaskVersion()));
} }
@ -2446,24 +2450,6 @@ public class ProcessService {
return taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode, taskDefinitionVersion); return taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode, taskDefinitionVersion);
} }
/**
* query tasks definition list by process code and process version
*/
public List<TaskDefinitionLog> queryTaskDefinitionListByProcess(long processCode, int processVersion) {
List<ProcessTaskRelationLog> processTaskRelationLogs =
processTaskRelationLogMapper.queryByProcessCodeAndVersion(processCode, processVersion);
Set<TaskDefinition> taskDefinitionSet = new HashSet<>();
for (ProcessTaskRelationLog processTaskRelationLog : processTaskRelationLogs) {
if (processTaskRelationLog.getPreTaskCode() > 0) {
taskDefinitionSet.add(new TaskDefinition(processTaskRelationLog.getPreTaskCode(), processTaskRelationLog.getPreTaskVersion()));
}
if (processTaskRelationLog.getPostTaskCode() > 0) {
taskDefinitionSet.add(new TaskDefinition(processTaskRelationLog.getPostTaskCode(), processTaskRelationLog.getPostTaskVersion()));
}
}
return taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionSet);
}
/** /**
* add authorized resources * add authorized resources
* *
@ -2479,9 +2465,9 @@ public class ProcessService {
/** /**
* Use temporarily before refactoring taskNode * Use temporarily before refactoring taskNode
*/ */
public List<TaskNode> transformTask(List<ProcessTaskRelationLog> taskRelationList, List<TaskDefinitionLog> taskDefinitionLogs) { public List<TaskNode> transformTask(List<ProcessTaskRelation> taskRelationList, List<TaskDefinitionLog> taskDefinitionLogs) {
Map<Long, List<Long>> taskCodeMap = new HashMap<>(); Map<Long, List<Long>> taskCodeMap = new HashMap<>();
for (ProcessTaskRelationLog processTaskRelation : taskRelationList) { for (ProcessTaskRelation processTaskRelation : taskRelationList) {
taskCodeMap.compute(processTaskRelation.getPostTaskCode(), (k, v) -> { taskCodeMap.compute(processTaskRelation.getPostTaskCode(), (k, v) -> {
if (v == null) { if (v == null) {
v = new ArrayList<>(); v = new ArrayList<>();

28
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

@ -38,6 +38,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap; import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@ -368,17 +369,17 @@ public class ProcessServiceTest {
processDefinition.setVersion(1); processDefinition.setVersion(1);
processDefinition.setCode(11L); processDefinition.setCode(11L);
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(); ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
processTaskRelationLog.setName("def 1"); processTaskRelation.setName("def 1");
processTaskRelationLog.setProcessDefinitionVersion(1); processTaskRelation.setProcessDefinitionVersion(1);
processTaskRelationLog.setProjectCode(1L); processTaskRelation.setProjectCode(1L);
processTaskRelationLog.setProcessDefinitionCode(1L); processTaskRelation.setProcessDefinitionCode(1L);
processTaskRelationLog.setPostTaskCode(3L); processTaskRelation.setPostTaskCode(3L);
processTaskRelationLog.setPreTaskCode(2L); processTaskRelation.setPreTaskCode(2L);
processTaskRelationLog.setUpdateTime(new Date()); processTaskRelation.setUpdateTime(new Date());
processTaskRelationLog.setCreateTime(new Date()); processTaskRelation.setCreateTime(new Date());
List<ProcessTaskRelationLog> list = new ArrayList<>(); List<ProcessTaskRelation> list = new ArrayList<>();
list.add(processTaskRelationLog); list.add(processTaskRelation);
TaskDefinitionLog taskDefinition = new TaskDefinitionLog(); TaskDefinitionLog taskDefinition = new TaskDefinitionLog();
taskDefinition.setCode(3L); taskDefinition.setCode(3L);
@ -405,11 +406,10 @@ public class ProcessServiceTest {
taskDefinitionLogs.add(td2); taskDefinitionLogs.add(td2);
Mockito.when(taskDefinitionLogMapper.queryByTaskDefinitions(any())).thenReturn(taskDefinitionLogs); Mockito.when(taskDefinitionLogMapper.queryByTaskDefinitions(any())).thenReturn(taskDefinitionLogs);
Mockito.when(processTaskRelationLogMapper.queryByProcessCodeAndVersion(Mockito.anyLong(), Mockito.anyInt())).thenReturn(list); Mockito.when(processTaskRelationMapper.queryByProcessCode(Mockito.anyLong(), Mockito.anyLong())).thenReturn(list);
DAG<String, TaskNode, TaskNodeRelation> stringTaskNodeTaskNodeRelationDAG = processService.genDagGraph(processDefinition); DAG<String, TaskNode, TaskNodeRelation> stringTaskNodeTaskNodeRelationDAG = processService.genDagGraph(processDefinition);
Assert.assertNotEquals(0, stringTaskNodeTaskNodeRelationDAG.getNodesCount()); Assert.assertEquals(1, stringTaskNodeTaskNodeRelationDAG.getNodesCount());
} }
@Test @Test

Loading…
Cancel
Save