From 8c545ffa9bda86f5212fc30aa7cc02b9ffd85783 Mon Sep 17 00:00:00 2001 From: Tboy Date: Thu, 27 Feb 2020 09:54:40 +0800 Subject: [PATCH] Refactor worker (#2026) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Refactor worker (#10) * Refactor worker (#2000) * Refactor worker (#2) * Refactor worker (#1993) * Refactor worker (#1) * add TaskResponseProcessor (#1983) * 1, master persistent task 2. extract master and worker communication model (#1992) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error Co-authored-by: qiaozhanwei * updates Co-authored-by: qiaozhanwei * TaskExecutionContext create modify (#1994) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify Co-authored-by: qiaozhanwei * updates * add- register processor Co-authored-by: qiaozhanwei * buildAckCommand taskInstanceId not set modify (#2002) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify (#2004) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment (#2006) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment * ExecutorManager interface add generic type (#2012) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment * ExecutorManager interface add generic type * Refactor worker (#2018) * Refactor worker (#7) * Refactor worker (#2000) * Refactor worker (#2) * Refactor worker (#1993) * Refactor worker (#1) * add TaskResponseProcessor (#1983) * 1, master persistent task 2. extract master and worker communication model (#1992) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error Co-authored-by: qiaozhanwei * updates Co-authored-by: qiaozhanwei * TaskExecutionContext create modify (#1994) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify Co-authored-by: qiaozhanwei * updates * add- register processor Co-authored-by: qiaozhanwei * buildAckCommand taskInstanceId not set modify (#2002) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify (#2004) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment (#2006) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment * ExecutorManager interface add generic type (#2012) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment * ExecutorManager interface add generic type Co-authored-by: qiaozhanwei * Refactor worker (#8) * Refactor worker (#2000) * Refactor worker (#2) * Refactor worker (#1993) * Refactor worker (#1) * add TaskResponseProcessor (#1983) * 1, master persistent task 2. extract master and worker communication model (#1992) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error Co-authored-by: qiaozhanwei * updates Co-authored-by: qiaozhanwei * TaskExecutionContext create modify (#1994) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify Co-authored-by: qiaozhanwei * updates * add- register processor Co-authored-by: qiaozhanwei * buildAckCommand taskInstanceId not set modify (#2002) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify (#2004) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment (#2006) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment * ExecutorManager interface add generic type (#2012) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment * ExecutorManager interface add generic type Co-authored-by: qiaozhanwei * add kill command Co-authored-by: qiaozhanwei * add TaskInstanceCacheManager receive Worker report result,modify master polling db transfrom to cache (#2021) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment * ExecutorManager interface add generic type * add TaskInstanceCacheManager receive Worker report result * TaskInstance setExecutePath * add TaskInstanceCacheManager to receive Worker Task result report * TaskInstanceCacheManager add remove method * add license * add dispatcht task method * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access Co-authored-by: qiaozhanwei * refactor heartbeat logic Co-authored-by: qiaozhanwei --- .../server/worker/WorkerServer.java | 46 +-------------- .../worker/registry/WorkerRegistry.java | 58 ++++++++++++++++++- 2 files changed, 58 insertions(+), 46 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index c2af7b12a6..f53f187437 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -90,11 +90,6 @@ public class WorkerServer implements IStoppable { @Autowired private AlertDao alertDao; - /** - * heartbeat thread pool - */ - private ScheduledExecutorService heartbeatWorkerService; - /** * task queue impl */ @@ -155,6 +150,7 @@ public class WorkerServer implements IStoppable { */ public static void main(String[] args) { Thread.currentThread().setName(Constants.THREAD_NAME_WORKER_SERVER); + System.setProperty("spring.profiles.active","worker"); new SpringApplicationBuilder(WorkerServer.class).web(WebApplicationType.NONE).run(args); } @@ -173,7 +169,7 @@ public class WorkerServer implements IStoppable { this.nettyRemotingServer.registerProcessor(CommandType.KILL_TASK_REQUEST, new TaskKillProcessor()); this.nettyRemotingServer.start(); - this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, serverConfig.getListenPort()); + this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, serverConfig.getListenPort(), workerConfig.getWorkerHeartbeatInterval()); this.workerRegistry.registry(); this.zkWorkerClient.init(); @@ -184,17 +180,8 @@ public class WorkerServer implements IStoppable { this.fetchTaskExecutorService = ThreadUtils.newDaemonSingleThreadExecutor("Worker-Fetch-Thread-Executor"); - heartbeatWorkerService = ThreadUtils.newDaemonThreadScheduledExecutor("Worker-Heartbeat-Thread-Executor", Constants.DEFAUL_WORKER_HEARTBEAT_THREAD_NUM); - - // heartbeat thread implement - Runnable heartBeatThread = heartBeatThread(); - zkWorkerClient.setStoppable(this); - // regular heartbeat - // delay 5 seconds, send heartbeat every 30 seconds - heartbeatWorkerService.scheduleAtFixedRate(heartBeatThread, 5, workerConfig.getWorkerHeartbeatInterval(), TimeUnit.SECONDS); - // kill process thread implement Runnable killProcessThread = getKillProcessThread(); @@ -255,13 +242,6 @@ public class WorkerServer implements IStoppable { this.nettyRemotingServer.close(); this.workerRegistry.unRegistry(); - try { - heartbeatWorkerService.shutdownNow(); - }catch (Exception e){ - logger.warn("heartbeat service stopped exception"); - } - logger.info("heartbeat service stopped"); - try { ThreadPoolExecutors.getInstance().shutdown(); }catch (Exception e){ @@ -298,28 +278,6 @@ public class WorkerServer implements IStoppable { } } - /** - * heartbeat thread implement - * - * @return - */ - private Runnable heartBeatThread(){ - logger.info("start worker heart beat thread..."); - Runnable heartBeatThread = new Runnable() { - @Override - public void run() { - // send heartbeat to zk - if (StringUtils.isEmpty(zkWorkerClient.getWorkerZNode())){ - logger.error("worker send heartbeat to zk failed"); - } - - zkWorkerClient.heartBeatForZk(zkWorkerClient.getWorkerZNode() , Constants.WORKER_PREFIX); - } - }; - return heartBeatThread; - } - - /** * kill process thread implement * diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java index a0f4e664b5..b6f6896d66 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java @@ -19,11 +19,22 @@ package org.apache.dolphinscheduler.server.worker.registry; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.dolphinscheduler.common.utils.DateUtils; +import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.remote.utils.Constants; +import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Date; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.apache.dolphinscheduler.remote.utils.Constants.COMMA; + + /** * worker registry */ @@ -41,14 +52,32 @@ public class WorkerRegistry { */ private final int port; + /** + * heartbeat interval + */ + private final long heartBeatInterval; + + /** + * heartbeat executor + */ + private final ScheduledExecutorService heartBeatExecutor; + + /** + * worker start time + */ + private final String startTime; + /** * construct * @param zookeeperRegistryCenter zookeeperRegistryCenter * @param port port */ - public WorkerRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int port){ + public WorkerRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int port, long heartBeatInterval){ this.zookeeperRegistryCenter = zookeeperRegistryCenter; this.port = port; + this.heartBeatInterval = heartBeatInterval; + this.startTime = DateUtils.dateToString(new Date()); + this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor")); } /** @@ -71,7 +100,9 @@ public class WorkerRegistry { } } }); - logger.info("worker node : {} registry to ZK successfully.", address); + this.heartBeatExecutor.scheduleAtFixedRate(new HeartBeatTask(), heartBeatInterval, heartBeatInterval, TimeUnit.SECONDS); + logger.info("worker node : {} registry to ZK successfully with heartBeatInterval : {}s", address, heartBeatInterval); + } /** @@ -81,6 +112,7 @@ public class WorkerRegistry { String address = getLocalAddress(); String localNodePath = getWorkerPath(); zookeeperRegistryCenter.getZookeeperCachedOperator().remove(localNodePath); + this.heartBeatExecutor.shutdownNow(); logger.info("worker node : {} unRegistry to ZK.", address); } @@ -101,4 +133,26 @@ public class WorkerRegistry { private String getLocalAddress(){ return Constants.LOCAL_ADDRESS + ":" + port; } + + /** + * hear beat task + */ + class HeartBeatTask implements Runnable{ + + @Override + public void run() { + try { + StringBuilder builder = new StringBuilder(100); + builder.append(OSUtils.cpuUsage()).append(COMMA); + builder.append(OSUtils.memoryUsage()).append(COMMA); + builder.append(OSUtils.loadAverage()).append(COMMA); + builder.append(startTime).append(COMMA); + builder.append(DateUtils.dateToString(new Date())); + String workerPath = getWorkerPath(); + zookeeperRegistryCenter.getZookeeperCachedOperator().persist(workerPath, builder.toString()); + } catch (Throwable ex){ + logger.error("error write worker heartbeat info", ex); + } + } + } }