From 10d619924e693830d27f84b4c99a03156d96f745 Mon Sep 17 00:00:00 2001 From: JinYong Li <42576980+JinyLeeChina@users.noreply.github.com> Date: Tue, 31 Jan 2023 11:32:54 +0800 Subject: [PATCH] [Fix-12966] Failed to export and then import the project (#13455) * fix 13347 and 12966 * fix switch Co-authored-by: JinyLeeChina --- .../impl/ProcessDefinitionServiceImpl.java | 66 +++++++++++++++++++ .../task/conditions/ConditionsParameters.java | 12 ++-- .../task/switchtask/SwitchParameters.java | 21 +++--- .../task/switchtask/SwitchResultVo.java | 21 +++--- .../dao/entity/TaskDefinition.java | 2 +- .../dolphinscheduler/dao/utils/DagHelper.java | 31 +++++---- .../dao/utils/DagHelperTest.java | 6 +- .../runner/task/SwitchTaskProcessor.java | 22 +++---- .../server/master/ConditionsTaskTest.java | 4 +- .../server/master/SwitchTaskTest.java | 8 +-- 10 files changed, 130 insertions(+), 63 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index 23fee1bb0f..9eb277caa6 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -43,11 +43,16 @@ import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNodeRelation; +import org.apache.dolphinscheduler.common.task.AbstractParameters; +import org.apache.dolphinscheduler.common.task.conditions.ConditionsParameters; +import org.apache.dolphinscheduler.common.task.switchtask.SwitchParameters; +import org.apache.dolphinscheduler.common.task.switchtask.SwitchResultVo; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils; import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils.CodeGenerateException; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.TaskParametersUtils; import org.apache.dolphinscheduler.dao.entity.DagData; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; @@ -107,6 +112,7 @@ import org.springframework.web.multipart.MultipartFile; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -962,6 +968,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro taskDefinitionLog.setUpdateTime(now); taskDefinitionLog.setOperator(loginUser.getId()); taskDefinitionLog.setOperateTime(now); + taskDefinitionLog.setTaskParams(taskDefinition.getTaskParams()); try { long code = CodeGenerateUtils.getInstance().genCode(); taskCodeMap.put(taskDefinitionLog.getCode(), code); @@ -973,6 +980,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro } taskDefinitionLogList.add(taskDefinitionLog); } + taskDefinitionLogList.forEach(v -> v.setTaskParams(resetImportTaskParams(taskCodeMap, v))); int insert = taskDefinitionMapper.batchInsert(taskDefinitionLogList); int logInsert = taskDefinitionLogMapper.batchInsert(taskDefinitionLogList); if ((logInsert & insert) == 0) { @@ -1035,6 +1043,64 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro return true; } + private String resetImportTaskParams(Map taskCodeMap, TaskDefinitionLog taskDefinition) { + String taskType = taskDefinition.getTaskType(); + if (!TaskType.CONDITIONS.getDesc().equals(taskType) && !TaskType.SWITCH.getDesc().equals(taskType)) { + return taskDefinition.getTaskParams(); + } + + Map taskParamsMap = JSONUtils.parseObject(taskDefinition.getTaskParams(), new TypeReference>() {}); + if (taskParamsMap == null) { + taskParamsMap = new HashMap<>(); + } + AbstractParameters switchParameters = TaskParametersUtils.getParameters(TaskType.SWITCH.getDesc(), JSONUtils.toJsonString(taskParamsMap.get(Constants.SWITCH_RESULT))); + if (switchParameters != null) { + taskParamsMap.put(Constants.SWITCH_RESULT, resetImportSwitchTaskParams(taskCodeMap, switchParameters)); + } + AbstractParameters conditionParameters = TaskParametersUtils.getParameters(TaskType.CONDITIONS.getDesc(), JSONUtils.toJsonString(taskParamsMap.get(Constants.CONDITION_RESULT))); + if (conditionParameters != null) { + taskParamsMap.put(Constants.CONDITION_RESULT, resetImportConditionTaskParams(taskCodeMap, conditionParameters)); + } + return JSONUtils.toJsonString(taskParamsMap); + } + + private AbstractParameters resetImportSwitchTaskParams(Map taskCodeMap, AbstractParameters parameter) { + SwitchParameters switchParameters = (SwitchParameters) parameter; + List dependTaskList = switchParameters.getDependTaskList(); + if (CollectionUtils.isEmpty(dependTaskList)) { + return switchParameters; + } + for (SwitchResultVo resultVo : dependTaskList) { + Long nextNode = resultVo.getNextNode(); + resultVo.setNextNode(taskCodeMap.get(nextNode)); + } + Long nextNode = switchParameters.getNextNode(); + switchParameters.setNextNode(taskCodeMap.get(nextNode)); + return switchParameters; + } + + private AbstractParameters resetImportConditionTaskParams(Map taskCodeMap, AbstractParameters parameter) { + ConditionsParameters conditionsParameters = (ConditionsParameters) parameter; + List originalSuccessNode = conditionsParameters.getSuccessNode(); + List originalFailedNode = conditionsParameters.getFailedNode(); + if (CollectionUtils.isEmpty(originalSuccessNode) || CollectionUtils.isEmpty(originalFailedNode)) { + return conditionsParameters; + } + List resultSuccessNode = new ArrayList<>(); + List resultFailedNode = new ArrayList<>(); + + if (CollectionUtils.isNotEmpty(originalSuccessNode)) { + originalSuccessNode.forEach(v -> resultSuccessNode.add(taskCodeMap.get(v))); + } + if (CollectionUtils.isNotEmpty(originalFailedNode)) { + originalFailedNode.forEach(v -> resultFailedNode.add(taskCodeMap.get(v))); + } + + conditionsParameters.setSuccessNode(resultSuccessNode); + conditionsParameters.setFailedNode(resultFailedNode); + return conditionsParameters; + } + /** * check importance params */ diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/conditions/ConditionsParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/conditions/ConditionsParameters.java index b136ae295d..0b592901f9 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/conditions/ConditionsParameters.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/conditions/ConditionsParameters.java @@ -32,10 +32,10 @@ public class ConditionsParameters extends AbstractParameters { private DependentRelation dependRelation; // node list to run when success - private List successNode; + private List successNode; // node list to run when failed - private List failedNode; + private List failedNode; @Override public boolean checkParameters() { @@ -63,19 +63,19 @@ public class ConditionsParameters extends AbstractParameters { this.dependRelation = dependRelation; } - public List getSuccessNode() { + public List getSuccessNode() { return successNode; } - public void setSuccessNode(List successNode) { + public void setSuccessNode(List successNode) { this.successNode = successNode; } - public List getFailedNode() { + public List getFailedNode() { return failedNode; } - public void setFailedNode(List failedNode) { + public void setFailedNode(List failedNode) { this.failedNode = failedNode; } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/switchtask/SwitchParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/switchtask/SwitchParameters.java index ff04a55b33..f7b4f7bea8 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/switchtask/SwitchParameters.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/switchtask/SwitchParameters.java @@ -21,6 +21,8 @@ import org.apache.dolphinscheduler.common.enums.DependentRelation; import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.AbstractParameters; +import org.apache.commons.collections.CollectionUtils; + import java.util.ArrayList; import java.util.List; @@ -28,7 +30,7 @@ public class SwitchParameters extends AbstractParameters { private DependentRelation dependRelation; private String relation; - private List nextNode; + private Long nextNode; private int resultConditionLocation; private List dependTaskList; @@ -74,21 +76,18 @@ public class SwitchParameters extends AbstractParameters { this.dependTaskList = dependTaskList; } - public List getNextNode() { + public Long getNextNode() { return nextNode; } public void setNextNode(Object nextNode) { - if (nextNode instanceof String) { - List nextNodeList = new ArrayList<>(); - nextNodeList.add(String.valueOf(nextNode)); - this.nextNode = nextNodeList; - } else if (nextNode instanceof Number) { - List nextNodeList = new ArrayList<>(); - nextNodeList.add(nextNode.toString()); - this.nextNode = nextNodeList; + if (nextNode instanceof Long) { + this.nextNode = (Long) nextNode; } else { - this.nextNode = (ArrayList) nextNode; + List nextNodes = (ArrayList) nextNode; + if (CollectionUtils.isNotEmpty(nextNodes)) { + this.nextNode = Long.parseLong(nextNodes.get(0)); + } } } } \ No newline at end of file diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/switchtask/SwitchResultVo.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/switchtask/SwitchResultVo.java index 07f0f94a54..58090c05be 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/switchtask/SwitchResultVo.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/switchtask/SwitchResultVo.java @@ -17,13 +17,15 @@ package org.apache.dolphinscheduler.common.task.switchtask; +import org.apache.commons.collections.CollectionUtils; + import java.util.ArrayList; import java.util.List; public class SwitchResultVo { private String condition; - private List nextNode; + private Long nextNode; public String getCondition() { return condition; @@ -33,21 +35,18 @@ public class SwitchResultVo { this.condition = condition; } - public List getNextNode() { + public Long getNextNode() { return nextNode; } public void setNextNode(Object nextNode) { - if (nextNode instanceof String) { - List nextNodeList = new ArrayList<>(); - nextNodeList.add(String.valueOf(nextNode)); - this.nextNode = nextNodeList; - } else if (nextNode instanceof Number) { - List nextNodeList = new ArrayList<>(); - nextNodeList.add(nextNode.toString()); - this.nextNode = nextNodeList; + if (nextNode instanceof Long) { + this.nextNode = (Long) nextNode; } else { - this.nextNode = (ArrayList) nextNode; + List nextNodes = (ArrayList) nextNode; + if (CollectionUtils.isNotEmpty(nextNodes)) { + this.nextNode = Long.parseLong(nextNodes.get(0)); + } } } } \ No newline at end of file diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java index 8c47fbfc86..f8af169d24 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java @@ -294,7 +294,7 @@ public class TaskDefinition { public Map getTaskParamMap() { if (taskParamMap == null && StringUtils.isNotEmpty(taskParams)) { JsonNode localParams = JSONUtils.parseObject(taskParams).findValue("localParams"); - if (localParams != null) { + if (localParams != null && localParams.size() > 0) { List propList = JSONUtils.toList(localParams.toString(), Property.class); taskParamMap = propList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue)); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java index 154c58473b..727e040287 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java @@ -38,6 +38,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -363,13 +364,16 @@ public class DagHelper { TaskInstance taskInstance = completeTaskList.get(nodeCode); ConditionsParameters conditionsParameters = JSONUtils.parseObject(taskNode.getConditionResult(), ConditionsParameters.class); + if (conditionsParameters == null) { + return conditionTaskList; + } List skipNodeList = new ArrayList<>(); if (taskInstance.getState().typeIsSuccess()) { - conditionTaskList = conditionsParameters.getSuccessNode(); - skipNodeList = conditionsParameters.getFailedNode(); + conditionTaskList = conditionsParameters.getSuccessNode().stream().map(String::valueOf).collect(Collectors.toList()); + skipNodeList = conditionsParameters.getFailedNode().stream().map(String::valueOf).collect(Collectors.toList()); } else if (taskInstance.getState().typeIsFailure()) { - conditionTaskList = conditionsParameters.getFailedNode(); - skipNodeList = conditionsParameters.getSuccessNode(); + conditionTaskList = conditionsParameters.getFailedNode().stream().map(String::valueOf).collect(Collectors.toList()); + skipNodeList = conditionsParameters.getSuccessNode().stream().map(String::valueOf).collect(Collectors.toList()); } else { conditionTaskList.add(nodeCode); } @@ -395,27 +399,28 @@ public class DagHelper { if (!completeTaskList.containsKey(nodeCode)) { return conditionTaskList; } - conditionTaskList = skipTaskNode4Switch(taskNode, skipTaskNodeList, completeTaskList, dag); + conditionTaskList.add(String.valueOf(skipTaskNode4Switch(taskNode, skipTaskNodeList, completeTaskList, dag))); return conditionTaskList; } - private static List skipTaskNode4Switch(TaskNode taskNode, Map skipTaskNodeList, - Map completeTaskList, - DAG dag) { + private static Long skipTaskNode4Switch(TaskNode taskNode, Map skipTaskNodeList, + Map completeTaskList, + DAG dag) { SwitchParameters switchParameters = completeTaskList.get(Long.toString(taskNode.getCode())).getSwitchDependency(); int resultConditionLocation = switchParameters.getResultConditionLocation(); List conditionResultVoList = switchParameters.getDependTaskList(); - List switchTaskList = conditionResultVoList.get(resultConditionLocation).getNextNode(); - if (CollectionUtils.isEmpty(switchTaskList)) { - switchTaskList = new ArrayList<>(); + Long switchTaskList = conditionResultVoList.get(resultConditionLocation).getNextNode(); + if (switchTaskList == null) { + switchTaskList = 0L; } conditionResultVoList.remove(resultConditionLocation); for (SwitchResultVo info : conditionResultVoList) { - if (CollectionUtils.isEmpty(info.getNextNode())) { + Long nextNode = info.getNextNode(); + if (nextNode == null || nextNode == 0L) { continue; } - setTaskNodeSkip(info.getNextNode().get(0), dag, completeTaskList, skipTaskNodeList); + setTaskNodeSkip(String.valueOf(nextNode), dag, completeTaskList, skipTaskNodeList); } return switchTaskList; } diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java index 8df92dd0ad..fd34c2589c 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java @@ -446,15 +446,15 @@ public class DagHelperTest { SwitchParameters conditionsParameters = new SwitchParameters(); SwitchResultVo switchResultVo1 = new SwitchResultVo(); switchResultVo1.setCondition(" 2 == 1"); - switchResultVo1.setNextNode("2"); + switchResultVo1.setNextNode(2L); SwitchResultVo switchResultVo2 = new SwitchResultVo(); switchResultVo2.setCondition(" 2 == 2"); - switchResultVo2.setNextNode("4"); + switchResultVo2.setNextNode(4L); List list = new ArrayList<>(); list.add(switchResultVo1); list.add(switchResultVo2); conditionsParameters.setDependTaskList(list); - conditionsParameters.setNextNode("5"); + conditionsParameters.setNextNode(5L); conditionsParameters.setRelation("AND"); // in: AND(AND(1 is SUCCESS)) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java index 9cc00d3158..bdb5e4f76d 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.master.runner.task; +import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.DependResult; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.TaskType; @@ -30,7 +31,6 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.server.utils.LogUtils; import org.apache.dolphinscheduler.server.utils.SwitchTaskUtils; -import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang.StringUtils; import java.util.Date; @@ -225,9 +225,15 @@ public class SwitchTaskProcessor extends BaseTaskProcessor { } while (m.find()) { String paramName = m.group(1); - Property property = globalParams.get(paramName); + Property property = globalParams.get(Constants.START_UP_PARAMS_PREFIX + paramName); if (property == null) { - return ""; + property = globalParams.get(paramName); + if (property == null) { + property = globalParams.get(Constants.GLOBAL_PARAMS_PREFIX + paramName); + if (property == null) { + return ""; + } + } } String value = property.getValue(); if (!org.apache.commons.lang.math.NumberUtils.isNumber(value)) { @@ -243,15 +249,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor { * check whether switch result is valid */ private boolean isValidSwitchResult(SwitchResultVo switchResult) { - if (CollectionUtils.isEmpty(switchResult.getNextNode())) { - return false; - } - for (String nextNode : switchResult.getNextNode()) { - if (StringUtils.isEmpty(nextNode)) { - return false; - } - } - return true; + return switchResult.getNextNode() != null && switchResult.getNextNode() != 0L; } } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java index 3b3b5855ce..718e14ccaf 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java @@ -155,8 +155,8 @@ public class ConditionsTaskTest { taskNode.setDependence(JSONUtils.toJsonString(dependentParameters)); ConditionsParameters conditionsParameters = new ConditionsParameters(); - conditionsParameters.setSuccessNode(Stream.of("2").collect(Collectors.toList())); - conditionsParameters.setFailedNode(Stream.of("3").collect(Collectors.toList())); + conditionsParameters.setSuccessNode(Stream.of(2L).collect(Collectors.toList())); + conditionsParameters.setFailedNode(Stream.of(3L).collect(Collectors.toList())); // out: SUCCESS => 2, FAILED => 3 taskNode.setConditionResult(JSONUtils.toJsonString(conditionsParameters)); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java index 61f1d6d800..38bd79d2e0 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java @@ -113,19 +113,19 @@ public class SwitchTaskTest { SwitchResultVo switchResultVo1 = new SwitchResultVo(); switchResultVo1.setCondition(" 2 == 1"); - switchResultVo1.setNextNode("t1"); + switchResultVo1.setNextNode(1L); SwitchResultVo switchResultVo2 = new SwitchResultVo(); switchResultVo2.setCondition(" 2 == 2"); - switchResultVo2.setNextNode("t2"); + switchResultVo2.setNextNode(2L); SwitchResultVo switchResultVo3 = new SwitchResultVo(); switchResultVo3.setCondition(" 3 == 2"); - switchResultVo3.setNextNode("t3"); + switchResultVo3.setNextNode(3L); List list = new ArrayList<>(); list.add(switchResultVo1); list.add(switchResultVo2); list.add(switchResultVo3); conditionsParameters.setDependTaskList(list); - conditionsParameters.setNextNode("t"); + conditionsParameters.setNextNode(4L); conditionsParameters.setRelation("AND"); return conditionsParameters;