diff --git a/ambari_plugin/common-services/DOLPHIN/1.3.2/configuration/dolphin-worker.xml b/ambari_plugin/common-services/DOLPHIN/1.3.2/configuration/dolphin-worker.xml
index f162b0882b..1ae7a1a765 100644
--- a/ambari_plugin/common-services/DOLPHIN/1.3.2/configuration/dolphin-worker.xml
+++ b/ambari_plugin/common-services/DOLPHIN/1.3.2/configuration/dolphin-worker.xml
@@ -59,7 +59,7 @@
- worker.group
+ worker.groups
default
default worker group
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/spark/SparkParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/spark/SparkParameters.java
index 4e58201bf3..32a2a6b05d 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/spark/SparkParameters.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/spark/SparkParameters.java
@@ -216,7 +216,7 @@ public class SparkParameters extends AbstractParameters {
@Override
public boolean checkParameters() {
- return mainJar != null && programType != null && sparkVersion != null;
+ return mainJar != null && programType != null;
}
@Override
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
index c0bc3aaea8..6692598ce6 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
@@ -663,37 +663,58 @@ public class MasterExecThread implements Runnable {
if(startNodes.contains(taskName)){
return DependResult.SUCCESS;
}
-
TaskNode taskNode = dag.getNode(taskName);
List depNameList = taskNode.getDepList();
for(String depsNode : depNameList ){
if(!dag.containsNode(depsNode)
- || forbiddenTaskList.containsKey(depsNode)
- || skipTaskNodeList.containsKey(depsNode)){
+ || forbiddenTaskList.containsKey(depsNode)){
continue;
}
- // dependencies must be fully completed
+ if(skipTaskNodeList.containsKey(depsNode)){
+ return DependResult.FAILED;
+ }
+ // all the dependencies must be completed
if(!completeTaskList.containsKey(depsNode)){
return DependResult.WAITING;
}
ExecutionStatus depTaskState = completeTaskList.get(depsNode).getState();
- // conditions task would not return failed.
- if(depTaskState.typeIsFailure()
- && !DagHelper.haveConditionsAfterNode(depsNode, dag )
- && !dag.getNode(depsNode).isConditionsTask()){
- return DependResult.FAILED;
- }
-
if(depTaskState.typeIsPause() || depTaskState.typeIsCancel()){
return DependResult.WAITING;
}
+ // ignore task state if current task is condition
+ if(taskNode.isConditionsTask()){
+ continue;
+ }
+ if(!dependTaskSuccess(depsNode, taskName)){
+ return DependResult.FAILED;
+ }
}
-
logger.info("taskName: {} completeDependTaskList: {}", taskName, Arrays.toString(completeTaskList.keySet().toArray()));
-
return DependResult.SUCCESS;
}
+ /**
+ * depend node is completed, but here need check the condition task branch is the next node
+ * @param dependNodeName
+ * @param nextNodeName
+ * @return
+ */
+ private boolean dependTaskSuccess(String dependNodeName, String nextNodeName){
+ if(dag.getNode(dependNodeName).isConditionsTask()){
+ //condition task need check the branch to run
+ List nextTaskList = parseConditionTask(dependNodeName);
+ if(!nextTaskList.contains(nextNodeName)){
+ return false;
+ }
+ }else {
+ ExecutionStatus depTaskState = completeTaskList.get(dependNodeName).getState();
+ if(depTaskState.typeIsFailure()){
+ return false;
+ }
+ }
+ return true;
+ }
+
/**
* query task instance by complete state