Browse Source

[Fix-12966] Failed to export and then import the project (#13455)

* fix 13347 and 12966
* fix switch

Co-authored-by: JinyLeeChina <jiny.li@foxmail.com>
2.0.8-prepare
JinYong Li 1 year ago committed by GitHub
parent
commit
10d619924e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 66
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  2. 12
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/conditions/ConditionsParameters.java
  3. 21
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/switchtask/SwitchParameters.java
  4. 21
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/switchtask/SwitchResultVo.java
  5. 2
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
  6. 31
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
  7. 6
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java
  8. 22
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
  9. 4
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java
  10. 8
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java

66
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -43,11 +43,16 @@ import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.conditions.ConditionsParameters;
import org.apache.dolphinscheduler.common.task.switchtask.SwitchParameters;
import org.apache.dolphinscheduler.common.task.switchtask.SwitchResultVo;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils.CodeGenerateException;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
import org.apache.dolphinscheduler.dao.entity.DagData;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
@ -107,6 +112,7 @@ import org.springframework.web.multipart.MultipartFile;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
@ -962,6 +968,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
taskDefinitionLog.setUpdateTime(now);
taskDefinitionLog.setOperator(loginUser.getId());
taskDefinitionLog.setOperateTime(now);
taskDefinitionLog.setTaskParams(taskDefinition.getTaskParams());
try {
long code = CodeGenerateUtils.getInstance().genCode();
taskCodeMap.put(taskDefinitionLog.getCode(), code);
@ -973,6 +980,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
}
taskDefinitionLogList.add(taskDefinitionLog);
}
taskDefinitionLogList.forEach(v -> v.setTaskParams(resetImportTaskParams(taskCodeMap, v)));
int insert = taskDefinitionMapper.batchInsert(taskDefinitionLogList);
int logInsert = taskDefinitionLogMapper.batchInsert(taskDefinitionLogList);
if ((logInsert & insert) == 0) {
@ -1035,6 +1043,64 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return true;
}
private String resetImportTaskParams(Map<Long, Long> taskCodeMap, TaskDefinitionLog taskDefinition) {
String taskType = taskDefinition.getTaskType();
if (!TaskType.CONDITIONS.getDesc().equals(taskType) && !TaskType.SWITCH.getDesc().equals(taskType)) {
return taskDefinition.getTaskParams();
}
Map<String, Object> taskParamsMap = JSONUtils.parseObject(taskDefinition.getTaskParams(), new TypeReference<Map<String, Object>>() {});
if (taskParamsMap == null) {
taskParamsMap = new HashMap<>();
}
AbstractParameters switchParameters = TaskParametersUtils.getParameters(TaskType.SWITCH.getDesc(), JSONUtils.toJsonString(taskParamsMap.get(Constants.SWITCH_RESULT)));
if (switchParameters != null) {
taskParamsMap.put(Constants.SWITCH_RESULT, resetImportSwitchTaskParams(taskCodeMap, switchParameters));
}
AbstractParameters conditionParameters = TaskParametersUtils.getParameters(TaskType.CONDITIONS.getDesc(), JSONUtils.toJsonString(taskParamsMap.get(Constants.CONDITION_RESULT)));
if (conditionParameters != null) {
taskParamsMap.put(Constants.CONDITION_RESULT, resetImportConditionTaskParams(taskCodeMap, conditionParameters));
}
return JSONUtils.toJsonString(taskParamsMap);
}
private AbstractParameters resetImportSwitchTaskParams(Map<Long, Long> taskCodeMap, AbstractParameters parameter) {
SwitchParameters switchParameters = (SwitchParameters) parameter;
List<SwitchResultVo> dependTaskList = switchParameters.getDependTaskList();
if (CollectionUtils.isEmpty(dependTaskList)) {
return switchParameters;
}
for (SwitchResultVo resultVo : dependTaskList) {
Long nextNode = resultVo.getNextNode();
resultVo.setNextNode(taskCodeMap.get(nextNode));
}
Long nextNode = switchParameters.getNextNode();
switchParameters.setNextNode(taskCodeMap.get(nextNode));
return switchParameters;
}
private AbstractParameters resetImportConditionTaskParams(Map<Long, Long> taskCodeMap, AbstractParameters parameter) {
ConditionsParameters conditionsParameters = (ConditionsParameters) parameter;
List<Long> originalSuccessNode = conditionsParameters.getSuccessNode();
List<Long> originalFailedNode = conditionsParameters.getFailedNode();
if (CollectionUtils.isEmpty(originalSuccessNode) || CollectionUtils.isEmpty(originalFailedNode)) {
return conditionsParameters;
}
List<Long> resultSuccessNode = new ArrayList<>();
List<Long> resultFailedNode = new ArrayList<>();
if (CollectionUtils.isNotEmpty(originalSuccessNode)) {
originalSuccessNode.forEach(v -> resultSuccessNode.add(taskCodeMap.get(v)));
}
if (CollectionUtils.isNotEmpty(originalFailedNode)) {
originalFailedNode.forEach(v -> resultFailedNode.add(taskCodeMap.get(v)));
}
conditionsParameters.setSuccessNode(resultSuccessNode);
conditionsParameters.setFailedNode(resultFailedNode);
return conditionsParameters;
}
/**
* check importance params
*/

12
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/conditions/ConditionsParameters.java

@ -32,10 +32,10 @@ public class ConditionsParameters extends AbstractParameters {
private DependentRelation dependRelation;
// node list to run when success
private List<String> successNode;
private List<Long> successNode;
// node list to run when failed
private List<String> failedNode;
private List<Long> failedNode;
@Override
public boolean checkParameters() {
@ -63,19 +63,19 @@ public class ConditionsParameters extends AbstractParameters {
this.dependRelation = dependRelation;
}
public List<String> getSuccessNode() {
public List<Long> getSuccessNode() {
return successNode;
}
public void setSuccessNode(List<String> successNode) {
public void setSuccessNode(List<Long> successNode) {
this.successNode = successNode;
}
public List<String> getFailedNode() {
public List<Long> getFailedNode() {
return failedNode;
}
public void setFailedNode(List<String> failedNode) {
public void setFailedNode(List<Long> failedNode) {
this.failedNode = failedNode;
}

21
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/switchtask/SwitchParameters.java

@ -21,6 +21,8 @@ import org.apache.dolphinscheduler.common.enums.DependentRelation;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.commons.collections.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
@ -28,7 +30,7 @@ public class SwitchParameters extends AbstractParameters {
private DependentRelation dependRelation;
private String relation;
private List<String> nextNode;
private Long nextNode;
private int resultConditionLocation;
private List<SwitchResultVo> dependTaskList;
@ -74,21 +76,18 @@ public class SwitchParameters extends AbstractParameters {
this.dependTaskList = dependTaskList;
}
public List<String> getNextNode() {
public Long getNextNode() {
return nextNode;
}
public void setNextNode(Object nextNode) {
if (nextNode instanceof String) {
List<String> nextNodeList = new ArrayList<>();
nextNodeList.add(String.valueOf(nextNode));
this.nextNode = nextNodeList;
} else if (nextNode instanceof Number) {
List<String> nextNodeList = new ArrayList<>();
nextNodeList.add(nextNode.toString());
this.nextNode = nextNodeList;
if (nextNode instanceof Long) {
this.nextNode = (Long) nextNode;
} else {
this.nextNode = (ArrayList) nextNode;
List<String> nextNodes = (ArrayList) nextNode;
if (CollectionUtils.isNotEmpty(nextNodes)) {
this.nextNode = Long.parseLong(nextNodes.get(0));
}
}
}
}

21
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/switchtask/SwitchResultVo.java

@ -17,13 +17,15 @@
package org.apache.dolphinscheduler.common.task.switchtask;
import org.apache.commons.collections.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
public class SwitchResultVo {
private String condition;
private List<String> nextNode;
private Long nextNode;
public String getCondition() {
return condition;
@ -33,21 +35,18 @@ public class SwitchResultVo {
this.condition = condition;
}
public List<String> getNextNode() {
public Long getNextNode() {
return nextNode;
}
public void setNextNode(Object nextNode) {
if (nextNode instanceof String) {
List<String> nextNodeList = new ArrayList<>();
nextNodeList.add(String.valueOf(nextNode));
this.nextNode = nextNodeList;
} else if (nextNode instanceof Number) {
List<String> nextNodeList = new ArrayList<>();
nextNodeList.add(nextNode.toString());
this.nextNode = nextNodeList;
if (nextNode instanceof Long) {
this.nextNode = (Long) nextNode;
} else {
this.nextNode = (ArrayList) nextNode;
List<String> nextNodes = (ArrayList) nextNode;
if (CollectionUtils.isNotEmpty(nextNodes)) {
this.nextNode = Long.parseLong(nextNodes.get(0));
}
}
}
}

2
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java

@ -294,7 +294,7 @@ public class TaskDefinition {
public Map<String, String> getTaskParamMap() {
if (taskParamMap == null && StringUtils.isNotEmpty(taskParams)) {
JsonNode localParams = JSONUtils.parseObject(taskParams).findValue("localParams");
if (localParams != null) {
if (localParams != null && localParams.size() > 0) {
List<Property> propList = JSONUtils.toList(localParams.toString(), Property.class);
taskParamMap = propList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue));
}

31
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java

@ -38,6 +38,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -363,13 +364,16 @@ public class DagHelper {
TaskInstance taskInstance = completeTaskList.get(nodeCode);
ConditionsParameters conditionsParameters =
JSONUtils.parseObject(taskNode.getConditionResult(), ConditionsParameters.class);
if (conditionsParameters == null) {
return conditionTaskList;
}
List<String> skipNodeList = new ArrayList<>();
if (taskInstance.getState().typeIsSuccess()) {
conditionTaskList = conditionsParameters.getSuccessNode();
skipNodeList = conditionsParameters.getFailedNode();
conditionTaskList = conditionsParameters.getSuccessNode().stream().map(String::valueOf).collect(Collectors.toList());
skipNodeList = conditionsParameters.getFailedNode().stream().map(String::valueOf).collect(Collectors.toList());
} else if (taskInstance.getState().typeIsFailure()) {
conditionTaskList = conditionsParameters.getFailedNode();
skipNodeList = conditionsParameters.getSuccessNode();
conditionTaskList = conditionsParameters.getFailedNode().stream().map(String::valueOf).collect(Collectors.toList());
skipNodeList = conditionsParameters.getSuccessNode().stream().map(String::valueOf).collect(Collectors.toList());
} else {
conditionTaskList.add(nodeCode);
}
@ -395,27 +399,28 @@ public class DagHelper {
if (!completeTaskList.containsKey(nodeCode)) {
return conditionTaskList;
}
conditionTaskList = skipTaskNode4Switch(taskNode, skipTaskNodeList, completeTaskList, dag);
conditionTaskList.add(String.valueOf(skipTaskNode4Switch(taskNode, skipTaskNodeList, completeTaskList, dag)));
return conditionTaskList;
}
private static List<String> skipTaskNode4Switch(TaskNode taskNode, Map<String, TaskNode> skipTaskNodeList,
Map<String, TaskInstance> completeTaskList,
DAG<String, TaskNode, TaskNodeRelation> dag) {
private static Long skipTaskNode4Switch(TaskNode taskNode, Map<String, TaskNode> skipTaskNodeList,
Map<String, TaskInstance> completeTaskList,
DAG<String, TaskNode, TaskNodeRelation> dag) {
SwitchParameters switchParameters = completeTaskList.get(Long.toString(taskNode.getCode())).getSwitchDependency();
int resultConditionLocation = switchParameters.getResultConditionLocation();
List<SwitchResultVo> conditionResultVoList = switchParameters.getDependTaskList();
List<String> switchTaskList = conditionResultVoList.get(resultConditionLocation).getNextNode();
if (CollectionUtils.isEmpty(switchTaskList)) {
switchTaskList = new ArrayList<>();
Long switchTaskList = conditionResultVoList.get(resultConditionLocation).getNextNode();
if (switchTaskList == null) {
switchTaskList = 0L;
}
conditionResultVoList.remove(resultConditionLocation);
for (SwitchResultVo info : conditionResultVoList) {
if (CollectionUtils.isEmpty(info.getNextNode())) {
Long nextNode = info.getNextNode();
if (nextNode == null || nextNode == 0L) {
continue;
}
setTaskNodeSkip(info.getNextNode().get(0), dag, completeTaskList, skipTaskNodeList);
setTaskNodeSkip(String.valueOf(nextNode), dag, completeTaskList, skipTaskNodeList);
}
return switchTaskList;
}

6
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java

@ -446,15 +446,15 @@ public class DagHelperTest {
SwitchParameters conditionsParameters = new SwitchParameters();
SwitchResultVo switchResultVo1 = new SwitchResultVo();
switchResultVo1.setCondition(" 2 == 1");
switchResultVo1.setNextNode("2");
switchResultVo1.setNextNode(2L);
SwitchResultVo switchResultVo2 = new SwitchResultVo();
switchResultVo2.setCondition(" 2 == 2");
switchResultVo2.setNextNode("4");
switchResultVo2.setNextNode(4L);
List<SwitchResultVo> list = new ArrayList<>();
list.add(switchResultVo1);
list.add(switchResultVo2);
conditionsParameters.setDependTaskList(list);
conditionsParameters.setNextNode("5");
conditionsParameters.setNextNode(5L);
conditionsParameters.setRelation("AND");
// in: AND(AND(1 is SUCCESS))

22
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.master.runner.task;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.DependResult;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
@ -30,7 +31,6 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.server.utils.SwitchTaskUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import java.util.Date;
@ -225,9 +225,15 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
}
while (m.find()) {
String paramName = m.group(1);
Property property = globalParams.get(paramName);
Property property = globalParams.get(Constants.START_UP_PARAMS_PREFIX + paramName);
if (property == null) {
return "";
property = globalParams.get(paramName);
if (property == null) {
property = globalParams.get(Constants.GLOBAL_PARAMS_PREFIX + paramName);
if (property == null) {
return "";
}
}
}
String value = property.getValue();
if (!org.apache.commons.lang.math.NumberUtils.isNumber(value)) {
@ -243,15 +249,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
* check whether switch result is valid
*/
private boolean isValidSwitchResult(SwitchResultVo switchResult) {
if (CollectionUtils.isEmpty(switchResult.getNextNode())) {
return false;
}
for (String nextNode : switchResult.getNextNode()) {
if (StringUtils.isEmpty(nextNode)) {
return false;
}
}
return true;
return switchResult.getNextNode() != null && switchResult.getNextNode() != 0L;
}
}

4
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java

@ -155,8 +155,8 @@ public class ConditionsTaskTest {
taskNode.setDependence(JSONUtils.toJsonString(dependentParameters));
ConditionsParameters conditionsParameters = new ConditionsParameters();
conditionsParameters.setSuccessNode(Stream.of("2").collect(Collectors.toList()));
conditionsParameters.setFailedNode(Stream.of("3").collect(Collectors.toList()));
conditionsParameters.setSuccessNode(Stream.of(2L).collect(Collectors.toList()));
conditionsParameters.setFailedNode(Stream.of(3L).collect(Collectors.toList()));
// out: SUCCESS => 2, FAILED => 3
taskNode.setConditionResult(JSONUtils.toJsonString(conditionsParameters));

8
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java

@ -113,19 +113,19 @@ public class SwitchTaskTest {
SwitchResultVo switchResultVo1 = new SwitchResultVo();
switchResultVo1.setCondition(" 2 == 1");
switchResultVo1.setNextNode("t1");
switchResultVo1.setNextNode(1L);
SwitchResultVo switchResultVo2 = new SwitchResultVo();
switchResultVo2.setCondition(" 2 == 2");
switchResultVo2.setNextNode("t2");
switchResultVo2.setNextNode(2L);
SwitchResultVo switchResultVo3 = new SwitchResultVo();
switchResultVo3.setCondition(" 3 == 2");
switchResultVo3.setNextNode("t3");
switchResultVo3.setNextNode(3L);
List<SwitchResultVo> list = new ArrayList<>();
list.add(switchResultVo1);
list.add(switchResultVo2);
list.add(switchResultVo3);
conditionsParameters.setDependTaskList(list);
conditionsParameters.setNextNode("t");
conditionsParameters.setNextNode(4L);
conditionsParameters.setRelation("AND");
return conditionsParameters;

Loading…
Cancel
Save