diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index 2aded0f943..163607265b 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -121,7 +121,7 @@ public final class Constants { /** * MasterServer directory registered in zookeeper */ - public static final String ZOOKEEPER_DOLPHINSCHEDULER_MASTERS = "/nodes/masters"; + public static final String ZOOKEEPER_DOLPHINSCHEDULER_MASTERS = "/nodes/master"; /** * WorkerServer directory registered in zookeeper diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index df1eac39ac..db7cdbbcd2 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -725,7 +725,7 @@ public class MasterExecThread implements Runnable { ProcessInstance instance = processService.findProcessInstanceById(processInstance.getId()); ExecutionStatus state = instance.getState(); - if(activeTaskNode.size() > 0){ + if(activeTaskNode.size() > 0 || haveRetryTaskStandBy()){ return runningState(state); } // process failure @@ -768,6 +768,24 @@ public class MasterExecThread implements Runnable { return state; } + /** + * whether standby task list have retry tasks + * @return + */ + private boolean haveRetryTaskStandBy() { + + boolean result = false; + + for(String taskName : readyToSubmitTaskList.keySet()){ + TaskInstance task = readyToSubmitTaskList.get(taskName); + if(task.getState().typeIsFailure()){ + result = true; + break; + } + } + return result; + } + /** * whether complement end * @return Boolean whether is complement end @@ -856,7 +874,11 @@ public class MasterExecThread implements Runnable { // submit start node submitPostNode(null); boolean sendTimeWarning = false; - while(!processInstance.IsProcessInstanceStop()){ + while(Stopper.isRunning()){ + + if(processInstance.IsProcessInstanceStop()){ + break; + } // send warning email if process time out. if( !sendTimeWarning && checkProcessTimeOut(processInstance) ){ @@ -871,12 +893,21 @@ public class MasterExecThread implements Runnable { if(!future.isDone()){ continue; } + // node monitor thread complete - activeTaskNode.remove(entry.getKey()); + task = this.processService.findTaskInstanceById(task.getId()); + if(task == null){ this.taskFailedSubmit = true; + activeTaskNode.remove(entry.getKey()); continue; } + + // node monitor thread complete + if(task.getState().typeIsFinished()){ + activeTaskNode.remove(entry.getKey()); + } + logger.info("task :{}, id:{} complete, state is {} ", task.getName(), task.getId(), task.getState().toString()); //TODO node success , post node submit diff --git a/pom.xml b/pom.xml index b17573d37f..e2b0e66902 100644 --- a/pom.xml +++ b/pom.xml @@ -712,20 +712,44 @@ **/alert/utils/FuncUtilsTest.java **/alert/utils/JSONUtilsTest.java **/alert/utils/PropertyUtilsTest.java - **/server/utils/SparkArgsUtilsTest.java - **/server/utils/FlinkArgsUtilsTest.java - **/server/utils/ParamUtilsTest.java - **/server/master/MasterExecThreadTest.java + **/alert/template/AlertTemplateFactoryTest.java + **/alert/template/impl/DefaultHTMLTemplateTest.java **/dao/mapper/AccessTokenMapperTest.java **/dao/mapper/AlertGroupMapperTest.java **/dao/mapper/AlertMapperTest.java **/dao/mapper/CommandMapperTest.java **/dao/cron/CronUtilsTest.java **/dao/utils/DagHelperTest.java - **/alert/template/AlertTemplateFactoryTest.java - **/alert/template/impl/DefaultHTMLTemplateTest.java **/server/worker/task/datax/DataxTaskTest.java **/server/utils/DataxUtilsTest.java + **/server/utils/SparkArgsUtilsTest.java + **/server/utils/FlinkArgsUtilsTest.java + **/server/utils/ParamUtilsTest.java + **/server/log/MasterLogFilterTest.java + **/server/log/SensitiveDataConverterTest.java + **/server/log/TaskLogDiscriminatorTest.java + **/server/log/TaskLogFilterTest.java + **/server/log/WorkerLogFilterTest.java + **/server/master/executor/NettyExecutorManagerTest.java + **/server/master/host/LowerWeightRoundRobinTest.java + **/server/master/register/MasterRegistryTest.java + **/server/master/AlertManagerTest.java + **/server/master/MasterCommandTest.java + **/server/master/MasterExecThreadTest.java + **/server/master/ParamsTest.java + **/server/register/ZookeeperNodeManagerTest.java + **/server/utils/DataxUtilsTest.java + **/server/utils/FlinkArgsUtilsTest.java + **/server/utils/ParamUtilsTest.java + **/server/utils/ProcessUtilsTest.java + **/server/utils/SparkArgsUtilsTest.java + **/server/worker/register/WorkerRegistryTest.java + **/server/worker/shell/ShellCommandExecutorTest.java + **/server/worker/sql/SqlExecutorTest.java + **/server/worker/task/datax/DataxTaskTest.java + **/server/worker/task/dependent/DependentTaskTest.java + **/server/worker/task/spark/SparkTaskTest.java + **/server/worker/task/EnvFileTest.java -Xmx2048m