Browse Source

refactor heartbeat logic

pull/2/head
Technoboy- 5 years ago
parent
commit
65d603b77c
  1. 46
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
  2. 58
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java

46
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

@ -90,11 +90,6 @@ public class WorkerServer implements IStoppable {
@Autowired @Autowired
private AlertDao alertDao; private AlertDao alertDao;
/**
* heartbeat thread pool
*/
private ScheduledExecutorService heartbeatWorkerService;
/** /**
* task queue impl * task queue impl
*/ */
@ -155,6 +150,7 @@ public class WorkerServer implements IStoppable {
*/ */
public static void main(String[] args) { public static void main(String[] args) {
Thread.currentThread().setName(Constants.THREAD_NAME_WORKER_SERVER); Thread.currentThread().setName(Constants.THREAD_NAME_WORKER_SERVER);
System.setProperty("spring.profiles.active","worker");
new SpringApplicationBuilder(WorkerServer.class).web(WebApplicationType.NONE).run(args); new SpringApplicationBuilder(WorkerServer.class).web(WebApplicationType.NONE).run(args);
} }
@ -173,7 +169,7 @@ public class WorkerServer implements IStoppable {
this.nettyRemotingServer.registerProcessor(CommandType.KILL_TASK_REQUEST, new TaskKillProcessor()); this.nettyRemotingServer.registerProcessor(CommandType.KILL_TASK_REQUEST, new TaskKillProcessor());
this.nettyRemotingServer.start(); this.nettyRemotingServer.start();
this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, serverConfig.getListenPort()); this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, serverConfig.getListenPort(), workerConfig.getWorkerHeartbeatInterval());
this.workerRegistry.registry(); this.workerRegistry.registry();
this.zkWorkerClient.init(); this.zkWorkerClient.init();
@ -184,17 +180,8 @@ public class WorkerServer implements IStoppable {
this.fetchTaskExecutorService = ThreadUtils.newDaemonSingleThreadExecutor("Worker-Fetch-Thread-Executor"); this.fetchTaskExecutorService = ThreadUtils.newDaemonSingleThreadExecutor("Worker-Fetch-Thread-Executor");
heartbeatWorkerService = ThreadUtils.newDaemonThreadScheduledExecutor("Worker-Heartbeat-Thread-Executor", Constants.DEFAUL_WORKER_HEARTBEAT_THREAD_NUM);
// heartbeat thread implement
Runnable heartBeatThread = heartBeatThread();
zkWorkerClient.setStoppable(this); zkWorkerClient.setStoppable(this);
// regular heartbeat
// delay 5 seconds, send heartbeat every 30 seconds
heartbeatWorkerService.scheduleAtFixedRate(heartBeatThread, 5, workerConfig.getWorkerHeartbeatInterval(), TimeUnit.SECONDS);
// kill process thread implement // kill process thread implement
Runnable killProcessThread = getKillProcessThread(); Runnable killProcessThread = getKillProcessThread();
@ -255,13 +242,6 @@ public class WorkerServer implements IStoppable {
this.nettyRemotingServer.close(); this.nettyRemotingServer.close();
this.workerRegistry.unRegistry(); this.workerRegistry.unRegistry();
try {
heartbeatWorkerService.shutdownNow();
}catch (Exception e){
logger.warn("heartbeat service stopped exception");
}
logger.info("heartbeat service stopped");
try { try {
ThreadPoolExecutors.getInstance().shutdown(); ThreadPoolExecutors.getInstance().shutdown();
}catch (Exception e){ }catch (Exception e){
@ -298,28 +278,6 @@ public class WorkerServer implements IStoppable {
} }
} }
/**
* heartbeat thread implement
*
* @return
*/
private Runnable heartBeatThread(){
logger.info("start worker heart beat thread...");
Runnable heartBeatThread = new Runnable() {
@Override
public void run() {
// send heartbeat to zk
if (StringUtils.isEmpty(zkWorkerClient.getWorkerZNode())){
logger.error("worker send heartbeat to zk failed");
}
zkWorkerClient.heartBeatForZk(zkWorkerClient.getWorkerZNode() , Constants.WORKER_PREFIX);
}
};
return heartBeatThread;
}
/** /**
* kill process thread implement * kill process thread implement
* *

58
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java

@ -19,11 +19,22 @@ package org.apache.dolphinscheduler.server.worker.registry;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.remote.utils.Constants; import org.apache.dolphinscheduler.remote.utils.Constants;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static org.apache.dolphinscheduler.remote.utils.Constants.COMMA;
/** /**
* worker registry * worker registry
*/ */
@ -41,14 +52,32 @@ public class WorkerRegistry {
*/ */
private final int port; private final int port;
/**
* heartbeat interval
*/
private final long heartBeatInterval;
/**
* heartbeat executor
*/
private final ScheduledExecutorService heartBeatExecutor;
/**
* worker start time
*/
private final String startTime;
/** /**
* construct * construct
* @param zookeeperRegistryCenter zookeeperRegistryCenter * @param zookeeperRegistryCenter zookeeperRegistryCenter
* @param port port * @param port port
*/ */
public WorkerRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int port){ public WorkerRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int port, long heartBeatInterval){
this.zookeeperRegistryCenter = zookeeperRegistryCenter; this.zookeeperRegistryCenter = zookeeperRegistryCenter;
this.port = port; this.port = port;
this.heartBeatInterval = heartBeatInterval;
this.startTime = DateUtils.dateToString(new Date());
this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor"));
} }
/** /**
@ -71,7 +100,9 @@ public class WorkerRegistry {
} }
} }
}); });
logger.info("worker node : {} registry to ZK successfully.", address); this.heartBeatExecutor.scheduleAtFixedRate(new HeartBeatTask(), heartBeatInterval, heartBeatInterval, TimeUnit.SECONDS);
logger.info("worker node : {} registry to ZK successfully with heartBeatInterval : {}s", address, heartBeatInterval);
} }
/** /**
@ -81,6 +112,7 @@ public class WorkerRegistry {
String address = getLocalAddress(); String address = getLocalAddress();
String localNodePath = getWorkerPath(); String localNodePath = getWorkerPath();
zookeeperRegistryCenter.getZookeeperCachedOperator().remove(localNodePath); zookeeperRegistryCenter.getZookeeperCachedOperator().remove(localNodePath);
this.heartBeatExecutor.shutdownNow();
logger.info("worker node : {} unRegistry to ZK.", address); logger.info("worker node : {} unRegistry to ZK.", address);
} }
@ -101,4 +133,26 @@ public class WorkerRegistry {
private String getLocalAddress(){ private String getLocalAddress(){
return Constants.LOCAL_ADDRESS + ":" + port; return Constants.LOCAL_ADDRESS + ":" + port;
} }
/**
* hear beat task
*/
class HeartBeatTask implements Runnable{
@Override
public void run() {
try {
StringBuilder builder = new StringBuilder(100);
builder.append(OSUtils.cpuUsage()).append(COMMA);
builder.append(OSUtils.memoryUsage()).append(COMMA);
builder.append(OSUtils.loadAverage()).append(COMMA);
builder.append(startTime).append(COMMA);
builder.append(DateUtils.dateToString(new Date()));
String workerPath = getWorkerPath();
zookeeperRegistryCenter.getZookeeperCachedOperator().persist(workerPath, builder.toString());
} catch (Throwable ex){
logger.error("error write worker heartbeat info", ex);
}
}
}
} }

Loading…
Cancel
Save