From 052f9d10bf9507ab68c32ca03441a8e6bf3db991 Mon Sep 17 00:00:00 2001 From: qiaozhanwei Date: Mon, 23 Mar 2020 14:44:17 +0800 Subject: [PATCH] UT Coverage rate test (#2276) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment * ExecutorManager interface add generic type * add TaskInstanceCacheManager receive Worker report result * TaskInstance setExecutePath * add TaskInstanceCacheManager to receive Worker Task result report * TaskInstanceCacheManager add remove method * add license * add dispatcht task method * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * taskInstanceCache is null ,need load from db * taskInstanceCache is null ,need load from db * taskInstanceCache is null ,need load from db * 1,worker TaskPros use TaskExecutionContext replase 2,Master kill Task , KillTaskProcessor modify * worker remove db * ShellTask modify * master persistence processId and appIds * master persistence processId and appIds * master add kill task logic * master add kill task logic * master add kill task logic * javadoc error modify * remove chinese log * executeDirectly method add Override * remote module modify * TaskKillResponseProcessor command type modify * create buildKillCommand * host add host:port format * host add host:port format * TaskAckProcessor modify * TaskAckProcessor modify * task prioriry refator * remove ITaskQueue * task prioriry refator * remove ITaskQueue * TaskPriority refactor * remove logs * WorkerServer refactor * MasterSchedulerService modify * WorkerConfig listen port modify * modify master and worker listen port * cancelTaskInstance set TaskExecutionContext host,logPath,executePath * cancelTaskInstance set TaskExecutionContext host,logPath,executePath * Encapsulate the parameters required by sqltask * 1,Encapsulate the parameters required by sqltask 2,SQLTask optimization * AbstractTask modify * ProcedureTask optimization * MasterSchedulerService modify * TaskUpdateQueueConsumer modify * test * DataxTask process run debug * DataxTask process run debug * add protobuf dependency,MR、Spark task etc need this * TaskUpdateQueueConsumer modify * TaskExecutionContextBuilder set TaskInstance workgroup * WorkerGroupService queryAllGroup modify query available work group * 1,get workergroup from zk modify 2,SpringConnectionFactory repeat load modify * master and worker register ip use OSUtils.getHost() * ProcessInstance host set ip:port format * worker fault tolerance modify * Constants and .env modify * master fault tolerant bug modify * UT add pom.xml Co-authored-by: qiaozhanwei --- .../dolphinscheduler/common/Constants.java | 2 +- .../master/runner/MasterExecThread.java | 37 +++++++++++++++++-- pom.xml | 36 +++++++++++++++--- 3 files changed, 65 insertions(+), 10 deletions(-) diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index 2aded0f943..163607265b 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -121,7 +121,7 @@ public final class Constants { /** * MasterServer directory registered in zookeeper */ - public static final String ZOOKEEPER_DOLPHINSCHEDULER_MASTERS = "/nodes/masters"; + public static final String ZOOKEEPER_DOLPHINSCHEDULER_MASTERS = "/nodes/master"; /** * WorkerServer directory registered in zookeeper diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index df1eac39ac..db7cdbbcd2 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -725,7 +725,7 @@ public class MasterExecThread implements Runnable { ProcessInstance instance = processService.findProcessInstanceById(processInstance.getId()); ExecutionStatus state = instance.getState(); - if(activeTaskNode.size() > 0){ + if(activeTaskNode.size() > 0 || haveRetryTaskStandBy()){ return runningState(state); } // process failure @@ -768,6 +768,24 @@ public class MasterExecThread implements Runnable { return state; } + /** + * whether standby task list have retry tasks + * @return + */ + private boolean haveRetryTaskStandBy() { + + boolean result = false; + + for(String taskName : readyToSubmitTaskList.keySet()){ + TaskInstance task = readyToSubmitTaskList.get(taskName); + if(task.getState().typeIsFailure()){ + result = true; + break; + } + } + return result; + } + /** * whether complement end * @return Boolean whether is complement end @@ -856,7 +874,11 @@ public class MasterExecThread implements Runnable { // submit start node submitPostNode(null); boolean sendTimeWarning = false; - while(!processInstance.IsProcessInstanceStop()){ + while(Stopper.isRunning()){ + + if(processInstance.IsProcessInstanceStop()){ + break; + } // send warning email if process time out. if( !sendTimeWarning && checkProcessTimeOut(processInstance) ){ @@ -871,12 +893,21 @@ public class MasterExecThread implements Runnable { if(!future.isDone()){ continue; } + // node monitor thread complete - activeTaskNode.remove(entry.getKey()); + task = this.processService.findTaskInstanceById(task.getId()); + if(task == null){ this.taskFailedSubmit = true; + activeTaskNode.remove(entry.getKey()); continue; } + + // node monitor thread complete + if(task.getState().typeIsFinished()){ + activeTaskNode.remove(entry.getKey()); + } + logger.info("task :{}, id:{} complete, state is {} ", task.getName(), task.getId(), task.getState().toString()); //TODO node success , post node submit diff --git a/pom.xml b/pom.xml index b17573d37f..e2b0e66902 100644 --- a/pom.xml +++ b/pom.xml @@ -712,20 +712,44 @@ **/alert/utils/FuncUtilsTest.java **/alert/utils/JSONUtilsTest.java **/alert/utils/PropertyUtilsTest.java - **/server/utils/SparkArgsUtilsTest.java - **/server/utils/FlinkArgsUtilsTest.java - **/server/utils/ParamUtilsTest.java - **/server/master/MasterExecThreadTest.java + **/alert/template/AlertTemplateFactoryTest.java + **/alert/template/impl/DefaultHTMLTemplateTest.java **/dao/mapper/AccessTokenMapperTest.java **/dao/mapper/AlertGroupMapperTest.java **/dao/mapper/AlertMapperTest.java **/dao/mapper/CommandMapperTest.java **/dao/cron/CronUtilsTest.java **/dao/utils/DagHelperTest.java - **/alert/template/AlertTemplateFactoryTest.java - **/alert/template/impl/DefaultHTMLTemplateTest.java **/server/worker/task/datax/DataxTaskTest.java **/server/utils/DataxUtilsTest.java + **/server/utils/SparkArgsUtilsTest.java + **/server/utils/FlinkArgsUtilsTest.java + **/server/utils/ParamUtilsTest.java + **/server/log/MasterLogFilterTest.java + **/server/log/SensitiveDataConverterTest.java + **/server/log/TaskLogDiscriminatorTest.java + **/server/log/TaskLogFilterTest.java + **/server/log/WorkerLogFilterTest.java + **/server/master/executor/NettyExecutorManagerTest.java + **/server/master/host/LowerWeightRoundRobinTest.java + **/server/master/register/MasterRegistryTest.java + **/server/master/AlertManagerTest.java + **/server/master/MasterCommandTest.java + **/server/master/MasterExecThreadTest.java + **/server/master/ParamsTest.java + **/server/register/ZookeeperNodeManagerTest.java + **/server/utils/DataxUtilsTest.java + **/server/utils/FlinkArgsUtilsTest.java + **/server/utils/ParamUtilsTest.java + **/server/utils/ProcessUtilsTest.java + **/server/utils/SparkArgsUtilsTest.java + **/server/worker/register/WorkerRegistryTest.java + **/server/worker/shell/ShellCommandExecutorTest.java + **/server/worker/sql/SqlExecutorTest.java + **/server/worker/task/datax/DataxTaskTest.java + **/server/worker/task/dependent/DependentTaskTest.java + **/server/worker/task/spark/SparkTaskTest.java + **/server/worker/task/EnvFileTest.java -Xmx2048m