From 043df7e1896cc220eb5f578c44d5302ee6ba77af Mon Sep 17 00:00:00 2001 From: qiaozhanwei Date: Fri, 6 Mar 2020 18:58:53 +0800 Subject: [PATCH] WorkerServer refactor (#2106) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment * ExecutorManager interface add generic type * add TaskInstanceCacheManager receive Worker report result * TaskInstance setExecutePath * add TaskInstanceCacheManager to receive Worker Task result report * TaskInstanceCacheManager add remove method * add license * add dispatcht task method * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * taskInstanceCache is null ,need load from db * taskInstanceCache is null ,need load from db * taskInstanceCache is null ,need load from db * 1,worker TaskPros use TaskExecutionContext replase 2,Master kill Task , KillTaskProcessor modify * worker remove db * ShellTask modify * master persistence processId and appIds * master persistence processId and appIds * master add kill task logic * master add kill task logic * master add kill task logic * javadoc error modify * remove chinese log * executeDirectly method add Override * remote module modify * TaskKillResponseProcessor command type modify * create buildKillCommand * host add host:port format * host add host:port format * TaskAckProcessor modify * TaskAckProcessor modify * task prioriry refator * remove ITaskQueue * task prioriry refator * remove ITaskQueue * TaskPriority refactor * remove logs * WorkerServer refactor Co-authored-by: qiaozhanwei --- .../server/worker/WorkerServer.java | 33 ------------------- 1 file changed, 33 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index ff8ff005ff..7ec0c42cae 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -52,16 +52,6 @@ public class WorkerServer { private static final Logger logger = LoggerFactory.getLogger(WorkerServer.class); - /** - * fetch task executor service - */ - private ExecutorService fetchTaskExecutorService; - - /** - * CountDownLatch latch - */ - private CountDownLatch latch; - /** * worker config */ @@ -120,9 +110,6 @@ public class WorkerServer { this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, serverConfig.getListenPort(), workerConfig.getWorkerHeartbeatInterval(), workerConfig.getWorkerGroup()); this.workerRegistry.registry(); - - this.fetchTaskExecutorService = ThreadUtils.newDaemonSingleThreadExecutor("Worker-Fetch-Thread-Executor"); - /** * register hooks, which are called before the process exits */ @@ -132,13 +119,6 @@ public class WorkerServer { close("shutdownHook"); } })); - - //let the main thread await - latch = new CountDownLatch(1); - try { - latch.await(); - } catch (InterruptedException ignore) { - } } public void close(String cause) { @@ -169,19 +149,6 @@ public class WorkerServer { }catch (Exception e){ logger.warn("threadPool service stopped exception:{}",e.getMessage()); } - - logger.info("threadPool service stopped"); - - try { - fetchTaskExecutorService.shutdownNow(); - }catch (Exception e){ - logger.warn("worker fetch task service stopped exception:{}",e.getMessage()); - } - logger.info("worker fetch task service stopped"); - - latch.countDown(); - logger.info("zookeeper service stopped"); - } catch (Exception e) { logger.error("worker server stop exception ", e); System.exit(-1);