Browse Source

[Feature][JsonSplit] fix process bug (#5193)

* transform taskCode from long to string

* fix process bug

* code review

* code review

* code review

Co-authored-by: JinyLeeChina <297062848@qq.com>
pull/3/MERGE
JinyLeeChina 3 years ago committed by GitHub
parent
commit
dd36da8d1f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  2. 12
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StringUtils.java
  3. 1
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
  4. 63
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

5
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -654,6 +654,11 @@ public final class Constants {
*/ */
public static final int EXIT_CODE_FAILURE = -1; public static final int EXIT_CODE_FAILURE = -1;
/**
* process or task definition failure
*/
public static final int DEFINITION_FAILURE = -1;
/** /**
* date format of yyyyMMdd * date format of yyyyMMdd
*/ */

12
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StringUtils.java

@ -119,4 +119,16 @@ public class StringUtils {
} }
return str.substring(pos + separator.length()); return str.substring(pos + separator.length());
} }
public static long strDigitToLong(String str, long defaultValue) {
if (str == null) {
return defaultValue;
} else {
try {
return Long.parseLong(str);
} catch (NumberFormatException var4) {
return defaultValue;
}
}
}
} }

1
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

@ -65,6 +65,7 @@ import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Date; import java.util.Date;
import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;

63
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -2181,7 +2181,7 @@ public class ProcessService {
*/ */
public int processDefinitionToDB(ProcessDefinition processDefinition, ProcessDefinitionLog processDefinitionLog) { public int processDefinitionToDB(ProcessDefinition processDefinition, ProcessDefinitionLog processDefinitionLog) {
if (null == processDefinition || null == processDefinitionLog) { if (null == processDefinition || null == processDefinitionLog) {
return Constants.EXIT_CODE_FAILURE; return Constants.DEFINITION_FAILURE;
} }
processDefinitionLog.setId(processDefinition.getId()); processDefinitionLog.setId(processDefinition.getId());
@ -2202,7 +2202,7 @@ public class ProcessService {
*/ */
public int switchVersion(ProcessDefinition processDefinition, ProcessDefinitionLog processDefinitionLog) { public int switchVersion(ProcessDefinition processDefinition, ProcessDefinitionLog processDefinitionLog) {
int switchResult = processDefinitionToDB(processDefinition, processDefinitionLog); int switchResult = processDefinitionToDB(processDefinition, processDefinitionLog);
if (switchResult != Constants.EXIT_CODE_FAILURE) { if (switchResult != Constants.DEFINITION_FAILURE) {
switchProcessTaskRelationVersion(processDefinition); switchProcessTaskRelationVersion(processDefinition);
} }
return switchResult; return switchResult;
@ -2227,7 +2227,7 @@ public class ProcessService {
Date now = new Date(); Date now = new Date();
taskDefinition.setProjectCode(projectCode); taskDefinition.setProjectCode(projectCode);
taskDefinition.setUserId(operator.getId()); taskDefinition.setUserId(operator.getId());
taskDefinition.setVersion(version == null || version == 0 ? 1 : version); taskDefinition.setVersion(version == null || version == 0 ? 1 : version + 1);
taskDefinition.setUpdateTime(now); taskDefinition.setUpdateTime(now);
setTaskFromTaskNode(taskNode, taskDefinition); setTaskFromTaskNode(taskNode, taskDefinition);
int update = taskDefinitionMapper.updateById(taskDefinition); int update = taskDefinitionMapper.updateById(taskDefinition);
@ -2284,14 +2284,17 @@ public class ProcessService {
*/ */
public int saveProcessDefinition(User operator, Project project, String name, String desc, String locations, public int saveProcessDefinition(User operator, Project project, String name, String desc, String locations,
String connects, ProcessData processData, ProcessDefinition processDefinition) { String connects, ProcessData processData, ProcessDefinition processDefinition) {
createTaskAndRelation(operator, project.getCode(), processDefinition, processData);
ProcessDefinitionLog processDefinitionLog = insertProcessDefinitionLog(operator, processDefinition.getCode(), ProcessDefinitionLog processDefinitionLog = insertProcessDefinitionLog(operator, processDefinition.getCode(),
name, processData, project, desc, locations, connects); name, processData, project, desc, locations, connects);
Map<String, TaskDefinition> taskDefinitionMap = handleTaskDefinition(operator, project.getCode(), processData.getTasks());
if (Constants.DEFINITION_FAILURE == handleTaskRelation(operator, project.getCode(), processDefinitionLog, processData.getTasks(), taskDefinitionMap)) {
return Constants.DEFINITION_FAILURE;
}
return processDefinitionToDB(processDefinition, processDefinitionLog); return processDefinitionToDB(processDefinition, processDefinitionLog);
} }
/** /**
* * save processDefinition
*/ */
public ProcessDefinitionLog insertProcessDefinitionLog(User operator, Long processDefinitionCode, String processDefinitionName, public ProcessDefinitionLog insertProcessDefinitionLog(User operator, Long processDefinitionCode, String processDefinitionName,
ProcessData processData, Project project, ProcessData processData, Project project,
@ -2300,7 +2303,7 @@ public class ProcessService {
Integer version = processDefineLogMapper.queryMaxVersionForDefinition(processDefinitionCode); Integer version = processDefineLogMapper.queryMaxVersionForDefinition(processDefinitionCode);
processDefinitionLog.setUserId(operator.getId()); processDefinitionLog.setUserId(operator.getId());
processDefinitionLog.setCode(processDefinitionCode); processDefinitionLog.setCode(processDefinitionCode);
processDefinitionLog.setVersion(version == null || version == 0 ? 1 : version); processDefinitionLog.setVersion(version == null || version == 0 ? 1 : version + 1);
processDefinitionLog.setName(processDefinitionName); processDefinitionLog.setName(processDefinitionName);
processDefinitionLog.setReleaseState(ReleaseState.OFFLINE); processDefinitionLog.setReleaseState(ReleaseState.OFFLINE);
processDefinitionLog.setProjectCode(project.getCode()); processDefinitionLog.setProjectCode(project.getCode());
@ -2331,16 +2334,15 @@ public class ProcessService {
} }
/** /**
* create task definition and task relations * handle task definition
*/ */
public void createTaskAndRelation(User operator, public Map<String, TaskDefinition> handleTaskDefinition(User operator, Long projectCode, List<TaskNode> taskNodes) {
Long projectCode, if (taskNodes == null) {
ProcessDefinition processDefinition, return null;
ProcessData processData) { }
List<TaskNode> taskNodeList = (processData.getTasks() == null) ? new ArrayList<>() : processData.getTasks(); Map<String, TaskDefinition> taskDefinitionMap = new HashMap<>();
Map<String, TaskDefinition> taskNameAndCode = new HashMap<>(); for (TaskNode taskNode : taskNodes) {
for (TaskNode taskNode : taskNodeList) { TaskDefinition taskDefinition = taskDefinitionMapper.queryByDefinitionCode(StringUtils.strDigitToLong(taskNode.getCode(), -1L));
TaskDefinition taskDefinition = taskDefinitionMapper.queryByDefinitionCode(Long.parseLong(taskNode.getCode()));
if (taskDefinition == null) { if (taskDefinition == null) {
try { try {
long code = SnowFlakeUtils.getInstance().nextId(); long code = SnowFlakeUtils.getInstance().nextId();
@ -2356,7 +2358,21 @@ public class ProcessService {
} }
updateTaskDefinition(operator, projectCode, taskNode, taskDefinition); updateTaskDefinition(operator, projectCode, taskNode, taskDefinition);
} }
taskNameAndCode.put(taskNode.getName(), taskDefinition); taskDefinitionMap.put(taskNode.getName(), taskDefinition);
}
return taskDefinitionMap;
}
/**
* handle task relations
*/
public int handleTaskRelation(User operator,
Long projectCode,
ProcessDefinition processDefinition,
List<TaskNode> taskNodes,
Map<String, TaskDefinition> taskDefinitionMap) {
if (null == processDefinition || null == taskNodes || null == taskDefinitionMap) {
return Constants.DEFINITION_FAILURE;
} }
List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinition.getCode()); List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinition.getCode());
if (!processTaskRelationList.isEmpty()) { if (!processTaskRelationList.isEmpty()) {
@ -2364,7 +2380,7 @@ public class ProcessService {
} }
List<ProcessTaskRelation> builderRelationList = new ArrayList<>(); List<ProcessTaskRelation> builderRelationList = new ArrayList<>();
Date now = new Date(); Date now = new Date();
for (TaskNode taskNode : taskNodeList) { for (TaskNode taskNode : taskNodes) {
List<String> depList = taskNode.getDepList(); List<String> depList = taskNode.getDepList();
if (CollectionUtils.isNotEmpty(depList)) { if (CollectionUtils.isNotEmpty(depList)) {
for (String preTaskName : depList) { for (String preTaskName : depList) {
@ -2372,10 +2388,10 @@ public class ProcessService {
processDefinition.getVersion(), processDefinition.getVersion(),
projectCode, projectCode,
processDefinition.getCode(), processDefinition.getCode(),
taskNameAndCode.get(preTaskName).getCode(), taskDefinitionMap.get(preTaskName).getCode(),
taskNameAndCode.get(preTaskName).getVersion(), taskDefinitionMap.get(preTaskName).getVersion(),
taskNameAndCode.get(taskNode.getName()).getCode(), taskDefinitionMap.get(taskNode.getName()).getCode(),
taskNameAndCode.get(taskNode.getName()).getVersion(), taskDefinitionMap.get(taskNode.getName()).getVersion(),
ConditionType.of("none"), ConditionType.of("none"),
taskNode.getConditionResult(), taskNode.getConditionResult(),
now, now,
@ -2388,8 +2404,8 @@ public class ProcessService {
processDefinition.getCode(), processDefinition.getCode(),
0L, // this isn't previous task node, set zero 0L, // this isn't previous task node, set zero
0, 0,
taskNameAndCode.get(taskNode.getName()).getCode(), taskDefinitionMap.get(taskNode.getName()).getCode(),
taskNameAndCode.get(taskNode.getName()).getVersion(), taskDefinitionMap.get(taskNode.getName()).getVersion(),
ConditionType.of("none"), ConditionType.of("none"),
taskNode.getConditionResult(), taskNode.getConditionResult(),
now, now,
@ -2404,6 +2420,7 @@ public class ProcessService {
processTaskRelationLog.setOperateTime(now); processTaskRelationLog.setOperateTime(now);
processTaskRelationLogMapper.insert(processTaskRelationLog); processTaskRelationLogMapper.insert(processTaskRelationLog);
} }
return 0;
} }
public int saveTaskDefinition(User operator, Long projectCode, TaskNode taskNode, TaskDefinition taskDefinition) { public int saveTaskDefinition(User operator, Long projectCode, TaskNode taskNode, TaskDefinition taskDefinition) {

Loading…
Cancel
Save