From dd36da8d1f2c9d37172be493f83221819a26c51f Mon Sep 17 00:00:00 2001 From: JinyLeeChina <42576980+JinyLeeChina@users.noreply.github.com> Date: Thu, 1 Apr 2021 17:45:44 +0800 Subject: [PATCH] [Feature][JsonSplit] fix process bug (#5193) * transform taskCode from long to string * fix process bug * code review * code review * code review Co-authored-by: JinyLeeChina <297062848@qq.com> --- .../dolphinscheduler/common/Constants.java | 5 ++ .../common/utils/StringUtils.java | 12 ++++ .../master/runner/MasterExecThread.java | 1 + .../service/process/ProcessService.java | 63 ++++++++++++------- 4 files changed, 58 insertions(+), 23 deletions(-) diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index 736eb83f04..db3cd14326 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -654,6 +654,11 @@ public final class Constants { */ public static final int EXIT_CODE_FAILURE = -1; + /** + * process or task definition failure + */ + public static final int DEFINITION_FAILURE = -1; + /** * date format of yyyyMMdd */ diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StringUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StringUtils.java index 3ec1a2e39b..dae799cdf8 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StringUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StringUtils.java @@ -119,4 +119,16 @@ public class StringUtils { } return str.substring(pos + separator.length()); } + + public static long strDigitToLong(String str, long defaultValue) { + if (str == null) { + return defaultValue; + } else { + try { + return Long.parseLong(str); + } catch (NumberFormatException var4) { + return defaultValue; + } + } + } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index 104aa87326..41eb15017f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -65,6 +65,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Date; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index fd357c4af3..bccd926a34 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -2181,7 +2181,7 @@ public class ProcessService { */ public int processDefinitionToDB(ProcessDefinition processDefinition, ProcessDefinitionLog processDefinitionLog) { if (null == processDefinition || null == processDefinitionLog) { - return Constants.EXIT_CODE_FAILURE; + return Constants.DEFINITION_FAILURE; } processDefinitionLog.setId(processDefinition.getId()); @@ -2202,7 +2202,7 @@ public class ProcessService { */ public int switchVersion(ProcessDefinition processDefinition, ProcessDefinitionLog processDefinitionLog) { int switchResult = processDefinitionToDB(processDefinition, processDefinitionLog); - if (switchResult != Constants.EXIT_CODE_FAILURE) { + if (switchResult != Constants.DEFINITION_FAILURE) { switchProcessTaskRelationVersion(processDefinition); } return switchResult; @@ -2227,7 +2227,7 @@ public class ProcessService { Date now = new Date(); taskDefinition.setProjectCode(projectCode); taskDefinition.setUserId(operator.getId()); - taskDefinition.setVersion(version == null || version == 0 ? 1 : version); + taskDefinition.setVersion(version == null || version == 0 ? 1 : version + 1); taskDefinition.setUpdateTime(now); setTaskFromTaskNode(taskNode, taskDefinition); int update = taskDefinitionMapper.updateById(taskDefinition); @@ -2284,14 +2284,17 @@ public class ProcessService { */ public int saveProcessDefinition(User operator, Project project, String name, String desc, String locations, String connects, ProcessData processData, ProcessDefinition processDefinition) { - createTaskAndRelation(operator, project.getCode(), processDefinition, processData); ProcessDefinitionLog processDefinitionLog = insertProcessDefinitionLog(operator, processDefinition.getCode(), name, processData, project, desc, locations, connects); + Map taskDefinitionMap = handleTaskDefinition(operator, project.getCode(), processData.getTasks()); + if (Constants.DEFINITION_FAILURE == handleTaskRelation(operator, project.getCode(), processDefinitionLog, processData.getTasks(), taskDefinitionMap)) { + return Constants.DEFINITION_FAILURE; + } return processDefinitionToDB(processDefinition, processDefinitionLog); } /** - * + * save processDefinition */ public ProcessDefinitionLog insertProcessDefinitionLog(User operator, Long processDefinitionCode, String processDefinitionName, ProcessData processData, Project project, @@ -2300,7 +2303,7 @@ public class ProcessService { Integer version = processDefineLogMapper.queryMaxVersionForDefinition(processDefinitionCode); processDefinitionLog.setUserId(operator.getId()); processDefinitionLog.setCode(processDefinitionCode); - processDefinitionLog.setVersion(version == null || version == 0 ? 1 : version); + processDefinitionLog.setVersion(version == null || version == 0 ? 1 : version + 1); processDefinitionLog.setName(processDefinitionName); processDefinitionLog.setReleaseState(ReleaseState.OFFLINE); processDefinitionLog.setProjectCode(project.getCode()); @@ -2331,16 +2334,15 @@ public class ProcessService { } /** - * create task definition and task relations + * handle task definition */ - public void createTaskAndRelation(User operator, - Long projectCode, - ProcessDefinition processDefinition, - ProcessData processData) { - List taskNodeList = (processData.getTasks() == null) ? new ArrayList<>() : processData.getTasks(); - Map taskNameAndCode = new HashMap<>(); - for (TaskNode taskNode : taskNodeList) { - TaskDefinition taskDefinition = taskDefinitionMapper.queryByDefinitionCode(Long.parseLong(taskNode.getCode())); + public Map handleTaskDefinition(User operator, Long projectCode, List taskNodes) { + if (taskNodes == null) { + return null; + } + Map taskDefinitionMap = new HashMap<>(); + for (TaskNode taskNode : taskNodes) { + TaskDefinition taskDefinition = taskDefinitionMapper.queryByDefinitionCode(StringUtils.strDigitToLong(taskNode.getCode(), -1L)); if (taskDefinition == null) { try { long code = SnowFlakeUtils.getInstance().nextId(); @@ -2356,7 +2358,21 @@ public class ProcessService { } updateTaskDefinition(operator, projectCode, taskNode, taskDefinition); } - taskNameAndCode.put(taskNode.getName(), taskDefinition); + taskDefinitionMap.put(taskNode.getName(), taskDefinition); + } + return taskDefinitionMap; + } + + /** + * handle task relations + */ + public int handleTaskRelation(User operator, + Long projectCode, + ProcessDefinition processDefinition, + List taskNodes, + Map taskDefinitionMap) { + if (null == processDefinition || null == taskNodes || null == taskDefinitionMap) { + return Constants.DEFINITION_FAILURE; } List processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinition.getCode()); if (!processTaskRelationList.isEmpty()) { @@ -2364,7 +2380,7 @@ public class ProcessService { } List builderRelationList = new ArrayList<>(); Date now = new Date(); - for (TaskNode taskNode : taskNodeList) { + for (TaskNode taskNode : taskNodes) { List depList = taskNode.getDepList(); if (CollectionUtils.isNotEmpty(depList)) { for (String preTaskName : depList) { @@ -2372,10 +2388,10 @@ public class ProcessService { processDefinition.getVersion(), projectCode, processDefinition.getCode(), - taskNameAndCode.get(preTaskName).getCode(), - taskNameAndCode.get(preTaskName).getVersion(), - taskNameAndCode.get(taskNode.getName()).getCode(), - taskNameAndCode.get(taskNode.getName()).getVersion(), + taskDefinitionMap.get(preTaskName).getCode(), + taskDefinitionMap.get(preTaskName).getVersion(), + taskDefinitionMap.get(taskNode.getName()).getCode(), + taskDefinitionMap.get(taskNode.getName()).getVersion(), ConditionType.of("none"), taskNode.getConditionResult(), now, @@ -2388,8 +2404,8 @@ public class ProcessService { processDefinition.getCode(), 0L, // this isn't previous task node, set zero 0, - taskNameAndCode.get(taskNode.getName()).getCode(), - taskNameAndCode.get(taskNode.getName()).getVersion(), + taskDefinitionMap.get(taskNode.getName()).getCode(), + taskDefinitionMap.get(taskNode.getName()).getVersion(), ConditionType.of("none"), taskNode.getConditionResult(), now, @@ -2404,6 +2420,7 @@ public class ProcessService { processTaskRelationLog.setOperateTime(now); processTaskRelationLogMapper.insert(processTaskRelationLog); } + return 0; } public int saveTaskDefinition(User operator, Long projectCode, TaskNode taskNode, TaskDefinition taskDefinition) {