Browse Source

[DS-6499][WorkerServer] report busy state when worker is overload (#6512)

* [DS-6499][WorkerServer] report busy state when worker is overload
3.0.0/version-upgrade
wind 3 years ago committed by GitHub
parent
commit
50f1766903
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  2. 248
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HeartBeat.java
  3. 135
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ResInfo.java
  4. 76
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/HeartBeatTest.java
  5. 13
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java
  6. 32
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
  7. 11
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
  8. 81
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
  9. 12
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
  10. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
  11. 22
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
  12. 14
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
  13. 15
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java

4
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -521,8 +521,7 @@ public final class Constants {
/**
* heartbeat for zk info length
*/
public static final int HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH = 10;
public static final int HEARTBEAT_WITH_WEIGHT_FOR_ZOOKEEPER_INFO_LENGTH = 11;
public static final int HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH = 13;
/**
* jar
@ -1029,6 +1028,7 @@ public final class Constants {
public static final int NORMAL_NODE_STATUS = 0;
public static final int ABNORMAL_NODE_STATUS = 1;
public static final int BUSY_NODE_STATUE = 2;
public static final String START_TIME = "start time";
public static final String END_TIME = "end time";

248
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HeartBeat.java

@ -0,0 +1,248 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.utils;
import org.apache.dolphinscheduler.common.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HeartBeat {
private static final Logger logger = LoggerFactory.getLogger(HeartBeat.class);
public static final String COMMA = ",";
private long startupTime;
private long reportTime;
private double cpuUsage;
private double memoryUsage;
private double loadAverage;
private double availablePhysicalMemorySize;
private double maxCpuloadAvg;
private double reservedMemory;
private int serverStatus;
private int processId;
private int workerHostWeight; // worker host weight
private int workerWaitingTaskCount; // worker waiting task count
private int workerExecThreadCount; // worker thread pool thread count
public long getStartupTime() {
return startupTime;
}
public void setStartupTime(long startupTime) {
this.startupTime = startupTime;
}
public long getReportTime() {
return reportTime;
}
public void setReportTime(long reportTime) {
this.reportTime = reportTime;
}
public double getCpuUsage() {
return cpuUsage;
}
public void setCpuUsage(double cpuUsage) {
this.cpuUsage = cpuUsage;
}
public double getMemoryUsage() {
return memoryUsage;
}
public void setMemoryUsage(double memoryUsage) {
this.memoryUsage = memoryUsage;
}
public double getLoadAverage() {
return loadAverage;
}
public void setLoadAverage(double loadAverage) {
this.loadAverage = loadAverage;
}
public double getAvailablePhysicalMemorySize() {
return availablePhysicalMemorySize;
}
public void setAvailablePhysicalMemorySize(double availablePhysicalMemorySize) {
this.availablePhysicalMemorySize = availablePhysicalMemorySize;
}
public double getMaxCpuloadAvg() {
return maxCpuloadAvg;
}
public void setMaxCpuloadAvg(double maxCpuloadAvg) {
this.maxCpuloadAvg = maxCpuloadAvg;
}
public double getReservedMemory() {
return reservedMemory;
}
public void setReservedMemory(double reservedMemory) {
this.reservedMemory = reservedMemory;
}
public int getServerStatus() {
return serverStatus;
}
public void setServerStatus(int serverStatus) {
this.serverStatus = serverStatus;
}
public int getProcessId() {
return processId;
}
public void setProcessId(int processId) {
this.processId = processId;
}
public int getWorkerHostWeight() {
return workerHostWeight;
}
public void setWorkerHostWeight(int workerHostWeight) {
this.workerHostWeight = workerHostWeight;
}
public int getWorkerWaitingTaskCount() {
return workerWaitingTaskCount;
}
public void setWorkerWaitingTaskCount(int workerWaitingTaskCount) {
this.workerWaitingTaskCount = workerWaitingTaskCount;
}
public int getWorkerExecThreadCount() {
return workerExecThreadCount;
}
public void setWorkerExecThreadCount(int workerExecThreadCount) {
this.workerExecThreadCount = workerExecThreadCount;
}
public HeartBeat() {
this.reportTime = System.currentTimeMillis();
this.serverStatus = Constants.NORMAL_NODE_STATUS;
}
public HeartBeat(long startupTime, double maxCpuloadAvg, double reservedMemory) {
this.reportTime = System.currentTimeMillis();
this.serverStatus = Constants.NORMAL_NODE_STATUS;
this.startupTime = startupTime;
this.maxCpuloadAvg = maxCpuloadAvg;
this.reservedMemory = reservedMemory;
}
public HeartBeat(long startupTime, double maxCpuloadAvg, double reservedMemory, int hostWeight, int workerExecThreadCount) {
this.reportTime = System.currentTimeMillis();
this.serverStatus = Constants.NORMAL_NODE_STATUS;
this.startupTime = startupTime;
this.maxCpuloadAvg = maxCpuloadAvg;
this.reservedMemory = reservedMemory;
this.workerHostWeight = hostWeight;
this.workerExecThreadCount = workerExecThreadCount;
}
/**
* fill system info
*/
private void fillSystemInfo() {
this.cpuUsage = OSUtils.cpuUsage();
this.loadAverage = OSUtils.loadAverage();
this.availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize();
this.memoryUsage = OSUtils.memoryUsage();
this.processId = OSUtils.getProcessID();
}
/**
* update server state
*/
public void updateServerState() {
if (loadAverage > maxCpuloadAvg || availablePhysicalMemorySize < reservedMemory) {
logger.warn("current cpu load average {} is too high or available memory {}G is too low, under max.cpuload.avg={} and reserved.memory={}G",
loadAverage, availablePhysicalMemorySize, maxCpuloadAvg, reservedMemory);
this.serverStatus = Constants.ABNORMAL_NODE_STATUS;
} else if (workerWaitingTaskCount > workerExecThreadCount) {
logger.warn("current waiting task count {} is large than worker thread count {}, worker is busy", workerWaitingTaskCount, workerExecThreadCount);
this.serverStatus = Constants.BUSY_NODE_STATUE;
} else {
this.serverStatus = Constants.NORMAL_NODE_STATUS;
}
}
/**
* encode heartbeat
*/
public String encodeHeartBeat() {
this.fillSystemInfo();
this.updateServerState();
StringBuilder builder = new StringBuilder(100);
builder.append(cpuUsage).append(COMMA);
builder.append(memoryUsage).append(COMMA);
builder.append(loadAverage).append(COMMA);
builder.append(availablePhysicalMemorySize).append(Constants.COMMA);
builder.append(maxCpuloadAvg).append(Constants.COMMA);
builder.append(reservedMemory).append(Constants.COMMA);
builder.append(startupTime).append(Constants.COMMA);
builder.append(reportTime).append(Constants.COMMA);
builder.append(serverStatus).append(COMMA);
builder.append(processId).append(COMMA);
builder.append(workerHostWeight).append(COMMA);
builder.append(workerExecThreadCount).append(COMMA);
builder.append(workerWaitingTaskCount);
return builder.toString();
}
/**
* decode heartbeat
*/
public static HeartBeat decodeHeartBeat(String heartBeatInfo) {
String[] parts = heartBeatInfo.split(Constants.COMMA);
if (parts.length != Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH) {
return null;
}
HeartBeat heartBeat = new HeartBeat();
heartBeat.cpuUsage = Double.parseDouble(parts[0]);
heartBeat.memoryUsage = Double.parseDouble(parts[1]);
heartBeat.loadAverage = Double.parseDouble(parts[2]);
heartBeat.availablePhysicalMemorySize = Double.parseDouble(parts[3]);
heartBeat.maxCpuloadAvg = Double.parseDouble(parts[4]);
heartBeat.reservedMemory = Double.parseDouble(parts[5]);
heartBeat.startupTime = Long.parseLong(parts[6]);
heartBeat.reportTime = Long.parseLong(parts[7]);
heartBeat.serverStatus = Integer.parseInt(parts[8]);
heartBeat.processId = Integer.parseInt(parts[9]);
heartBeat.workerHostWeight = Integer.parseInt(parts[10]);
heartBeat.workerExecThreadCount = Integer.parseInt(parts[11]);
heartBeat.workerWaitingTaskCount = Integer.parseInt(parts[12]);
return heartBeat;
}
}

135
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ResInfo.java

@ -1,135 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.utils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.commons.lang.StringUtils;
/**
* heartbeat for ZK reigster res info
*/
public class ResInfo {
/**
* cpuUsage
*/
private double cpuUsage;
/**
* memoryUsage
*/
private double memoryUsage;
/**
* loadAverage
*/
private double loadAverage;
public ResInfo(double cpuUsage, double memoryUsage) {
this.cpuUsage = cpuUsage;
this.memoryUsage = memoryUsage;
}
public ResInfo(double cpuUsage, double memoryUsage, double loadAverage) {
this(cpuUsage,memoryUsage);
this.loadAverage = loadAverage;
}
public double getCpuUsage() {
return cpuUsage;
}
public void setCpuUsage(double cpuUsage) {
this.cpuUsage = cpuUsage;
}
public double getMemoryUsage() {
return memoryUsage;
}
public void setMemoryUsage(double memoryUsage) {
this.memoryUsage = memoryUsage;
}
public double getLoadAverage() {
return loadAverage;
}
public void setLoadAverage(double loadAverage) {
this.loadAverage = loadAverage;
}
/**
* get CPU and memory usage
* @param cpuUsage cpu usage
* @param memoryUsage memory usage
* @param loadAverage load average
* @return cpu and memory usage
*/
public static String getResInfoJson(double cpuUsage, double memoryUsage, double loadAverage) {
ResInfo resInfo = new ResInfo(cpuUsage,memoryUsage,loadAverage);
return JSONUtils.toJsonString(resInfo);
}
/**
* parse heartbeat info for zk
* @param heartBeatInfo heartbeat info
* @return heartbeat info to Server
*/
public static Server parseHeartbeatForRegistryInfo(String heartBeatInfo) {
if (!isValidHeartbeatForRegistryInfo(heartBeatInfo)) {
return null;
}
String[] parts = heartBeatInfo.split(Constants.COMMA);
Server server = new Server();
server.setResInfo(getResInfoJson(Double.parseDouble(parts[0]),
Double.parseDouble(parts[1]),
Double.parseDouble(parts[2])));
server.setCreateTime(DateUtils.stringToDate(parts[6]));
server.setLastHeartbeatTime(DateUtils.stringToDate(parts[7]));
//set process id
server.setId(Integer.parseInt(parts[9]));
return server;
}
/**
* is valid heartbeat info for zk
* @param heartBeatInfo heartbeat info
* @return heartbeat info is valid
*/
public static boolean isValidHeartbeatForRegistryInfo(String heartBeatInfo) {
if (!StringUtils.isEmpty(heartBeatInfo)) {
String[] parts = heartBeatInfo.split(Constants.COMMA);
return parts.length == Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH
|| parts.length == Constants.HEARTBEAT_WITH_WEIGHT_FOR_ZOOKEEPER_INFO_LENGTH;
}
return false;
}
/**
* is new heartbeat info for zk with weight
* @param parts heartbeat info parts
* @return heartbeat info is new with weight
*/
public static boolean isNewHeartbeatWithWeight(String[] parts) {
return parts.length == Constants.HEARTBEAT_WITH_WEIGHT_FOR_ZOOKEEPER_INFO_LENGTH;
}
}

76
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/HeartBeatTest.java

@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.utils;
import static org.junit.Assert.assertEquals;
import org.apache.dolphinscheduler.common.Constants;
import org.junit.Test;
/**
* NetUtilsTest
*/
public class HeartBeatTest {
@Test
public void testAbnormalState() {
long startupTime = System.currentTimeMillis();
double loadAverage = 100;
double reservedMemory = 100;
HeartBeat heartBeat = new HeartBeat(startupTime, loadAverage, reservedMemory);
heartBeat.updateServerState();
assertEquals(Constants.ABNORMAL_NODE_STATUS, heartBeat.getServerStatus());
}
@Test
public void testBusyState() {
long startupTime = System.currentTimeMillis();
double loadAverage = 0;
double reservedMemory = 0;
int hostWeight = 1;
int taskCount = 200;
int workerThreadCount = 199;
HeartBeat heartBeat = new HeartBeat(startupTime, loadAverage, reservedMemory, hostWeight, workerThreadCount);
heartBeat.setWorkerWaitingTaskCount(taskCount);
heartBeat.updateServerState();
assertEquals(Constants.BUSY_NODE_STATUE, heartBeat.getServerStatus());
}
@Test
public void testDecodeHeartBeat() throws Exception {
String heartBeatInfo = "0.35,0.58,3.09,6.47,5.0,1.0,1634033006749,1634033006857,1,29732,1,199,200";
HeartBeat heartBeat = HeartBeat.decodeHeartBeat(heartBeatInfo);
double delta = 0.001;
assertEquals(0.35, heartBeat.getCpuUsage(), delta);
assertEquals(0.58, heartBeat.getMemoryUsage(), delta);
assertEquals(3.09, heartBeat.getLoadAverage(), delta);
assertEquals(6.47, heartBeat.getAvailablePhysicalMemorySize(), delta);
assertEquals(5.0, heartBeat.getMaxCpuloadAvg(), delta);
assertEquals(1.0, heartBeat.getReservedMemory(), delta);
assertEquals(1634033006749L, heartBeat.getStartupTime());
assertEquals(1634033006857L, heartBeat.getReportTime());
assertEquals(1, heartBeat.getServerStatus());
assertEquals(29732, heartBeat.getProcessId());
assertEquals(199, heartBeat.getWorkerExecThreadCount());
assertEquals(200, heartBeat.getWorkerWaitingTaskCount());
}
}

13
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java

@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.server.master.dispatch.host;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.ResInfo;
import org.apache.dolphinscheduler.common.utils.HeartBeat;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
@ -48,6 +48,7 @@ public abstract class CommonHostManager implements HostManager {
/**
* select host
*
* @param context context
* @return host
*/
@ -87,12 +88,12 @@ public abstract class CommonHostManager implements HostManager {
return hostWorkers;
}
protected int getWorkerHostWeightFromHeartbeat(String heartbeat) {
protected int getWorkerHostWeightFromHeartbeat(String heartBeatInfo) {
int hostWeight = Constants.DEFAULT_WORKER_HOST_WEIGHT;
if (!StringUtils.isEmpty(heartbeat)) {
String[] parts = heartbeat.split(Constants.COMMA);
if (ResInfo.isNewHeartbeatWithWeight(parts)) {
hostWeight = Integer.parseInt(parts[10]);
if (!StringUtils.isEmpty(heartBeatInfo)) {
HeartBeat heartBeat = HeartBeat.decodeHeartBeat(heartBeatInfo);
if (heartBeat != null) {
hostWeight = heartBeat.getWorkerHostWeight();
}
}
return hostWeight;

32
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java

@ -19,8 +19,7 @@ package org.apache.dolphinscheduler.server.master.dispatch.host;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.ResInfo;
import org.apache.dolphinscheduler.common.utils.HeartBeat;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
@ -79,7 +78,7 @@ public class LowerWeightHostManager extends CommonHostManager {
this.workerHostWeightsMap = new ConcurrentHashMap<>();
this.lock = new ReentrantLock();
this.executorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("LowerWeightHostManagerExecutor"));
this.executorService.scheduleWithFixedDelay(new RefreshResourceTask(),0, 5, TimeUnit.SECONDS);
this.executorService.scheduleWithFixedDelay(new RefreshResourceTask(), 0, 5, TimeUnit.SECONDS);
}
@PreDestroy
@ -89,6 +88,7 @@ public class LowerWeightHostManager extends CommonHostManager {
/**
* select host
*
* @param context context
* @return host
*/
@ -153,24 +153,24 @@ public class LowerWeightHostManager extends CommonHostManager {
}
}
public HostWeight getHostWeight(String addr, String workerGroup, String heartbeat) {
if (ResInfo.isValidHeartbeatForRegistryInfo(heartbeat)) {
String[] parts = heartbeat.split(Constants.COMMA);
int status = Integer.parseInt(parts[8]);
if (status == Constants.ABNORMAL_NODE_STATUS) {
logger.warn("worker {} current cpu load average {} is too high or available memory {}G is too low",
addr, Double.parseDouble(parts[2]), Double.parseDouble(parts[3]));
public HostWeight getHostWeight(String addr, String workerGroup, String heartBeatInfo) {
HeartBeat heartBeat = HeartBeat.decodeHeartBeat(heartBeatInfo);
if (heartBeat == null) {
return null;
}
double cpu = Double.parseDouble(parts[0]);
double memory = Double.parseDouble(parts[1]);
double loadAverage = Double.parseDouble(parts[2]);
long startTime = DateUtils.stringToDate(parts[6]).getTime();
int weight = getWorkerHostWeightFromHeartbeat(heartbeat);
return new HostWeight(HostWorker.of(addr, weight, workerGroup), cpu, memory, loadAverage, startTime);
if (Constants.ABNORMAL_NODE_STATUS == heartBeat.getServerStatus()) {
logger.warn("worker {} current cpu load average {} is too high or available memory {}G is too low",
addr, heartBeat.getLoadAverage(), heartBeat.getAvailablePhysicalMemorySize());
return null;
}
if (Constants.BUSY_NODE_STATUE == heartBeat.getServerStatus()) {
logger.warn("worker {} is busy, current waiting task count {} is large than worker thread count {}",
addr, heartBeat.getWorkerWaitingTaskCount(), heartBeat.getWorkerExecThreadCount());
return null;
}
return new HostWeight(HostWorker.of(addr, heartBeat.getWorkerHostWeight(), workerGroup),
heartBeat.getCpuUsage(), heartBeat.getMemoryUsage(), heartBeat.getLoadAverage(), heartBeat.getStartupTime());
}
}
}

11
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java

@ -29,7 +29,6 @@ import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@ -96,14 +95,14 @@ public class MasterRegistryClient {
private ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps;
/**
* master start time
* master startup time, ms
*/
private String startTime;
private long startupTime;
private String localNodePath;
public void init(ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps) {
this.startTime = DateUtils.dateToString(new Date());
this.startupTime = System.currentTimeMillis();
this.registryClient = RegistryClient.getInstance();
this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor"));
this.processInstanceExecMaps = processInstanceExecMaps;
@ -364,14 +363,14 @@ public class MasterRegistryClient {
String address = NetUtils.getAddr(masterConfig.getListenPort());
localNodePath = getMasterPath();
int masterHeartbeatInterval = masterConfig.getMasterHeartbeatInterval();
HeartBeatTask heartBeatTask = new HeartBeatTask(startTime,
HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
masterConfig.getMasterMaxCpuloadAvg(),
masterConfig.getMasterReservedMemory(),
Sets.newHashSet(getMasterPath()),
Constants.MASTER_TYPE,
registryClient);
registryClient.persistEphemeral(localNodePath, heartBeatTask.heartBeatInfo());
registryClient.persistEphemeral(localNodePath, heartBeatTask.getHeartBeatInfo());
registryClient.addConnectionStateListener(new MasterRegistryConnectStateListener());
this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS);
logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s", address, masterHeartbeatInterval);

81
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java

@ -17,15 +17,12 @@
package org.apache.dolphinscheduler.server.registry;
import static org.apache.dolphinscheduler.remote.utils.Constants.COMMA;
import org.apache.dolphinscheduler.common.utils.HeartBeat;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import java.util.Date;
import java.util.Set;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -36,42 +33,43 @@ public class HeartBeatTask implements Runnable {
private final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class);
private String startTime;
private double maxCpuloadAvg;
private double reservedMemory;
private int hostWeight; // worker host weight
private Set<String> heartBeatPaths;
private String serverType;
private RegistryClient registryClient;
private WorkerManagerThread workerManagerThread;
private String serverType;
private HeartBeat heartBeat;
public HeartBeatTask(String startTime,
public HeartBeatTask(long startupTime,
double maxCpuloadAvg,
double reservedMemory,
Set<String> heartBeatPaths,
String serverType,
RegistryClient registryClient) {
this.startTime = startTime;
this.maxCpuloadAvg = maxCpuloadAvg;
this.reservedMemory = reservedMemory;
this.heartBeatPaths = heartBeatPaths;
this.serverType = serverType;
this.registryClient = registryClient;
this.serverType = serverType;
this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg, reservedMemory);
}
public HeartBeatTask(String startTime,
public HeartBeatTask(long startupTime,
double maxCpuloadAvg,
double reservedMemory,
int hostWeight,
Set<String> heartBeatPaths,
String serverType,
RegistryClient registryClient) {
this.startTime = startTime;
this.maxCpuloadAvg = maxCpuloadAvg;
this.reservedMemory = reservedMemory;
this.hostWeight = hostWeight;
RegistryClient registryClient,
int workerThreadCount,
WorkerManagerThread workerManagerThread
) {
this.heartBeatPaths = heartBeatPaths;
this.serverType = serverType;
this.registryClient = registryClient;
this.workerManagerThread = workerManagerThread;
this.serverType = serverType;
this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg, reservedMemory, hostWeight, workerThreadCount);
}
public String getHeartBeatInfo() {
return this.heartBeat.encodeHeartBeat();
}
@Override
@ -85,41 +83,16 @@ public class HeartBeatTask implements Runnable {
}
}
if (workerManagerThread != null) {
// update waiting task count
heartBeat.setWorkerWaitingTaskCount(workerManagerThread.getThreadPoolQueueSize());
}
for (String heartBeatPath : heartBeatPaths) {
registryClient.update(heartBeatPath, heartBeatInfo());
registryClient.update(heartBeatPath, heartBeat.encodeHeartBeat());
}
} catch (Throwable ex) {
logger.error("error write heartbeat info", ex);
}
}
public String heartBeatInfo() {
double loadAverage = OSUtils.loadAverage();
double availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize();
int status = Constants.NORMAL_NODE_STATUS;
if (loadAverage > maxCpuloadAvg || availablePhysicalMemorySize < reservedMemory) {
logger.warn("current cpu load average {} is too high or available memory {}G is too low, under max.cpuload.avg={} and reserved.memory={}G",
loadAverage, availablePhysicalMemorySize, maxCpuloadAvg, reservedMemory);
status = Constants.ABNORMAL_NODE_STATUS;
}
StringBuilder builder = new StringBuilder(100);
builder.append(OSUtils.cpuUsage()).append(COMMA);
builder.append(OSUtils.memoryUsage()).append(COMMA);
builder.append(OSUtils.loadAverage()).append(COMMA);
builder.append(OSUtils.availablePhysicalMemorySize()).append(Constants.COMMA);
builder.append(maxCpuloadAvg).append(Constants.COMMA);
builder.append(reservedMemory).append(Constants.COMMA);
builder.append(startTime).append(Constants.COMMA);
builder.append(DateUtils.dateToString(new Date())).append(Constants.COMMA);
builder.append(status).append(COMMA);
// save process id
builder.append(OSUtils.getProcessID());
// worker host weight
if (Constants.WORKER_TYPE.equals(serverType)) {
builder.append(Constants.COMMA).append(hostWeight);
}
return builder.toString();
}
}

12
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

@ -80,12 +80,6 @@ public class WorkerServer implements IStoppable {
*/
private NettyRemotingServer nettyRemotingServer;
/**
* worker registry
*/
@Autowired
private WorkerRegistryClient workerRegistryClient;
/**
* worker config
*/
@ -110,6 +104,12 @@ public class WorkerServer implements IStoppable {
@Autowired
private WorkerManagerThread workerManagerThread;
/**
* worker registry
*/
@Autowired
private WorkerRegistryClient workerRegistryClient;
private TaskPluginManager taskPluginManager;
/**

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java

@ -172,7 +172,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
// submit task to manager
if (!workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService, taskPluginManager))) {
logger.info("submit task to manager error, queue is full, queue size is {}", workerManager.getQueueSize());
logger.info("submit task to manager error, queue is full, queue size is {}", workerManager.getDelayQueueSize());
}
}

22
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java

@ -24,16 +24,15 @@ import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.apache.commons.lang.StringUtils;
import java.util.Date;
import java.util.Set;
import java.util.StringJoiner;
import java.util.concurrent.Executors;
@ -63,6 +62,12 @@ public class WorkerRegistryClient {
@Autowired
private WorkerConfig workerConfig;
/**
* worker manager
*/
@Autowired
private WorkerManagerThread workerManagerThread;
/**
* heartbeat executor
*/
@ -71,16 +76,16 @@ public class WorkerRegistryClient {
private RegistryClient registryClient;
/**
* worker start time
* worker startup time, ms
*/
private String startTime;
private long startupTime;
private Set<String> workerGroups;
@PostConstruct
public void initWorkRegistry() {
this.workerGroups = workerConfig.getWorkerGroups();
this.startTime = DateUtils.dateToString(new Date());
this.startupTime = System.currentTimeMillis();
this.registryClient = RegistryClient.getInstance();
this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor"));
}
@ -98,13 +103,16 @@ public class WorkerRegistryClient {
logger.info("worker node : {} registry to ZK {} successfully", address, workerZKPath);
}
HeartBeatTask heartBeatTask = new HeartBeatTask(startTime,
HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
workerConfig.getWorkerMaxCpuloadAvg(),
workerConfig.getWorkerReservedMemory(),
workerConfig.getHostWeight(),
workerZkPaths,
Constants.WORKER_TYPE,
registryClient);
registryClient,
workerConfig.getWorkerExecThreads(),
workerManagerThread
);
this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, workerHeartbeatInterval, workerHeartbeatInterval, TimeUnit.SECONDS);
logger.info("worker node : {} heartbeat interval {} s", address, workerHeartbeatInterval);

14
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java

@ -33,6 +33,7 @@ import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -73,14 +74,23 @@ public class WorkerManagerThread implements Runnable {
}
/**
* get queue size
* get delay queue size
*
* @return queue size
*/
public int getQueueSize() {
public int getDelayQueueSize() {
return workerExecuteQueue.size();
}
/**
* get thread pool queue size
*
* @return queue size
*/
public int getThreadPoolQueueSize() {
return ((ThreadPoolExecutor) workerExecService).getQueue().size();
}
/**
* Kill tasks that have not been executed, like delay task
* then send Response to Master, update the execution status of task instance

15
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java

@ -31,12 +31,14 @@ import static org.apache.dolphinscheduler.common.Constants.WORKER_TYPE;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.ResInfo;
import org.apache.dolphinscheduler.common.utils.HeartBeat;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.commons.lang.StringUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -93,10 +95,17 @@ public class RegistryClient extends RegistryCenter {
List<Server> serverList = new ArrayList<>();
for (Map.Entry<String, String> entry : serverMaps.entrySet()) {
Server server = ResInfo.parseHeartbeatForRegistryInfo(entry.getValue());
if (server == null) {
HeartBeat heartBeat = HeartBeat.decodeHeartBeat(entry.getValue());
if (heartBeat == null) {
continue;
}
Server server = new Server();
server.setResInfo(JSONUtils.toJsonString(heartBeat));
server.setCreateTime(new Date(heartBeat.getStartupTime()));
server.setLastHeartbeatTime(new Date(heartBeat.getReportTime()));
server.setId(heartBeat.getProcessId());
String key = entry.getKey();
server.setZkDirectory(parentPath + "/" + key);
// set host and port

Loading…
Cancel
Save