Browse Source

[Feature-4417][JsonSplit] refactor process dag generate (#4790)

* #4417 [JsonSplit] refactor dag generate

* #4417 [JsonSplit] refactor dag generate

* #4417 [JsonSplit] add postNodeVersion and preNodeVersion

* #4417 [JsonSplit] code style

* #4417 [JsonSplit] code style

* #4417 [JsonSplit] code style
pull/3/MERGE
bao liang 4 years ago committed by GitHub
parent
commit
67eae43154
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 100
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
  2. 36
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  3. 26
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java
  4. 26
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelationLog.java
  5. 6
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
  6. 32
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
  7. 4
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml
  8. 4
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
  9. 32
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  10. 4
      sql/dolphinscheduler-postgre.sql
  11. 4
      sql/dolphinscheduler_mysql.sql

100
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java

@ -47,14 +47,18 @@ import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils; import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessData; import org.apache.dolphinscheduler.dao.entity.ProcessData;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.utils.DagHelper; import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
@ -93,6 +97,9 @@ public class ProcessInstanceService extends BaseService {
private static final Logger logger = LoggerFactory.getLogger(ProcessInstanceService.class); private static final Logger logger = LoggerFactory.getLogger(ProcessInstanceService.class);
public static final String TASK_TYPE = "taskType";
public static final String LOCAL_PARAMS_LIST = "localParamsList";
@Autowired @Autowired
ProjectMapper projectMapper; ProjectMapper projectMapper;
@ -123,6 +130,11 @@ public class ProcessInstanceService extends BaseService {
@Autowired @Autowired
LoggerService loggerService; LoggerService loggerService;
@Autowired
ProcessDefinitionLogMapper processDefinitionLogMapper;
@Autowired
TaskDefinitionLogMapper taskDefinitionLogMapper;
@Autowired @Autowired
UsersService usersService; UsersService usersService;
@ -608,34 +620,47 @@ public class ProcessInstanceService extends BaseService {
Map<String, String> timeParams = BusinessTimeUtils Map<String, String> timeParams = BusinessTimeUtils
.getBusinessTime(processInstance.getCmdTypeIfComplement(), .getBusinessTime(processInstance.getCmdTypeIfComplement(),
processInstance.getScheduleTime()); processInstance.getScheduleTime());
String workflowInstanceJson = processInstance.getProcessInstanceJson();
ProcessData workflowData = JSONUtils.parseObject(workflowInstanceJson, ProcessData.class);
String userDefinedParams = processInstance.getGlobalParams(); String userDefinedParams = processInstance.getGlobalParams();
// global params // global params
List<Property> globalParams = new ArrayList<>(); List<Property> globalParams = new ArrayList<>();
if (userDefinedParams != null && userDefinedParams.length() > 0) {
globalParams = JSONUtils.toList(userDefinedParams, Property.class);
}
List<TaskNode> taskNodeList = workflowData.getTasks();
// global param string // global param string
String globalParamStr = JSONUtils.toJsonString(globalParams); String globalParamStr = ParameterUtils.convertParameterPlaceholders(JSONUtils.toJsonString(globalParams), timeParams);
globalParamStr = ParameterUtils.convertParameterPlaceholders(globalParamStr, timeParams);
globalParams = JSONUtils.toList(globalParamStr, Property.class); globalParams = JSONUtils.toList(globalParamStr, Property.class);
for (Property property : globalParams) { for (Property property : globalParams) {
timeParams.put(property.getProp(), property.getValue()); timeParams.put(property.getProp(), property.getValue());
} }
// local params if (userDefinedParams != null && userDefinedParams.length() > 0) {
globalParams = JSONUtils.toList(userDefinedParams, Property.class);
}
Map<String, Map<String, Object>> localUserDefParams = getLocalParams(processInstance, timeParams);
Map<String, Object> resultMap = new HashMap<>();
resultMap.put(GLOBAL_PARAMS, globalParams);
resultMap.put(LOCAL_PARAMS, localUserDefParams);
result.put(DATA_LIST, resultMap);
putMsg(result, Status.SUCCESS);
return result;
}
/**
* get local params
*
* @param processInstance
* @param timeParams
* @return
*/
private Map<String, Map<String, Object>> getLocalParams(ProcessInstance processInstance, Map<String, String> timeParams) {
Map<String, Map<String, Object>> localUserDefParams = new HashMap<>(); Map<String, Map<String, Object>> localUserDefParams = new HashMap<>();
for (TaskNode taskNode : taskNodeList) { List<TaskInstance> taskInstanceList = taskInstanceMapper.findValidTaskListByProcessId(processInstance.getId(), Flag.YES);
String parameter = taskNode.getParams(); for (TaskInstance taskInstance : taskInstanceList) {
TaskDefinitionLog taskDefinitionLog = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(
taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion());
String parameter = taskDefinitionLog.getTaskParams();
Map<String, String> map = JSONUtils.toMap(parameter); Map<String, String> map = JSONUtils.toMap(parameter);
String localParams = map.get(LOCAL_PARAMS); String localParams = map.get(LOCAL_PARAMS);
if (localParams != null && !localParams.isEmpty()) { if (localParams != null && !localParams.isEmpty()) {
@ -643,23 +668,15 @@ public class ProcessInstanceService extends BaseService {
List<Property> localParamsList = JSONUtils.toList(localParams, Property.class); List<Property> localParamsList = JSONUtils.toList(localParams, Property.class);
Map<String, Object> localParamsMap = new HashMap<>(); Map<String, Object> localParamsMap = new HashMap<>();
localParamsMap.put("taskType", taskNode.getType()); localParamsMap.put(TASK_TYPE, taskDefinitionLog.getTaskType());
localParamsMap.put("localParamsList", localParamsList); localParamsMap.put(LOCAL_PARAMS_LIST, localParamsList);
if (CollectionUtils.isNotEmpty(localParamsList)) { if (CollectionUtils.isNotEmpty(localParamsList)) {
localUserDefParams.put(taskNode.getName(), localParamsMap); localUserDefParams.put(taskDefinitionLog.getName(), localParamsMap);
} }
} }
} }
return localUserDefParams;
Map<String, Object> resultMap = new HashMap<>();
resultMap.put(GLOBAL_PARAMS, globalParams);
resultMap.put(LOCAL_PARAMS, localUserDefParams);
result.put(DATA_LIST, resultMap);
putMsg(result, Status.SUCCESS);
return result;
} }
/** /**
@ -678,9 +695,15 @@ public class ProcessInstanceService extends BaseService {
throw new RuntimeException("workflow instance is null"); throw new RuntimeException("workflow instance is null");
} }
GanttDto ganttDto = new GanttDto(); ProcessDefinitionLog processDefinitionLog = processDefinitionLogMapper.queryByDefinitionCodeAndVersion(
processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion()
);
ProcessDefinition processDefinition = JSONUtils.parseObject(JSONUtils.toJsonString(processDefinitionLog),
ProcessDefinition.class);
DAG<String, TaskNode, TaskNodeRelation> dag = processInstance2DAG(processInstance); GanttDto ganttDto = new GanttDto();
DAG<String, TaskNode, TaskNodeRelation> dag = processService.genDagGraph(processDefinition);
//topological sort //topological sort
List<String> nodeList = dag.topologicalSort(); List<String> nodeList = dag.topologicalSort();
@ -712,21 +735,6 @@ public class ProcessInstanceService extends BaseService {
return result; return result;
} }
/**
* process instance to DAG
*
* @param processInstance input process instance
* @return process instance dag.
*/
private static DAG<String, TaskNode, TaskNodeRelation> processInstance2DAG(ProcessInstance processInstance) {
String processDefinitionJson = processInstance.getProcessInstanceJson();
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
List<TaskNode> taskNodeList = processData.getTasks();
ProcessDag processDag = DagHelper.getProcessDag(taskNodeList);
return DagHelper.buildDagGraph(processDag);
}
/** /**
* query process instance by processDefinitionId and stateArray * query process instance by processDefinitionId and stateArray
* *

36
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -44,7 +44,6 @@ import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation; import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.process.ProcessDag;
import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.Stopper;
@ -66,11 +65,12 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.service.permission.PermissionCheck; import org.apache.dolphinscheduler.service.permission.PermissionCheck;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
@ -150,6 +150,12 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
@Autowired @Autowired
private ProcessTaskRelationMapper processTaskRelationMapper; private ProcessTaskRelationMapper processTaskRelationMapper;
@Autowired
private ProcessTaskRelationLogMapper processTaskRelationLogMapper;
@Autowired
TaskDefinitionLogMapper taskDefinitionLogMapper;
/** /**
* create process definition * create process definition
* *
@ -1275,7 +1281,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinition); putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinition);
return result; return result;
} }
DAG<String, TaskNode, TaskNodeRelation> dag = genDagGraph(processDefinition); DAG<String, TaskNode, TaskNodeRelation> dag = processService.genDagGraph(processDefinition);
/** /**
* nodes that is running * nodes that is running
*/ */
@ -1385,30 +1391,6 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
return result; return result;
} }
/**
* Generate the DAG Graph based on the process definition id
*
* @param processDefinition process definition
* @return dag graph
*/
private DAG<String, TaskNode, TaskNodeRelation> genDagGraph(ProcessDefinition processDefinition) {
String processDefinitionJson = processDefinition.getProcessDefinitionJson();
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
//check process data
if (null != processData) {
List<TaskNode> taskNodeList = processData.getTasks();
processDefinition.setGlobalParamList(processData.getGlobalParams());
ProcessDag processDag = DagHelper.getProcessDag(taskNodeList);
// Generate concrete Dag to be executed
return DagHelper.buildDagGraph(processDag);
}
return new DAG<>();
}
/** /**
* whether the graph has a ring * whether the graph has a ring

26
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java

@ -70,11 +70,21 @@ public class ProcessTaskRelation {
*/ */
private long preTaskCode; private long preTaskCode;
/**
* pre node version
*/
private int preNodeVersion;
/** /**
* post task code * post task code
*/ */
private long postTaskCode; private long postTaskCode;
/**
* post node version
*/
private int postNodeVersion;
/** /**
* condition type * condition type
*/ */
@ -248,4 +258,20 @@ public class ProcessTaskRelation {
public void setConditionType(ConditionType conditionType) { public void setConditionType(ConditionType conditionType) {
this.conditionType = conditionType; this.conditionType = conditionType;
} }
public int getPreNodeVersion() {
return preNodeVersion;
}
public void setPreNodeVersion(int preNodeVersion) {
this.preNodeVersion = preNodeVersion;
}
public int getPostNodeVersion() {
return postNodeVersion;
}
public void setPostNodeVersion(int postNodeVersion) {
this.postNodeVersion = postNodeVersion;
}
} }

26
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelationLog.java

@ -71,11 +71,21 @@ public class ProcessTaskRelationLog {
*/ */
private long preTaskCode; private long preTaskCode;
/**
* pre node version
*/
private int preNodeVersion;
/** /**
* post task code * post task code
*/ */
private long postTaskCode; private long postTaskCode;
/**
* post node version
*/
private int postNodeVersion;
/** /**
* condition type * condition type
*/ */
@ -262,4 +272,20 @@ public class ProcessTaskRelationLog {
this.conditionType = processTaskRelation.getConditionType(); this.conditionType = processTaskRelation.getConditionType();
this.conditionParams = processTaskRelation.getConditionParams(); this.conditionParams = processTaskRelation.getConditionParams();
} }
public int getPostNodeVersion() {
return postNodeVersion;
}
public void setPostNodeVersion(int postNodeVersion) {
this.postNodeVersion = postNodeVersion;
}
public int getPreNodeVersion() {
return preNodeVersion;
}
public void setPreNodeVersion(int preNodeVersion) {
this.preNodeVersion = preNodeVersion;
}
} }

6
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java

@ -82,7 +82,7 @@ public class TaskInstance implements Serializable {
/** /**
* task defintion version * task defintion version
*/ */
private String taskDefinitionVersion; private int taskDefinitionVersion;
/** /**
* process instance name * process instance name
@ -637,11 +637,11 @@ public class TaskInstance implements Serializable {
this.processDefinitionCode = processDefinitionCode; this.processDefinitionCode = processDefinitionCode;
} }
public String getTaskDefinitionVersion() { public int getTaskDefinitionVersion() {
return taskDefinitionVersion; return taskDefinitionVersion;
} }
public void setTaskDefinitionVersion(String taskDefinitionVersion) { public void setTaskDefinitionVersion(int taskDefinitionVersion) {
this.taskDefinitionVersion = taskDefinitionVersion; this.taskDefinitionVersion = taskDefinitionVersion;
} }
} }

32
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java

@ -27,6 +27,8 @@ import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessData; import org.apache.dolphinscheduler.dao.entity.ProcessData;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -465,6 +467,36 @@ public class DagHelper {
return processDag; return processDag;
} }
/**
* get process dag
*
* @param taskDefinitions task definition
* @return Process dag
*/
public static ProcessDag getProcessDag(List<TaskDefinition> taskDefinitions,
List<ProcessTaskRelation> processTaskRelations) {
Map<Long, TaskNode> taskNodeMap = new HashMap<>();
List<TaskNode> taskNodeList = new ArrayList<>();
for (TaskDefinition taskDefinition : taskDefinitions) {
TaskNode taskNode = JSONUtils.parseObject(JSONUtils.toJsonString(taskDefinition), TaskNode.class);
taskNodeMap.put(taskDefinition.getCode(), taskNode);
taskNodeList.add(taskNode);
}
List<TaskNodeRelation> taskNodeRelations = new ArrayList<>();
for (ProcessTaskRelation processTaskRelation : processTaskRelations) {
if (processTaskRelation.getPreTaskCode() != 0) {
TaskNode preNode = taskNodeMap.get(processTaskRelation.getPreTaskCode());
TaskNode postNode = taskNodeMap.get(processTaskRelation.getPostTaskCode());
taskNodeRelations.add(new TaskNodeRelation(preNode.getName(), postNode.getName()));
}
}
ProcessDag processDag = new ProcessDag();
processDag.setEdges(taskNodeRelations);
processDag.setNodes(taskNodeList);
return processDag;
}
/** /**
* is there have conditions after the parent node * is there have conditions after the parent node
* *

4
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml

@ -19,8 +19,8 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" > <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper"> <mapper namespace="org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper">
<sql id="baseSql"> <sql id="baseSql">
id, `name`, process_definition_version, project_code, process_definition_code, pre_task_code, post_task_code, id, `name`, process_definition_version, project_code, process_definition_code, pre_task_code, pre_task_version,
condition_type, condition_params, operator, operate_time, create_time, update_time post_task_code, post_task_version, condition_type, condition_params, operator, operate_time, create_time, update_time
</sql> </sql>
<select id="queryByProcessCodeAndVersion" resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog"> <select id="queryByProcessCodeAndVersion" resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog">
select select

4
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml

@ -19,8 +19,8 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" > <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper"> <mapper namespace="org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper">
<sql id="baseSql"> <sql id="baseSql">
id, `name`, process_definition_version, project_code, process_definition_code, pre_task_code, post_task_code, id, `name`, process_definition_version, project_code, process_definition_code, pre_task_code, pre_task_version,
condition_type, condition_params, create_time, update_time post_task_code, post_task_version, condition_type, condition_params, create_time, update_time
</sql> </sql>
<select id="queryByProcessCode" resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation"> <select id="queryByProcessCode" resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation">
select select

32
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -42,8 +42,11 @@ import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag; import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.DateInterval; import org.apache.dolphinscheduler.common.model.DateInterval;
import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.process.ProcessDag;
import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.AbstractParameters;
@ -96,6 +99,7 @@ import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper; import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper; import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper; import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.exceptions.ServiceException; import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.dolphinscheduler.service.log.LogClientService; import org.apache.dolphinscheduler.service.log.LogClientService;
@ -2384,4 +2388,32 @@ public class ProcessService {
} }
return false; return false;
} }
/**
* Generate the DAG Graph based on the process definition id
*
* @param processDefinition process definition
* @return dag graph
*/
public DAG<String, TaskNode, TaskNodeRelation> genDagGraph(ProcessDefinition processDefinition) {
List<ProcessTaskRelationLog> taskRelationLogs = processTaskRelationLogMapper.queryByProcessCodeAndVersion(
processDefinition.getCode(),
processDefinition.getVersion());
List<ProcessTaskRelation> processTaskRelations = new ArrayList<>();
List<TaskDefinition> taskDefinitions = new ArrayList<>();
for (ProcessTaskRelationLog processTaskRelationLog : taskRelationLogs) {
processTaskRelations.add(JSONUtils.parseObject(JSONUtils.toJsonString(processTaskRelationLog), ProcessTaskRelation.class));
TaskDefinitionLog taskDefinitionLog = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(
processTaskRelationLog.getPostTaskCode(),
processTaskRelationLog.getPostNodeVersion());
taskDefinitions.add(JSONUtils.parseObject(JSONUtils.toJsonString(taskDefinitionLog), TaskDefinition.class));
}
ProcessDag processDag = DagHelper.getProcessDag(taskDefinitions, processTaskRelations);
// Generate concrete Dag to be executed
return DagHelper.buildDagGraph(processDag);
}
} }

4
sql/dolphinscheduler-postgre.sql

@ -398,7 +398,9 @@ CREATE TABLE t_ds_process_task_relation (
project_code bigint DEFAULT NULL , project_code bigint DEFAULT NULL ,
process_definition_code bigint DEFAULT NULL , process_definition_code bigint DEFAULT NULL ,
pre_task_code bigint DEFAULT NULL , pre_task_code bigint DEFAULT NULL ,
pre_task_version int DEFAULT 0 ,
post_task_code bigint DEFAULT NULL , post_task_code bigint DEFAULT NULL ,
post_task_version int DEFAULT 0 ,
condition_type int DEFAULT NULL , condition_type int DEFAULT NULL ,
condition_params text , condition_params text ,
create_time timestamp DEFAULT NULL , create_time timestamp DEFAULT NULL ,
@ -414,7 +416,9 @@ CREATE TABLE t_ds_process_task_relation_log (
project_code bigint DEFAULT NULL , project_code bigint DEFAULT NULL ,
process_definition_code bigint DEFAULT NULL , process_definition_code bigint DEFAULT NULL ,
pre_task_code bigint DEFAULT NULL , pre_task_code bigint DEFAULT NULL ,
pre_task_version int DEFAULT 0 ,
post_task_code bigint DEFAULT NULL , post_task_code bigint DEFAULT NULL ,
post_task_version int DEFAULT 0 ,
condition_type int DEFAULT NULL , condition_type int DEFAULT NULL ,
condition_params text , condition_params text ,
operator int DEFAULT NULL , operator int DEFAULT NULL ,

4
sql/dolphinscheduler_mysql.sql

@ -513,7 +513,9 @@ CREATE TABLE `t_ds_process_task_relation` (
`project_code` bigint(20) NOT NULL COMMENT 'project code', `project_code` bigint(20) NOT NULL COMMENT 'project code',
`process_definition_code` bigint(20) NOT NULL COMMENT 'process code', `process_definition_code` bigint(20) NOT NULL COMMENT 'process code',
`pre_task_code` bigint(20) NOT NULL COMMENT 'pre task code', `pre_task_code` bigint(20) NOT NULL COMMENT 'pre task code',
`pre_task_version` int(11) NOT NULL COMMENT 'pre task version',
`post_task_code` bigint(20) NOT NULL COMMENT 'post task code', `post_task_code` bigint(20) NOT NULL COMMENT 'post task code',
`post_task_version` int(11) NOT NULL COMMENT 'post task version',
`condition_type` tinyint(2) DEFAULT NULL COMMENT 'condition type : 0 none, 1 judge 2 delay', `condition_type` tinyint(2) DEFAULT NULL COMMENT 'condition type : 0 none, 1 judge 2 delay',
`condition_params` text COMMENT 'condition params(json)', `condition_params` text COMMENT 'condition params(json)',
`create_time` datetime NOT NULL COMMENT 'create time', `create_time` datetime NOT NULL COMMENT 'create time',
@ -532,7 +534,9 @@ CREATE TABLE `t_ds_process_task_relation_log` (
`project_code` bigint(20) NOT NULL COMMENT 'project code', `project_code` bigint(20) NOT NULL COMMENT 'project code',
`process_definition_code` bigint(20) NOT NULL COMMENT 'process code', `process_definition_code` bigint(20) NOT NULL COMMENT 'process code',
`pre_task_code` bigint(20) NOT NULL COMMENT 'pre task code', `pre_task_code` bigint(20) NOT NULL COMMENT 'pre task code',
`pre_task_version` int(11) NOT NULL COMMENT 'pre task version',
`post_task_code` bigint(20) NOT NULL COMMENT 'post task code', `post_task_code` bigint(20) NOT NULL COMMENT 'post task code',
`post_task_version` int(11) NOT NULL COMMENT 'post task version',
`condition_type` tinyint(2) DEFAULT NULL COMMENT 'condition type : 0 none, 1 judge 2 delay', `condition_type` tinyint(2) DEFAULT NULL COMMENT 'condition type : 0 none, 1 judge 2 delay',
`condition_params` text COMMENT 'condition params(json)', `condition_params` text COMMENT 'condition params(json)',
`operator` int(11) DEFAULT NULL COMMENT 'operator user id', `operator` int(11) DEFAULT NULL COMMENT 'operator user id',

Loading…
Cancel
Save