Browse Source

Fix task nodes under switch task will not be skipped (#16108)

upstream-dev
Wenjun Ruan 6 months ago committed by GitHub
parent
commit
5e4afe6f64
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 39
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/switchtask/SwitchLogicTask.java
  2. 5
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/SwitchTaskUtils.java
  3. 12
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/utils/SwitchTaskUtilsTest.java
  4. 8
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/log/TaskInstanceLogHeader.java

39
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/switchtask/SwitchLogicTask.java

@ -61,26 +61,30 @@ public class SwitchLogicTask extends BaseSyncLogicTask<SwitchParameters> {
this.taskInstance = workflowExecuteRunnable.getTaskInstance(taskExecutionContext.getTaskInstanceId())
.orElseThrow(() -> new LogicTaskInitializeException(
"Cannot find the task instance in workflow execute runnable"));
// Since the default branch is not in the dependTaskList, we need to add it to the end
// otherwise the default branch will never be skipped in DAGHelper
addDefaultBranchToEnd();
}
@Override
public void handle() throws MasterTaskExecuteException {
// Calculate the condition result and get the next node
if (CollectionUtils.isEmpty(taskParameters.getDependTaskList())) {
// If the branch is empty then will go into the default branch
// This case shouldn't happen, we can directly throw exception and forbid the user to set branch
moveToDefaultBranch();
} else {
calculateSwitchBranch();
}
taskInstance.setSwitchDependency(taskParameters);
log.info("Switch task execute finished");
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS);
log.info("Switch task execute finished: {}", taskExecutionContext.getCurrentExecutionStatus().name());
}
private void moveToDefaultBranch() {
checkIfBranchExist(taskParameters.getNextNode());
List<SwitchResultVo> switchResultVos = taskParameters.getDependTaskList();
switchResultVos.add(new SwitchResultVo(null, taskParameters.getNextNode()));
SwitchResultVo defaultSwitchResultVo = getDefaultSwitchResultVo();
checkIfBranchExist(defaultSwitchResultVo.getNextNode());
taskParameters.setResultConditionLocation(switchResultVos.size() - 1);
log.info("The condition is not satisfied, move to the default branch: {}",
@ -90,9 +94,6 @@ public class SwitchLogicTask extends BaseSyncLogicTask<SwitchParameters> {
private void calculateSwitchBranch() {
List<SwitchResultVo> switchResultVos = taskParameters.getDependTaskList();
if (CollectionUtils.isEmpty(switchResultVos)) {
moveToDefaultBranch();
}
Map<String, Property> globalParams = taskExecutionContext.getPrepareParamsMap();
Map<String, Property> varParams = JSONUtils
.toList(taskInstance.getVarPool(), Property.class)
@ -100,7 +101,8 @@ public class SwitchLogicTask extends BaseSyncLogicTask<SwitchParameters> {
.collect(Collectors.toMap(Property::getProp, Property -> Property));
int finalConditionLocation = -1;
for (int i = 0; i < switchResultVos.size(); i++) {
// The last one is the default branch, no need to calculate
for (int i = 0; i < switchResultVos.size() - 1; i++) {
SwitchResultVo switchResultVo = switchResultVos.get(i);
log.info("Begin to execute {} condition: {} ", i, switchResultVo.getCondition());
String content = SwitchTaskUtils.generateContentWithTaskParams(switchResultVo.getCondition(), globalParams,
@ -111,14 +113,18 @@ public class SwitchLogicTask extends BaseSyncLogicTask<SwitchParameters> {
result = SwitchTaskUtils.evaluate(content);
log.info("Execute condition sentence: {} successfully: {}", content, result);
if (result) {
// If matched, break the loop
finalConditionLocation = i;
break;
}
} catch (Exception e) {
log.info("Execute condition sentence: {} failed", content, e);
}
}
// If the finalConditionLocation is -1, then the default branch will be executed
if (finalConditionLocation >= 0) {
checkIfBranchExist(switchResultVos.get(finalConditionLocation).getNextNode());
List<Long> nextNodes = switchResultVos.get(finalConditionLocation).getNextNode();
checkIfBranchExist(nextNodes);
log.info("The condition is satisfied, move to the branch: {}",
switchResultVos.get(finalConditionLocation).getNextNode().stream()
.map(node -> workflowExecuteRunnable.getWorkflowExecuteContext().getWorkflowGraph().getDag()
@ -126,7 +132,6 @@ public class SwitchLogicTask extends BaseSyncLogicTask<SwitchParameters> {
.collect(Collectors.toList()));
taskParameters.setResultConditionLocation(finalConditionLocation);
} else {
log.info("All conditions are not satisfied, move to the default branch");
moveToDefaultBranch();
}
}
@ -147,4 +152,16 @@ public class SwitchLogicTask extends BaseSyncLogicTask<SwitchParameters> {
}
}
private void addDefaultBranchToEnd() {
SwitchResultVo switchResultVo = new SwitchResultVo(null, taskParameters.getNextNode());
List<SwitchResultVo> dependTaskList = taskParameters.getDependTaskList();
if (!dependTaskList.contains(switchResultVo)) {
dependTaskList.add(switchResultVo);
}
}
private SwitchResultVo getDefaultSwitchResultVo() {
return taskParameters.getDependTaskList().get(taskParameters.getDependTaskList().size() - 1);
}
}

5
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/SwitchTaskUtils.java

@ -63,7 +63,6 @@ public class SwitchTaskUtils {
if (MapUtils.isNotEmpty(varParams)) {
params.putAll(varParams);
}
String originContent = content;
Pattern pattern = Pattern.compile(rgex);
Matcher m = pattern.matcher(content);
while (m.find()) {
@ -82,10 +81,6 @@ public class SwitchTaskUtils {
content = content.replace("${" + paramName + "}", value);
}
// if not replace any params, throw exception to avoid illegal condition
if (originContent.equals(content)) {
throw new IllegalArgumentException("condition is not valid, please check it. condition: " + condition);
}
return content;
}

12
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/utils/SwitchTaskUtilsTest.java

@ -50,7 +50,7 @@ public class SwitchTaskUtilsTest {
Map<String, Property> globalParams = new HashMap<>();
Map<String, Property> varParams = new HashMap<>();
globalParams.put("test", new Property("test", Direct.IN, DataType.INTEGER, "1"));
Assertions.assertThrowsExactly(IllegalArgumentException.class, () -> {
Assertions.assertDoesNotThrow(() -> {
SwitchTaskUtils.generateContentWithTaskParams(content, globalParams, varParams);
});
@ -70,15 +70,5 @@ public class SwitchTaskUtilsTest {
SwitchTaskUtils.evaluate(script);
});
String contentWithSpecify1 = "cmd.abc";
Assertions.assertThrowsExactly(IllegalArgumentException.class, () -> {
SwitchTaskUtils.generateContentWithTaskParams(contentWithSpecify1, globalParams, varParams);
});
String contentWithSpecify2 = "cmd()";
Assertions.assertThrowsExactly(IllegalArgumentException.class, () -> {
SwitchTaskUtils.generateContentWithTaskParams(contentWithSpecify2, globalParams, varParams);
});
}
}

8
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/log/TaskInstanceLogHeader.java

@ -27,11 +27,11 @@ import com.google.common.collect.Lists;
public class TaskInstanceLogHeader {
private static final List<String> INITIALIZE_TASK_CONTEXT_HEADER = Lists.newArrayList(
"***********************************************************************************************",
"\n***********************************************************************************************",
"********************************* Initialize task context ***********************************",
"***********************************************************************************************");
private static final List<String> LOAD_TASK_INSTANCE_PLUGIN_HEADER = Lists.newArrayList(
"***********************************************************************************************",
"\n***********************************************************************************************",
"********************************* Load task instance plugin *********************************",
"***********************************************************************************************");
@ -40,12 +40,12 @@ public class TaskInstanceLogHeader {
}
private static final List<String> EXECUTE_TASK_HEADER = Lists.newArrayList(
"***********************************************************************************************",
"\n***********************************************************************************************",
"********************************* Execute task instance *************************************",
"***********************************************************************************************");
private static final List<String> FINALIZE_TASK_HEADER = Lists.newArrayList(
"***********************************************************************************************",
"\n***********************************************************************************************",
"********************************* Finalize task instance ************************************",
"***********************************************************************************************");

Loading…
Cancel
Save