diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java index aca977125e..ad2f574cbe 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java @@ -43,8 +43,7 @@ public class CheckUtils { /** * check username * - * @param userName - * user name + * @param userName user name * @return true if user name regex valid,otherwise return false */ public static boolean checkUserName(String userName) { @@ -54,8 +53,7 @@ public class CheckUtils { /** * check email * - * @param email - * email + * @param email email * @return true if email regex valid, otherwise return false */ public static boolean checkEmail(String email) { @@ -69,8 +67,7 @@ public class CheckUtils { /** * check project description * - * @param desc - * desc + * @param desc desc * @return true if description regex valid, otherwise return false */ public static Map checkDesc(String desc) { @@ -78,7 +75,7 @@ public class CheckUtils { if (StringUtils.isNotEmpty(desc) && desc.length() > 200) { result.put(Constants.STATUS, Status.REQUEST_PARAMS_NOT_VALID_ERROR); result.put(Constants.MSG, - MessageFormat.format(Status.REQUEST_PARAMS_NOT_VALID_ERROR.getMsg(), "desc length")); + MessageFormat.format(Status.REQUEST_PARAMS_NOT_VALID_ERROR.getMsg(), "desc length")); } else { result.put(Constants.STATUS, Status.SUCCESS); } @@ -88,8 +85,7 @@ public class CheckUtils { /** * check extra info * - * @param otherParams - * other parames + * @param otherParams other parames * @return true if other parameters are valid, otherwise return false */ public static boolean checkOtherParams(String otherParams) { @@ -99,8 +95,7 @@ public class CheckUtils { /** * check password * - * @param password - * password + * @param password password * @return true if password regex valid, otherwise return false */ public static boolean checkPassword(String password) { @@ -110,8 +105,7 @@ public class CheckUtils { /** * check phone phone can be empty. * - * @param phone - * phone + * @param phone phone * @return true if phone regex valid, otherwise return false */ public static boolean checkPhone(String phone) { @@ -121,8 +115,7 @@ public class CheckUtils { /** * check task node parameter * - * @param taskNode - * TaskNode + * @param taskNode TaskNode * @return true if task node parameters are valid, otherwise return false */ public static boolean checkTaskNodeParameters(TaskNode taskNode) { @@ -133,6 +126,8 @@ public class CheckUtils { } if (TaskType.DEPENDENT.getDesc().equalsIgnoreCase(taskType)) { abstractParameters = TaskParametersUtils.getParameters(taskType.toUpperCase(), taskNode.getDependence()); + } else if (TaskType.SWITCH.getDesc().equalsIgnoreCase(taskType)) { + abstractParameters = TaskParametersUtils.getParameters(taskType.toUpperCase(), taskNode.getSwitchResult()); } else { abstractParameters = TaskParametersUtils.getParameters(taskType.toUpperCase(), taskNode.getParams()); } @@ -147,28 +142,22 @@ public class CheckUtils { /** * check params * - * @param userName - * user name - * @param password - * password - * @param email - * email - * @param phone - * phone + * @param userName user name + * @param password password + * @param email email + * @param phone phone * @return true if user parameters are valid, other return false */ public static boolean checkUserParams(String userName, String password, String email, String phone) { return CheckUtils.checkUserName(userName) && CheckUtils.checkEmail(email) && CheckUtils.checkPassword(password) - && CheckUtils.checkPhone(phone); + && CheckUtils.checkPhone(phone); } /** * regex check * - * @param str - * input string - * @param pattern - * regex pattern + * @param str input string + * @param pattern regex pattern * @return true if regex pattern is right, otherwise return false */ private static boolean regexChecks(String str, Pattern pattern) { diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index 53645b7e00..e2b8a0c0e8 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -776,6 +776,7 @@ public final class Constants { public static final String PROCESS_INSTANCE_STATE = "processInstanceState"; public static final String PARENT_WORKFLOW_INSTANCE = "parentWorkflowInstance"; public static final String CONDITION_RESULT = "conditionResult"; + public static final String SWITCH_RESULT = "switchResult"; public static final String DEPENDENCE = "dependence"; public static final String TASK_TYPE = "taskType"; public static final String TASK_LIST = "taskList"; diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java index d0842e4ba7..3792368aee 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java @@ -51,7 +51,9 @@ public enum TaskType { DATAX(10, "DATAX"), CONDITIONS(11, "CONDITIONS"), SQOOP(12, "SQOOP"), - WATERDROP(13, "WATERDROP"); + WATERDROP(13, "WATERDROP"), + SWITCH(14, "SWITCH"), + ; TaskType(int code, String desc) { this.code = code; diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java index b9c5a282ff..2e9262dd6b 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.common.model; import org.apache.dolphinscheduler.common.Constants; @@ -33,7 +34,6 @@ import java.util.Objects; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import com.fasterxml.jackson.databind.annotation.JsonSerialize; - public class TaskNode { /** @@ -129,6 +129,10 @@ public class TaskNode { @JsonSerialize(using = JSONUtils.JsonDataSerializer.class) private String conditionResult; + @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class) + @JsonSerialize(using = JSONUtils.JsonDataSerializer.class) + private String switchResult; + /** * task instance priority */ @@ -365,6 +369,10 @@ public class TaskNode { return TaskType.CONDITIONS.getDesc().equalsIgnoreCase(this.getType()); } + public boolean isSwitchTask() { + return TaskType.SWITCH.toString().equalsIgnoreCase(this.getType()); + } + public List getPreTaskNodeList() { return preTaskNodeList; } @@ -380,6 +388,7 @@ public class TaskNode { } taskParams.put(Constants.CONDITION_RESULT, this.conditionResult); taskParams.put(Constants.DEPENDENCE, this.dependence); + taskParams.put(Constants.SWITCH_RESULT, this.switchResult); return JSONUtils.toJsonString(taskParams); } @@ -417,4 +426,12 @@ public class TaskNode { + ", delayTime=" + delayTime + '}'; } + + public String getSwitchResult() { + return switchResult; + } + + public void setSwitchResult(String switchResult) { + this.switchResult = switchResult; + } } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/switchtask/SwitchParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/switchtask/SwitchParameters.java new file mode 100644 index 0000000000..dc59795308 --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/switchtask/SwitchParameters.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.common.task.switchtask; + +import org.apache.dolphinscheduler.common.enums.DependentRelation; +import org.apache.dolphinscheduler.common.process.ResourceInfo; +import org.apache.dolphinscheduler.common.task.AbstractParameters; + +import java.util.ArrayList; +import java.util.List; + +public class SwitchParameters extends AbstractParameters { + + private DependentRelation dependRelation; + private String relation; + private List nextNode; + + @Override + public boolean checkParameters() { + return true; + } + + @Override + public List getResourceFilesList() { + return new ArrayList<>(); + } + + private int resultConditionLocation; + private List dependTaskList; + + public DependentRelation getDependRelation() { + return dependRelation; + } + + public void setDependRelation(DependentRelation dependRelation) { + this.dependRelation = dependRelation; + } + + public int getResultConditionLocation() { + return resultConditionLocation; + } + + public void setResultConditionLocation(int resultConditionLocation) { + this.resultConditionLocation = resultConditionLocation; + } + + public String getRelation() { + return relation; + } + + public void setRelation(String relation) { + this.relation = relation; + } + + public List getDependTaskList() { + return dependTaskList; + } + + public void setDependTaskList(List dependTaskList) { + this.dependTaskList = dependTaskList; + } + + public List getNextNode() { + return nextNode; + } + + public void setNextNode(Object nextNode) { + if (nextNode instanceof String) { + List nextNodeList = new ArrayList<>(); + nextNodeList.add(String.valueOf(nextNode)); + this.nextNode = nextNodeList; + } else { + this.nextNode = (ArrayList) nextNode; + } + } +} \ No newline at end of file diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/switchtask/SwitchResultVo.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/switchtask/SwitchResultVo.java new file mode 100644 index 0000000000..558a6f1b83 --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/switchtask/SwitchResultVo.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.common.task.switchtask; + +import java.util.ArrayList; +import java.util.List; + +public class SwitchResultVo { + + private String condition; + private List nextNode; + + public String getCondition() { + return condition; + } + + public void setCondition(String condition) { + this.condition = condition; + } + + public List getNextNode() { + return nextNode; + } + + public void setNextNode(Object nextNode) { + if (nextNode instanceof String) { + List nextNodeList = new ArrayList<>(); + nextNodeList.add(String.valueOf(nextNode)); + this.nextNode = nextNodeList; + } else { + this.nextNode = (ArrayList) nextNode; + } + } +} \ No newline at end of file diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java index 740635cd0e..f5e9dec369 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java @@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.common.task.spark.SparkParameters; import org.apache.dolphinscheduler.common.task.sql.SqlParameters; import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; import org.apache.dolphinscheduler.common.task.subprocess.SubProcessParameters; +import org.apache.dolphinscheduler.common.task.switchtask.SwitchParameters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,6 +83,8 @@ public class TaskParametersUtils { return JSONUtils.parseObject(parameter, ConditionsParameters.class); case "SQOOP": return JSONUtils.parseObject(parameter, SqoopParameters.class); + case "SWITCH": + return JSONUtils.parseObject(parameter, SwitchParameters.class); default: logger.error("not support task type: {}", taskType); return null; diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java index aa8727225a..2be4ad659e 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java @@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.task.dependent.DependentParameters; +import org.apache.dolphinscheduler.common.task.switchtask.SwitchParameters; import org.apache.dolphinscheduler.common.utils.JSONUtils; import java.io.Serializable; @@ -174,6 +175,12 @@ public class TaskInstance implements Serializable { @TableField(exist = false) private DependentParameters dependency; + /** + * switch dependency + */ + @TableField(exist = false) + private SwitchParameters switchDependency; + /** * duration */ @@ -426,6 +433,20 @@ public class TaskInstance implements Serializable { this.dependency = dependency; } + public SwitchParameters getSwitchDependency() { + if (this.switchDependency == null) { + Map taskParamsMap = JSONUtils.toMap(this.getTaskParams(), String.class, Object.class); + this.switchDependency = JSONUtils.parseObject((String) taskParamsMap.get(Constants.SWITCH_RESULT), SwitchParameters.class); + } + return this.switchDependency; + } + + public void setSwitchDependency(SwitchParameters switchDependency) { + Map taskParamsMap = JSONUtils.toMap(this.getTaskParams(), String.class, Object.class); + taskParamsMap.put(Constants.SWITCH_RESULT,JSONUtils.toJsonString(switchDependency)); + this.setTaskParams(JSONUtils.toJsonString(taskParamsMap)); + } + public Flag getFlag() { return flag; } @@ -510,6 +531,10 @@ public class TaskInstance implements Serializable { return TaskType.CONDITIONS.getDesc().equalsIgnoreCase(this.taskType); } + public boolean isSwitchTask() { + return TaskType.SWITCH.getDesc().equalsIgnoreCase(this.taskType); + } + /** * determine if you can try again * diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java index 025b8250fe..de27f173ea 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java @@ -14,8 +14,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.dao.utils; +package org.apache.dolphinscheduler.dao.utils; import org.apache.dolphinscheduler.common.enums.TaskDependType; import org.apache.dolphinscheduler.common.graph.DAG; @@ -23,6 +23,8 @@ import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNodeRelation; import org.apache.dolphinscheduler.common.process.ProcessDag; import org.apache.dolphinscheduler.common.task.conditions.ConditionsParameters; +import org.apache.dolphinscheduler.common.task.switchtask.SwitchParameters; +import org.apache.dolphinscheduler.common.task.switchtask.SwitchResultVo; import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; @@ -281,6 +283,9 @@ public class DagHelper { } else if (dag.getNode(preNodeName).isConditionsTask()) { List conditionTaskList = parseConditionTask(preNodeName, skipTaskNodeList, dag, completeTaskList); startVertexes.addAll(conditionTaskList); + } else if (dag.getNode(preNodeName).isSwitchTask()) { + List conditionTaskList = parseSwitchTask(preNodeName, skipTaskNodeList, dag, completeTaskList); + startVertexes.addAll(conditionTaskList); } else { startVertexes = dag.getSubsequentNodes(preNodeName); } @@ -355,6 +360,49 @@ public class DagHelper { return conditionTaskList; } + /** + * parse condition task find the branch process + * set skip flag for another one. + * + * @param nodeName + * @return + */ + public static List parseSwitchTask(String nodeName, + Map skipTaskNodeList, + DAG dag, + Map completeTaskList) { + List conditionTaskList = new ArrayList<>(); + TaskNode taskNode = dag.getNode(nodeName); + if (!taskNode.isSwitchTask()) { + return conditionTaskList; + } + if (!completeTaskList.containsKey(nodeName)) { + return conditionTaskList; + } + conditionTaskList = skipTaskNode4Switch(taskNode, skipTaskNodeList, completeTaskList, dag); + return conditionTaskList; + } + + private static List skipTaskNode4Switch(TaskNode taskNode, Map skipTaskNodeList, + Map completeTaskList, + DAG dag) { + SwitchParameters switchParameters = completeTaskList.get(taskNode.getName()).getSwitchDependency(); + int resultConditionLocation = switchParameters.getResultConditionLocation(); + List conditionResultVoList = switchParameters.getDependTaskList(); + List switchTaskList = conditionResultVoList.get(resultConditionLocation).getNextNode(); + if (CollectionUtils.isEmpty(switchTaskList)) { + switchTaskList = new ArrayList<>(); + } + conditionResultVoList.remove(resultConditionLocation); + for (SwitchResultVo info : conditionResultVoList) { + if (CollectionUtils.isEmpty(info.getNextNode())) { + continue; + } + setTaskNodeSkip(info.getNextNode().get(0), dag, completeTaskList, skipTaskNodeList); + } + return switchTaskList; + } + /** * set task node and the post nodes skip flag */ diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java index c486ed9a15..18c17fe00b 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java @@ -25,6 +25,8 @@ import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNodeRelation; import org.apache.dolphinscheduler.common.process.ProcessDag; +import org.apache.dolphinscheduler.common.task.switchtask.SwitchParameters; +import org.apache.dolphinscheduler.common.task.switchtask.SwitchResultVo; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.ProcessData; import org.apache.dolphinscheduler.dao.entity.TaskInstance; @@ -251,6 +253,10 @@ public class DagHelperTest { skipNodeList.clear(); completeTaskList.remove("3"); taskInstance = new TaskInstance(); + + Map taskParamsMap = new HashMap<>(); + taskParamsMap.put(Constants.SWITCH_RESULT, ""); + taskInstance.setTaskParams(JSONUtils.toJsonString(taskParamsMap)); taskInstance.setState(ExecutionStatus.FAILURE); completeTaskList.put("3", taskInstance); postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList); @@ -259,6 +265,17 @@ public class DagHelperTest { Assert.assertEquals(2, skipNodeList.size()); Assert.assertTrue(skipNodeList.containsKey("5")); Assert.assertTrue(skipNodeList.containsKey("7")); + + // dag: 1-2-3-5-7 4-3-6 + // 3-if , complete:1/2/3/4 + // 1.failure:3 expect post:6 skip:5/7 + dag = generateDag2(); + skipNodeList.clear(); + completeTaskList.clear(); + taskInstance.setSwitchDependency(getSwitchNode()); + completeTaskList.put("1", taskInstance); + postNodes = DagHelper.parsePostNodes("1", skipNodeList, dag, completeTaskList); + Assert.assertEquals(1, postNodes.size()); } /** @@ -286,7 +303,6 @@ public class DagHelperTest { node2.setPreTasks(JSONUtils.toJsonString(dep2)); taskNodeList.add(node2); - TaskNode node4 = new TaskNode(); node4.setId("4"); node4.setName("4"); @@ -351,6 +367,87 @@ public class DagHelperTest { return DagHelper.buildDagGraph(processDag); } + /** + * 1->2->3->5->7 + * 4->3->6 + * 2->8->5->7 + * + * @return dag + * @throws JsonProcessingException if error throws JsonProcessingException + */ + private DAG generateDag2() throws IOException { + List taskNodeList = new ArrayList<>(); + + TaskNode node = new TaskNode(); + node.setId("0"); + node.setName("0"); + node.setType("SHELL"); + taskNodeList.add(node); + + TaskNode node1 = new TaskNode(); + node1.setId("1"); + node1.setName("1"); + node1.setType("switch"); + node1.setDependence(JSONUtils.toJsonString(getSwitchNode())); + taskNodeList.add(node1); + + TaskNode node2 = new TaskNode(); + node2.setId("2"); + node2.setName("2"); + node2.setType("SHELL"); + List dep2 = new ArrayList<>(); + dep2.add("1"); + node2.setPreTasks(JSONUtils.toJsonString(dep2)); + taskNodeList.add(node2); + + TaskNode node4 = new TaskNode(); + node4.setId("4"); + node4.setName("4"); + node4.setType("SHELL"); + List dep4 = new ArrayList<>(); + dep4.add("1"); + node4.setPreTasks(JSONUtils.toJsonString(dep4)); + taskNodeList.add(node4); + + TaskNode node5 = new TaskNode(); + node5.setId("4"); + node5.setName("4"); + node5.setType("SHELL"); + List dep5 = new ArrayList<>(); + dep5.add("1"); + node5.setPreTasks(JSONUtils.toJsonString(dep5)); + taskNodeList.add(node5); + + List startNodes = new ArrayList<>(); + List recoveryNodes = new ArrayList<>(); + List destTaskNodeList = DagHelper.generateFlowNodeListByStartNode(taskNodeList, + startNodes, recoveryNodes, TaskDependType.TASK_POST); + List taskNodeRelations = DagHelper.generateRelationListByFlowNodes(destTaskNodeList); + ProcessDag processDag = new ProcessDag(); + processDag.setEdges(taskNodeRelations); + processDag.setNodes(destTaskNodeList); + return DagHelper.buildDagGraph(processDag); + } + + private SwitchParameters getSwitchNode() { + SwitchParameters conditionsParameters = new SwitchParameters(); + SwitchResultVo switchResultVo1 = new SwitchResultVo(); + switchResultVo1.setCondition(" 2 == 1"); + switchResultVo1.setNextNode("2"); + SwitchResultVo switchResultVo2 = new SwitchResultVo(); + switchResultVo2.setCondition(" 2 == 2"); + switchResultVo2.setNextNode("4"); + List list = new ArrayList<>(); + list.add(switchResultVo1); + list.add(switchResultVo2); + conditionsParameters.setDependTaskList(list); + conditionsParameters.setNextNode("5"); + conditionsParameters.setRelation("AND"); + + // in: AND(AND(1 is SUCCESS)) + return conditionsParameters; + } + @Test public void testBuildDagGraph() { String shellJson = "{\"globalParams\":[],\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-9527\",\"name\":\"shell-1\"," diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java index cfd8a9a0d0..da62982970 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java @@ -17,11 +17,13 @@ package org.apache.dolphinscheduler.server.master.runner; +import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy; import org.apache.dolphinscheduler.common.enums.TimeoutFlag; import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; @@ -201,7 +203,9 @@ public class MasterBaseTaskExecThread implements Callable { try { if (taskInstance.isConditionsTask() || taskInstance.isDependTask() - || taskInstance.isSubProcess()) { + || taskInstance.isSubProcess() + || taskInstance.isSwitchTask() + ) { return true; } if (taskInstance.getState().typeIsFinished()) { @@ -321,4 +325,13 @@ public class MasterBaseTaskExecThread implements Callable { long usedTime = (System.currentTimeMillis() - startTime.getTime()) / 1000; return timeoutSeconds - usedTime; } + + protected String getThreadName() { + logger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX, + processInstance.getProcessDefinitionCode(), + processInstance.getProcessDefinitionVersion(), + taskInstance.getProcessInstanceId(), + taskInstance.getId())); + return String.format(Constants.TASK_LOG_INFO_FORMAT, processService.formatTaskAppId(this.taskInstance)); + } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index 1863087fca..856b833865 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -433,6 +433,8 @@ public class MasterExecThread implements Runnable { abstractExecThread = new DependentTaskExecThread(taskInstance); } else if (taskInstance.isConditionsTask()) { abstractExecThread = new ConditionsTaskExecThread(taskInstance); + } else if (taskInstance.isSwitchTask()) { + abstractExecThread = new SwitchTaskExecThread(taskInstance); } else { abstractExecThread = new MasterTaskExecThread(taskInstance); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/SwitchTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/SwitchTaskExecThread.java new file mode 100644 index 0000000000..f9e7f426dc --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/SwitchTaskExecThread.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.runner; + +import org.apache.dolphinscheduler.common.enums.DependResult; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.process.Property; +import org.apache.dolphinscheduler.common.task.switchtask.SwitchParameters; +import org.apache.dolphinscheduler.common.task.switchtask.SwitchResultVo; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.NetUtils; +import org.apache.dolphinscheduler.common.utils.StringUtils; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.server.utils.LogUtils; +import org.apache.dolphinscheduler.server.utils.SwitchTaskUtils; + +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +public class SwitchTaskExecThread extends MasterBaseTaskExecThread { + + protected final String rgex = "['\"]*\\$\\{(.*?)\\}['\"]*"; + + /** + * complete task map + */ + private Map completeTaskList = new ConcurrentHashMap<>(); + + /** + * switch result + */ + private DependResult conditionResult; + + /** + * constructor of MasterBaseTaskExecThread + * + * @param taskInstance task instance + */ + public SwitchTaskExecThread(TaskInstance taskInstance) { + super(taskInstance); + taskInstance.setStartTime(new Date()); + } + + @Override + public Boolean submitWaitComplete() { + try { + this.taskInstance = submit(); + logger.info("taskInstance submit end"); + Thread.currentThread().setName(getThreadName()); + initTaskParameters(); + logger.info("switch task start"); + waitTaskQuit(); + updateTaskState(); + } catch (Exception e) { + logger.error("switch task run exception", e); + } + return true; + } + + private void waitTaskQuit() { + List taskInstances = processService.findValidTaskListByProcessId( + taskInstance.getProcessInstanceId() + ); + for (TaskInstance task : taskInstances) { + completeTaskList.putIfAbsent(task.getName(), task.getState()); + } + + SwitchParameters switchParameters = taskInstance.getSwitchDependency(); + List switchResultVos = switchParameters.getDependTaskList(); + SwitchResultVo switchResultVo = new SwitchResultVo(); + switchResultVo.setNextNode(switchParameters.getNextNode()); + switchResultVos.add(switchResultVo); + int finalConditionLocation = switchResultVos.size() - 1; + int i = 0; + conditionResult = DependResult.SUCCESS; + for (SwitchResultVo info : switchResultVos) { + logger.info("the {} execution ", (i + 1)); + logger.info("original condition sentence:{}", info.getCondition()); + if (StringUtils.isEmpty(info.getCondition())) { + finalConditionLocation = i; + break; + } + String content = setTaskParams(info.getCondition().replaceAll("'", "\""), rgex); + logger.info("format condition sentence::{}", content); + Boolean result = null; + try { + result = SwitchTaskUtils.evaluate(content); + } catch (Exception e) { + logger.info("error sentence : {}", content); + conditionResult = DependResult.FAILED; + //result = false; + break; + } + logger.info("condition result : {}", result); + if (result) { + finalConditionLocation = i; + break; + } + i++; + } + switchParameters.setDependTaskList(switchResultVos); + switchParameters.setResultConditionLocation(finalConditionLocation); + taskInstance.setSwitchDependency(switchParameters); + + //conditionResult = DependResult.SUCCESS; + logger.info("the switch task depend result : {}", conditionResult); + } + + /** + * update task state + */ + private void updateTaskState() { + ExecutionStatus status; + if (this.cancel) { + status = ExecutionStatus.KILL; + } else { + status = (conditionResult == DependResult.SUCCESS) ? ExecutionStatus.SUCCESS : ExecutionStatus.FAILURE; + } + taskInstance.setEndTime(new Date()); + taskInstance.setState(status); + processService.updateTaskInstance(taskInstance); + } + + private void initTaskParameters() { + taskInstance.setLogPath(LogUtils.getTaskLogPath(processInstance.getProcessDefinitionCode(), + processInstance.getProcessDefinitionVersion(), + taskInstance.getProcessInstanceId(), + taskInstance.getId())); + this.taskInstance.setStartTime(new Date()); + this.taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort())); + this.taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION); + this.processService.saveTaskInstance(taskInstance); + } + + public String setTaskParams(String content, String rgex) { + Pattern pattern = Pattern.compile(rgex); + Matcher m = pattern.matcher(content); + Map globalParams = JSONUtils.toList(processInstance.getGlobalParams(), Property.class).stream().collect(Collectors.toMap(Property::getProp, Property -> Property)); + Map varParams = JSONUtils.toList(taskInstance.getVarPool(), Property.class).stream().collect(Collectors.toMap(Property::getProp, Property -> Property)); + if (varParams.size() > 0) { + varParams.putAll(globalParams); + globalParams = varParams; + } + while (m.find()) { + String paramName = m.group(1); + Property property = globalParams.get(paramName); + if (property == null) { + return ""; + } + String value = property.getValue(); + if (!org.apache.commons.lang.math.NumberUtils.isNumber(value)) { + value = "\"" + value + "\""; + } + logger.info("paramName:{},paramValue{}", paramName, value); + content = content.replace("${" + paramName + "}", value); + } + return content; + } + +} \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SwitchTaskUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SwitchTaskUtils.java new file mode 100644 index 0000000000..6320febc9b --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SwitchTaskUtils.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.utils; + +import javax.script.ScriptEngine; +import javax.script.ScriptEngineManager; +import javax.script.ScriptException; + +public class SwitchTaskUtils { + private static ScriptEngineManager manager; + private static ScriptEngine engine; + + static { + manager = new ScriptEngineManager(); + engine = manager.getEngineByName("js"); + } + + public static boolean evaluate(String expression) throws ScriptException { + Object result = engine.eval(expression); + return (Boolean) result; + } + +} \ No newline at end of file diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java new file mode 100644 index 0000000000..0c2d74a0a2 --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master; + +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy; +import org.apache.dolphinscheduler.common.enums.TimeoutFlag; +import org.apache.dolphinscheduler.common.task.switchtask.SwitchParameters; +import org.apache.dolphinscheduler.common.task.switchtask.SwitchResultVo; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.TaskDefinition; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.master.runner.SwitchTaskExecThread; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.apache.dolphinscheduler.service.process.ProcessService; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.ApplicationContext; + +@RunWith(MockitoJUnitRunner.Silent.class) +public class SwitchTaskTest { + + private static final Logger logger = LoggerFactory.getLogger(SwitchTaskTest.class); + + /** + * TaskNode.runFlag : task can be run normally + */ + public static final String FLOWNODE_RUN_FLAG_NORMAL = "NORMAL"; + + private ProcessService processService; + + private ProcessInstance processInstance; + + @Before + public void before() { + ApplicationContext applicationContext = Mockito.mock(ApplicationContext.class); + SpringApplicationContext springApplicationContext = new SpringApplicationContext(); + springApplicationContext.setApplicationContext(applicationContext); + + MasterConfig config = new MasterConfig(); + Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config); + config.setMasterTaskCommitRetryTimes(3); + config.setMasterTaskCommitInterval(1000); + + processService = Mockito.mock(ProcessService.class); + Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService); + + processInstance = getProcessInstance(); + Mockito.when(processService + .findProcessInstanceById(processInstance.getId())) + .thenReturn(processInstance); + } + + private TaskInstance testBasicInit(ExecutionStatus expectResult) { + TaskDefinition taskDefinition = new TaskDefinition(); + taskDefinition.setTimeoutFlag(TimeoutFlag.OPEN); + taskDefinition.setTimeoutNotifyStrategy(TaskTimeoutStrategy.WARN); + taskDefinition.setTimeout(0); + Mockito.when(processService.findTaskDefinition(1L, 1)) + .thenReturn(taskDefinition); + TaskInstance taskInstance = getTaskInstance(getTaskNode(), processInstance); + + // for MasterBaseTaskExecThread.submit + Mockito.when(processService + .submitTask(taskInstance)) + .thenReturn(taskInstance); + // for MasterBaseTaskExecThread.call + Mockito.when(processService + .findTaskInstanceById(taskInstance.getId())) + .thenReturn(taskInstance); + // for SwitchTaskExecThread.initTaskParameters + Mockito.when(processService + .saveTaskInstance(taskInstance)) + .thenReturn(true); + // for SwitchTaskExecThread.updateTaskState + Mockito.when(processService + .updateTaskInstance(taskInstance)) + .thenReturn(true); + + return taskInstance; + } + + @Test + public void testExe() throws Exception { + TaskInstance taskInstance = testBasicInit(ExecutionStatus.SUCCESS); + taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS); + SwitchTaskExecThread taskExecThread = new SwitchTaskExecThread(taskInstance); + taskExecThread.call(); + Assert.assertEquals(ExecutionStatus.SUCCESS, taskExecThread.getTaskInstance().getState()); + } + + private SwitchParameters getTaskNode() { + SwitchParameters conditionsParameters = new SwitchParameters(); + + SwitchResultVo switchResultVo1 = new SwitchResultVo(); + switchResultVo1.setCondition(" 2 == 1"); + switchResultVo1.setNextNode("t1"); + SwitchResultVo switchResultVo2 = new SwitchResultVo(); + switchResultVo2.setCondition(" 2 == 2"); + switchResultVo2.setNextNode("t2"); + SwitchResultVo switchResultVo3 = new SwitchResultVo(); + switchResultVo3.setCondition(" 3 == 2"); + switchResultVo3.setNextNode("t3"); + List list = new ArrayList<>(); + list.add(switchResultVo1); + list.add(switchResultVo2); + list.add(switchResultVo3); + conditionsParameters.setDependTaskList(list); + conditionsParameters.setNextNode("t"); + conditionsParameters.setRelation("AND"); + + return conditionsParameters; + } + + private ProcessInstance getProcessInstance() { + ProcessInstance processInstance = new ProcessInstance(); + processInstance.setId(1000); + processInstance.setState(ExecutionStatus.RUNNING_EXECUTION); + processInstance.setProcessDefinitionCode(1L); + return processInstance; + } + + private TaskInstance getTaskInstance(SwitchParameters conditionsParameters, ProcessInstance processInstance) { + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setId(1000); + Map taskParamsMap = new HashMap<>(); + taskParamsMap.put(Constants.SWITCH_RESULT, ""); + taskInstance.setTaskParams(JSONUtils.toJsonString(taskParamsMap)); + taskInstance.setSwitchDependency(conditionsParameters); + taskInstance.setName("C"); + taskInstance.setTaskType("SWITCH"); + taskInstance.setProcessInstanceId(processInstance.getId()); + taskInstance.setTaskCode(1L); + taskInstance.setTaskDefinitionVersion(1); + return taskInstance; + } +} \ No newline at end of file diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index f7b5de33e4..ac3e78d7af 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -2458,6 +2458,7 @@ public class ProcessService { v.setRetryInterval(taskDefinitionLog.getFailRetryInterval()); Map taskParamsMap = v.taskParamsToJsonObj(taskDefinitionLog.getTaskParams()); v.setConditionResult((String) taskParamsMap.get(Constants.CONDITION_RESULT)); + v.setSwitchResult((String) taskParamsMap.get(Constants.SWITCH_RESULT)); v.setDependence((String) taskParamsMap.get(Constants.DEPENDENCE)); taskParamsMap.remove(Constants.CONDITION_RESULT); taskParamsMap.remove(Constants.DEPENDENCE); diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java index 643dc09c6a..d0a735173a 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java @@ -424,12 +424,14 @@ public class ProcessServiceTest { @Test public void testGenProcessData() { - String processDefinitionJson = "{\"tasks\":[{\"id\":null,\"code\":3,\"version\":0,\"name\":\"1-test\",\"desc\":null," - + "\"type\":\"SHELL\",\"runFlag\":\"FORBIDDEN\",\"loc\":null,\"maxRetryTimes\":0,\"retryInterval\":0," - + "\"params\":{},\"preTasks\":[\"unit-test\"],\"preTaskNodeList\":[{\"code\":2,\"name\":\"unit-test\"," - + "\"version\":0}],\"extras\":null,\"depList\":[\"unit-test\"],\"dependence\":null,\"conditionResult\":null," - + "\"taskInstancePriority\":null,\"workerGroup\":null,\"timeout\":{\"enable\":false,\"strategy\":null," - + "\"interval\":0},\"delayTime\":0}],\"globalParams\":[],\"timeout\":0,\"tenantId\":0}"; + String processDefinitionJson = "{\"tasks\":[{\"id\":null,\"code\":3,\"version\":0,\"name\":\"1-test\"," + + "\"desc\":null,\"type\":\"SHELL\",\"runFlag\":\"FORBIDDEN\",\"loc\":null,\"maxRetryTimes\":0," + + "\"retryInterval\":0,\"params\":{},\"preTasks\":[\"unit-test\"]," + + "\"preTaskNodeList\":[{\"code\":2,\"name\":\"unit-test\",\"version\":0}]," + + "\"extras\":null,\"depList\":[\"unit-test\"],\"dependence\":null,\"conditionResult\":null," + + "\"switchResult\":null,\"taskInstancePriority\":null,\"workerGroup\":null," + + "\"timeout\":{\"enable\":false,\"strategy\":null,\"interval\":0},\"delayTime\":0}]," + + "\"globalParams\":[],\"timeout\":0,\"tenantId\":0}"; ProcessDefinition processDefinition = new ProcessDefinition(); processDefinition.setCode(1L); diff --git a/pom.xml b/pom.xml index 705a54b95b..522d9b1ab9 100644 --- a/pom.xml +++ b/pom.xml @@ -992,6 +992,7 @@ **/server/master/MasterCommandTest.java **/server/master/DependentTaskTest.java **/server/master/ConditionsTaskTest.java + **/server/master/SwitchTaskTest.java **/server/master/MasterExecThreadTest.java **/server/master/ParamsTest.java **/server/master/SubProcessTaskTest.java