From 62e961e3f1bab666781df982d2284720db434461 Mon Sep 17 00:00:00 2001 From: bao liang <29528966+lenboo@users.noreply.github.com> Date: Mon, 22 Feb 2021 10:13:32 +0800 Subject: [PATCH] [Feature][JsonSplit]refactor remove the json in process instance and definition (#4828) * #4417 [JsonSplit] refactor process definition json * [#4177] refactor json split. remove json in process definition and process instance * [#4177] refactor json split. copy process definition need set task code 0L. * code style * code style * code style * code style * update * update --- .../impl/ProcessDefinitionServiceImpl.java | 8 +- .../ProcessDefinitionVersionServiceImpl.java | 1 - .../impl/ProcessInstanceServiceImpl.java | 6 - .../impl/ProcessTaskRelationServiceImpl.java | 2 +- .../impl/TaskDefinitionServiceImpl.java | 3 +- .../common/model/TaskNode.java | 665 +++++++++--------- .../dolphinscheduler/dao/utils/DagHelper.java | 52 +- .../master/runner/MasterExecThread.java | 17 +- .../service/process/ProcessService.java | 118 +--- .../service/process/ProcessServiceTest.java | 104 --- 10 files changed, 406 insertions(+), 570 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index 2b9e7a31b2..f7ddf06672 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -427,9 +427,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro return result; } } - // get the processdefinitionjson before saving,and then save the name and taskid - String oldJson = processDefinition.getProcessDefinitionJson(); - processDefinitionJson = processService.changeJson(processData, oldJson); ProcessData newProcessData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); int saveResult = processService.saveProcessDefinition(loginUser, project, name, desc, locations, connects, newProcessData, processDefinition); @@ -1452,6 +1449,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processId); return result; } else { + ProcessData processData = JSONUtils.parseObject(processDefinition.getProcessDefinitionJson(), ProcessData.class); + List taskNodeList = processData.getTasks(); + taskNodeList.stream().forEach(taskNode -> { + taskNode.setCode(0L); + }); return createProcessDefinition( loginUser, targetProject.getName(), diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionVersionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionVersionServiceImpl.java index 64dcd8710d..0764810cd7 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionVersionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionVersionServiceImpl.java @@ -71,7 +71,6 @@ public class ProcessDefinitionVersionServiceImpl extends BaseServiceImpl impleme .newBuilder() .processDefinitionId(processDefinition.getId()) .version(version) - .processDefinitionJson(processDefinition.getProcessDefinitionJson()) .description(processDefinition.getDescription()) .locations(processDefinition.getLocations()) .connects(processDefinition.getConnects()) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java index aeb23df03c..905b24576b 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java @@ -457,11 +457,6 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce } Tenant tenant = processService.getTenantForProcess(processData.getTenantId(), processDefinition.getUserId()); - // get the processinstancejson before saving,and then save the name and taskid - String oldJson = processInstance.getProcessInstanceJson(); - if (StringUtils.isNotEmpty(oldJson)) { - processInstanceJson = processService.changeJson(processData, oldJson); - } setProcessInstance(processInstance, tenant, scheduleTime, locations, connects, processInstanceJson, processData); int update = processService.updateProcessInstance(processInstance); @@ -530,7 +525,6 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce processInstance.setProcessInstanceJson(processInstanceJson); processInstance.setGlobalParams(globalParams); } - /** * query parent process instance detail info by sub process instance id * diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java index 5ea01bd9d1..e57ef002ea 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java @@ -41,7 +41,7 @@ import org.springframework.stereotype.Service; * task definition service impl */ @Service -public class ProcessTaskRelationServiceImpl extends BaseService implements +public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements ProcessTaskRelationService { private static final Logger logger = LoggerFactory.getLogger(ProcessTaskRelationServiceImpl.class); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java index 8b1e6720ae..8e321339ae 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java @@ -56,8 +56,7 @@ import org.springframework.transaction.annotation.Transactional; * task definition service impl */ @Service -public class TaskDefinitionServiceImpl extends BaseService implements - TaskDefinitionService { +public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDefinitionService { private static final Logger logger = LoggerFactory.getLogger(TaskDefinitionServiceImpl.class); diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java index cd3e573b16..0072b93496 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java @@ -23,9 +23,11 @@ import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter; import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.*; + import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import com.fasterxml.jackson.databind.annotation.JsonSerialize; + import org.apache.dolphinscheduler.common.utils.StringUtils; import java.io.IOException; @@ -35,330 +37,299 @@ import java.util.Objects; public class TaskNode { - /** - * task node id - */ - private String id; - - /** - * task node name - */ - private String name; - - /** - * task node description - */ - private String desc; - - /** - * task node type - */ - private String type; - - /** - * the run flag has two states, NORMAL or FORBIDDEN - */ - private String runFlag; - - /** - * the front field - */ - private String loc; - - /** - * maximum number of retries - */ - private int maxRetryTimes; - - /** - * Unit of retry interval: points - */ - private int retryInterval; - - /** - * params information - */ - @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class) - @JsonSerialize(using = JSONUtils.JsonDataSerializer.class) - private String params; - - /** - * inner dependency information - */ - @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class) - @JsonSerialize(using = JSONUtils.JsonDataSerializer.class) - private String preTasks; - - /** - * users store additional information - */ - @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class) - @JsonSerialize(using = JSONUtils.JsonDataSerializer.class) - private String extras; - - /** - * node dependency list - */ - private List depList; - - /** - * outer dependency information - */ - @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class) - @JsonSerialize(using = JSONUtils.JsonDataSerializer.class) - private String dependence; - - - @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class) - @JsonSerialize(using = JSONUtils.JsonDataSerializer.class) - private String conditionResult; - - /** - * task instance priority - */ - private Priority taskInstancePriority; - - /** - * worker group - */ - private String workerGroup; - - /** - * worker group id - */ - private Integer workerGroupId; - - - /** - * task time out - */ - @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class) - @JsonSerialize(using = JSONUtils.JsonDataSerializer.class) - private String timeout; + /** + * task node id + */ + private String id; + + /** + * task node code + */ + private Long code; + + /** + * task node version + */ + private int version; + + /** + * task node name + */ + private String name; + + /** + * task node description + */ + private String desc; + + /** + * task node type + */ + private String type; + + /** + * the run flag has two states, NORMAL or FORBIDDEN + */ + private String runFlag; + + /** + * the front field + */ + private String loc; + + /** + * maximum number of retries + */ + private int maxRetryTimes; + + /** + * Unit of retry interval: points + */ + private int retryInterval; + + /** + * params information + */ + @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class) + @JsonSerialize(using = JSONUtils.JsonDataSerializer.class) + private String params; + + /** + * inner dependency information + */ + @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class) + @JsonSerialize(using = JSONUtils.JsonDataSerializer.class) + private String preTasks; + + /** + * users store additional information + */ + @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class) + @JsonSerialize(using = JSONUtils.JsonDataSerializer.class) + private String extras; + + /** + * node dependency list + */ + private List depList; + + /** + * outer dependency information + */ + @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class) + @JsonSerialize(using = JSONUtils.JsonDataSerializer.class) + private String dependence; + + + @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class) + @JsonSerialize(using = JSONUtils.JsonDataSerializer.class) + private String conditionResult; + + /** + * task instance priority + */ + private Priority taskInstancePriority; + + /** + * worker group + */ + private String workerGroup; + + /** + * worker group id + */ + private Integer workerGroupId; + + + /** + * task time out + */ + @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class) + @JsonSerialize(using = JSONUtils.JsonDataSerializer.class) + private String timeout; /** * delay execution time. */ private int delayTime; - public String getId() { - return id; - } - - public void setId(String id) { - this.id = id; - } - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public String getDesc() { - return desc; - } - - public void setDesc(String desc) { - this.desc = desc; - } - - public String getType() { - return type; - } - - public void setType(String type) { - this.type = type; - } - - public String getParams() { - return params; - } - - public void setParams(String params) { - this.params = params; - } - - public String getPreTasks() { - return preTasks; - } - - public void setPreTasks(String preTasks) throws IOException { - this.preTasks = preTasks; - this.depList = JSONUtils.toList(preTasks, String.class); - } - - public String getExtras() { - return extras; - } - - public void setExtras(String extras) { - this.extras = extras; - } - - public List getDepList() { - return depList; - } - - public void setDepList(List depList) throws JsonProcessingException { - this.depList = depList; - this.preTasks = JSONUtils.toJsonString(depList); - } - - public String getLoc() { - return loc; - } - - public void setLoc(String loc) { - this.loc = loc; - } - - public String getRunFlag(){ - return runFlag; - } - - public void setRunFlag(String runFlag) { - this.runFlag = runFlag; - } - - public Boolean isForbidden(){ - return (StringUtils.isNotEmpty(this.runFlag) && - this.runFlag.equals(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN)); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - TaskNode taskNode = (TaskNode) o; - return Objects.equals(name, taskNode.name) && - Objects.equals(desc, taskNode.desc) && - Objects.equals(type, taskNode.type) && - Objects.equals(params, taskNode.params) && - Objects.equals(preTasks, taskNode.preTasks) && - Objects.equals(extras, taskNode.extras) && - Objects.equals(runFlag, taskNode.runFlag) && - Objects.equals(dependence, taskNode.dependence) && - Objects.equals(workerGroup, taskNode.workerGroup) && - Objects.equals(conditionResult, taskNode.conditionResult) && - - CollectionUtils.equalLists(depList, taskNode.depList); - } - - @Override - public int hashCode() { - return Objects.hash(name, desc, type, params, preTasks, extras, depList, runFlag); - } - - public String getDependence() { - return dependence; - } - - public void setDependence(String dependence) { - this.dependence = dependence; - } - - public int getMaxRetryTimes() { - return maxRetryTimes; - } - - public void setMaxRetryTimes(int maxRetryTimes) { - this.maxRetryTimes = maxRetryTimes; - } - - public int getRetryInterval() { - return retryInterval; - } - - public void setRetryInterval(int retryInterval) { - this.retryInterval = retryInterval; - } - - public Priority getTaskInstancePriority() { - return taskInstancePriority; - } - - public void setTaskInstancePriority(Priority taskInstancePriority) { - this.taskInstancePriority = taskInstancePriority; - } - - public String getTimeout() { - return timeout; - } - - public void setTimeout(String timeout) { - this.timeout = timeout; - } - - /** - * get task time out parameter - * @return task time out parameter - */ - public TaskTimeoutParameter getTaskTimeoutParameter() { - if(StringUtils.isNotEmpty(this.getTimeout())){ - String formatStr = String.format("%s,%s", TaskTimeoutStrategy.WARN.name(), TaskTimeoutStrategy.FAILED.name()); - String taskTimeout = this.getTimeout().replace(formatStr,TaskTimeoutStrategy.WARNFAILED.name()); - return JSONUtils.parseObject(taskTimeout,TaskTimeoutParameter.class); - } - return new TaskTimeoutParameter(false); - } - - public boolean isConditionsTask(){ - return TaskType.CONDITIONS.toString().equalsIgnoreCase(this.getType()); - } - - @Override - public String toString() { - return "TaskNode{" - + "id='" + id + '\'' - + ", name='" + name + '\'' - + ", desc='" + desc + '\'' - + ", type='" + type + '\'' - + ", runFlag='" + runFlag + '\'' - + ", loc='" + loc + '\'' - + ", maxRetryTimes=" + maxRetryTimes - + ", retryInterval=" + retryInterval - + ", params='" + params + '\'' - + ", preTasks='" + preTasks + '\'' - + ", extras='" + extras + '\'' - + ", depList=" + depList - + ", dependence='" + dependence + '\'' - + ", taskInstancePriority=" + taskInstancePriority - + ", timeout='" + timeout + '\'' - + ", workerGroup='" + workerGroup + '\'' - + ", delayTime=" + delayTime - + '}'; - } - - public String getWorkerGroup() { - return workerGroup; - } - - public void setWorkerGroup(String workerGroup) { - this.workerGroup = workerGroup; - } - - public String getConditionResult() { - return conditionResult; - } - - public void setConditionResult(String conditionResult) { - this.conditionResult = conditionResult; - } - - public Integer getWorkerGroupId() { - return workerGroupId; - } - - public void setWorkerGroupId(Integer workerGroupId) { - this.workerGroupId = workerGroupId; - } + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getDesc() { + return desc; + } + + public void setDesc(String desc) { + this.desc = desc; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public String getParams() { + return params; + } + + public void setParams(String params) { + this.params = params; + } + + public String getPreTasks() { + return preTasks; + } + + public void setPreTasks(String preTasks) throws IOException { + this.preTasks = preTasks; + this.depList = JSONUtils.toList(preTasks, String.class); + } + + public String getExtras() { + return extras; + } + + public void setExtras(String extras) { + this.extras = extras; + } + + public List getDepList() { + return depList; + } + + public void setDepList(List depList) throws JsonProcessingException { + this.depList = depList; + this.preTasks = JSONUtils.toJsonString(depList); + } + + public String getLoc() { + return loc; + } + + public void setLoc(String loc) { + this.loc = loc; + } + + public String getRunFlag() { + return runFlag; + } + + public void setRunFlag(String runFlag) { + this.runFlag = runFlag; + } + + public Boolean isForbidden() { + return (StringUtils.isNotEmpty(this.runFlag) + && this.runFlag.equals(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN)); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TaskNode taskNode = (TaskNode) o; + return Objects.equals(name, taskNode.name) + && Objects.equals(desc, taskNode.desc) + && Objects.equals(type, taskNode.type) + && Objects.equals(params, taskNode.params) + && Objects.equals(preTasks, taskNode.preTasks) + && Objects.equals(extras, taskNode.extras) + && Objects.equals(runFlag, taskNode.runFlag) + && Objects.equals(dependence, taskNode.dependence) + && Objects.equals(workerGroup, taskNode.workerGroup) + && Objects.equals(conditionResult, taskNode.conditionResult) + && CollectionUtils.equalLists(depList, taskNode.depList); + } + + @Override + public int hashCode() { + return Objects.hash(name, desc, type, params, preTasks, extras, depList, runFlag); + } + + public String getDependence() { + return dependence; + } + + public void setDependence(String dependence) { + this.dependence = dependence; + } + + public int getMaxRetryTimes() { + return maxRetryTimes; + } + + public void setMaxRetryTimes(int maxRetryTimes) { + this.maxRetryTimes = maxRetryTimes; + } + + public int getRetryInterval() { + return retryInterval; + } + + public void setRetryInterval(int retryInterval) { + this.retryInterval = retryInterval; + } + + public Priority getTaskInstancePriority() { + return taskInstancePriority; + } + + public void setTaskInstancePriority(Priority taskInstancePriority) { + this.taskInstancePriority = taskInstancePriority; + } + + public String getTimeout() { + return timeout; + } + + public void setTimeout(String timeout) { + this.timeout = timeout; + } + + public String getWorkerGroup() { + return workerGroup; + } + + public void setWorkerGroup(String workerGroup) { + this.workerGroup = workerGroup; + } + + public String getConditionResult() { + return conditionResult; + } + + public void setConditionResult(String conditionResult) { + this.conditionResult = conditionResult; + } + + public Integer getWorkerGroupId() { + return workerGroupId; + } + + public void setWorkerGroupId(Integer workerGroupId) { + this.workerGroupId = workerGroupId; + } public int getDelayTime() { return delayTime; @@ -367,4 +338,62 @@ public class TaskNode { public void setDelayTime(int delayTime) { this.delayTime = delayTime; } + + public Long getCode() { + return code; + } + + public void setCode(Long code) { + this.code = code; + } + + public int getVersion() { + return version; + } + + public void setVersion(int version) { + this.version = version; + } + + /** + * get task time out parameter + * + * @return task time out parameter + */ + public TaskTimeoutParameter getTaskTimeoutParameter() { + if (StringUtils.isNotEmpty(this.getTimeout())) { + String formatStr = String.format("%s,%s", TaskTimeoutStrategy.WARN.name(), TaskTimeoutStrategy.FAILED.name()); + String taskTimeout = this.getTimeout().replace(formatStr, TaskTimeoutStrategy.WARNFAILED.name()); + return JSONUtils.parseObject(taskTimeout, TaskTimeoutParameter.class); + } + return new TaskTimeoutParameter(false); + } + + public boolean isConditionsTask() { + return TaskType.CONDITIONS.toString().equalsIgnoreCase(this.getType()); + } + + @Override + public String toString() { + return "TaskNode{" + + "id='" + id + '\'' + + ", name='" + name + '\'' + + ", desc='" + desc + '\'' + + ", type='" + type + '\'' + + ", runFlag='" + runFlag + '\'' + + ", loc='" + loc + '\'' + + ", maxRetryTimes=" + maxRetryTimes + + ", retryInterval=" + retryInterval + + ", params='" + params + '\'' + + ", preTasks='" + preTasks + '\'' + + ", extras='" + extras + '\'' + + ", depList=" + depList + + ", dependence='" + dependence + '\'' + + ", taskInstancePriority=" + taskInstancePriority + + ", timeout='" + timeout + '\'' + + ", workerGroup='" + workerGroup + '\'' + + ", delayTime=" + delayTime + + '}'; + } + } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java index f7eaabcfda..970fc47de8 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java @@ -25,17 +25,13 @@ import org.apache.dolphinscheduler.common.process.ProcessDag; import org.apache.dolphinscheduler.common.task.conditions.ConditionsParameters; import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.*; -import org.apache.dolphinscheduler.common.utils.StringUtils; -import org.apache.dolphinscheduler.dao.entity.ProcessData; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; -import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; /** * dag tools @@ -193,24 +189,19 @@ public class DagHelper { /** * generate dag by start nodes and recovery nodes * - * @param processDefinitionJson processDefinitionJson + * @param totalTaskNodeList totalTaskNodeList * @param startNodeNameList startNodeNameList * @param recoveryNodeNameList recoveryNodeNameList * @param depNodeType depNodeType * @return process dag * @throws Exception if error throws Exception */ - public static ProcessDag generateFlowDag(String processDefinitionJson, + public static ProcessDag generateFlowDag(List totalTaskNodeList, List startNodeNameList, List recoveryNodeNameList, TaskDependType depNodeType) throws Exception { - ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); - List taskNodeList = new ArrayList<>(); - if (null != processData) { - taskNodeList = processData.getTasks(); - } - List destTaskNodeList = generateFlowNodeListByStartNode(taskNodeList, startNodeNameList, recoveryNodeNameList, depNodeType); + List destTaskNodeList = generateFlowNodeListByStartNode(totalTaskNodeList, startNodeNameList, recoveryNodeNameList, depNodeType); if (destTaskNodeList.isEmpty()) { return null; } @@ -221,29 +212,6 @@ public class DagHelper { return processDag; } - /** - * parse the forbidden task nodes in process definition. - * - * @param processDefinitionJson processDefinitionJson - * @return task node map - */ - public static Map getForbiddenTaskNodeMaps(String processDefinitionJson) { - Map forbidTaskNodeMap = new ConcurrentHashMap<>(); - ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); - - List taskNodeList = new ArrayList<>(); - if (null != processData) { - taskNodeList = processData.getTasks(); - } - for (TaskNode node : taskNodeList) { - if (node.isForbidden()) { - forbidTaskNodeMap.putIfAbsent(node.getName(), node); - } - } - return forbidTaskNodeMap; - } - - /** * find node by node name * @@ -470,18 +438,16 @@ public class DagHelper { /** * get process dag * - * @param taskDefinitions task definition + * @param taskNodeList task node list * @return Process dag */ - public static ProcessDag getProcessDag(List taskDefinitions, + public static ProcessDag getProcessDag(List taskNodeList, List processTaskRelations) { Map taskNodeMap = new HashMap<>(); - List taskNodeList = new ArrayList<>(); - for (TaskDefinition taskDefinition : taskDefinitions) { - TaskNode taskNode = JSONUtils.parseObject(JSONUtils.toJsonString(taskDefinition), TaskNode.class); - taskNodeMap.put(taskDefinition.getCode(), taskNode); - taskNodeList.add(taskNode); - } + + taskNodeList.stream().forEach(taskNode -> { + taskNodeMap.putIfAbsent(taskNode.getCode(), taskNode); + }); List taskNodeRelations = new ArrayList<>(); for (ProcessTaskRelation processTaskRelation : processTaskRelations) { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index 3b113b6536..ff448423fa 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -383,12 +383,17 @@ public class MasterExecThread implements Runnable { */ private void buildFlowDag() throws Exception { recoverNodeIdList = getStartTaskInstanceList(processInstance.getCommandParam()); - - forbiddenTaskList = DagHelper.getForbiddenTaskNodeMaps(processInstance.getProcessInstanceJson()); + List taskNodeList = processService.getTaskNodeListByDefinitionId(processInstance.getProcessDefinitionId()); + forbiddenTaskList.clear(); + taskNodeList.stream().forEach(taskNode -> { + if (taskNode.isForbidden()) { + forbiddenTaskList.put(taskNode.getName(), taskNode); + } + }); // generate process to get DAG info List recoveryNameList = getRecoveryNodeNameList(); List startNodeNameList = parseStartNodeName(processInstance.getCommandParam()); - ProcessDag processDag = generateFlowDag(processInstance.getProcessInstanceJson(), + ProcessDag processDag = generateFlowDag(taskNodeList, startNodeNameList, recoveryNameList, processInstance.getTaskDependType()); if (processDag == null) { logger.error("processDag is null"); @@ -1229,17 +1234,17 @@ public class MasterExecThread implements Runnable { /** * generate flow dag * - * @param processDefinitionJson process definition json + * @param totalTaskNodeList total task node list * @param startNodeNameList start node name list * @param recoveryNodeNameList recovery node name list * @param depNodeType depend node type * @return ProcessDag process dag * @throws Exception exception */ - public ProcessDag generateFlowDag(String processDefinitionJson, + public ProcessDag generateFlowDag(List totalTaskNodeList, List startNodeNameList, List recoveryNodeNameList, TaskDependType depNodeType) throws Exception { - return DagHelper.generateFlowDag(processDefinitionJson, startNodeNameList, recoveryNodeNameList, depNodeType); + return DagHelper.generateFlowDag(totalTaskNodeList, startNodeNameList, recoveryNodeNameList, depNodeType); } } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 472f98aa5c..cc82844669 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -341,20 +341,22 @@ public class ProcessService { public List getTaskNodeListByDefinitionId(Integer defineId) { ProcessDefinition processDefinition = processDefineMapper.selectById(defineId); if (processDefinition == null) { - logger.info("process define not exists"); - return null; - } - - String processDefinitionJson = processDefinition.getProcessDefinitionJson(); - ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); - - //process data check - if (null == processData) { - logger.error("process data is null"); + logger.error("process define not exists"); return new ArrayList<>(); } - return processData.getTasks(); + List processTaskRelations = getProcessTaskRelationList(processDefinition.getCode(), processDefinition.getVersion()); + Map taskDefinitionMap = new HashMap<>(); + for (ProcessTaskRelation processTaskRelation : processTaskRelations) { + if (taskDefinitionMap.containsKey(processTaskRelation.getPostTaskCode())) { + TaskDefinition taskDefinition = taskDefinitionMapper.queryByDefinitionCode(processTaskRelation.getPostTaskCode()); + taskDefinitionMap.put(processTaskRelation.getPostTaskCode(), taskDefinition); + } + } + return taskDefinitionMap.entrySet() + .stream() + .map(e -> JSONUtils.parseObject(JSONUtils.toJsonString(e.getValue()), TaskNode.class)) + .collect(Collectors.toList()); } /** @@ -495,13 +497,7 @@ public class ProcessService { * @param ids ids */ public void recurseFindSubProcessId(int parentId, List ids) { - ProcessDefinition processDefinition = processDefineMapper.selectById(parentId); - String processDefinitionJson = processDefinition.getProcessDefinitionJson(); - - ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); - - List taskNodeList = processData.getTasks(); - + List taskNodeList = this.getTaskNodeListByDefinitionId(parentId); if (taskNodeList != null && !taskNodeList.isEmpty()) { for (TaskNode taskNode : taskNodeList) { @@ -650,8 +646,6 @@ public class ProcessService { getCommandTypeIfComplement(processInstance, command), processInstance.getScheduleTime())); - //copy process define json to process instance - processInstance.setProcessInstanceJson(processDefinition.getProcessDefinitionJson()); // set process instance priority processInstance.setProcessInstancePriority(command.getProcessInstancePriority()); String workerGroup = StringUtils.isBlank(command.getWorkerGroup()) ? Constants.DEFAULT_WORKER_GROUP : command.getWorkerGroup(); @@ -2064,60 +2058,6 @@ public class ProcessService { taskInstance.getId()); } - /** - * solve the branch rename bug - * - * @return String - */ - public String changeJson(ProcessData processData, String oldJson) { - ProcessData oldProcessData = JSONUtils.parseObject(oldJson, ProcessData.class); - HashMap oldNameTaskId = new HashMap<>(); - List oldTasks = oldProcessData.getTasks(); - for (int i = 0; i < oldTasks.size(); i++) { - TaskNode taskNode = oldTasks.get(i); - String oldName = taskNode.getName(); - String oldId = taskNode.getId(); - oldNameTaskId.put(oldName, oldId); - } - - // take the processdefinitionjson saved this time, and then save the taskid and name - HashMap newNameTaskId = new HashMap<>(); - List newTasks = processData.getTasks(); - for (int i = 0; i < newTasks.size(); i++) { - TaskNode taskNode = newTasks.get(i); - String newId = taskNode.getId(); - String newName = taskNode.getName(); - newNameTaskId.put(newId, newName); - } - - // replace the previous conditionresult with a new one - List tasks = processData.getTasks(); - for (int i = 0; i < tasks.size(); i++) { - TaskNode taskNode = newTasks.get(i); - String type = taskNode.getType(); - if (TaskType.CONDITIONS.getDescp().equalsIgnoreCase(type)) { - ConditionsParameters conditionsParameters = JSONUtils.parseObject(taskNode.getConditionResult(), ConditionsParameters.class); - String oldSuccessNodeName = conditionsParameters.getSuccessNode().get(0); - String oldFailedNodeName = conditionsParameters.getFailedNode().get(0); - String newSuccessNodeName = newNameTaskId.get(oldNameTaskId.get(oldSuccessNodeName)); - String newFailedNodeName = newNameTaskId.get(oldNameTaskId.get(oldFailedNodeName)); - if (newSuccessNodeName != null) { - ArrayList successNode = new ArrayList<>(); - successNode.add(newSuccessNodeName); - conditionsParameters.setSuccessNode(successNode); - } - if (newFailedNodeName != null) { - ArrayList failedNode = new ArrayList<>(); - failedNode.add(newFailedNodeName); - conditionsParameters.setFailedNode(failedNode); - } - String conditionResultStr = conditionsParameters.getConditionResult(); - taskNode.setConditionResult(conditionResultStr); - tasks.set(i, taskNode); - } - } - return JSONUtils.toJsonString(processData); - } /** * switch process definition version to process definition log version @@ -2391,24 +2331,30 @@ public class ProcessService { * @return dag graph */ public DAG genDagGraph(ProcessDefinition processDefinition) { + List taskNodeList = this.getTaskNodeListByDefinitionId(processDefinition.getId()); + List processTaskRelations = getProcessTaskRelationList(processDefinition.getCode(), processDefinition.getVersion()); + ProcessDag processDag = DagHelper.getProcessDag(taskNodeList, processTaskRelations); + // Generate concrete Dag to be executed + return DagHelper.buildDagGraph(processDag); + } + /** + * get process task relation list + * this function can be query relation list from log record + * + * @param processCode + * @param processVersion + * @return + */ + public List getProcessTaskRelationList(Long processCode, int processVersion) { List taskRelationLogs = processTaskRelationLogMapper.queryByProcessCodeAndVersion( - processDefinition.getCode(), - processDefinition.getVersion()); + processCode, + processVersion); List processTaskRelations = new ArrayList<>(); - List taskDefinitions = new ArrayList<>(); for (ProcessTaskRelationLog processTaskRelationLog : taskRelationLogs) { processTaskRelations.add(JSONUtils.parseObject(JSONUtils.toJsonString(processTaskRelationLog), ProcessTaskRelation.class)); - - TaskDefinitionLog taskDefinitionLog = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion( - processTaskRelationLog.getPostTaskCode(), - processTaskRelationLog.getPostNodeVersion()); - taskDefinitions.add(JSONUtils.parseObject(JSONUtils.toJsonString(taskDefinitionLog), TaskDefinition.class)); } - - ProcessDag processDag = DagHelper.getProcessDag(taskDefinitions, processTaskRelations); - // Generate concrete Dag to be executed - return DagHelper.buildDagGraph(processDag); + return processTaskRelations; } } diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java index b6d518c9b4..cfa8caa85c 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java @@ -338,108 +338,4 @@ public class ProcessServiceTest { processService.recurseFindSubProcessId(parentId, ids); } - - @Test - public void testChangeJson() { - - ProcessData oldProcessData = new ProcessData(); - ConditionsParameters conditionsParameters = new ConditionsParameters(); - ArrayList tasks = new ArrayList<>(); - TaskNode taskNode = new TaskNode(); - TaskNode taskNode11 = new TaskNode(); - TaskNode taskNode111 = new TaskNode(); - ArrayList successNode = new ArrayList<>(); - ArrayList faildNode = new ArrayList<>(); - - taskNode.setName("bbb"); - taskNode.setType("SHELL"); - taskNode.setId("222"); - - taskNode11.setName("vvv"); - taskNode11.setType("CONDITIONS"); - taskNode11.setId("444"); - successNode.add("bbb"); - faildNode.add("ccc"); - - taskNode111.setName("ccc"); - taskNode111.setType("SHELL"); - taskNode111.setId("333"); - - conditionsParameters.setSuccessNode(successNode); - conditionsParameters.setFailedNode(faildNode); - taskNode11.setConditionResult(conditionsParameters.getConditionResult()); - tasks.add(taskNode); - tasks.add(taskNode11); - tasks.add(taskNode111); - oldProcessData.setTasks(tasks); - - ProcessData newProcessData = new ProcessData(); - ConditionsParameters conditionsParameters2 = new ConditionsParameters(); - TaskNode taskNode2 = new TaskNode(); - TaskNode taskNode22 = new TaskNode(); - TaskNode taskNode222 = new TaskNode(); - ArrayList tasks2 = new ArrayList<>(); - ArrayList successNode2 = new ArrayList<>(); - ArrayList faildNode2 = new ArrayList<>(); - - taskNode2.setName("bbbchange"); - taskNode2.setType("SHELL"); - taskNode2.setId("222"); - - taskNode22.setName("vv"); - taskNode22.setType("CONDITIONS"); - taskNode22.setId("444"); - successNode2.add("bbb"); - faildNode2.add("ccc"); - - taskNode222.setName("ccc"); - taskNode222.setType("SHELL"); - taskNode222.setId("333"); - - conditionsParameters2.setSuccessNode(successNode2); - conditionsParameters2.setFailedNode(faildNode2); - taskNode22.setConditionResult(conditionsParameters2.getConditionResult()); - tasks2.add(taskNode2); - tasks2.add(taskNode22); - tasks2.add(taskNode222); - - newProcessData.setTasks(tasks2); - - ProcessData exceptProcessData = new ProcessData(); - ConditionsParameters conditionsParameters3 = new ConditionsParameters(); - TaskNode taskNode3 = new TaskNode(); - TaskNode taskNode33 = new TaskNode(); - TaskNode taskNode333 = new TaskNode(); - ArrayList tasks3 = new ArrayList<>(); - ArrayList successNode3 = new ArrayList<>(); - ArrayList faildNode3 = new ArrayList<>(); - - taskNode3.setName("bbbchange"); - taskNode3.setType("SHELL"); - taskNode3.setId("222"); - - taskNode33.setName("vv"); - taskNode33.setType("CONDITIONS"); - taskNode33.setId("444"); - successNode3.add("bbbchange"); - faildNode3.add("ccc"); - - taskNode333.setName("ccc"); - taskNode333.setType("SHELL"); - taskNode333.setId("333"); - - conditionsParameters3.setSuccessNode(successNode3); - conditionsParameters3.setFailedNode(faildNode3); - taskNode33.setConditionResult(conditionsParameters3.getConditionResult()); - tasks3.add(taskNode3); - tasks3.add(taskNode33); - tasks3.add(taskNode333); - exceptProcessData.setTasks(tasks3); - - String expect = JSONUtils.toJsonString(exceptProcessData); - String oldJson = JSONUtils.toJsonString(oldProcessData); - - Assert.assertEquals(expect, processService.changeJson(newProcessData, oldJson)); - - } }