From 4eebde835594e0a7f719eaf1d3d288bbbdf9e3f0 Mon Sep 17 00:00:00 2001 From: Tboy Date: Fri, 6 Mar 2020 18:18:49 +0800 Subject: [PATCH] Refactor worker (#2103) * refactor kill logic * refactor ExecutionContext * refactor worker --- .../server/master/MasterServer.java | 13 +-- .../server/worker/WorkerServer.java | 27 +---- .../server/zk/ZKMasterClient.java | 4 - .../server/zk/ZKWorkerClient.java | 103 ------------------ .../service/zk/AbstractZKClient.java | 17 --- 5 files changed, 8 insertions(+), 156 deletions(-) delete mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKWorkerClient.java diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index 12fe25b30d..292bfaea2c 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -54,7 +54,7 @@ import java.util.concurrent.ExecutorService; * master server */ @ComponentScan("org.apache.dolphinscheduler") -public class MasterServer implements IStoppable { +public class MasterServer { /** * logger of MasterServer @@ -142,8 +142,6 @@ public class MasterServer implements IStoppable { masterSchedulerService = ThreadUtils.newDaemonSingleThreadExecutor("Master-Scheduler-Thread"); - zkMasterClient.setStoppable(this); - // master scheduler thread MasterSchedulerThread masterSchedulerThread = new MasterSchedulerThread( zkMasterClient, @@ -180,7 +178,7 @@ public class MasterServer implements IStoppable { zkMasterClient.getAlertDao().sendServerStopedAlert( 1, OSUtils.getHost(), "Master-Server"); } - stop("shutdownhook"); + close("shutdownhook"); } })); } @@ -190,8 +188,7 @@ public class MasterServer implements IStoppable { * gracefully stop * @param cause why stopping */ - @Override - public synchronized void stop(String cause) { + public void close(String cause) { try { //execute only once @@ -225,10 +222,10 @@ public class MasterServer implements IStoppable { try { ThreadPoolExecutors.getInstance().shutdown(); }catch (Exception e){ - logger.warn("threadpool service stopped exception:{}",e.getMessage()); + logger.warn("threadPool service stopped exception:{}",e.getMessage()); } - logger.info("threadpool service stopped"); + logger.info("threadPool service stopped"); try { masterSchedulerService.shutdownNow(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index fb35f472aa..ff8ff005ff 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.server.worker; import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.IStoppable; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors; import org.apache.dolphinscheduler.common.thread.ThreadUtils; @@ -29,9 +28,7 @@ import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor; import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor; import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry; -import org.apache.dolphinscheduler.server.zk.ZKWorkerClient; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import org.apache.dolphinscheduler.service.queue.TaskQueueFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -47,20 +44,13 @@ import java.util.concurrent.ExecutorService; * worker server */ @ComponentScan("org.apache.dolphinscheduler") -public class WorkerServer implements IStoppable { +public class WorkerServer { /** * logger */ private static final Logger logger = LoggerFactory.getLogger(WorkerServer.class); - /** - * zk worker client - */ - @Autowired - private ZKWorkerClient zkWorkerClient = null; - - /** * fetch task executor service @@ -130,21 +120,16 @@ public class WorkerServer implements IStoppable { this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, serverConfig.getListenPort(), workerConfig.getWorkerHeartbeatInterval(), workerConfig.getWorkerGroup()); this.workerRegistry.registry(); - this.zkWorkerClient.init(); - - this.fetchTaskExecutorService = ThreadUtils.newDaemonSingleThreadExecutor("Worker-Fetch-Thread-Executor"); - zkWorkerClient.setStoppable(this); - /** * register hooks, which are called before the process exits */ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override public void run() { - stop("shutdownHook"); + close("shutdownHook"); } })); @@ -156,8 +141,7 @@ public class WorkerServer implements IStoppable { } } - @Override - public synchronized void stop(String cause) { + public void close(String cause) { try { //execute only once @@ -195,11 +179,6 @@ public class WorkerServer implements IStoppable { } logger.info("worker fetch task service stopped"); - try{ - zkWorkerClient.close(); - }catch (Exception e){ - logger.warn("zookeeper service stopped exception:{}",e.getMessage()); - } latch.countDown(); logger.info("zookeeper service stopped"); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java index 77d2139aab..7fc91dc9e2 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java @@ -245,10 +245,6 @@ public class ZKMasterClient extends AbstractZKClient { logger.info("master node added : {}", path); break; case NODE_REMOVED: - String serverHost = getHostByEventDataPath(path); - if (checkServerSelfDead(serverHost, ZKNodeType.MASTER)) { - return; - } removeZKNodePath(path, ZKNodeType.MASTER, true); break; default: diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKWorkerClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKWorkerClient.java deleted file mode 100644 index a1d70f8343..0000000000 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKWorkerClient.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dolphinscheduler.server.zk; - -import org.apache.curator.framework.recipes.cache.TreeCacheEvent; -import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.ZKNodeType; -import org.apache.commons.lang.StringUtils; -import org.apache.curator.framework.CuratorFramework; -import org.apache.dolphinscheduler.service.zk.AbstractZKClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Component; - - -/** - * zookeeper worker client - * single instance - */ -@Component -public class ZKWorkerClient extends AbstractZKClient { - - /** - * logger - */ - private static final Logger logger = LoggerFactory.getLogger(ZKWorkerClient.class); - - - /** - * worker znode - */ - private String workerZNode = null; - - - /** - * init - */ - public void init(){ - - logger.info("initialize worker client..."); - // init system znode - this.initSystemZNode(); - - } - - /** - * handle path events that this class cares about - * @param client zkClient - * @param event path event - * @param path zk path - */ - @Override - protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) { - if(path.startsWith(getZNodeParentPath(ZKNodeType.WORKER)+Constants.SINGLE_SLASH)){ - handleWorkerEvent(event,path); - } - } - - /** - * monitor worker - * @param event event - * @param path path - */ - public void handleWorkerEvent(TreeCacheEvent event, String path){ - switch (event.getType()) { - case NODE_ADDED: - logger.info("worker node added : {}", path); - break; - case NODE_REMOVED: - //find myself dead - String serverHost = getHostByEventDataPath(path); - if(checkServerSelfDead(serverHost, ZKNodeType.WORKER)){ - return; - } - break; - default: - break; - } - } - - /** - * get worker znode - * @return worker zookeeper node - */ - public String getWorkerZNode() { - return workerZNode; - } - -} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java index 6e887f80d7..24bf25984b 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java @@ -369,23 +369,6 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator { } } - /** - * server self dead, stop all threads - * @param serverHost server host - * @param zkNodeType zookeeper node type - * @return true if server dead and stop all threads - */ - protected boolean checkServerSelfDead(String serverHost, ZKNodeType zkNodeType) { - if (serverHost.equals(OSUtils.getHost())) { - logger.error("{} server({}) of myself dead , stopping...", - zkNodeType.toString(), serverHost); - stoppable.stop(String.format(" %s server %s of myself dead , stopping...", - zkNodeType.toString(), serverHost)); - return true; - } - return false; - } - /** * get host ip, string format: masterParentPath/ip * @param path path