diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java index ebc2bc518b..21ba828d2b 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java @@ -47,14 +47,18 @@ import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils; import org.apache.dolphinscheduler.dao.entity.ProcessData; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.Project; +import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; +import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; import org.apache.dolphinscheduler.dao.utils.DagHelper; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -93,6 +97,9 @@ public class ProcessInstanceService extends BaseService { private static final Logger logger = LoggerFactory.getLogger(ProcessInstanceService.class); + public static final String TASK_TYPE = "taskType"; + public static final String LOCAL_PARAMS_LIST = "localParamsList"; + @Autowired ProjectMapper projectMapper; @@ -123,6 +130,11 @@ public class ProcessInstanceService extends BaseService { @Autowired LoggerService loggerService; + @Autowired + ProcessDefinitionLogMapper processDefinitionLogMapper; + + @Autowired + TaskDefinitionLogMapper taskDefinitionLogMapper; @Autowired UsersService usersService; @@ -608,34 +620,47 @@ public class ProcessInstanceService extends BaseService { Map timeParams = BusinessTimeUtils .getBusinessTime(processInstance.getCmdTypeIfComplement(), processInstance.getScheduleTime()); - - String workflowInstanceJson = processInstance.getProcessInstanceJson(); - - ProcessData workflowData = JSONUtils.parseObject(workflowInstanceJson, ProcessData.class); - String userDefinedParams = processInstance.getGlobalParams(); - // global params List globalParams = new ArrayList<>(); - if (userDefinedParams != null && userDefinedParams.length() > 0) { - globalParams = JSONUtils.toList(userDefinedParams, Property.class); - } - - List taskNodeList = workflowData.getTasks(); - // global param string - String globalParamStr = JSONUtils.toJsonString(globalParams); - globalParamStr = ParameterUtils.convertParameterPlaceholders(globalParamStr, timeParams); + String globalParamStr = ParameterUtils.convertParameterPlaceholders(JSONUtils.toJsonString(globalParams), timeParams); globalParams = JSONUtils.toList(globalParamStr, Property.class); for (Property property : globalParams) { timeParams.put(property.getProp(), property.getValue()); } - // local params + if (userDefinedParams != null && userDefinedParams.length() > 0) { + globalParams = JSONUtils.toList(userDefinedParams, Property.class); + } + + Map> localUserDefParams = getLocalParams(processInstance, timeParams); + + Map resultMap = new HashMap<>(); + + resultMap.put(GLOBAL_PARAMS, globalParams); + resultMap.put(LOCAL_PARAMS, localUserDefParams); + + result.put(DATA_LIST, resultMap); + putMsg(result, Status.SUCCESS); + return result; + } + + /** + * get local params + * + * @param processInstance + * @param timeParams + * @return + */ + private Map> getLocalParams(ProcessInstance processInstance, Map timeParams) { Map> localUserDefParams = new HashMap<>(); - for (TaskNode taskNode : taskNodeList) { - String parameter = taskNode.getParams(); + List taskInstanceList = taskInstanceMapper.findValidTaskListByProcessId(processInstance.getId(), Flag.YES); + for (TaskInstance taskInstance : taskInstanceList) { + TaskDefinitionLog taskDefinitionLog = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion( + taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion()); + String parameter = taskDefinitionLog.getTaskParams(); Map map = JSONUtils.toMap(parameter); String localParams = map.get(LOCAL_PARAMS); if (localParams != null && !localParams.isEmpty()) { @@ -643,23 +668,15 @@ public class ProcessInstanceService extends BaseService { List localParamsList = JSONUtils.toList(localParams, Property.class); Map localParamsMap = new HashMap<>(); - localParamsMap.put("taskType", taskNode.getType()); - localParamsMap.put("localParamsList", localParamsList); + localParamsMap.put(TASK_TYPE, taskDefinitionLog.getTaskType()); + localParamsMap.put(LOCAL_PARAMS_LIST, localParamsList); if (CollectionUtils.isNotEmpty(localParamsList)) { - localUserDefParams.put(taskNode.getName(), localParamsMap); + localUserDefParams.put(taskDefinitionLog.getName(), localParamsMap); } } } - - Map resultMap = new HashMap<>(); - - resultMap.put(GLOBAL_PARAMS, globalParams); - resultMap.put(LOCAL_PARAMS, localUserDefParams); - - result.put(DATA_LIST, resultMap); - putMsg(result, Status.SUCCESS); - return result; + return localUserDefParams; } /** @@ -678,9 +695,15 @@ public class ProcessInstanceService extends BaseService { throw new RuntimeException("workflow instance is null"); } - GanttDto ganttDto = new GanttDto(); + ProcessDefinitionLog processDefinitionLog = processDefinitionLogMapper.queryByDefinitionCodeAndVersion( + processInstance.getProcessDefinitionCode(), + processInstance.getProcessDefinitionVersion() + ); + ProcessDefinition processDefinition = JSONUtils.parseObject(JSONUtils.toJsonString(processDefinitionLog), + ProcessDefinition.class); - DAG dag = processInstance2DAG(processInstance); + GanttDto ganttDto = new GanttDto(); + DAG dag = processService.genDagGraph(processDefinition); //topological sort List nodeList = dag.topologicalSort(); @@ -712,21 +735,6 @@ public class ProcessInstanceService extends BaseService { return result; } - /** - * process instance to DAG - * - * @param processInstance input process instance - * @return process instance dag. - */ - private static DAG processInstance2DAG(ProcessInstance processInstance) { - - String processDefinitionJson = processInstance.getProcessInstanceJson(); - ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); - List taskNodeList = processData.getTasks(); - ProcessDag processDag = DagHelper.getProcessDag(taskNodeList); - return DagHelper.buildDagGraph(processDag); - } - /** * query process instance by processDefinitionId and stateArray * diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index acdaae84e2..985011696a 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -44,7 +44,6 @@ import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNodeRelation; -import org.apache.dolphinscheduler.common.process.ProcessDag; import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.thread.Stopper; @@ -66,11 +65,12 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; +import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; +import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; -import org.apache.dolphinscheduler.dao.utils.DagHelper; import org.apache.dolphinscheduler.service.permission.PermissionCheck; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -150,6 +150,12 @@ public class ProcessDefinitionServiceImpl extends BaseService implements @Autowired private ProcessTaskRelationMapper processTaskRelationMapper; + @Autowired + private ProcessTaskRelationLogMapper processTaskRelationLogMapper; + + @Autowired + TaskDefinitionLogMapper taskDefinitionLogMapper; + /** * create process definition * @@ -1275,7 +1281,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinition); return result; } - DAG dag = genDagGraph(processDefinition); + DAG dag = processService.genDagGraph(processDefinition); /** * nodes that is running */ @@ -1385,30 +1391,6 @@ public class ProcessDefinitionServiceImpl extends BaseService implements return result; } - /** - * Generate the DAG Graph based on the process definition id - * - * @param processDefinition process definition - * @return dag graph - */ - private DAG genDagGraph(ProcessDefinition processDefinition) { - - String processDefinitionJson = processDefinition.getProcessDefinitionJson(); - - ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); - - //check process data - if (null != processData) { - List taskNodeList = processData.getTasks(); - processDefinition.setGlobalParamList(processData.getGlobalParams()); - ProcessDag processDag = DagHelper.getProcessDag(taskNodeList); - - // Generate concrete Dag to be executed - return DagHelper.buildDagGraph(processDag); - } - - return new DAG<>(); - } /** * whether the graph has a ring diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java index 13a804d2b2..9633abefc0 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java @@ -70,11 +70,21 @@ public class ProcessTaskRelation { */ private long preTaskCode; + /** + * pre node version + */ + private int preNodeVersion; + /** * post task code */ private long postTaskCode; + /** + * post node version + */ + private int postNodeVersion; + /** * condition type */ @@ -248,4 +258,20 @@ public class ProcessTaskRelation { public void setConditionType(ConditionType conditionType) { this.conditionType = conditionType; } + + public int getPreNodeVersion() { + return preNodeVersion; + } + + public void setPreNodeVersion(int preNodeVersion) { + this.preNodeVersion = preNodeVersion; + } + + public int getPostNodeVersion() { + return postNodeVersion; + } + + public void setPostNodeVersion(int postNodeVersion) { + this.postNodeVersion = postNodeVersion; + } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelationLog.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelationLog.java index 523bf4eba0..f858d98fad 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelationLog.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelationLog.java @@ -71,11 +71,21 @@ public class ProcessTaskRelationLog { */ private long preTaskCode; + /** + * pre node version + */ + private int preNodeVersion; + /** * post task code */ private long postTaskCode; + /** + * post node version + */ + private int postNodeVersion; + /** * condition type */ @@ -262,4 +272,20 @@ public class ProcessTaskRelationLog { this.conditionType = processTaskRelation.getConditionType(); this.conditionParams = processTaskRelation.getConditionParams(); } + + public int getPostNodeVersion() { + return postNodeVersion; + } + + public void setPostNodeVersion(int postNodeVersion) { + this.postNodeVersion = postNodeVersion; + } + + public int getPreNodeVersion() { + return preNodeVersion; + } + + public void setPreNodeVersion(int preNodeVersion) { + this.preNodeVersion = preNodeVersion; + } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java index ba9ebaa77b..73d4bca60b 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java @@ -82,7 +82,7 @@ public class TaskInstance implements Serializable { /** * task defintion version */ - private String taskDefinitionVersion; + private int taskDefinitionVersion; /** * process instance name @@ -637,11 +637,11 @@ public class TaskInstance implements Serializable { this.processDefinitionCode = processDefinitionCode; } - public String getTaskDefinitionVersion() { + public int getTaskDefinitionVersion() { return taskDefinitionVersion; } - public void setTaskDefinitionVersion(String taskDefinitionVersion) { + public void setTaskDefinitionVersion(int taskDefinitionVersion) { this.taskDefinitionVersion = taskDefinitionVersion; } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java index 6ee1c19e35..f7eaabcfda 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java @@ -27,6 +27,8 @@ import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.dao.entity.ProcessData; +import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; +import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.slf4j.Logger; @@ -465,6 +467,36 @@ public class DagHelper { return processDag; } + /** + * get process dag + * + * @param taskDefinitions task definition + * @return Process dag + */ + public static ProcessDag getProcessDag(List taskDefinitions, + List processTaskRelations) { + Map taskNodeMap = new HashMap<>(); + List taskNodeList = new ArrayList<>(); + for (TaskDefinition taskDefinition : taskDefinitions) { + TaskNode taskNode = JSONUtils.parseObject(JSONUtils.toJsonString(taskDefinition), TaskNode.class); + taskNodeMap.put(taskDefinition.getCode(), taskNode); + taskNodeList.add(taskNode); + } + + List taskNodeRelations = new ArrayList<>(); + for (ProcessTaskRelation processTaskRelation : processTaskRelations) { + if (processTaskRelation.getPreTaskCode() != 0) { + TaskNode preNode = taskNodeMap.get(processTaskRelation.getPreTaskCode()); + TaskNode postNode = taskNodeMap.get(processTaskRelation.getPostTaskCode()); + taskNodeRelations.add(new TaskNodeRelation(preNode.getName(), postNode.getName())); + } + } + ProcessDag processDag = new ProcessDag(); + processDag.setEdges(taskNodeRelations); + processDag.setNodes(taskNodeList); + return processDag; + } + /** * is there have conditions after the parent node * diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml index e719af6d24..b604a1d377 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml @@ -19,8 +19,8 @@ - id, `name`, process_definition_version, project_code, process_definition_code, pre_task_code, post_task_code, - condition_type, condition_params, operator, operate_time, create_time, update_time + id, `name`, process_definition_version, project_code, process_definition_code, pre_task_code, pre_task_version, + post_task_code, post_task_version, condition_type, condition_params, operator, operate_time, create_time, update_time select diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index e47a5ac567..31b2d143a5 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -42,8 +42,11 @@ import org.apache.dolphinscheduler.common.enums.TaskDependType; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.TimeoutFlag; import org.apache.dolphinscheduler.common.enums.WarningType; +import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.model.DateInterval; import org.apache.dolphinscheduler.common.model.TaskNode; +import org.apache.dolphinscheduler.common.model.TaskNodeRelation; +import org.apache.dolphinscheduler.common.process.ProcessDag; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.AbstractParameters; @@ -96,6 +99,7 @@ import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.TenantMapper; import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper; import org.apache.dolphinscheduler.dao.mapper.UserMapper; +import org.apache.dolphinscheduler.dao.utils.DagHelper; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.service.exceptions.ServiceException; import org.apache.dolphinscheduler.service.log.LogClientService; @@ -2384,4 +2388,32 @@ public class ProcessService { } return false; } + + /** + * Generate the DAG Graph based on the process definition id + * + * @param processDefinition process definition + * @return dag graph + */ + public DAG genDagGraph(ProcessDefinition processDefinition) { + + List taskRelationLogs = processTaskRelationLogMapper.queryByProcessCodeAndVersion( + processDefinition.getCode(), + processDefinition.getVersion()); + List processTaskRelations = new ArrayList<>(); + List taskDefinitions = new ArrayList<>(); + for (ProcessTaskRelationLog processTaskRelationLog : taskRelationLogs) { + processTaskRelations.add(JSONUtils.parseObject(JSONUtils.toJsonString(processTaskRelationLog), ProcessTaskRelation.class)); + + TaskDefinitionLog taskDefinitionLog = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion( + processTaskRelationLog.getPostTaskCode(), + processTaskRelationLog.getPostNodeVersion()); + taskDefinitions.add(JSONUtils.parseObject(JSONUtils.toJsonString(taskDefinitionLog), TaskDefinition.class)); + } + + ProcessDag processDag = DagHelper.getProcessDag(taskDefinitions, processTaskRelations); + // Generate concrete Dag to be executed + return DagHelper.buildDagGraph(processDag); + } + } diff --git a/sql/dolphinscheduler-postgre.sql b/sql/dolphinscheduler-postgre.sql index 04207899c9..6ffd638544 100644 --- a/sql/dolphinscheduler-postgre.sql +++ b/sql/dolphinscheduler-postgre.sql @@ -398,7 +398,9 @@ CREATE TABLE t_ds_process_task_relation ( project_code bigint DEFAULT NULL , process_definition_code bigint DEFAULT NULL , pre_task_code bigint DEFAULT NULL , + pre_task_version int DEFAULT 0 , post_task_code bigint DEFAULT NULL , + post_task_version int DEFAULT 0 , condition_type int DEFAULT NULL , condition_params text , create_time timestamp DEFAULT NULL , @@ -414,7 +416,9 @@ CREATE TABLE t_ds_process_task_relation_log ( project_code bigint DEFAULT NULL , process_definition_code bigint DEFAULT NULL , pre_task_code bigint DEFAULT NULL , + pre_task_version int DEFAULT 0 , post_task_code bigint DEFAULT NULL , + post_task_version int DEFAULT 0 , condition_type int DEFAULT NULL , condition_params text , operator int DEFAULT NULL , diff --git a/sql/dolphinscheduler_mysql.sql b/sql/dolphinscheduler_mysql.sql index f5d212454b..f77d61fbf4 100644 --- a/sql/dolphinscheduler_mysql.sql +++ b/sql/dolphinscheduler_mysql.sql @@ -513,7 +513,9 @@ CREATE TABLE `t_ds_process_task_relation` ( `project_code` bigint(20) NOT NULL COMMENT 'project code', `process_definition_code` bigint(20) NOT NULL COMMENT 'process code', `pre_task_code` bigint(20) NOT NULL COMMENT 'pre task code', + `pre_task_version` int(11) NOT NULL COMMENT 'pre task version', `post_task_code` bigint(20) NOT NULL COMMENT 'post task code', + `post_task_version` int(11) NOT NULL COMMENT 'post task version', `condition_type` tinyint(2) DEFAULT NULL COMMENT 'condition type : 0 none, 1 judge 2 delay', `condition_params` text COMMENT 'condition params(json)', `create_time` datetime NOT NULL COMMENT 'create time', @@ -532,7 +534,9 @@ CREATE TABLE `t_ds_process_task_relation_log` ( `project_code` bigint(20) NOT NULL COMMENT 'project code', `process_definition_code` bigint(20) NOT NULL COMMENT 'process code', `pre_task_code` bigint(20) NOT NULL COMMENT 'pre task code', + `pre_task_version` int(11) NOT NULL COMMENT 'pre task version', `post_task_code` bigint(20) NOT NULL COMMENT 'post task code', + `post_task_version` int(11) NOT NULL COMMENT 'post task version', `condition_type` tinyint(2) DEFAULT NULL COMMENT 'condition type : 0 none, 1 judge 2 delay', `condition_params` text COMMENT 'condition params(json)', `operator` int(11) DEFAULT NULL COMMENT 'operator user id',