Browse Source

[Feature-#5273][server-master] Task node of SWITCH (#5922)

Co-authored-by: wangxj <wangxj31>
2.0.7-release
wangxj3 3 years ago committed by GitHub
parent
commit
2e1768aae6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 45
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java
  2. 1
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  3. 4
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java
  4. 19
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java
  5. 91
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/switchtask/SwitchParameters.java
  6. 49
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/switchtask/SwitchResultVo.java
  7. 3
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java
  8. 25
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
  9. 50
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
  10. 99
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java
  11. 15
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
  12. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
  13. 180
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/SwitchTaskExecThread.java
  14. 38
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SwitchTaskUtils.java
  15. 167
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java
  16. 1
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  17. 14
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
  18. 1
      pom.xml

45
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java

@ -43,8 +43,7 @@ public class CheckUtils {
/**
* check username
*
* @param userName
* user name
* @param userName user name
* @return true if user name regex valid,otherwise return false
*/
public static boolean checkUserName(String userName) {
@ -54,8 +53,7 @@ public class CheckUtils {
/**
* check email
*
* @param email
* email
* @param email email
* @return true if email regex valid, otherwise return false
*/
public static boolean checkEmail(String email) {
@ -69,8 +67,7 @@ public class CheckUtils {
/**
* check project description
*
* @param desc
* desc
* @param desc desc
* @return true if description regex valid, otherwise return false
*/
public static Map<String, Object> checkDesc(String desc) {
@ -78,7 +75,7 @@ public class CheckUtils {
if (StringUtils.isNotEmpty(desc) && desc.length() > 200) {
result.put(Constants.STATUS, Status.REQUEST_PARAMS_NOT_VALID_ERROR);
result.put(Constants.MSG,
MessageFormat.format(Status.REQUEST_PARAMS_NOT_VALID_ERROR.getMsg(), "desc length"));
MessageFormat.format(Status.REQUEST_PARAMS_NOT_VALID_ERROR.getMsg(), "desc length"));
} else {
result.put(Constants.STATUS, Status.SUCCESS);
}
@ -88,8 +85,7 @@ public class CheckUtils {
/**
* check extra info
*
* @param otherParams
* other parames
* @param otherParams other parames
* @return true if other parameters are valid, otherwise return false
*/
public static boolean checkOtherParams(String otherParams) {
@ -99,8 +95,7 @@ public class CheckUtils {
/**
* check password
*
* @param password
* password
* @param password password
* @return true if password regex valid, otherwise return false
*/
public static boolean checkPassword(String password) {
@ -110,8 +105,7 @@ public class CheckUtils {
/**
* check phone phone can be empty.
*
* @param phone
* phone
* @param phone phone
* @return true if phone regex valid, otherwise return false
*/
public static boolean checkPhone(String phone) {
@ -121,8 +115,7 @@ public class CheckUtils {
/**
* check task node parameter
*
* @param taskNode
* TaskNode
* @param taskNode TaskNode
* @return true if task node parameters are valid, otherwise return false
*/
public static boolean checkTaskNodeParameters(TaskNode taskNode) {
@ -133,6 +126,8 @@ public class CheckUtils {
}
if (TaskType.DEPENDENT.getDesc().equalsIgnoreCase(taskType)) {
abstractParameters = TaskParametersUtils.getParameters(taskType.toUpperCase(), taskNode.getDependence());
} else if (TaskType.SWITCH.getDesc().equalsIgnoreCase(taskType)) {
abstractParameters = TaskParametersUtils.getParameters(taskType.toUpperCase(), taskNode.getSwitchResult());
} else {
abstractParameters = TaskParametersUtils.getParameters(taskType.toUpperCase(), taskNode.getParams());
}
@ -147,28 +142,22 @@ public class CheckUtils {
/**
* check params
*
* @param userName
* user name
* @param password
* password
* @param email
* email
* @param phone
* phone
* @param userName user name
* @param password password
* @param email email
* @param phone phone
* @return true if user parameters are valid, other return false
*/
public static boolean checkUserParams(String userName, String password, String email, String phone) {
return CheckUtils.checkUserName(userName) && CheckUtils.checkEmail(email) && CheckUtils.checkPassword(password)
&& CheckUtils.checkPhone(phone);
&& CheckUtils.checkPhone(phone);
}
/**
* regex check
*
* @param str
* input string
* @param pattern
* regex pattern
* @param str input string
* @param pattern regex pattern
* @return true if regex pattern is right, otherwise return false
*/
private static boolean regexChecks(String str, Pattern pattern) {

1
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -776,6 +776,7 @@ public final class Constants {
public static final String PROCESS_INSTANCE_STATE = "processInstanceState";
public static final String PARENT_WORKFLOW_INSTANCE = "parentWorkflowInstance";
public static final String CONDITION_RESULT = "conditionResult";
public static final String SWITCH_RESULT = "switchResult";
public static final String DEPENDENCE = "dependence";
public static final String TASK_TYPE = "taskType";
public static final String TASK_LIST = "taskList";

4
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java

@ -51,7 +51,9 @@ public enum TaskType {
DATAX(10, "DATAX"),
CONDITIONS(11, "CONDITIONS"),
SQOOP(12, "SQOOP"),
WATERDROP(13, "WATERDROP");
WATERDROP(13, "WATERDROP"),
SWITCH(14, "SWITCH"),
;
TaskType(int code, String desc) {
this.code = code;

19
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java

@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.model;
import org.apache.dolphinscheduler.common.Constants;
@ -33,7 +34,6 @@ import java.util.Objects;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
public class TaskNode {
/**
@ -129,6 +129,10 @@ public class TaskNode {
@JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
private String conditionResult;
@JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class)
@JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
private String switchResult;
/**
* task instance priority
*/
@ -365,6 +369,10 @@ public class TaskNode {
return TaskType.CONDITIONS.getDesc().equalsIgnoreCase(this.getType());
}
public boolean isSwitchTask() {
return TaskType.SWITCH.toString().equalsIgnoreCase(this.getType());
}
public List<PreviousTaskNode> getPreTaskNodeList() {
return preTaskNodeList;
}
@ -380,6 +388,7 @@ public class TaskNode {
}
taskParams.put(Constants.CONDITION_RESULT, this.conditionResult);
taskParams.put(Constants.DEPENDENCE, this.dependence);
taskParams.put(Constants.SWITCH_RESULT, this.switchResult);
return JSONUtils.toJsonString(taskParams);
}
@ -417,4 +426,12 @@ public class TaskNode {
+ ", delayTime=" + delayTime
+ '}';
}
public String getSwitchResult() {
return switchResult;
}
public void setSwitchResult(String switchResult) {
this.switchResult = switchResult;
}
}

91
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/switchtask/SwitchParameters.java

@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.task.switchtask;
import org.apache.dolphinscheduler.common.enums.DependentRelation;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import java.util.ArrayList;
import java.util.List;
public class SwitchParameters extends AbstractParameters {
private DependentRelation dependRelation;
private String relation;
private List<String> nextNode;
@Override
public boolean checkParameters() {
return true;
}
@Override
public List<ResourceInfo> getResourceFilesList() {
return new ArrayList<>();
}
private int resultConditionLocation;
private List<SwitchResultVo> dependTaskList;
public DependentRelation getDependRelation() {
return dependRelation;
}
public void setDependRelation(DependentRelation dependRelation) {
this.dependRelation = dependRelation;
}
public int getResultConditionLocation() {
return resultConditionLocation;
}
public void setResultConditionLocation(int resultConditionLocation) {
this.resultConditionLocation = resultConditionLocation;
}
public String getRelation() {
return relation;
}
public void setRelation(String relation) {
this.relation = relation;
}
public List<SwitchResultVo> getDependTaskList() {
return dependTaskList;
}
public void setDependTaskList(List<SwitchResultVo> dependTaskList) {
this.dependTaskList = dependTaskList;
}
public List<String> getNextNode() {
return nextNode;
}
public void setNextNode(Object nextNode) {
if (nextNode instanceof String) {
List<String> nextNodeList = new ArrayList<>();
nextNodeList.add(String.valueOf(nextNode));
this.nextNode = nextNodeList;
} else {
this.nextNode = (ArrayList) nextNode;
}
}
}

49
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/switchtask/SwitchResultVo.java

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.task.switchtask;
import java.util.ArrayList;
import java.util.List;
public class SwitchResultVo {
private String condition;
private List<String> nextNode;
public String getCondition() {
return condition;
}
public void setCondition(String condition) {
this.condition = condition;
}
public List<String> getNextNode() {
return nextNode;
}
public void setNextNode(Object nextNode) {
if (nextNode instanceof String) {
List<String> nextNodeList = new ArrayList<>();
nextNodeList.add(String.valueOf(nextNode));
this.nextNode = nextNodeList;
} else {
this.nextNode = (ArrayList) nextNode;
}
}
}

3
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java

@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.common.task.spark.SparkParameters;
import org.apache.dolphinscheduler.common.task.sql.SqlParameters;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.apache.dolphinscheduler.common.task.subprocess.SubProcessParameters;
import org.apache.dolphinscheduler.common.task.switchtask.SwitchParameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -82,6 +83,8 @@ public class TaskParametersUtils {
return JSONUtils.parseObject(parameter, ConditionsParameters.class);
case "SQOOP":
return JSONUtils.parseObject(parameter, SqoopParameters.class);
case "SWITCH":
return JSONUtils.parseObject(parameter, SwitchParameters.class);
default:
logger.error("not support task type: {}", taskType);
return null;

25
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java

@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
import org.apache.dolphinscheduler.common.task.switchtask.SwitchParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import java.io.Serializable;
@ -174,6 +175,12 @@ public class TaskInstance implements Serializable {
@TableField(exist = false)
private DependentParameters dependency;
/**
* switch dependency
*/
@TableField(exist = false)
private SwitchParameters switchDependency;
/**
* duration
*/
@ -426,6 +433,20 @@ public class TaskInstance implements Serializable {
this.dependency = dependency;
}
public SwitchParameters getSwitchDependency() {
if (this.switchDependency == null) {
Map<String, Object> taskParamsMap = JSONUtils.toMap(this.getTaskParams(), String.class, Object.class);
this.switchDependency = JSONUtils.parseObject((String) taskParamsMap.get(Constants.SWITCH_RESULT), SwitchParameters.class);
}
return this.switchDependency;
}
public void setSwitchDependency(SwitchParameters switchDependency) {
Map<String, Object> taskParamsMap = JSONUtils.toMap(this.getTaskParams(), String.class, Object.class);
taskParamsMap.put(Constants.SWITCH_RESULT,JSONUtils.toJsonString(switchDependency));
this.setTaskParams(JSONUtils.toJsonString(taskParamsMap));
}
public Flag getFlag() {
return flag;
}
@ -510,6 +531,10 @@ public class TaskInstance implements Serializable {
return TaskType.CONDITIONS.getDesc().equalsIgnoreCase(this.taskType);
}
public boolean isSwitchTask() {
return TaskType.SWITCH.getDesc().equalsIgnoreCase(this.taskType);
}
/**
* determine if you can try again
*

50
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java

@ -14,8 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.utils;
package org.apache.dolphinscheduler.dao.utils;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.graph.DAG;
@ -23,6 +23,8 @@ import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.process.ProcessDag;
import org.apache.dolphinscheduler.common.task.conditions.ConditionsParameters;
import org.apache.dolphinscheduler.common.task.switchtask.SwitchParameters;
import org.apache.dolphinscheduler.common.task.switchtask.SwitchResultVo;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
@ -281,6 +283,9 @@ public class DagHelper {
} else if (dag.getNode(preNodeName).isConditionsTask()) {
List<String> conditionTaskList = parseConditionTask(preNodeName, skipTaskNodeList, dag, completeTaskList);
startVertexes.addAll(conditionTaskList);
} else if (dag.getNode(preNodeName).isSwitchTask()) {
List<String> conditionTaskList = parseSwitchTask(preNodeName, skipTaskNodeList, dag, completeTaskList);
startVertexes.addAll(conditionTaskList);
} else {
startVertexes = dag.getSubsequentNodes(preNodeName);
}
@ -355,6 +360,49 @@ public class DagHelper {
return conditionTaskList;
}
/**
* parse condition task find the branch process
* set skip flag for another one.
*
* @param nodeName
* @return
*/
public static List<String> parseSwitchTask(String nodeName,
Map<String, TaskNode> skipTaskNodeList,
DAG<String, TaskNode, TaskNodeRelation> dag,
Map<String, TaskInstance> completeTaskList) {
List<String> conditionTaskList = new ArrayList<>();
TaskNode taskNode = dag.getNode(nodeName);
if (!taskNode.isSwitchTask()) {
return conditionTaskList;
}
if (!completeTaskList.containsKey(nodeName)) {
return conditionTaskList;
}
conditionTaskList = skipTaskNode4Switch(taskNode, skipTaskNodeList, completeTaskList, dag);
return conditionTaskList;
}
private static List<String> skipTaskNode4Switch(TaskNode taskNode, Map<String, TaskNode> skipTaskNodeList,
Map<String, TaskInstance> completeTaskList,
DAG<String, TaskNode, TaskNodeRelation> dag) {
SwitchParameters switchParameters = completeTaskList.get(taskNode.getName()).getSwitchDependency();
int resultConditionLocation = switchParameters.getResultConditionLocation();
List<SwitchResultVo> conditionResultVoList = switchParameters.getDependTaskList();
List<String> switchTaskList = conditionResultVoList.get(resultConditionLocation).getNextNode();
if (CollectionUtils.isEmpty(switchTaskList)) {
switchTaskList = new ArrayList<>();
}
conditionResultVoList.remove(resultConditionLocation);
for (SwitchResultVo info : conditionResultVoList) {
if (CollectionUtils.isEmpty(info.getNextNode())) {
continue;
}
setTaskNodeSkip(info.getNextNode().get(0), dag, completeTaskList, skipTaskNodeList);
}
return switchTaskList;
}
/**
* set task node and the post nodes skip flag
*/

99
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java

@ -25,6 +25,8 @@ import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.process.ProcessDag;
import org.apache.dolphinscheduler.common.task.switchtask.SwitchParameters;
import org.apache.dolphinscheduler.common.task.switchtask.SwitchResultVo;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessData;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@ -251,6 +253,10 @@ public class DagHelperTest {
skipNodeList.clear();
completeTaskList.remove("3");
taskInstance = new TaskInstance();
Map<String, Object> taskParamsMap = new HashMap<>();
taskParamsMap.put(Constants.SWITCH_RESULT, "");
taskInstance.setTaskParams(JSONUtils.toJsonString(taskParamsMap));
taskInstance.setState(ExecutionStatus.FAILURE);
completeTaskList.put("3", taskInstance);
postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList);
@ -259,6 +265,17 @@ public class DagHelperTest {
Assert.assertEquals(2, skipNodeList.size());
Assert.assertTrue(skipNodeList.containsKey("5"));
Assert.assertTrue(skipNodeList.containsKey("7"));
// dag: 1-2-3-5-7 4-3-6
// 3-if , complete:1/2/3/4
// 1.failure:3 expect post:6 skip:5/7
dag = generateDag2();
skipNodeList.clear();
completeTaskList.clear();
taskInstance.setSwitchDependency(getSwitchNode());
completeTaskList.put("1", taskInstance);
postNodes = DagHelper.parsePostNodes("1", skipNodeList, dag, completeTaskList);
Assert.assertEquals(1, postNodes.size());
}
/**
@ -286,7 +303,6 @@ public class DagHelperTest {
node2.setPreTasks(JSONUtils.toJsonString(dep2));
taskNodeList.add(node2);
TaskNode node4 = new TaskNode();
node4.setId("4");
node4.setName("4");
@ -351,6 +367,87 @@ public class DagHelperTest {
return DagHelper.buildDagGraph(processDag);
}
/**
* 1->2->3->5->7
* 4->3->6
* 2->8->5->7
*
* @return dag
* @throws JsonProcessingException if error throws JsonProcessingException
*/
private DAG<String, TaskNode, TaskNodeRelation> generateDag2() throws IOException {
List<TaskNode> taskNodeList = new ArrayList<>();
TaskNode node = new TaskNode();
node.setId("0");
node.setName("0");
node.setType("SHELL");
taskNodeList.add(node);
TaskNode node1 = new TaskNode();
node1.setId("1");
node1.setName("1");
node1.setType("switch");
node1.setDependence(JSONUtils.toJsonString(getSwitchNode()));
taskNodeList.add(node1);
TaskNode node2 = new TaskNode();
node2.setId("2");
node2.setName("2");
node2.setType("SHELL");
List<String> dep2 = new ArrayList<>();
dep2.add("1");
node2.setPreTasks(JSONUtils.toJsonString(dep2));
taskNodeList.add(node2);
TaskNode node4 = new TaskNode();
node4.setId("4");
node4.setName("4");
node4.setType("SHELL");
List<String> dep4 = new ArrayList<>();
dep4.add("1");
node4.setPreTasks(JSONUtils.toJsonString(dep4));
taskNodeList.add(node4);
TaskNode node5 = new TaskNode();
node5.setId("4");
node5.setName("4");
node5.setType("SHELL");
List<String> dep5 = new ArrayList<>();
dep5.add("1");
node5.setPreTasks(JSONUtils.toJsonString(dep5));
taskNodeList.add(node5);
List<String> startNodes = new ArrayList<>();
List<String> recoveryNodes = new ArrayList<>();
List<TaskNode> destTaskNodeList = DagHelper.generateFlowNodeListByStartNode(taskNodeList,
startNodes, recoveryNodes, TaskDependType.TASK_POST);
List<TaskNodeRelation> taskNodeRelations = DagHelper.generateRelationListByFlowNodes(destTaskNodeList);
ProcessDag processDag = new ProcessDag();
processDag.setEdges(taskNodeRelations);
processDag.setNodes(destTaskNodeList);
return DagHelper.buildDagGraph(processDag);
}
private SwitchParameters getSwitchNode() {
SwitchParameters conditionsParameters = new SwitchParameters();
SwitchResultVo switchResultVo1 = new SwitchResultVo();
switchResultVo1.setCondition(" 2 == 1");
switchResultVo1.setNextNode("2");
SwitchResultVo switchResultVo2 = new SwitchResultVo();
switchResultVo2.setCondition(" 2 == 2");
switchResultVo2.setNextNode("4");
List<SwitchResultVo> list = new ArrayList<>();
list.add(switchResultVo1);
list.add(switchResultVo2);
conditionsParameters.setDependTaskList(list);
conditionsParameters.setNextNode("5");
conditionsParameters.setRelation("AND");
// in: AND(AND(1 is SUCCESS))
return conditionsParameters;
}
@Test
public void testBuildDagGraph() {
String shellJson = "{\"globalParams\":[],\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-9527\",\"name\":\"shell-1\","

15
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java

@ -17,11 +17,13 @@
package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
@ -201,7 +203,9 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
try {
if (taskInstance.isConditionsTask()
|| taskInstance.isDependTask()
|| taskInstance.isSubProcess()) {
|| taskInstance.isSubProcess()
|| taskInstance.isSwitchTask()
) {
return true;
}
if (taskInstance.getState().typeIsFinished()) {
@ -321,4 +325,13 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
long usedTime = (System.currentTimeMillis() - startTime.getTime()) / 1000;
return timeoutSeconds - usedTime;
}
protected String getThreadName() {
logger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(),
taskInstance.getProcessInstanceId(),
taskInstance.getId()));
return String.format(Constants.TASK_LOG_INFO_FORMAT, processService.formatTaskAppId(this.taskInstance));
}
}

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

@ -433,6 +433,8 @@ public class MasterExecThread implements Runnable {
abstractExecThread = new DependentTaskExecThread(taskInstance);
} else if (taskInstance.isConditionsTask()) {
abstractExecThread = new ConditionsTaskExecThread(taskInstance);
} else if (taskInstance.isSwitchTask()) {
abstractExecThread = new SwitchTaskExecThread(taskInstance);
} else {
abstractExecThread = new MasterTaskExecThread(taskInstance);
}

180
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/SwitchTaskExecThread.java

@ -0,0 +1,180 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.enums.DependResult;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.switchtask.SwitchParameters;
import org.apache.dolphinscheduler.common.task.switchtask.SwitchResultVo;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.server.utils.SwitchTaskUtils;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
public class SwitchTaskExecThread extends MasterBaseTaskExecThread {
protected final String rgex = "['\"]*\\$\\{(.*?)\\}['\"]*";
/**
* complete task map
*/
private Map<String, ExecutionStatus> completeTaskList = new ConcurrentHashMap<>();
/**
* switch result
*/
private DependResult conditionResult;
/**
* constructor of MasterBaseTaskExecThread
*
* @param taskInstance task instance
*/
public SwitchTaskExecThread(TaskInstance taskInstance) {
super(taskInstance);
taskInstance.setStartTime(new Date());
}
@Override
public Boolean submitWaitComplete() {
try {
this.taskInstance = submit();
logger.info("taskInstance submit end");
Thread.currentThread().setName(getThreadName());
initTaskParameters();
logger.info("switch task start");
waitTaskQuit();
updateTaskState();
} catch (Exception e) {
logger.error("switch task run exception", e);
}
return true;
}
private void waitTaskQuit() {
List<TaskInstance> taskInstances = processService.findValidTaskListByProcessId(
taskInstance.getProcessInstanceId()
);
for (TaskInstance task : taskInstances) {
completeTaskList.putIfAbsent(task.getName(), task.getState());
}
SwitchParameters switchParameters = taskInstance.getSwitchDependency();
List<SwitchResultVo> switchResultVos = switchParameters.getDependTaskList();
SwitchResultVo switchResultVo = new SwitchResultVo();
switchResultVo.setNextNode(switchParameters.getNextNode());
switchResultVos.add(switchResultVo);
int finalConditionLocation = switchResultVos.size() - 1;
int i = 0;
conditionResult = DependResult.SUCCESS;
for (SwitchResultVo info : switchResultVos) {
logger.info("the {} execution ", (i + 1));
logger.info("original condition sentence:{}", info.getCondition());
if (StringUtils.isEmpty(info.getCondition())) {
finalConditionLocation = i;
break;
}
String content = setTaskParams(info.getCondition().replaceAll("'", "\""), rgex);
logger.info("format condition sentence::{}", content);
Boolean result = null;
try {
result = SwitchTaskUtils.evaluate(content);
} catch (Exception e) {
logger.info("error sentence : {}", content);
conditionResult = DependResult.FAILED;
//result = false;
break;
}
logger.info("condition result : {}", result);
if (result) {
finalConditionLocation = i;
break;
}
i++;
}
switchParameters.setDependTaskList(switchResultVos);
switchParameters.setResultConditionLocation(finalConditionLocation);
taskInstance.setSwitchDependency(switchParameters);
//conditionResult = DependResult.SUCCESS;
logger.info("the switch task depend result : {}", conditionResult);
}
/**
* update task state
*/
private void updateTaskState() {
ExecutionStatus status;
if (this.cancel) {
status = ExecutionStatus.KILL;
} else {
status = (conditionResult == DependResult.SUCCESS) ? ExecutionStatus.SUCCESS : ExecutionStatus.FAILURE;
}
taskInstance.setEndTime(new Date());
taskInstance.setState(status);
processService.updateTaskInstance(taskInstance);
}
private void initTaskParameters() {
taskInstance.setLogPath(LogUtils.getTaskLogPath(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(),
taskInstance.getProcessInstanceId(),
taskInstance.getId()));
this.taskInstance.setStartTime(new Date());
this.taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
this.taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
this.processService.saveTaskInstance(taskInstance);
}
public String setTaskParams(String content, String rgex) {
Pattern pattern = Pattern.compile(rgex);
Matcher m = pattern.matcher(content);
Map<String, Property> globalParams = JSONUtils.toList(processInstance.getGlobalParams(), Property.class).stream().collect(Collectors.toMap(Property::getProp, Property -> Property));
Map<String, Property> varParams = JSONUtils.toList(taskInstance.getVarPool(), Property.class).stream().collect(Collectors.toMap(Property::getProp, Property -> Property));
if (varParams.size() > 0) {
varParams.putAll(globalParams);
globalParams = varParams;
}
while (m.find()) {
String paramName = m.group(1);
Property property = globalParams.get(paramName);
if (property == null) {
return "";
}
String value = property.getValue();
if (!org.apache.commons.lang.math.NumberUtils.isNumber(value)) {
value = "\"" + value + "\"";
}
logger.info("paramName:{},paramValue{}", paramName, value);
content = content.replace("${" + paramName + "}", value);
}
return content;
}
}

38
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SwitchTaskUtils.java

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.utils;
import javax.script.ScriptEngine;
import javax.script.ScriptEngineManager;
import javax.script.ScriptException;
public class SwitchTaskUtils {
private static ScriptEngineManager manager;
private static ScriptEngine engine;
static {
manager = new ScriptEngineManager();
engine = manager.getEngineByName("js");
}
public static boolean evaluate(String expression) throws ScriptException {
Object result = engine.eval(expression);
return (Boolean) result;
}
}

167
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java

@ -0,0 +1,167 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.task.switchtask.SwitchParameters;
import org.apache.dolphinscheduler.common.task.switchtask.SwitchResultVo;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.SwitchTaskExecThread;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
@RunWith(MockitoJUnitRunner.Silent.class)
public class SwitchTaskTest {
private static final Logger logger = LoggerFactory.getLogger(SwitchTaskTest.class);
/**
* TaskNode.runFlag : task can be run normally
*/
public static final String FLOWNODE_RUN_FLAG_NORMAL = "NORMAL";
private ProcessService processService;
private ProcessInstance processInstance;
@Before
public void before() {
ApplicationContext applicationContext = Mockito.mock(ApplicationContext.class);
SpringApplicationContext springApplicationContext = new SpringApplicationContext();
springApplicationContext.setApplicationContext(applicationContext);
MasterConfig config = new MasterConfig();
Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config);
config.setMasterTaskCommitRetryTimes(3);
config.setMasterTaskCommitInterval(1000);
processService = Mockito.mock(ProcessService.class);
Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
processInstance = getProcessInstance();
Mockito.when(processService
.findProcessInstanceById(processInstance.getId()))
.thenReturn(processInstance);
}
private TaskInstance testBasicInit(ExecutionStatus expectResult) {
TaskDefinition taskDefinition = new TaskDefinition();
taskDefinition.setTimeoutFlag(TimeoutFlag.OPEN);
taskDefinition.setTimeoutNotifyStrategy(TaskTimeoutStrategy.WARN);
taskDefinition.setTimeout(0);
Mockito.when(processService.findTaskDefinition(1L, 1))
.thenReturn(taskDefinition);
TaskInstance taskInstance = getTaskInstance(getTaskNode(), processInstance);
// for MasterBaseTaskExecThread.submit
Mockito.when(processService
.submitTask(taskInstance))
.thenReturn(taskInstance);
// for MasterBaseTaskExecThread.call
Mockito.when(processService
.findTaskInstanceById(taskInstance.getId()))
.thenReturn(taskInstance);
// for SwitchTaskExecThread.initTaskParameters
Mockito.when(processService
.saveTaskInstance(taskInstance))
.thenReturn(true);
// for SwitchTaskExecThread.updateTaskState
Mockito.when(processService
.updateTaskInstance(taskInstance))
.thenReturn(true);
return taskInstance;
}
@Test
public void testExe() throws Exception {
TaskInstance taskInstance = testBasicInit(ExecutionStatus.SUCCESS);
taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
SwitchTaskExecThread taskExecThread = new SwitchTaskExecThread(taskInstance);
taskExecThread.call();
Assert.assertEquals(ExecutionStatus.SUCCESS, taskExecThread.getTaskInstance().getState());
}
private SwitchParameters getTaskNode() {
SwitchParameters conditionsParameters = new SwitchParameters();
SwitchResultVo switchResultVo1 = new SwitchResultVo();
switchResultVo1.setCondition(" 2 == 1");
switchResultVo1.setNextNode("t1");
SwitchResultVo switchResultVo2 = new SwitchResultVo();
switchResultVo2.setCondition(" 2 == 2");
switchResultVo2.setNextNode("t2");
SwitchResultVo switchResultVo3 = new SwitchResultVo();
switchResultVo3.setCondition(" 3 == 2");
switchResultVo3.setNextNode("t3");
List<SwitchResultVo> list = new ArrayList<>();
list.add(switchResultVo1);
list.add(switchResultVo2);
list.add(switchResultVo3);
conditionsParameters.setDependTaskList(list);
conditionsParameters.setNextNode("t");
conditionsParameters.setRelation("AND");
return conditionsParameters;
}
private ProcessInstance getProcessInstance() {
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(1000);
processInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
processInstance.setProcessDefinitionCode(1L);
return processInstance;
}
private TaskInstance getTaskInstance(SwitchParameters conditionsParameters, ProcessInstance processInstance) {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1000);
Map<String, Object> taskParamsMap = new HashMap<>();
taskParamsMap.put(Constants.SWITCH_RESULT, "");
taskInstance.setTaskParams(JSONUtils.toJsonString(taskParamsMap));
taskInstance.setSwitchDependency(conditionsParameters);
taskInstance.setName("C");
taskInstance.setTaskType("SWITCH");
taskInstance.setProcessInstanceId(processInstance.getId());
taskInstance.setTaskCode(1L);
taskInstance.setTaskDefinitionVersion(1);
return taskInstance;
}
}

1
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -2458,6 +2458,7 @@ public class ProcessService {
v.setRetryInterval(taskDefinitionLog.getFailRetryInterval());
Map<String, Object> taskParamsMap = v.taskParamsToJsonObj(taskDefinitionLog.getTaskParams());
v.setConditionResult((String) taskParamsMap.get(Constants.CONDITION_RESULT));
v.setSwitchResult((String) taskParamsMap.get(Constants.SWITCH_RESULT));
v.setDependence((String) taskParamsMap.get(Constants.DEPENDENCE));
taskParamsMap.remove(Constants.CONDITION_RESULT);
taskParamsMap.remove(Constants.DEPENDENCE);

14
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

@ -424,12 +424,14 @@ public class ProcessServiceTest {
@Test
public void testGenProcessData() {
String processDefinitionJson = "{\"tasks\":[{\"id\":null,\"code\":3,\"version\":0,\"name\":\"1-test\",\"desc\":null,"
+ "\"type\":\"SHELL\",\"runFlag\":\"FORBIDDEN\",\"loc\":null,\"maxRetryTimes\":0,\"retryInterval\":0,"
+ "\"params\":{},\"preTasks\":[\"unit-test\"],\"preTaskNodeList\":[{\"code\":2,\"name\":\"unit-test\","
+ "\"version\":0}],\"extras\":null,\"depList\":[\"unit-test\"],\"dependence\":null,\"conditionResult\":null,"
+ "\"taskInstancePriority\":null,\"workerGroup\":null,\"timeout\":{\"enable\":false,\"strategy\":null,"
+ "\"interval\":0},\"delayTime\":0}],\"globalParams\":[],\"timeout\":0,\"tenantId\":0}";
String processDefinitionJson = "{\"tasks\":[{\"id\":null,\"code\":3,\"version\":0,\"name\":\"1-test\","
+ "\"desc\":null,\"type\":\"SHELL\",\"runFlag\":\"FORBIDDEN\",\"loc\":null,\"maxRetryTimes\":0,"
+ "\"retryInterval\":0,\"params\":{},\"preTasks\":[\"unit-test\"],"
+ "\"preTaskNodeList\":[{\"code\":2,\"name\":\"unit-test\",\"version\":0}],"
+ "\"extras\":null,\"depList\":[\"unit-test\"],\"dependence\":null,\"conditionResult\":null,"
+ "\"switchResult\":null,\"taskInstancePriority\":null,\"workerGroup\":null,"
+ "\"timeout\":{\"enable\":false,\"strategy\":null,\"interval\":0},\"delayTime\":0}],"
+ "\"globalParams\":[],\"timeout\":0,\"tenantId\":0}";
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setCode(1L);

1
pom.xml

@ -992,6 +992,7 @@
<include>**/server/master/MasterCommandTest.java</include>
<include>**/server/master/DependentTaskTest.java</include>
<include>**/server/master/ConditionsTaskTest.java</include>
<include>**/server/master/SwitchTaskTest.java</include>
<include>**/server/master/MasterExecThreadTest.java</include>
<include>**/server/master/ParamsTest.java</include>
<include>**/server/master/SubProcessTaskTest.java</include>

Loading…
Cancel
Save