Browse Source

[DS-6694][Master] Reduce the refresh resource interval of LowerWeight (#6695)

* [DS-6694][Master] Reduce the refresh resource interval of LowerWeight

* add server node update event handle

Co-authored-by: caishunfeng <534328519@qq.com>
2.0.7-release
wind 3 years ago committed by lenboo
parent
commit
7b9abf896a
  1. 1
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HeartBeat.java
  2. 4
      dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java
  3. 2
      dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/test/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryTest.java
  4. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
  5. 3
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java
  6. 35
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
  7. 4
      dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/ListenerManager.java
  8. 2
      dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/SubscribeListener.java

1
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HeartBeat.java

@ -184,6 +184,7 @@ public class HeartBeat {
* update server state
*/
public void updateServerState() {
this.reportTime = System.currentTimeMillis();
if (loadAverage > maxCpuloadAvg || availablePhysicalMemorySize < reservedMemory) {
logger.warn("current cpu load average {} is too high or available memory {}G is too low, under max.cpuload.avg={} and reserved.memory={}G",
loadAverage, availablePhysicalMemorySize, maxCpuloadAvg, reservedMemory);

4
dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java

@ -148,14 +148,12 @@ public class ZookeeperRegistry implements Registry {
String dataPath = null;
switch (type) {
case NODE_ADDED:
dataPath = event.getData().getPath();
eventType = DataChangeEvent.ADD;
break;
case NODE_UPDATED:
eventType = DataChangeEvent.UPDATE;
dataPath = event.getData().getPath();
break;
case NODE_REMOVED:
eventType = DataChangeEvent.REMOVE;
@ -164,7 +162,7 @@ public class ZookeeperRegistry implements Registry {
default:
}
if (null != eventType && null != dataPath) {
ListenerManager.dataChange(path, dataPath, eventType);
ListenerManager.dataChange(path, dataPath, new String(event.getData().getData()), eventType);
}
};
treeCache.getListenable().addListener(treeCacheListener);

2
dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/test/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryTest.java

@ -115,7 +115,7 @@ public class ZookeeperRegistryTest {
class TestListener implements SubscribeListener {
@Override
public void notify(String path, DataChangeEvent dataChangeEvent) {
public void notify(String path, String data, DataChangeEvent dataChangeEvent) {
logger.info("I'm test listener");
}
}

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java

@ -78,7 +78,7 @@ public class LowerWeightHostManager extends CommonHostManager {
this.workerHostWeightsMap = new ConcurrentHashMap<>();
this.lock = new ReentrantLock();
this.executorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("LowerWeightHostManagerExecutor"));
this.executorService.scheduleWithFixedDelay(new RefreshResourceTask(), 0, 5, TimeUnit.SECONDS);
this.executorService.scheduleWithFixedDelay(new RefreshResourceTask(), 0, 1, TimeUnit.SECONDS);
}
@PreDestroy

3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java

@ -39,9 +39,8 @@ public class MasterRegistryDataListener implements SubscribeListener {
masterRegistryClient = SpringApplicationContext.getBean(MasterRegistryClient.class);
}
@Override
public void notify(String path, DataChangeEvent event) {
public void notify(String path, String data, DataChangeEvent event) {
//monitor master
if (path.startsWith(REGISTRY_DOLPHINSCHEDULER_MASTERS + Constants.SINGLE_SLASH)) {
handleMasterEvent(event, path);

35
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java

@ -190,7 +190,7 @@ public class ServerNodeManager implements InitializingBean {
public void run() {
// sync worker node info
Map<String, String> newWorkerNodeInfo = registryClient.getServerMaps(NodeType.WORKER, true);
syncWorkerNodeInfo(newWorkerNodeInfo);
syncAllWorkerNodeInfo(newWorkerNodeInfo);
// sync worker group nodes from database
List<WorkerGroup> workerGroupList = workerGroupMapper.queryAllWorkerGroup();
@ -218,7 +218,7 @@ public class ServerNodeManager implements InitializingBean {
class WorkerDataListener implements SubscribeListener {
@Override
public void notify(String path, DataChangeEvent dataChangeEvent) {
public void notify(String path, String data, DataChangeEvent dataChangeEvent) {
if (registryClient.isWorkerPath(path)) {
try {
if (dataChangeEvent == DataChangeEvent.ADD) {
@ -233,6 +233,14 @@ public class ServerNodeManager implements InitializingBean {
Set<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(group);
syncWorkerGroupNodes(group, currentNodes);
alertDao.sendServerStopedAlert(1, path, "WORKER");
} else if (dataChangeEvent == DataChangeEvent.UPDATE) {
logger.debug("worker group node : {} update, data: {}", path, data);
String group = parseGroup(path);
Set<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(group);
syncWorkerGroupNodes(group, currentNodes);
String node = parseNode(path);
syncSingleWorkerNodeInfo(node, data);
}
} catch (IllegalArgumentException ex) {
logger.warn(ex.getMessage());
@ -251,6 +259,13 @@ public class ServerNodeManager implements InitializingBean {
return parts[parts.length - 2];
}
private String parseNode(String path) {
String[] parts = path.split("/");
if (parts.length < WORKER_LISTENER_CHECK_LENGTH) {
throw new IllegalArgumentException(String.format("worker group path : %s is not valid, ignore", path));
}
return parts[parts.length - 1];
}
}
/**
@ -258,7 +273,7 @@ public class ServerNodeManager implements InitializingBean {
*/
class MasterDataListener implements SubscribeListener {
@Override
public void notify(String path, DataChangeEvent dataChangeEvent) {
public void notify(String path, String data, DataChangeEvent dataChangeEvent) {
if (registryClient.isMasterPath(path)) {
try {
if (dataChangeEvent.equals(DataChangeEvent.ADD)) {
@ -407,7 +422,7 @@ public class ServerNodeManager implements InitializingBean {
*
* @param newWorkerNodeInfo new worker node info
*/
private void syncWorkerNodeInfo(Map<String, String> newWorkerNodeInfo) {
private void syncAllWorkerNodeInfo(Map<String, String> newWorkerNodeInfo) {
workerNodeInfoLock.lock();
try {
workerNodeInfo.clear();
@ -417,6 +432,18 @@ public class ServerNodeManager implements InitializingBean {
}
}
/**
* sync single worker node info
*/
private void syncSingleWorkerNodeInfo(String node, String info) {
workerNodeInfoLock.lock();
try {
workerNodeInfo.put(node, info);
} finally {
workerNodeInfoLock.unlock();
}
}
/**
* destroy
*/

4
dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/ListenerManager.java

@ -55,12 +55,12 @@ public class ListenerManager {
*
*After the data changes, it is distributed to the corresponding listener for processing
*/
public static void dataChange(String key,String path, DataChangeEvent dataChangeEvent) {
public static void dataChange(String key,String path, String data, DataChangeEvent dataChangeEvent) {
SubscribeListener notifyListener = listeners.get(key);
if (null == notifyListener) {
return;
}
notifyListener.notify(path,dataChangeEvent);
notifyListener.notify(path, data, dataChangeEvent);
}
}

2
dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/SubscribeListener.java

@ -25,6 +25,6 @@ public interface SubscribeListener {
/**
* Processing logic when the subscription node changes
*/
void notify(String path, DataChangeEvent dataChangeEvent);
void notify(String path, String data, DataChangeEvent dataChangeEvent);
}

Loading…
Cancel
Save