Browse Source

[Feature][JsonSplit] modify checkDAGRing and ProcessService method (#4931)

* modify checkDAGRing and ProcessService method

* merge

* modify dagRing

Co-authored-by: JinyLeeChina <297062848@qq.com>
pull/3/MERGE
JinyLeeChina 4 years ago committed by GitHub
parent
commit
779596cf05
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 16
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  2. 76
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

16
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -61,7 +61,6 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
@ -144,9 +143,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
@Autowired
private ProcessTaskRelationMapper processTaskRelationMapper;
@Autowired
private ProcessTaskRelationLogMapper processTaskRelationLogMapper;
@Autowired
TaskDefinitionLogMapper taskDefinitionLogMapper;
@ -190,9 +186,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return checkProcessJson;
}
Long processDefinitionCode;
try {
processDefinitionCode = SnowFlakeUtils.getInstance().nextId();
long processDefinitionCode = SnowFlakeUtils.getInstance().nextId();
processDefinition.setCode(processDefinitionCode);
} catch (SnowFlakeException e) {
putMsg(result, Status.CREATE_PROCESS_DEFINITION);
@ -1346,7 +1341,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
}
runningNodeMap.remove(nodeName);
}
if (waitingRunningNodeMap == null || waitingRunningNodeMap.size() == 0) {
if (waitingRunningNodeMap.size() == 0) {
break;
} else {
runningNodeMap.putAll(waitingRunningNodeMap);
@ -1359,7 +1354,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return result;
}
/**
* whether the graph has a ring
*
@ -1368,15 +1362,12 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
*/
private boolean graphHasCycle(List<TaskNode> taskNodeResponseList) {
DAG<String, TaskNode, String> graph = new DAG<>();
// Fill the vertices
for (TaskNode taskNodeResponse : taskNodeResponseList) {
graph.addNode(taskNodeResponse.getName(), taskNodeResponse);
}
// Fill edge relations
for (TaskNode taskNodeResponse : taskNodeResponseList) {
taskNodeResponse.getPreTasks();
List<String> preTasks = JSONUtils.toList(taskNodeResponse.getPreTasks(), String.class);
if (CollectionUtils.isNotEmpty(preTasks)) {
for (String preTask : preTasks) {
@ -1386,7 +1377,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
}
}
}
return graph.hasCycle();
}
@ -1634,7 +1624,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
* @param processDefinitionId processDefinitionId
*/
private void setFailedProcessList(List<String> failedProcessList, String processDefinitionId) {
ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineId(Integer.valueOf(processDefinitionId));
ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineId(Integer.parseInt(processDefinitionId));
if (processDefinition != null) {
failedProcessList.add(processDefinitionId + "[" + processDefinition.getName() + "]");
} else {

76
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -350,17 +350,15 @@ public class ProcessService {
logger.error("process define not exists");
return new ArrayList<>();
}
List<ProcessTaskRelation> processTaskRelations = getProcessTaskRelationList(processDefinition.getCode(), processDefinition.getVersion());
Map<Long, TaskDefinition> taskDefinitionMap = new HashMap<>();
for (ProcessTaskRelation processTaskRelation : processTaskRelations) {
if (taskDefinitionMap.containsKey(processTaskRelation.getPostTaskCode())) {
TaskDefinition taskDefinition = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(
processTaskRelation.getPostTaskCode(), processTaskRelation.getPostNodeVersion());
taskDefinitionMap.put(processTaskRelation.getPostTaskCode(), taskDefinition);
List<ProcessTaskRelationLog> processTaskRelations = processTaskRelationLogMapper.queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion());
Set<TaskDefinition> taskDefinitionSet = new HashSet<>();
for (ProcessTaskRelationLog processTaskRelation : processTaskRelations) {
if (processTaskRelation.getPostTaskCode() > 0) {
taskDefinitionSet.add(new TaskDefinition(processTaskRelation.getPostTaskCode(), processTaskRelation.getPostNodeVersion()));
}
}
return new ArrayList<>(taskDefinitionMap.values());
List<TaskDefinitionLog> taskDefinitionLogs = taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionSet);
return new ArrayList<>(taskDefinitionLogs);
}
/**
@ -614,7 +612,7 @@ public class ProcessService {
processInstance.setConnects(processDefinition.getConnects());
// reset global params while there are start parameters
setGlobalParamIfCommanded(processDefinition,cmdParam);
setGlobalParamIfCommanded(processDefinition, cmdParam);
// curing global params
processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
@ -753,10 +751,10 @@ public class ProcessService {
// Recalculate global parameters after rerun.
processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
processDefinition.getGlobalParamMap(),
processDefinition.getGlobalParamList(),
commandTypeIfComplement,
processInstance.getScheduleTime()));
processDefinition.getGlobalParamMap(),
processDefinition.getGlobalParamList(),
commandTypeIfComplement,
processInstance.getScheduleTime()));
}
processDefinition = processDefineMapper.selectById(processInstance.getProcessDefinitionId());
processInstance.setProcessDefinition(processDefinition);
@ -2090,13 +2088,13 @@ public class ProcessService {
}
/**
* get resource by resoruce id
* get resource by resource id
*
* @param resoruceId resource id
* @param resourceId resource id
* @return Resource
*/
public Resource getResourceById(int resoruceId) {
return resourceMapper.selectById(resoruceId);
public Resource getResourceById(int resourceId) {
return resourceMapper.selectById(resourceId);
}
/**
@ -2134,16 +2132,15 @@ public class ProcessService {
return Constants.EXIT_CODE_FAILURE;
}
ProcessDefinition tmpDefinition = processDefinitionLog;
tmpDefinition.setId(processDefinition.getId());
tmpDefinition.setReleaseState(ReleaseState.OFFLINE);
tmpDefinition.setFlag(Flag.YES);
processDefinitionLog.setId(processDefinition.getId());
processDefinitionLog.setReleaseState(ReleaseState.OFFLINE);
processDefinitionLog.setFlag(Flag.YES);
int switchResult = 0;
if (0 == processDefinition.getId()) {
switchResult = processDefineMapper.insert(tmpDefinition);
switchResult = processDefineMapper.insert(processDefinitionLog);
} else {
switchResult = processDefineMapper.updateById(tmpDefinition);
switchResult = processDefineMapper.updateById(processDefinitionLog);
}
switchProcessTaskRelationVersion(processDefinition);
return switchResult;
@ -2156,8 +2153,7 @@ public class ProcessService {
}
List<ProcessTaskRelationLog> processTaskRelationLogList = processTaskRelationLogMapper.queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion());
for (ProcessTaskRelationLog processTaskRelationLog : processTaskRelationLogList) {
ProcessTaskRelation processTaskRelation = processTaskRelationLog;
processTaskRelationMapper.insert(processTaskRelation);
processTaskRelationMapper.insert(processTaskRelationLog);
}
}
@ -2327,14 +2323,12 @@ public class ProcessService {
now));
}
} else {
// todo relation name
builderRelationList.add(new ProcessTaskRelation("",
processDefinition.getVersion(),
projectCode,
processDefinition.getCode(),
0L,
taskNameAndCode.get(taskNode.getName()),
// todo conditionType
ConditionType.of("none"),
taskNode.getConditionResult(),
now,
@ -2394,33 +2388,17 @@ public class ProcessService {
*/
public DAG<String, TaskNode, TaskNodeRelation> genDagGraph(ProcessDefinition processDefinition) {
List<TaskNode> taskNodeList = genTaskNodeList(processDefinition.getCode(), processDefinition.getVersion());
List<ProcessTaskRelation> processTaskRelations = getProcessTaskRelationList(processDefinition.getCode(), processDefinition.getVersion());
ProcessDag processDag = DagHelper.getProcessDag(taskNodeList, processTaskRelations);
List<ProcessTaskRelationLog> processTaskRelations = processTaskRelationLogMapper.queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion());
ProcessDag processDag = DagHelper.getProcessDag(taskNodeList, new ArrayList<>(processTaskRelations));
// Generate concrete Dag to be executed
return DagHelper.buildDagGraph(processDag);
}
/**
* get process task relation list
* this function can be query relation list from log record
*/
public List<ProcessTaskRelation> getProcessTaskRelationList(Long processCode, int processVersion) {
List<ProcessTaskRelationLog> taskRelationLogs = processTaskRelationLogMapper.queryByProcessCodeAndVersion(
processCode,
processVersion);
List<ProcessTaskRelation> processTaskRelations = new ArrayList<>();
for (ProcessTaskRelationLog processTaskRelationLog : taskRelationLogs) {
processTaskRelations.add(processTaskRelationLog);
}
return processTaskRelations;
}
/**
* generate ProcessData
*/
public ProcessData genProcessData(ProcessDefinition processDefinition) {
List<TaskNode> taskNodes = genTaskNodeList(processDefinition.getCode()
, processDefinition.getVersion());
List<TaskNode> taskNodes = genTaskNodeList(processDefinition.getCode(), processDefinition.getVersion());
ProcessData processData = new ProcessData();
processData.setTasks(taskNodes);
processData.setGlobalParams(JSONUtils.toList(processDefinition.getGlobalParams(), Property.class));
@ -2430,10 +2408,10 @@ public class ProcessService {
}
public List<TaskNode> genTaskNodeList(Long processCode, int processVersion) {
List<ProcessTaskRelation> processTaskRelations = this.getProcessTaskRelationList(processCode, processVersion);
List<ProcessTaskRelationLog> processTaskRelations = processTaskRelationLogMapper.queryByProcessCodeAndVersion(processCode, processVersion);
Set<TaskDefinition> taskDefinitionSet = new HashSet<>();
Map<Long, TaskNode> taskNodeMap = new HashMap<>();
for (ProcessTaskRelation processTaskRelation : processTaskRelations) {
for (ProcessTaskRelationLog processTaskRelation : processTaskRelations) {
if (processTaskRelation.getPreTaskCode() > 0) {
taskDefinitionSet.add(new TaskDefinition(processTaskRelation.getPreTaskCode(), processTaskRelation.getPreNodeVersion()));
}

Loading…
Cancel
Save