From 68cb81fdf5e62c95444f22837545c3c3b8604ba3 Mon Sep 17 00:00:00 2001 From: qiaozhanwei Date: Sat, 28 Mar 2020 22:34:47 +0800 Subject: [PATCH] =?UTF-8?q?1=EF=BC=8Cadd=20UT=20in=20pom=202=EF=BC=8Crefac?= =?UTF-8?q?tor=20TaskUpdateQueue=20(#2326)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment * ExecutorManager interface add generic type * add TaskInstanceCacheManager receive Worker report result * TaskInstance setExecutePath * add TaskInstanceCacheManager to receive Worker Task result report * TaskInstanceCacheManager add remove method * add license * add dispatcht task method * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * taskInstanceCache is null ,need load from db * taskInstanceCache is null ,need load from db * taskInstanceCache is null ,need load from db * 1,worker TaskPros use TaskExecutionContext replase 2,Master kill Task , KillTaskProcessor modify * worker remove db * ShellTask modify * master persistence processId and appIds * master persistence processId and appIds * master add kill task logic * master add kill task logic * master add kill task logic * javadoc error modify * remove chinese log * executeDirectly method add Override * remote module modify * TaskKillResponseProcessor command type modify * create buildKillCommand * host add host:port format * host add host:port format * TaskAckProcessor modify * TaskAckProcessor modify * task prioriry refator * remove ITaskQueue * task prioriry refator * remove ITaskQueue * TaskPriority refactor * remove logs * WorkerServer refactor * MasterSchedulerService modify * WorkerConfig listen port modify * modify master and worker listen port * cancelTaskInstance set TaskExecutionContext host,logPath,executePath * cancelTaskInstance set TaskExecutionContext host,logPath,executePath * Encapsulate the parameters required by sqltask * 1,Encapsulate the parameters required by sqltask 2,SQLTask optimization * AbstractTask modify * ProcedureTask optimization * MasterSchedulerService modify * TaskUpdateQueueConsumer modify * test * DataxTask process run debug * DataxTask process run debug * add protobuf dependency,MR、Spark task etc need this * TaskUpdateQueueConsumer modify * TaskExecutionContextBuilder set TaskInstance workgroup * WorkerGroupService queryAllGroup modify query available work group * 1,get workergroup from zk modify 2,SpringConnectionFactory repeat load modify * master and worker register ip use OSUtils.getHost() * ProcessInstance host set ip:port format * worker fault tolerance modify * Constants and .env modify * master fault tolerant bug modify * UT add pom.xml * timing online modify * when taskResponse is faster than taskAck to db,task state will error add async queue and new a thread reslove this problem * TaskExecutionContext set host * 1,TaskManager refactor 2, api start load server dolphinschedule-daemon.sh modify * 1,TaskManager refactor 2, api start load server dolphinschedule-daemon.sh modify * add UT in pom.xml * revert dolphinscheduler-daemon.sh * ZookeeperRegister use common.properties zookeeperRoot path * api start exclude org.apache.dolphinscheduler.server.* * ZookeeperRegister use common.properties zookeeperRoot path * 1,api start load server filter 2,SHELL task exitStatusCode modify * java doc error modify * java doc error modify * remove todo * add UT in pom * 1,add UT in pom 2,refactor TaskUpdateQueue Co-authored-by: qiaozhanwei --- .../controller/ProcessInstanceController.java | 1 - .../api/service/DataAnalysisServiceTest.java | 2 - ...er.java => TaskPriorityQueueConsumer.java} | 107 +++++++++++------- .../runner/MasterBaseTaskExecThread.java | 8 +- ...pdateQueue.java => TaskPriorityQueue.java} | 2 +- ...ueImpl.java => TaskPriorityQueueImpl.java} | 4 +- .../service/queue/TaskQueueFactory.java | 55 --------- .../test/java/queue/TaskUpdateQueueTest.java | 6 +- pom.xml | 23 ++-- 9 files changed, 85 insertions(+), 123 deletions(-) rename dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/{TaskUpdateQueueConsumer.java => TaskPriorityQueueConsumer.java} (69%) rename dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/{TaskUpdateQueue.java => TaskPriorityQueue.java} (97%) rename dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/{TaskUpdateQueueImpl.java => TaskPriorityQueueImpl.java} (96%) delete mode 100644 dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskQueueFactory.java diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java index b6533ad25c..2fd332f6f8 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java @@ -26,7 +26,6 @@ import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.dao.entity.User; import io.swagger.annotations.*; -import org.apache.dolphinscheduler.service.queue.TaskQueueFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataAnalysisServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataAnalysisServiceTest.java index 10220e2d31..35cc6ae9a6 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataAnalysisServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataAnalysisServiceTest.java @@ -28,7 +28,6 @@ import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.*; import org.apache.dolphinscheduler.service.process.ProcessService; -import org.apache.dolphinscheduler.service.queue.TaskQueueFactory; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -46,7 +45,6 @@ import java.util.List; import java.util.Map; @RunWith(PowerMockRunner.class) -@PrepareForTest({TaskQueueFactory.class}) public class DataAnalysisServiceTest { @InjectMocks diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java similarity index 69% rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java rename to dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index 077b1efbd2..4aaf901638 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -40,7 +40,7 @@ import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionConte import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException; import org.apache.dolphinscheduler.service.process.ProcessService; -import org.apache.dolphinscheduler.service.queue.TaskUpdateQueue; +import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -53,18 +53,18 @@ import java.util.List; * TaskUpdateQueue consumer */ @Component -public class TaskUpdateQueueConsumer extends Thread{ +public class TaskPriorityQueueConsumer extends Thread{ /** * logger of TaskUpdateQueueConsumer */ - private static final Logger logger = LoggerFactory.getLogger(TaskUpdateQueueConsumer.class); + private static final Logger logger = LoggerFactory.getLogger(TaskPriorityQueueConsumer.class); /** * taskUpdateQueue */ @Autowired - private TaskUpdateQueue taskUpdateQueue; + private TaskPriorityQueue taskUpdateQueue; /** * processService @@ -155,52 +155,19 @@ public class TaskUpdateQueueConsumer extends Thread{ TaskNode taskNode = JSONObject.parseObject(taskInstance.getTaskJson(), TaskNode.class); // SQL task if (taskType == TaskType.SQL){ - SqlParameters sqlParameters = JSONObject.parseObject(taskNode.getParams(), SqlParameters.class); - int datasourceId = sqlParameters.getDatasource(); - DataSource datasource = processService.findDataSourceById(datasourceId); - sqlTaskExecutionContext.setConnectionParams(datasource.getConnectionParams()); - - // whether udf type - boolean udfTypeFlag = EnumUtils.isValidEnum(UdfType.class, sqlParameters.getType()) - && StringUtils.isNotEmpty(sqlParameters.getUdfs()); - - if (udfTypeFlag){ - String[] udfFunIds = sqlParameters.getUdfs().split(","); - int[] udfFunIdsArray = new int[udfFunIds.length]; - for(int i = 0 ; i < udfFunIds.length;i++){ - udfFunIdsArray[i]=Integer.parseInt(udfFunIds[i]); - } - - List udfFuncList = processService.queryUdfFunListByids(udfFunIdsArray); - sqlTaskExecutionContext.setUdfFuncList(udfFuncList); - } + setSQLTaskRelation(sqlTaskExecutionContext, taskNode); } // DATAX task if (taskType == TaskType.DATAX){ - DataxParameters dataxParameters = JSONObject.parseObject(taskNode.getParams(), DataxParameters.class); - - DataSource dataSource = processService.findDataSourceById(dataxParameters.getDataSource()); - DataSource dataTarget = processService.findDataSourceById(dataxParameters.getDataTarget()); - - - dataxTaskExecutionContext.setDataSourceId(dataxParameters.getDataSource()); - dataxTaskExecutionContext.setSourcetype(dataSource.getType().getCode()); - dataxTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams()); - - dataxTaskExecutionContext.setDataTargetId(dataxParameters.getDataTarget()); - dataxTaskExecutionContext.setTargetType(dataTarget.getType().getCode()); - dataxTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams()); + setDataxTaskRelation(dataxTaskExecutionContext, taskNode); } // procedure task if (taskType == TaskType.PROCEDURE){ - ProcedureParameters procedureParameters = JSONObject.parseObject(taskNode.getParams(), ProcedureParameters.class); - int datasourceId = procedureParameters.getDatasource(); - DataSource datasource = processService.findDataSourceById(datasourceId); - procedureTaskExecutionContext.setConnectionParams(datasource.getConnectionParams()); + setProcedureTaskRelation(procedureTaskExecutionContext, taskNode); } @@ -215,6 +182,66 @@ public class TaskUpdateQueueConsumer extends Thread{ .create(); } + /** + * set procedure task relation + * @param procedureTaskExecutionContext procedureTaskExecutionContext + * @param taskNode taskNode + */ + private void setProcedureTaskRelation(ProcedureTaskExecutionContext procedureTaskExecutionContext, TaskNode taskNode) { + ProcedureParameters procedureParameters = JSONObject.parseObject(taskNode.getParams(), ProcedureParameters.class); + int datasourceId = procedureParameters.getDatasource(); + DataSource datasource = processService.findDataSourceById(datasourceId); + procedureTaskExecutionContext.setConnectionParams(datasource.getConnectionParams()); + } + + /** + * set datax task relation + * @param dataxTaskExecutionContext dataxTaskExecutionContext + * @param taskNode taskNode + */ + private void setDataxTaskRelation(DataxTaskExecutionContext dataxTaskExecutionContext, TaskNode taskNode) { + DataxParameters dataxParameters = JSONObject.parseObject(taskNode.getParams(), DataxParameters.class); + + DataSource dataSource = processService.findDataSourceById(dataxParameters.getDataSource()); + DataSource dataTarget = processService.findDataSourceById(dataxParameters.getDataTarget()); + + + dataxTaskExecutionContext.setDataSourceId(dataxParameters.getDataSource()); + dataxTaskExecutionContext.setSourcetype(dataSource.getType().getCode()); + dataxTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams()); + + dataxTaskExecutionContext.setDataTargetId(dataxParameters.getDataTarget()); + dataxTaskExecutionContext.setTargetType(dataTarget.getType().getCode()); + dataxTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams()); + } + + /** + * set SQL task relation + * @param sqlTaskExecutionContext sqlTaskExecutionContext + * @param taskNode taskNode + */ + private void setSQLTaskRelation(SQLTaskExecutionContext sqlTaskExecutionContext, TaskNode taskNode) { + SqlParameters sqlParameters = JSONObject.parseObject(taskNode.getParams(), SqlParameters.class); + int datasourceId = sqlParameters.getDatasource(); + DataSource datasource = processService.findDataSourceById(datasourceId); + sqlTaskExecutionContext.setConnectionParams(datasource.getConnectionParams()); + + // whether udf type + boolean udfTypeFlag = EnumUtils.isValidEnum(UdfType.class, sqlParameters.getType()) + && StringUtils.isNotEmpty(sqlParameters.getUdfs()); + + if (udfTypeFlag){ + String[] udfFunIds = sqlParameters.getUdfs().split(","); + int[] udfFunIdsArray = new int[udfFunIds.length]; + for(int i = 0 ; i < udfFunIds.length;i++){ + udfFunIdsArray[i]=Integer.parseInt(udfFunIds[i]); + } + + List udfFuncList = processService.queryUdfFunListByids(udfFunIdsArray); + sqlTaskExecutionContext.setUdfFuncList(udfFuncList); + } + } + /** * get execute local path * diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java index b0fd6322c3..dd7c564cbe 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java @@ -24,8 +24,8 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; -import org.apache.dolphinscheduler.service.queue.TaskUpdateQueue; -import org.apache.dolphinscheduler.service.queue.TaskUpdateQueueImpl; +import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; +import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static org.apache.dolphinscheduler.common.Constants.*; @@ -76,7 +76,7 @@ public class MasterBaseTaskExecThread implements Callable { /** * taskUpdateQueue */ - private TaskUpdateQueue taskUpdateQueue; + private TaskPriorityQueue taskUpdateQueue; /** * constructor of MasterBaseTaskExecThread * @param taskInstance task instance @@ -89,7 +89,7 @@ public class MasterBaseTaskExecThread implements Callable { this.cancel = false; this.taskInstance = taskInstance; this.masterConfig = SpringApplicationContext.getBean(MasterConfig.class); - this.taskUpdateQueue = SpringApplicationContext.getBean(TaskUpdateQueueImpl.class); + this.taskUpdateQueue = SpringApplicationContext.getBean(TaskPriorityQueueImpl.class); } /** diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskUpdateQueue.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java similarity index 97% rename from dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskUpdateQueue.java rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java index 48f510e09a..3ad9aef6c5 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskUpdateQueue.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java @@ -17,7 +17,7 @@ package org.apache.dolphinscheduler.service.queue; -public interface TaskUpdateQueue { +public interface TaskPriorityQueue { /** * put task info diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskUpdateQueueImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java similarity index 96% rename from dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskUpdateQueueImpl.java rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java index 1b3bec76ad..0a0fb1b9b0 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskUpdateQueueImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java @@ -17,8 +17,6 @@ package org.apache.dolphinscheduler.service.queue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; import java.util.*; @@ -31,7 +29,7 @@ import static org.apache.dolphinscheduler.common.Constants.*; * tasks queue implementation */ @Service -public class TaskUpdateQueueImpl implements TaskUpdateQueue { +public class TaskPriorityQueueImpl implements TaskPriorityQueue { /** * queue size */ diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskQueueFactory.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskQueueFactory.java deleted file mode 100644 index 3ea3195f1b..0000000000 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskQueueFactory.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dolphinscheduler.service.queue; - -import org.apache.commons.lang.StringUtils; -import org.apache.dolphinscheduler.common.utils.CommonUtils; -import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * task queue factory - */ -public class TaskQueueFactory { - - private static final Logger logger = LoggerFactory.getLogger(TaskQueueFactory.class); - - - private TaskQueueFactory(){ - - } - - - /** - * get instance (singleton) - * - * @return instance - */ - public static TaskUpdateQueue getTaskQueueInstance() { - String queueImplValue = CommonUtils.getQueueImplValue(); - if (StringUtils.isNotBlank(queueImplValue)) { - logger.info("task queue impl use zookeeper "); - return SpringApplicationContext.getBean(TaskUpdateQueueImpl.class); - }else{ - logger.error("property dolphinscheduler.queue.impl can't be blank, system will exit "); - System.exit(-1); - } - - return null; - } -} diff --git a/dolphinscheduler-service/src/test/java/queue/TaskUpdateQueueTest.java b/dolphinscheduler-service/src/test/java/queue/TaskUpdateQueueTest.java index a0e4fadcfc..ca6c083a67 100644 --- a/dolphinscheduler-service/src/test/java/queue/TaskUpdateQueueTest.java +++ b/dolphinscheduler-service/src/test/java/queue/TaskUpdateQueueTest.java @@ -17,8 +17,8 @@ package queue; -import org.apache.dolphinscheduler.service.queue.TaskUpdateQueue; -import org.apache.dolphinscheduler.service.queue.TaskUpdateQueueImpl; +import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; +import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl; import org.junit.Test; import static org.junit.Assert.*; @@ -45,7 +45,7 @@ public class TaskUpdateQueueTest { String taskInfo3 = "1_1_0_3_default"; String taskInfo4 = "1_1_0_4_default"; - TaskUpdateQueue queue = new TaskUpdateQueueImpl(); + TaskPriorityQueue queue = new TaskPriorityQueueImpl(); queue.put(taskInfo1); queue.put(taskInfo2); queue.put(taskInfo3); diff --git a/pom.xml b/pom.xml index e7b95d9cf8..1ba35b73a2 100644 --- a/pom.xml +++ b/pom.xml @@ -717,23 +717,18 @@ **/dao/mapper/AccessTokenMapperTest.java **/dao/mapper/AlertGroupMapperTest.java **/dao/mapper/AlertMapperTest.java - **/dao/mapper/CommandMapperTest.java - **/dao/cron/CronUtilsTest.java - **/dao/utils/DagHelperTest.java - **/server/worker/task/datax/DataxTaskTest.java - **/server/utils/DataxUtilsTest.java - **/server/utils/SparkArgsUtilsTest.java - **/server/utils/FlinkArgsUtilsTest.java - **/server/utils/ParamUtilsTest.java + **/dao/mapper/DataSourceMapperTest.java **/server/log/MasterLogFilterTest.java **/server/log/SensitiveDataConverterTest.java **/server/log/TaskLogDiscriminatorTest.java **/server/log/TaskLogFilterTest.java **/server/log/WorkerLogFilterTest.java - **/server/master/executor/NettyExecutorManagerTest.java - **/server/master/host/LowerWeightRoundRobinTest.java - **/server/master/host/RandomSelectorTest.java - **/server/master/host/RoundRobinSelectorTest.java + **/server/master/dispatch/executor/NettyExecutorManagerTest.java + **/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java + **/server/master/dispatch/host/assign/RandomSelectorTest.java + **/server/master/dispatch/host/assign/RoundRobinSelectorTest.java + **/server/master/dispatch/host/RoundRobinHostManagerTest.java + **/server/master/dispatch/ExecutorDispatcherTest.java **/server/master/register/MasterRegistryTest.java **/server/master/AlertManagerTest.java **/server/master/MasterCommandTest.java @@ -741,19 +736,19 @@ **/server/master/ParamsTest.java **/server/register/ZookeeperNodeManagerTest.java **/server/utils/DataxUtilsTest.java + **/server/utils/ExecutionContextTestUtils.java **/server/utils/FlinkArgsUtilsTest.java **/server/utils/ParamUtilsTest.java **/server/utils/ProcessUtilsTest.java **/server/utils/SparkArgsUtilsTest.java **/server/worker/processor/TaskCallbackServiceTest.java - **/server/worker/register/WorkerRegistryTest.java + **/server/worker/registry/WorkerRegistryTest.java **/server/worker/shell/ShellCommandExecutorTest.java **/server/worker/sql/SqlExecutorTest.java **/server/worker/task/datax/DataxTaskTest.java **/server/worker/task/dependent/DependentTaskTest.java **/server/worker/task/spark/SparkTaskTest.java **/server/worker/task/EnvFileTest.java - -Xmx2048m