Browse Source

[Feature][JsonSplit]refactor remove the json in process instance and definition (#4828)

* #4417 [JsonSplit] refactor process definition json

* [#4177] refactor json split. remove json in process definition and process instance

* [#4177] refactor json split. copy process definition need set task code 0L.

* code style

* code style

* code style

* code style

* update

* update
pull/3/MERGE
bao liang 3 years ago committed by GitHub
parent
commit
62e961e3f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  2. 1
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionVersionServiceImpl.java
  3. 6
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
  4. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
  5. 3
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
  6. 665
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java
  7. 52
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
  8. 17
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
  9. 118
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  10. 104
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

8
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -427,9 +427,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return result;
}
}
// get the processdefinitionjson before saving,and then save the name and taskid
String oldJson = processDefinition.getProcessDefinitionJson();
processDefinitionJson = processService.changeJson(processData, oldJson);
ProcessData newProcessData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
int saveResult = processService.saveProcessDefinition(loginUser, project, name, desc,
locations, connects, newProcessData, processDefinition);
@ -1452,6 +1449,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processId);
return result;
} else {
ProcessData processData = JSONUtils.parseObject(processDefinition.getProcessDefinitionJson(), ProcessData.class);
List<TaskNode> taskNodeList = processData.getTasks();
taskNodeList.stream().forEach(taskNode -> {
taskNode.setCode(0L);
});
return createProcessDefinition(
loginUser,
targetProject.getName(),

1
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionVersionServiceImpl.java

@ -71,7 +71,6 @@ public class ProcessDefinitionVersionServiceImpl extends BaseServiceImpl impleme
.newBuilder()
.processDefinitionId(processDefinition.getId())
.version(version)
.processDefinitionJson(processDefinition.getProcessDefinitionJson())
.description(processDefinition.getDescription())
.locations(processDefinition.getLocations())
.connects(processDefinition.getConnects())

6
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java

@ -457,11 +457,6 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
}
Tenant tenant = processService.getTenantForProcess(processData.getTenantId(),
processDefinition.getUserId());
// get the processinstancejson before saving,and then save the name and taskid
String oldJson = processInstance.getProcessInstanceJson();
if (StringUtils.isNotEmpty(oldJson)) {
processInstanceJson = processService.changeJson(processData, oldJson);
}
setProcessInstance(processInstance, tenant, scheduleTime, locations,
connects, processInstanceJson, processData);
int update = processService.updateProcessInstance(processInstance);
@ -530,7 +525,6 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
processInstance.setProcessInstanceJson(processInstanceJson);
processInstance.setGlobalParams(globalParams);
}
/**
* query parent process instance detail info by sub process instance id
*

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java

@ -41,7 +41,7 @@ import org.springframework.stereotype.Service;
* task definition service impl
*/
@Service
public class ProcessTaskRelationServiceImpl extends BaseService implements
public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements
ProcessTaskRelationService {
private static final Logger logger = LoggerFactory.getLogger(ProcessTaskRelationServiceImpl.class);

3
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java

@ -56,8 +56,7 @@ import org.springframework.transaction.annotation.Transactional;
* task definition service impl
*/
@Service
public class TaskDefinitionServiceImpl extends BaseService implements
TaskDefinitionService {
public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDefinitionService {
private static final Logger logger = LoggerFactory.getLogger(TaskDefinitionServiceImpl.class);

665
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java

@ -23,9 +23,11 @@ import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.*;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import java.io.IOException;
@ -35,330 +37,299 @@ import java.util.Objects;
public class TaskNode {
/**
* task node id
*/
private String id;
/**
* task node name
*/
private String name;
/**
* task node description
*/
private String desc;
/**
* task node type
*/
private String type;
/**
* the run flag has two states, NORMAL or FORBIDDEN
*/
private String runFlag;
/**
* the front field
*/
private String loc;
/**
* maximum number of retries
*/
private int maxRetryTimes;
/**
* Unit of retry interval: points
*/
private int retryInterval;
/**
* params information
*/
@JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class)
@JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
private String params;
/**
* inner dependency information
*/
@JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class)
@JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
private String preTasks;
/**
* users store additional information
*/
@JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class)
@JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
private String extras;
/**
* node dependency list
*/
private List<String> depList;
/**
* outer dependency information
*/
@JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class)
@JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
private String dependence;
@JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class)
@JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
private String conditionResult;
/**
* task instance priority
*/
private Priority taskInstancePriority;
/**
* worker group
*/
private String workerGroup;
/**
* worker group id
*/
private Integer workerGroupId;
/**
* task time out
*/
@JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class)
@JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
private String timeout;
/**
* task node id
*/
private String id;
/**
* task node code
*/
private Long code;
/**
* task node version
*/
private int version;
/**
* task node name
*/
private String name;
/**
* task node description
*/
private String desc;
/**
* task node type
*/
private String type;
/**
* the run flag has two states, NORMAL or FORBIDDEN
*/
private String runFlag;
/**
* the front field
*/
private String loc;
/**
* maximum number of retries
*/
private int maxRetryTimes;
/**
* Unit of retry interval: points
*/
private int retryInterval;
/**
* params information
*/
@JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class)
@JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
private String params;
/**
* inner dependency information
*/
@JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class)
@JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
private String preTasks;
/**
* users store additional information
*/
@JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class)
@JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
private String extras;
/**
* node dependency list
*/
private List<String> depList;
/**
* outer dependency information
*/
@JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class)
@JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
private String dependence;
@JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class)
@JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
private String conditionResult;
/**
* task instance priority
*/
private Priority taskInstancePriority;
/**
* worker group
*/
private String workerGroup;
/**
* worker group id
*/
private Integer workerGroupId;
/**
* task time out
*/
@JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class)
@JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
private String timeout;
/**
* delay execution time.
*/
private int delayTime;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getDesc() {
return desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getParams() {
return params;
}
public void setParams(String params) {
this.params = params;
}
public String getPreTasks() {
return preTasks;
}
public void setPreTasks(String preTasks) throws IOException {
this.preTasks = preTasks;
this.depList = JSONUtils.toList(preTasks, String.class);
}
public String getExtras() {
return extras;
}
public void setExtras(String extras) {
this.extras = extras;
}
public List<String> getDepList() {
return depList;
}
public void setDepList(List<String> depList) throws JsonProcessingException {
this.depList = depList;
this.preTasks = JSONUtils.toJsonString(depList);
}
public String getLoc() {
return loc;
}
public void setLoc(String loc) {
this.loc = loc;
}
public String getRunFlag(){
return runFlag;
}
public void setRunFlag(String runFlag) {
this.runFlag = runFlag;
}
public Boolean isForbidden(){
return (StringUtils.isNotEmpty(this.runFlag) &&
this.runFlag.equals(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN));
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TaskNode taskNode = (TaskNode) o;
return Objects.equals(name, taskNode.name) &&
Objects.equals(desc, taskNode.desc) &&
Objects.equals(type, taskNode.type) &&
Objects.equals(params, taskNode.params) &&
Objects.equals(preTasks, taskNode.preTasks) &&
Objects.equals(extras, taskNode.extras) &&
Objects.equals(runFlag, taskNode.runFlag) &&
Objects.equals(dependence, taskNode.dependence) &&
Objects.equals(workerGroup, taskNode.workerGroup) &&
Objects.equals(conditionResult, taskNode.conditionResult) &&
CollectionUtils.equalLists(depList, taskNode.depList);
}
@Override
public int hashCode() {
return Objects.hash(name, desc, type, params, preTasks, extras, depList, runFlag);
}
public String getDependence() {
return dependence;
}
public void setDependence(String dependence) {
this.dependence = dependence;
}
public int getMaxRetryTimes() {
return maxRetryTimes;
}
public void setMaxRetryTimes(int maxRetryTimes) {
this.maxRetryTimes = maxRetryTimes;
}
public int getRetryInterval() {
return retryInterval;
}
public void setRetryInterval(int retryInterval) {
this.retryInterval = retryInterval;
}
public Priority getTaskInstancePriority() {
return taskInstancePriority;
}
public void setTaskInstancePriority(Priority taskInstancePriority) {
this.taskInstancePriority = taskInstancePriority;
}
public String getTimeout() {
return timeout;
}
public void setTimeout(String timeout) {
this.timeout = timeout;
}
/**
* get task time out parameter
* @return task time out parameter
*/
public TaskTimeoutParameter getTaskTimeoutParameter() {
if(StringUtils.isNotEmpty(this.getTimeout())){
String formatStr = String.format("%s,%s", TaskTimeoutStrategy.WARN.name(), TaskTimeoutStrategy.FAILED.name());
String taskTimeout = this.getTimeout().replace(formatStr,TaskTimeoutStrategy.WARNFAILED.name());
return JSONUtils.parseObject(taskTimeout,TaskTimeoutParameter.class);
}
return new TaskTimeoutParameter(false);
}
public boolean isConditionsTask(){
return TaskType.CONDITIONS.toString().equalsIgnoreCase(this.getType());
}
@Override
public String toString() {
return "TaskNode{"
+ "id='" + id + '\''
+ ", name='" + name + '\''
+ ", desc='" + desc + '\''
+ ", type='" + type + '\''
+ ", runFlag='" + runFlag + '\''
+ ", loc='" + loc + '\''
+ ", maxRetryTimes=" + maxRetryTimes
+ ", retryInterval=" + retryInterval
+ ", params='" + params + '\''
+ ", preTasks='" + preTasks + '\''
+ ", extras='" + extras + '\''
+ ", depList=" + depList
+ ", dependence='" + dependence + '\''
+ ", taskInstancePriority=" + taskInstancePriority
+ ", timeout='" + timeout + '\''
+ ", workerGroup='" + workerGroup + '\''
+ ", delayTime=" + delayTime
+ '}';
}
public String getWorkerGroup() {
return workerGroup;
}
public void setWorkerGroup(String workerGroup) {
this.workerGroup = workerGroup;
}
public String getConditionResult() {
return conditionResult;
}
public void setConditionResult(String conditionResult) {
this.conditionResult = conditionResult;
}
public Integer getWorkerGroupId() {
return workerGroupId;
}
public void setWorkerGroupId(Integer workerGroupId) {
this.workerGroupId = workerGroupId;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getDesc() {
return desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getParams() {
return params;
}
public void setParams(String params) {
this.params = params;
}
public String getPreTasks() {
return preTasks;
}
public void setPreTasks(String preTasks) throws IOException {
this.preTasks = preTasks;
this.depList = JSONUtils.toList(preTasks, String.class);
}
public String getExtras() {
return extras;
}
public void setExtras(String extras) {
this.extras = extras;
}
public List<String> getDepList() {
return depList;
}
public void setDepList(List<String> depList) throws JsonProcessingException {
this.depList = depList;
this.preTasks = JSONUtils.toJsonString(depList);
}
public String getLoc() {
return loc;
}
public void setLoc(String loc) {
this.loc = loc;
}
public String getRunFlag() {
return runFlag;
}
public void setRunFlag(String runFlag) {
this.runFlag = runFlag;
}
public Boolean isForbidden() {
return (StringUtils.isNotEmpty(this.runFlag)
&& this.runFlag.equals(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN));
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TaskNode taskNode = (TaskNode) o;
return Objects.equals(name, taskNode.name)
&& Objects.equals(desc, taskNode.desc)
&& Objects.equals(type, taskNode.type)
&& Objects.equals(params, taskNode.params)
&& Objects.equals(preTasks, taskNode.preTasks)
&& Objects.equals(extras, taskNode.extras)
&& Objects.equals(runFlag, taskNode.runFlag)
&& Objects.equals(dependence, taskNode.dependence)
&& Objects.equals(workerGroup, taskNode.workerGroup)
&& Objects.equals(conditionResult, taskNode.conditionResult)
&& CollectionUtils.equalLists(depList, taskNode.depList);
}
@Override
public int hashCode() {
return Objects.hash(name, desc, type, params, preTasks, extras, depList, runFlag);
}
public String getDependence() {
return dependence;
}
public void setDependence(String dependence) {
this.dependence = dependence;
}
public int getMaxRetryTimes() {
return maxRetryTimes;
}
public void setMaxRetryTimes(int maxRetryTimes) {
this.maxRetryTimes = maxRetryTimes;
}
public int getRetryInterval() {
return retryInterval;
}
public void setRetryInterval(int retryInterval) {
this.retryInterval = retryInterval;
}
public Priority getTaskInstancePriority() {
return taskInstancePriority;
}
public void setTaskInstancePriority(Priority taskInstancePriority) {
this.taskInstancePriority = taskInstancePriority;
}
public String getTimeout() {
return timeout;
}
public void setTimeout(String timeout) {
this.timeout = timeout;
}
public String getWorkerGroup() {
return workerGroup;
}
public void setWorkerGroup(String workerGroup) {
this.workerGroup = workerGroup;
}
public String getConditionResult() {
return conditionResult;
}
public void setConditionResult(String conditionResult) {
this.conditionResult = conditionResult;
}
public Integer getWorkerGroupId() {
return workerGroupId;
}
public void setWorkerGroupId(Integer workerGroupId) {
this.workerGroupId = workerGroupId;
}
public int getDelayTime() {
return delayTime;
@ -367,4 +338,62 @@ public class TaskNode {
public void setDelayTime(int delayTime) {
this.delayTime = delayTime;
}
public Long getCode() {
return code;
}
public void setCode(Long code) {
this.code = code;
}
public int getVersion() {
return version;
}
public void setVersion(int version) {
this.version = version;
}
/**
* get task time out parameter
*
* @return task time out parameter
*/
public TaskTimeoutParameter getTaskTimeoutParameter() {
if (StringUtils.isNotEmpty(this.getTimeout())) {
String formatStr = String.format("%s,%s", TaskTimeoutStrategy.WARN.name(), TaskTimeoutStrategy.FAILED.name());
String taskTimeout = this.getTimeout().replace(formatStr, TaskTimeoutStrategy.WARNFAILED.name());
return JSONUtils.parseObject(taskTimeout, TaskTimeoutParameter.class);
}
return new TaskTimeoutParameter(false);
}
public boolean isConditionsTask() {
return TaskType.CONDITIONS.toString().equalsIgnoreCase(this.getType());
}
@Override
public String toString() {
return "TaskNode{"
+ "id='" + id + '\''
+ ", name='" + name + '\''
+ ", desc='" + desc + '\''
+ ", type='" + type + '\''
+ ", runFlag='" + runFlag + '\''
+ ", loc='" + loc + '\''
+ ", maxRetryTimes=" + maxRetryTimes
+ ", retryInterval=" + retryInterval
+ ", params='" + params + '\''
+ ", preTasks='" + preTasks + '\''
+ ", extras='" + extras + '\''
+ ", depList=" + depList
+ ", dependence='" + dependence + '\''
+ ", taskInstancePriority=" + taskInstancePriority
+ ", timeout='" + timeout + '\''
+ ", workerGroup='" + workerGroup + '\''
+ ", delayTime=" + delayTime
+ '}';
}
}

52
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java

@ -25,17 +25,13 @@ import org.apache.dolphinscheduler.common.process.ProcessDag;
import org.apache.dolphinscheduler.common.task.conditions.ConditionsParameters;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessData;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
/**
* dag tools
@ -193,24 +189,19 @@ public class DagHelper {
/**
* generate dag by start nodes and recovery nodes
*
* @param processDefinitionJson processDefinitionJson
* @param totalTaskNodeList totalTaskNodeList
* @param startNodeNameList startNodeNameList
* @param recoveryNodeNameList recoveryNodeNameList
* @param depNodeType depNodeType
* @return process dag
* @throws Exception if error throws Exception
*/
public static ProcessDag generateFlowDag(String processDefinitionJson,
public static ProcessDag generateFlowDag(List<TaskNode> totalTaskNodeList,
List<String> startNodeNameList,
List<String> recoveryNodeNameList,
TaskDependType depNodeType) throws Exception {
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
List<TaskNode> taskNodeList = new ArrayList<>();
if (null != processData) {
taskNodeList = processData.getTasks();
}
List<TaskNode> destTaskNodeList = generateFlowNodeListByStartNode(taskNodeList, startNodeNameList, recoveryNodeNameList, depNodeType);
List<TaskNode> destTaskNodeList = generateFlowNodeListByStartNode(totalTaskNodeList, startNodeNameList, recoveryNodeNameList, depNodeType);
if (destTaskNodeList.isEmpty()) {
return null;
}
@ -221,29 +212,6 @@ public class DagHelper {
return processDag;
}
/**
* parse the forbidden task nodes in process definition.
*
* @param processDefinitionJson processDefinitionJson
* @return task node map
*/
public static Map<String, TaskNode> getForbiddenTaskNodeMaps(String processDefinitionJson) {
Map<String, TaskNode> forbidTaskNodeMap = new ConcurrentHashMap<>();
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
List<TaskNode> taskNodeList = new ArrayList<>();
if (null != processData) {
taskNodeList = processData.getTasks();
}
for (TaskNode node : taskNodeList) {
if (node.isForbidden()) {
forbidTaskNodeMap.putIfAbsent(node.getName(), node);
}
}
return forbidTaskNodeMap;
}
/**
* find node by node name
*
@ -470,18 +438,16 @@ public class DagHelper {
/**
* get process dag
*
* @param taskDefinitions task definition
* @param taskNodeList task node list
* @return Process dag
*/
public static ProcessDag getProcessDag(List<TaskDefinition> taskDefinitions,
public static ProcessDag getProcessDag(List<TaskNode> taskNodeList,
List<ProcessTaskRelation> processTaskRelations) {
Map<Long, TaskNode> taskNodeMap = new HashMap<>();
List<TaskNode> taskNodeList = new ArrayList<>();
for (TaskDefinition taskDefinition : taskDefinitions) {
TaskNode taskNode = JSONUtils.parseObject(JSONUtils.toJsonString(taskDefinition), TaskNode.class);
taskNodeMap.put(taskDefinition.getCode(), taskNode);
taskNodeList.add(taskNode);
}
taskNodeList.stream().forEach(taskNode -> {
taskNodeMap.putIfAbsent(taskNode.getCode(), taskNode);
});
List<TaskNodeRelation> taskNodeRelations = new ArrayList<>();
for (ProcessTaskRelation processTaskRelation : processTaskRelations) {

17
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

@ -383,12 +383,17 @@ public class MasterExecThread implements Runnable {
*/
private void buildFlowDag() throws Exception {
recoverNodeIdList = getStartTaskInstanceList(processInstance.getCommandParam());
forbiddenTaskList = DagHelper.getForbiddenTaskNodeMaps(processInstance.getProcessInstanceJson());
List<TaskNode> taskNodeList = processService.getTaskNodeListByDefinitionId(processInstance.getProcessDefinitionId());
forbiddenTaskList.clear();
taskNodeList.stream().forEach(taskNode -> {
if (taskNode.isForbidden()) {
forbiddenTaskList.put(taskNode.getName(), taskNode);
}
});
// generate process to get DAG info
List<String> recoveryNameList = getRecoveryNodeNameList();
List<String> startNodeNameList = parseStartNodeName(processInstance.getCommandParam());
ProcessDag processDag = generateFlowDag(processInstance.getProcessInstanceJson(),
ProcessDag processDag = generateFlowDag(taskNodeList,
startNodeNameList, recoveryNameList, processInstance.getTaskDependType());
if (processDag == null) {
logger.error("processDag is null");
@ -1229,17 +1234,17 @@ public class MasterExecThread implements Runnable {
/**
* generate flow dag
*
* @param processDefinitionJson process definition json
* @param totalTaskNodeList total task node list
* @param startNodeNameList start node name list
* @param recoveryNodeNameList recovery node name list
* @param depNodeType depend node type
* @return ProcessDag process dag
* @throws Exception exception
*/
public ProcessDag generateFlowDag(String processDefinitionJson,
public ProcessDag generateFlowDag(List<TaskNode> totalTaskNodeList,
List<String> startNodeNameList,
List<String> recoveryNodeNameList,
TaskDependType depNodeType) throws Exception {
return DagHelper.generateFlowDag(processDefinitionJson, startNodeNameList, recoveryNodeNameList, depNodeType);
return DagHelper.generateFlowDag(totalTaskNodeList, startNodeNameList, recoveryNodeNameList, depNodeType);
}
}

118
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -341,20 +341,22 @@ public class ProcessService {
public List<TaskNode> getTaskNodeListByDefinitionId(Integer defineId) {
ProcessDefinition processDefinition = processDefineMapper.selectById(defineId);
if (processDefinition == null) {
logger.info("process define not exists");
return null;
}
String processDefinitionJson = processDefinition.getProcessDefinitionJson();
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
//process data check
if (null == processData) {
logger.error("process data is null");
logger.error("process define not exists");
return new ArrayList<>();
}
return processData.getTasks();
List<ProcessTaskRelation> processTaskRelations = getProcessTaskRelationList(processDefinition.getCode(), processDefinition.getVersion());
Map<Long, TaskDefinition> taskDefinitionMap = new HashMap<>();
for (ProcessTaskRelation processTaskRelation : processTaskRelations) {
if (taskDefinitionMap.containsKey(processTaskRelation.getPostTaskCode())) {
TaskDefinition taskDefinition = taskDefinitionMapper.queryByDefinitionCode(processTaskRelation.getPostTaskCode());
taskDefinitionMap.put(processTaskRelation.getPostTaskCode(), taskDefinition);
}
}
return taskDefinitionMap.entrySet()
.stream()
.map(e -> JSONUtils.parseObject(JSONUtils.toJsonString(e.getValue()), TaskNode.class))
.collect(Collectors.toList());
}
/**
@ -495,13 +497,7 @@ public class ProcessService {
* @param ids ids
*/
public void recurseFindSubProcessId(int parentId, List<Integer> ids) {
ProcessDefinition processDefinition = processDefineMapper.selectById(parentId);
String processDefinitionJson = processDefinition.getProcessDefinitionJson();
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
List<TaskNode> taskNodeList = processData.getTasks();
List<TaskNode> taskNodeList = this.getTaskNodeListByDefinitionId(parentId);
if (taskNodeList != null && !taskNodeList.isEmpty()) {
for (TaskNode taskNode : taskNodeList) {
@ -650,8 +646,6 @@ public class ProcessService {
getCommandTypeIfComplement(processInstance, command),
processInstance.getScheduleTime()));
//copy process define json to process instance
processInstance.setProcessInstanceJson(processDefinition.getProcessDefinitionJson());
// set process instance priority
processInstance.setProcessInstancePriority(command.getProcessInstancePriority());
String workerGroup = StringUtils.isBlank(command.getWorkerGroup()) ? Constants.DEFAULT_WORKER_GROUP : command.getWorkerGroup();
@ -2064,60 +2058,6 @@ public class ProcessService {
taskInstance.getId());
}
/**
* solve the branch rename bug
*
* @return String
*/
public String changeJson(ProcessData processData, String oldJson) {
ProcessData oldProcessData = JSONUtils.parseObject(oldJson, ProcessData.class);
HashMap<String, String> oldNameTaskId = new HashMap<>();
List<TaskNode> oldTasks = oldProcessData.getTasks();
for (int i = 0; i < oldTasks.size(); i++) {
TaskNode taskNode = oldTasks.get(i);
String oldName = taskNode.getName();
String oldId = taskNode.getId();
oldNameTaskId.put(oldName, oldId);
}
// take the processdefinitionjson saved this time, and then save the taskid and name
HashMap<String, String> newNameTaskId = new HashMap<>();
List<TaskNode> newTasks = processData.getTasks();
for (int i = 0; i < newTasks.size(); i++) {
TaskNode taskNode = newTasks.get(i);
String newId = taskNode.getId();
String newName = taskNode.getName();
newNameTaskId.put(newId, newName);
}
// replace the previous conditionresult with a new one
List<TaskNode> tasks = processData.getTasks();
for (int i = 0; i < tasks.size(); i++) {
TaskNode taskNode = newTasks.get(i);
String type = taskNode.getType();
if (TaskType.CONDITIONS.getDescp().equalsIgnoreCase(type)) {
ConditionsParameters conditionsParameters = JSONUtils.parseObject(taskNode.getConditionResult(), ConditionsParameters.class);
String oldSuccessNodeName = conditionsParameters.getSuccessNode().get(0);
String oldFailedNodeName = conditionsParameters.getFailedNode().get(0);
String newSuccessNodeName = newNameTaskId.get(oldNameTaskId.get(oldSuccessNodeName));
String newFailedNodeName = newNameTaskId.get(oldNameTaskId.get(oldFailedNodeName));
if (newSuccessNodeName != null) {
ArrayList<String> successNode = new ArrayList<>();
successNode.add(newSuccessNodeName);
conditionsParameters.setSuccessNode(successNode);
}
if (newFailedNodeName != null) {
ArrayList<String> failedNode = new ArrayList<>();
failedNode.add(newFailedNodeName);
conditionsParameters.setFailedNode(failedNode);
}
String conditionResultStr = conditionsParameters.getConditionResult();
taskNode.setConditionResult(conditionResultStr);
tasks.set(i, taskNode);
}
}
return JSONUtils.toJsonString(processData);
}
/**
* switch process definition version to process definition log version
@ -2391,24 +2331,30 @@ public class ProcessService {
* @return dag graph
*/
public DAG<String, TaskNode, TaskNodeRelation> genDagGraph(ProcessDefinition processDefinition) {
List<TaskNode> taskNodeList = this.getTaskNodeListByDefinitionId(processDefinition.getId());
List<ProcessTaskRelation> processTaskRelations = getProcessTaskRelationList(processDefinition.getCode(), processDefinition.getVersion());
ProcessDag processDag = DagHelper.getProcessDag(taskNodeList, processTaskRelations);
// Generate concrete Dag to be executed
return DagHelper.buildDagGraph(processDag);
}
/**
* get process task relation list
* this function can be query relation list from log record
*
* @param processCode
* @param processVersion
* @return
*/
public List<ProcessTaskRelation> getProcessTaskRelationList(Long processCode, int processVersion) {
List<ProcessTaskRelationLog> taskRelationLogs = processTaskRelationLogMapper.queryByProcessCodeAndVersion(
processDefinition.getCode(),
processDefinition.getVersion());
processCode,
processVersion);
List<ProcessTaskRelation> processTaskRelations = new ArrayList<>();
List<TaskDefinition> taskDefinitions = new ArrayList<>();
for (ProcessTaskRelationLog processTaskRelationLog : taskRelationLogs) {
processTaskRelations.add(JSONUtils.parseObject(JSONUtils.toJsonString(processTaskRelationLog), ProcessTaskRelation.class));
TaskDefinitionLog taskDefinitionLog = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(
processTaskRelationLog.getPostTaskCode(),
processTaskRelationLog.getPostNodeVersion());
taskDefinitions.add(JSONUtils.parseObject(JSONUtils.toJsonString(taskDefinitionLog), TaskDefinition.class));
}
ProcessDag processDag = DagHelper.getProcessDag(taskDefinitions, processTaskRelations);
// Generate concrete Dag to be executed
return DagHelper.buildDagGraph(processDag);
return processTaskRelations;
}
}

104
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

@ -338,108 +338,4 @@ public class ProcessServiceTest {
processService.recurseFindSubProcessId(parentId, ids);
}
@Test
public void testChangeJson() {
ProcessData oldProcessData = new ProcessData();
ConditionsParameters conditionsParameters = new ConditionsParameters();
ArrayList<TaskNode> tasks = new ArrayList<>();
TaskNode taskNode = new TaskNode();
TaskNode taskNode11 = new TaskNode();
TaskNode taskNode111 = new TaskNode();
ArrayList<String> successNode = new ArrayList<>();
ArrayList<String> faildNode = new ArrayList<>();
taskNode.setName("bbb");
taskNode.setType("SHELL");
taskNode.setId("222");
taskNode11.setName("vvv");
taskNode11.setType("CONDITIONS");
taskNode11.setId("444");
successNode.add("bbb");
faildNode.add("ccc");
taskNode111.setName("ccc");
taskNode111.setType("SHELL");
taskNode111.setId("333");
conditionsParameters.setSuccessNode(successNode);
conditionsParameters.setFailedNode(faildNode);
taskNode11.setConditionResult(conditionsParameters.getConditionResult());
tasks.add(taskNode);
tasks.add(taskNode11);
tasks.add(taskNode111);
oldProcessData.setTasks(tasks);
ProcessData newProcessData = new ProcessData();
ConditionsParameters conditionsParameters2 = new ConditionsParameters();
TaskNode taskNode2 = new TaskNode();
TaskNode taskNode22 = new TaskNode();
TaskNode taskNode222 = new TaskNode();
ArrayList<TaskNode> tasks2 = new ArrayList<>();
ArrayList<String> successNode2 = new ArrayList<>();
ArrayList<String> faildNode2 = new ArrayList<>();
taskNode2.setName("bbbchange");
taskNode2.setType("SHELL");
taskNode2.setId("222");
taskNode22.setName("vv");
taskNode22.setType("CONDITIONS");
taskNode22.setId("444");
successNode2.add("bbb");
faildNode2.add("ccc");
taskNode222.setName("ccc");
taskNode222.setType("SHELL");
taskNode222.setId("333");
conditionsParameters2.setSuccessNode(successNode2);
conditionsParameters2.setFailedNode(faildNode2);
taskNode22.setConditionResult(conditionsParameters2.getConditionResult());
tasks2.add(taskNode2);
tasks2.add(taskNode22);
tasks2.add(taskNode222);
newProcessData.setTasks(tasks2);
ProcessData exceptProcessData = new ProcessData();
ConditionsParameters conditionsParameters3 = new ConditionsParameters();
TaskNode taskNode3 = new TaskNode();
TaskNode taskNode33 = new TaskNode();
TaskNode taskNode333 = new TaskNode();
ArrayList<TaskNode> tasks3 = new ArrayList<>();
ArrayList<String> successNode3 = new ArrayList<>();
ArrayList<String> faildNode3 = new ArrayList<>();
taskNode3.setName("bbbchange");
taskNode3.setType("SHELL");
taskNode3.setId("222");
taskNode33.setName("vv");
taskNode33.setType("CONDITIONS");
taskNode33.setId("444");
successNode3.add("bbbchange");
faildNode3.add("ccc");
taskNode333.setName("ccc");
taskNode333.setType("SHELL");
taskNode333.setId("333");
conditionsParameters3.setSuccessNode(successNode3);
conditionsParameters3.setFailedNode(faildNode3);
taskNode33.setConditionResult(conditionsParameters3.getConditionResult());
tasks3.add(taskNode3);
tasks3.add(taskNode33);
tasks3.add(taskNode333);
exceptProcessData.setTasks(tasks3);
String expect = JSONUtils.toJsonString(exceptProcessData);
String oldJson = JSONUtils.toJsonString(oldProcessData);
Assert.assertEquals(expect, processService.changeJson(newProcessData, oldJson));
}
}

Loading…
Cancel
Save