Browse Source

check has cycle of ProcessDefinition (#5944)

Co-authored-by: JinyLeeChina <297062848@qq.com>
2.0.7-release
JinyLeeChina 3 years ago committed by GitHub
parent
commit
316b919d4f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 9
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
  2. 24
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  3. 61
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

9
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java

@ -324,14 +324,5 @@ public interface ProcessDefinitionService {
long projectCode,
int processDefinitionId,
long version);
/**
* check has associated process definition
*
* @param processDefinitionId process definition id
* @param version version
* @return The query result has a specific process definition return true
*/
boolean checkHasAssociatedProcessDefinition(int processDefinitionId, long version);
}

24
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -255,12 +255,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return result;
}
// TODO check has cycle
// if (graphHasCycle(taskRelationList)) {
// logger.error("process DAG has cycle");
// putMsg(result, Status.PROCESS_NODE_HAS_CYCLE);
// return result;
// }
if (graphHasCycle(processService.transformTask(taskRelationList))) {
logger.error("process DAG has cycle");
putMsg(result, Status.PROCESS_NODE_HAS_CYCLE);
return result;
}
// check whether the task relation json is normal
for (ProcessTaskRelationLog processTaskRelationLog : taskRelationList) {
@ -1323,19 +1322,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
}
}
/**
* check has associated process definition
*
* @param processDefinitionId process definition id
* @param version version
* @return The query result has a specific process definition return true
*/
@Override
public boolean checkHasAssociatedProcessDefinition(int processDefinitionId, long version) {
Integer hasAssociatedDefinitionId = processDefinitionMapper.queryHasAssociatedDefinitionByIdAndVersion(processDefinitionId, version);
return Objects.nonNull(hasAssociatedDefinitionId);
}
/**
* query the pagination versions info by one certain process definition code
*

61
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -872,8 +872,6 @@ public class ProcessService {
* If it is a fault-tolerant command, get the specified version of ProcessDefinition through ProcessInstance
* Otherwise, get the latest version of ProcessDefinition
*
* @param processDefinitionCode
* @param cmdParam
* @return ProcessDefinition
*/
private ProcessDefinition getProcessDefinitionByCommand(long processDefinitionCode, Map<String, String> cmdParam) {
@ -1632,7 +1630,6 @@ public class ProcessService {
/**
* for show in page of taskInstance
* @param taskInstance
*/
public void changeOutParam(TaskInstance taskInstance) {
if (StringUtils.isEmpty(taskInstance.getVarPool())) {
@ -2389,6 +2386,10 @@ public class ProcessService {
*/
public DagData genDagData(ProcessDefinition processDefinition) {
List<ProcessTaskRelationLog> processTaskRelations = processTaskRelationLogMapper.queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion());
return new DagData(processDefinition, processTaskRelations, genTaskDefineList(processTaskRelations));
}
private List<TaskDefinitionLog> genTaskDefineList(List<ProcessTaskRelationLog> processTaskRelations) {
Set<TaskDefinition> taskDefinitionSet = new HashSet<>();
for (ProcessTaskRelationLog processTaskRelation : processTaskRelations) {
if (processTaskRelation.getPreTaskCode() > 0) {
@ -2398,8 +2399,7 @@ public class ProcessService {
taskDefinitionSet.add(new TaskDefinition(processTaskRelation.getPostTaskCode(), processTaskRelation.getPostTaskVersion()));
}
}
List<TaskDefinitionLog> taskDefinitionLogs = taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionSet);
return new DagData(processDefinition, processTaskRelations, taskDefinitionLogs);
return taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionSet);
}
/**
@ -2532,4 +2532,55 @@ public class ProcessService {
List<Resource> relationResources = CollectionUtils.isNotEmpty(relationResourceIds) ? resourceMapper.queryResourceListById(relationResourceIds) : new ArrayList<>();
ownResources.addAll(relationResources);
}
/**
* Use temporarily before refactoring taskNode
*/
public List<TaskNode> transformTask(List<ProcessTaskRelationLog> taskRelationList) {
Map<Long, List<Long>> taskCodeMap = new HashMap<>();
for (ProcessTaskRelationLog processTaskRelation : taskRelationList) {
taskCodeMap.compute(processTaskRelation.getPostTaskCode(), (k, v) -> {
if (v == null) {
v = new ArrayList<>();
}
if (processTaskRelation.getPreTaskCode() != 0L) {
v.add(processTaskRelation.getPreTaskCode());
}
return v;
});
}
List<TaskDefinitionLog> taskDefinitionLogs = genTaskDefineList(taskRelationList);
Map<Long, TaskDefinitionLog> taskDefinitionLogMap = taskDefinitionLogs.stream()
.collect(Collectors.toMap(TaskDefinitionLog::getCode, taskDefinitionLog -> taskDefinitionLog));
List<TaskNode> taskNodeList = new ArrayList<>();
for (Entry<Long, List<Long>> code : taskCodeMap.entrySet()) {
TaskDefinitionLog taskDefinitionLog = taskDefinitionLogMap.get(code.getKey());
if (taskDefinitionLog != null) {
TaskNode taskNode = new TaskNode();
taskNode.setCode(taskDefinitionLog.getCode());
taskNode.setVersion(taskDefinitionLog.getVersion());
taskNode.setName(taskDefinitionLog.getName());
taskNode.setDesc(taskDefinitionLog.getDescription());
taskNode.setType(taskDefinitionLog.getTaskType().toUpperCase());
taskNode.setRunFlag(taskDefinitionLog.getFlag() == Flag.YES ? Constants.FLOWNODE_RUN_FLAG_NORMAL : Constants.FLOWNODE_RUN_FLAG_FORBIDDEN);
taskNode.setMaxRetryTimes(taskDefinitionLog.getFailRetryTimes());
taskNode.setRetryInterval(taskDefinitionLog.getFailRetryInterval());
Map<String, Object> taskParamsMap = taskNode.taskParamsToJsonObj(taskDefinitionLog.getTaskParams());
taskNode.setConditionResult((String) taskParamsMap.get(Constants.CONDITION_RESULT));
taskNode.setDependence((String) taskParamsMap.get(Constants.DEPENDENCE));
taskParamsMap.remove(Constants.CONDITION_RESULT);
taskParamsMap.remove(Constants.DEPENDENCE);
taskNode.setParams(JSONUtils.toJsonString(taskParamsMap));
taskNode.setTaskInstancePriority(taskDefinitionLog.getTaskPriority());
taskNode.setWorkerGroup(taskDefinitionLog.getWorkerGroup());
taskNode.setTimeout(JSONUtils.toJsonString(new TaskTimeoutParameter(taskDefinitionLog.getTimeoutFlag() == TimeoutFlag.OPEN,
taskDefinitionLog.getTimeoutNotifyStrategy(),
taskDefinitionLog.getTimeout())));
taskNode.setDelayTime(taskDefinitionLog.getDelayTime());
taskNode.setPreTasks(JSONUtils.toJsonString(code.getValue().stream().map(taskDefinitionLogMap::get).map(TaskDefinition::getName).collect(Collectors.toList())));
taskNodeList.add(taskNode);
}
}
return taskNodeList;
}
}

Loading…
Cancel
Save