Browse Source

refactor task node

pull/3/MERGE
lenboo 4 years ago
parent
commit
929655fdbe
  1. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
  2. 11
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java
  3. 37
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

@ -388,7 +388,7 @@ public class MasterExecThread implements Runnable {
*/
private void buildFlowDag() throws Exception {
recoverNodeIdList = getStartTaskInstanceList(processInstance.getCommandParam());
List<TaskNode> taskNodeList = processService.getTaskNodeListByDefinitionId(processInstance.getProcessDefinitionId());
List<TaskNode> taskNodeList = processService.genTaskNodeList(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion());
forbiddenTaskList.clear();
taskNodeList.stream().forEach(taskNode -> {
if (taskNode.isForbidden()) {

11
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java

@ -17,11 +17,13 @@
package org.apache.dolphinscheduler.server.master;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.model.DateInterval;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.utils.dependent.DependentDateUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.DependentTaskExecThread;
@ -182,11 +184,12 @@ public class DependentTaskTest {
}
private List<TaskNode> getTaskNodes(){
List<TaskNode> list = new ArrayList<>();
TaskNode taskNode = new TaskNode();
private List<TaskDefinition> getTaskNodes(){
List<TaskDefinition> list = new ArrayList<>();
TaskDefinition taskNode = new TaskDefinition();
taskNode.setCode(1111L);
taskNode.setName("C");
taskNode.setType("SQL");
taskNode.setTaskType(TaskType.SQL);
list.add(taskNode);
return list;
}

37
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -344,25 +344,23 @@ public class ProcessService {
/**
* get task node list by definitionId
*/
public List<TaskNode> getTaskNodeListByDefinitionId(Integer defineId) {
public List<TaskDefinition> getTaskNodeListByDefinitionId(Integer defineId) {
ProcessDefinition processDefinition = processDefineMapper.selectById(defineId);
if (processDefinition == null) {
logger.error("process define not exists");
return new ArrayList<>();
}
List<ProcessTaskRelation> processTaskRelations = getProcessTaskRelationList(processDefinition.getCode(), processDefinition.getVersion());
Map<Long, TaskDefinition> taskDefinitionMap = new HashMap<>();
for (ProcessTaskRelation processTaskRelation : processTaskRelations) {
if (taskDefinitionMap.containsKey(processTaskRelation.getPostTaskCode())) {
TaskDefinition taskDefinition = taskDefinitionMapper.queryByDefinitionCode(processTaskRelation.getPostTaskCode());
TaskDefinition taskDefinition = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(
processTaskRelation.getPostTaskCode(), processTaskRelation.getPostNodeVersion());
taskDefinitionMap.put(processTaskRelation.getPostTaskCode(), taskDefinition);
}
}
return taskDefinitionMap.entrySet()
.stream()
.map(e -> JSONUtils.parseObject(JSONUtils.toJsonString(e.getValue()), TaskNode.class))
.collect(Collectors.toList());
return new ArrayList<>(taskDefinitionMap.values());
}
/**
@ -394,23 +392,12 @@ public class ProcessService {
public ProcessDefinition findProcessDefinition(Long processDefinitionCode, int version) {
ProcessDefinition processDefinition = processDefineMapper.queryByCode(processDefinitionCode);
if (processDefinition.getVersion() != version) {
ProcessDefinitionLog log = processDefineLogMapper.queryByDefinitionCodeAndVersion(processDefinitionCode, version);
processDefinition = convertFromLog(log);
processDefinition = processDefineLogMapper.queryByDefinitionCodeAndVersion(processDefinitionCode, version);
processDefinition.setId(0);
}
return processDefinition;
}
/**
* covert log to process definition
*/
public ProcessDefinition convertFromLog(ProcessDefinitionLog processDefinitionLog) {
ProcessDefinition definition = processDefinitionLog;
if (null != definition) {
definition.setId(0);
}
return definition;
}
/**
* delete work process instance by id
*
@ -500,11 +487,13 @@ public class ProcessService {
* @param ids ids
*/
public void recurseFindSubProcessId(int parentId, List<Integer> ids) {
List<TaskNode> taskNodeList = this.getTaskNodeListByDefinitionId(parentId);
List<TaskDefinition> taskNodeList = this.getTaskNodeListByDefinitionId(parentId);
if (taskNodeList != null && !taskNodeList.isEmpty()) {
for (TaskNode taskNode : taskNodeList) {
String parameter = taskNode.getParams();
for (TaskDefinition taskNode : taskNodeList) {
String parameter = taskNode.getTaskParams();
ObjectNode parameterJson = JSONUtils.parseObject(parameter);
if (parameterJson.get(CMD_PARAM_SUB_PROCESS_DEFINE_ID) != null) {
SubProcessParameters subProcessParam = JSONUtils.parseObject(parameter, SubProcessParameters.class);
@ -2404,7 +2393,7 @@ public class ProcessService {
* @return dag graph
*/
public DAG<String, TaskNode, TaskNodeRelation> genDagGraph(ProcessDefinition processDefinition) {
List<TaskNode> taskNodeList = this.getTaskNodeListByDefinitionId(processDefinition.getId());
List<TaskNode> taskNodeList = genTaskNodeList(processDefinition.getCode(), processDefinition.getVersion());
List<ProcessTaskRelation> processTaskRelations = getProcessTaskRelationList(processDefinition.getCode(), processDefinition.getVersion());
ProcessDag processDag = DagHelper.getProcessDag(taskNodeList, processTaskRelations);
// Generate concrete Dag to be executed

Loading…
Cancel
Save