Browse Source

Refactor heart beat task, use json to serialize/deserialize (#11702)

* Refactor heart beat task, use json to serialize/deserialize
3.1.0-release
Wenjun Ruan 2 years ago committed by GitHub
parent
commit
67e7f88d8b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 10
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
  2. 81
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/BaseHeartBeatTask.java
  3. 21
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/HeartBeat.java
  4. 39
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java
  5. 92
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/Server.java
  6. 47
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerHeartBeat.java
  7. 261
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HeartBeat.java
  8. 46
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java
  9. 77
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/HeartBeatTest.java
  10. 7
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
  11. 24
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java
  12. 39
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
  13. 70
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java
  14. 56
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
  15. 160
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
  16. 4
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/WorkerInfoChangeListener.java
  17. 71
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/task/MasterHeartBeatTask.java
  18. 2
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java
  19. 8
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
  20. 36
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java
  21. 4
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
  22. 18
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
  23. 79
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerHeartBeatTask.java
  24. 110
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
  25. 107
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java

10
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java

@ -28,7 +28,9 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.HeartBeat;
import org.apache.dolphinscheduler.common.model.HeartBeat;
import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
@ -318,9 +320,9 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
wg.setName(workerGroup);
if (isPaging) {
String registeredValue = registryClient.get(workerGroupPath + Constants.SINGLE_SLASH + childrenNodes.iterator().next());
HeartBeat heartBeat = HeartBeat.decodeHeartBeat(registeredValue);
wg.setCreateTime(new Date(heartBeat.getStartupTime()));
wg.setUpdateTime(new Date(heartBeat.getReportTime()));
WorkerHeartBeat workerHeartBeat = JSONUtils.parseObject(registeredValue, WorkerHeartBeat.class);
wg.setCreateTime(new Date(workerHeartBeat.getStartupTime()));
wg.setUpdateTime(new Date(workerHeartBeat.getReportTime()));
wg.setSystemDefault(true);
if (workerGroupsMap != null && workerGroupsMap.containsKey(workerGroup)) {
wg.setDescription(workerGroupsMap.get(workerGroup).getDescription());

81
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/BaseHeartBeatTask.java

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.model;
import lombok.extern.slf4j.Slf4j;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
@Slf4j
public abstract class BaseHeartBeatTask<T> extends BaseDaemonThread {
private final String threadName;
private final long heartBeatInterval;
protected boolean runningFlag;
public BaseHeartBeatTask(String threadName, long heartBeatInterval) {
super(threadName);
this.threadName = threadName;
this.heartBeatInterval = heartBeatInterval;
this.runningFlag = true;
}
@Override
public synchronized void start() {
log.info("Starting {}", threadName);
super.start();
log.info("Started {}, heartBeatInterval: {}", threadName, heartBeatInterval);
}
@Override
public void run() {
while (runningFlag) {
try {
if (!ServerLifeCycleManager.isRunning()) {
log.info("The current server status is {}, will not write heartBeatInfo into registry", ServerLifeCycleManager.getServerStatus());
continue;
}
T heartBeat = getHeartBeat();
writeHeartBeat(heartBeat);
} catch (Exception ex) {
log.error("{} task execute failed", threadName, ex);
} finally {
try {
Thread.sleep(heartBeatInterval);
} catch (InterruptedException e) {
handleInterruptException(e);
}
}
}
}
public void shutdown() {
log.warn("{} task finished", threadName);
runningFlag = false;
}
private void handleInterruptException(InterruptedException ex) {
log.warn("{} has been interrupted", threadName, ex);
Thread.currentThread().interrupt();
}
public abstract T getHeartBeat();
public abstract void writeHeartBeat(T heartBeat);
}

21
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/HeartBeat.java

@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.model;
public interface HeartBeat {
}

39
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.model;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class MasterHeartBeat implements HeartBeat {
private long startupTime;
private long reportTime;
private double cpuUsage;
private double memoryUsage;
private double loadAverage;
private double availablePhysicalMemorySize;
private double maxCpuloadAvg;
private double reservedMemory;
private int processId;
}

92
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/Server.java

@ -17,31 +17,19 @@
package org.apache.dolphinscheduler.common.model;
import lombok.Data;
import java.util.Date;
/**
* server
*/
@Data
public class Server {
/**
* id
*/
private int id;
/**
* host
*/
private String host;
/**
* port
*/
private int port;
/**
* master directory in zookeeper
*/
private String zkDirectory;
/**
@ -49,82 +37,8 @@ public class Server {
*/
private String resInfo;
/**
* create time
*/
private Date createTime;
/**
* laster heart beat time
*/
private Date lastHeartbeatTime;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public Date getCreateTime() {
return createTime;
}
public void setCreateTime(Date createTime) {
this.createTime = createTime;
}
public String getZkDirectory() {
return zkDirectory;
}
public void setZkDirectory(String zkDirectory) {
this.zkDirectory = zkDirectory;
}
public Date getLastHeartbeatTime() {
return lastHeartbeatTime;
}
public void setLastHeartbeatTime(Date lastHeartbeatTime) {
this.lastHeartbeatTime = lastHeartbeatTime;
}
public String getResInfo() {
return resInfo;
}
public void setResInfo(String resInfo) {
this.resInfo = resInfo;
}
@Override
public String toString() {
return "MasterServer{" +
"id=" + id +
", host='" + host + '\'' +
", port=" + port +
", zkDirectory='" + zkDirectory + '\'' +
", resInfo='" + resInfo + '\'' +
", createTime=" + createTime +
", lastHeartbeatTime=" + lastHeartbeatTime +
'}';
}
}

47
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerHeartBeat.java

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.model;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class WorkerHeartBeat implements HeartBeat {
private long startupTime;
private long reportTime;
private double cpuUsage;
private double memoryUsage;
private double loadAverage;
private double availablePhysicalMemorySize;
private double maxCpuloadAvg;
private double reservedMemory;
private int serverStatus;
private int processId;
private int workerHostWeight; // worker host weight
private int workerWaitingTaskCount; // worker waiting task count
private int workerExecThreadCount; // worker thread pool thread count
}

261
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HeartBeat.java

@ -1,261 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.utils;
import org.apache.dolphinscheduler.common.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HeartBeat {
private static final Logger logger = LoggerFactory.getLogger(HeartBeat.class);
private long startupTime;
private long reportTime;
private double cpuUsage;
private double memoryUsage;
private double loadAverage;
private double availablePhysicalMemorySize;
private double maxCpuloadAvg;
private double reservedMemory;
private int serverStatus;
private int processId;
private int workerHostWeight; // worker host weight
private int workerWaitingTaskCount; // worker waiting task count
private int workerExecThreadCount; // worker thread pool thread count
private double diskAvailable;
public double getDiskAvailable() {
return diskAvailable;
}
public void setDiskAvailable(double diskAvailable) {
this.diskAvailable = diskAvailable;
}
public long getStartupTime() {
return startupTime;
}
public void setStartupTime(long startupTime) {
this.startupTime = startupTime;
}
public long getReportTime() {
return reportTime;
}
public void setReportTime(long reportTime) {
this.reportTime = reportTime;
}
public double getCpuUsage() {
return cpuUsage;
}
public void setCpuUsage(double cpuUsage) {
this.cpuUsage = cpuUsage;
}
public double getMemoryUsage() {
return memoryUsage;
}
public void setMemoryUsage(double memoryUsage) {
this.memoryUsage = memoryUsage;
}
public double getLoadAverage() {
return loadAverage;
}
public void setLoadAverage(double loadAverage) {
this.loadAverage = loadAverage;
}
public double getAvailablePhysicalMemorySize() {
return availablePhysicalMemorySize;
}
public void setAvailablePhysicalMemorySize(double availablePhysicalMemorySize) {
this.availablePhysicalMemorySize = availablePhysicalMemorySize;
}
public double getMaxCpuloadAvg() {
return maxCpuloadAvg;
}
public void setMaxCpuloadAvg(double maxCpuloadAvg) {
this.maxCpuloadAvg = maxCpuloadAvg;
}
public double getReservedMemory() {
return reservedMemory;
}
public void setReservedMemory(double reservedMemory) {
this.reservedMemory = reservedMemory;
}
public int getServerStatus() {
return serverStatus;
}
public void setServerStatus(int serverStatus) {
this.serverStatus = serverStatus;
}
public int getProcessId() {
return processId;
}
public void setProcessId(int processId) {
this.processId = processId;
}
public int getWorkerHostWeight() {
return workerHostWeight;
}
public void setWorkerHostWeight(int workerHostWeight) {
this.workerHostWeight = workerHostWeight;
}
public int getWorkerWaitingTaskCount() {
return workerWaitingTaskCount;
}
public void setWorkerWaitingTaskCount(int workerWaitingTaskCount) {
this.workerWaitingTaskCount = workerWaitingTaskCount;
}
public int getWorkerExecThreadCount() {
return workerExecThreadCount;
}
public void setWorkerExecThreadCount(int workerExecThreadCount) {
this.workerExecThreadCount = workerExecThreadCount;
}
public HeartBeat() {
this.reportTime = System.currentTimeMillis();
this.serverStatus = Constants.NORMAL_NODE_STATUS;
}
public HeartBeat(long startupTime, double maxCpuloadAvg, double reservedMemory) {
this.reportTime = System.currentTimeMillis();
this.serverStatus = Constants.NORMAL_NODE_STATUS;
this.startupTime = startupTime;
this.maxCpuloadAvg = maxCpuloadAvg;
this.reservedMemory = reservedMemory;
}
public HeartBeat(long startupTime, double maxCpuloadAvg, double reservedMemory, int hostWeight, int workerExecThreadCount) {
this.reportTime = System.currentTimeMillis();
this.serverStatus = Constants.NORMAL_NODE_STATUS;
this.startupTime = startupTime;
this.maxCpuloadAvg = maxCpuloadAvg;
this.reservedMemory = reservedMemory;
this.workerHostWeight = hostWeight;
this.workerExecThreadCount = workerExecThreadCount;
}
/**
* fill system info
*/
private void fillSystemInfo() {
this.cpuUsage = OSUtils.cpuUsage();
this.loadAverage = OSUtils.loadAverage();
this.availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize();
this.memoryUsage = OSUtils.memoryUsage();
this.diskAvailable = OSUtils.diskAvailable();
this.processId = OSUtils.getProcessID();
}
/**
* update server state
*/
public void updateServerState() {
this.reportTime = System.currentTimeMillis();
if (loadAverage > maxCpuloadAvg || availablePhysicalMemorySize < reservedMemory) {
logger.warn("current cpu load average {} is too high or available memory {}G is too low, under max.cpuload.avg={} and reserved.memory={}G",
loadAverage, availablePhysicalMemorySize, maxCpuloadAvg, reservedMemory);
this.serverStatus = Constants.ABNORMAL_NODE_STATUS;
} else if (workerWaitingTaskCount > workerExecThreadCount) {
logger.warn("current waiting task count {} is large than worker thread count {}, worker is busy", workerWaitingTaskCount, workerExecThreadCount);
this.serverStatus = Constants.BUSY_NODE_STATUE;
} else {
this.serverStatus = Constants.NORMAL_NODE_STATUS;
}
}
/**
* encode heartbeat
*/
public String encodeHeartBeat() {
this.fillSystemInfo();
this.updateServerState();
StringBuilder builder = new StringBuilder(100);
builder.append(cpuUsage).append(Constants.COMMA);
builder.append(memoryUsage).append(Constants.COMMA);
builder.append(loadAverage).append(Constants.COMMA);
builder.append(availablePhysicalMemorySize).append(Constants.COMMA);
builder.append(maxCpuloadAvg).append(Constants.COMMA);
builder.append(reservedMemory).append(Constants.COMMA);
builder.append(startupTime).append(Constants.COMMA);
builder.append(reportTime).append(Constants.COMMA);
builder.append(serverStatus).append(Constants.COMMA);
builder.append(processId).append(Constants.COMMA);
builder.append(workerHostWeight).append(Constants.COMMA);
builder.append(workerExecThreadCount).append(Constants.COMMA);
builder.append(workerWaitingTaskCount).append(Constants.COMMA);
builder.append(diskAvailable);
return builder.toString();
}
/**
* decode heartbeat
*/
public static HeartBeat decodeHeartBeat(String heartBeatInfo) {
String[] parts = heartBeatInfo.split(Constants.COMMA);
if (parts.length != Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH) {
return null;
}
HeartBeat heartBeat = new HeartBeat();
heartBeat.cpuUsage = Double.parseDouble(parts[0]);
heartBeat.memoryUsage = Double.parseDouble(parts[1]);
heartBeat.loadAverage = Double.parseDouble(parts[2]);
heartBeat.availablePhysicalMemorySize = Double.parseDouble(parts[3]);
heartBeat.maxCpuloadAvg = Double.parseDouble(parts[4]);
heartBeat.reservedMemory = Double.parseDouble(parts[5]);
heartBeat.startupTime = Long.parseLong(parts[6]);
heartBeat.reportTime = Long.parseLong(parts[7]);
heartBeat.serverStatus = Integer.parseInt(parts[8]);
heartBeat.processId = Integer.parseInt(parts[9]);
heartBeat.workerHostWeight = Integer.parseInt(parts[10]);
heartBeat.workerExecThreadCount = Integer.parseInt(parts[11]);
heartBeat.workerWaitingTaskCount = Integer.parseInt(parts[12]);
heartBeat.diskAvailable = Double.parseDouble(parts[13]);
return heartBeat;
}
}

46
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java

@ -17,28 +17,6 @@
package org.apache.dolphinscheduler.common.utils;
import static java.nio.charset.StandardCharsets.UTF_8;
import static com.fasterxml.jackson.databind.DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT;
import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
import static com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL;
import static com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_GETTERS;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
@ -55,6 +33,26 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import com.fasterxml.jackson.databind.type.CollectionType;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TimeZone;
import static com.fasterxml.jackson.databind.DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT;
import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
import static com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL;
import static com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_GETTERS;
import static java.nio.charset.StandardCharsets.UTF_8;
/**
* json utils
@ -130,7 +128,7 @@ public class JSONUtils {
* @return an object of type T from the string
* classOfT
*/
public static <T> T parseObject(String json, Class<T> clazz) {
public static @Nullable <T> T parseObject(String json, Class<T> clazz) {
if (StringUtils.isEmpty(json)) {
return null;
}
@ -138,7 +136,7 @@ public class JSONUtils {
try {
return objectMapper.readValue(json, clazz);
} catch (Exception e) {
logger.error("parse object exception!", e);
logger.error("Parse object exception, jsonStr: {}, class: {}", json, clazz, e);
}
return null;
}

77
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/HeartBeatTest.java

@ -1,77 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.utils;
import static org.junit.Assert.assertEquals;
import org.apache.dolphinscheduler.common.Constants;
import org.junit.Test;
/**
* NetUtilsTest
*/
public class HeartBeatTest {
@Test
public void testAbnormalState() {
long startupTime = System.currentTimeMillis();
double loadAverage = 100;
double reservedMemory = 100;
HeartBeat heartBeat = new HeartBeat(startupTime, loadAverage, reservedMemory);
heartBeat.updateServerState();
assertEquals(Constants.ABNORMAL_NODE_STATUS, heartBeat.getServerStatus());
}
@Test
public void testBusyState() {
long startupTime = System.currentTimeMillis();
double loadAverage = 0;
double reservedMemory = 0;
int hostWeight = 1;
int taskCount = 200;
int workerThreadCount = 199;
HeartBeat heartBeat = new HeartBeat(startupTime, loadAverage, reservedMemory, hostWeight, workerThreadCount);
heartBeat.setWorkerWaitingTaskCount(taskCount);
heartBeat.updateServerState();
assertEquals(Constants.BUSY_NODE_STATUE, heartBeat.getServerStatus());
}
@Test
public void testDecodeHeartBeat() throws Exception {
String heartBeatInfo = "0.35,0.58,3.09,6.47,5.0,1.0,1634033006749,1634033006857,1,29732,1,199,200,65.86";
HeartBeat heartBeat = HeartBeat.decodeHeartBeat(heartBeatInfo);
double delta = 0.001;
assertEquals(0.35, heartBeat.getCpuUsage(), delta);
assertEquals(0.58, heartBeat.getMemoryUsage(), delta);
assertEquals(3.09, heartBeat.getLoadAverage(), delta);
assertEquals(6.47, heartBeat.getAvailablePhysicalMemorySize(), delta);
assertEquals(5.0, heartBeat.getMaxCpuloadAvg(), delta);
assertEquals(1.0, heartBeat.getReservedMemory(), delta);
assertEquals(1634033006749L, heartBeat.getStartupTime());
assertEquals(1634033006857L, heartBeat.getReportTime());
assertEquals(1, heartBeat.getServerStatus());
assertEquals(29732, heartBeat.getProcessId());
assertEquals(199, heartBeat.getWorkerExecThreadCount());
assertEquals(200, heartBeat.getWorkerWaitingTaskCount());
assertEquals(65.86, heartBeat.getDiskAvailable(), delta);
}
}

7
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java

@ -95,7 +95,7 @@ public class MasterConfig implements Validator {
private String masterAddress;
// /nodes/master/ip:listenPort
private String masterRegistryNodePath;
private String masterRegistryPath;
@Override
public boolean supports(Class<?> clazz) {
@ -139,8 +139,7 @@ public class MasterConfig implements Validator {
masterConfig.setMaxCpuLoadAvg(Runtime.getRuntime().availableProcessors() * 2);
}
masterConfig.setMasterAddress(NetUtils.getAddr(masterConfig.getListenPort()));
masterConfig
.setMasterRegistryNodePath(REGISTRY_DOLPHINSCHEDULER_MASTERS + "/" + masterConfig.getMasterAddress());
masterConfig.setMasterRegistryPath(REGISTRY_DOLPHINSCHEDULER_MASTERS + "/" + masterConfig.getMasterAddress());
printConfig();
}
@ -161,6 +160,6 @@ public class MasterConfig implements Validator {
logger.info("Master config: killYarnJobWhenTaskFailover -> {} ", killYarnJobWhenTaskFailover);
logger.info("Master config: registryDisconnectStrategy -> {} ", registryDisconnectStrategy);
logger.info("Master config: masterAddress -> {} ", masterAddress);
logger.info("Master config: masterRegistryNodePath -> {} ", masterRegistryNodePath);
logger.info("Master config: masterRegistryPath -> {} ", masterRegistryPath);
}
}

24
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java

@ -17,8 +17,8 @@
package org.apache.dolphinscheduler.server.master.dispatch.host;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.HeartBeat;
import org.apache.commons.collections.CollectionUtils;
import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
@ -27,14 +27,13 @@ import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import org.springframework.beans.factory.annotation.Autowired;
/**
* common host manager
*/
@ -80,23 +79,10 @@ public abstract class CommonHostManager implements HostManager {
Set<String> nodes = serverNodeManager.getWorkerGroupNodes(workerGroup);
if (CollectionUtils.isNotEmpty(nodes)) {
for (String node : nodes) {
String heartbeat = serverNodeManager.getWorkerNodeInfo(node);
int hostWeight = getWorkerHostWeightFromHeartbeat(heartbeat);
hostWorkers.add(HostWorker.of(node, hostWeight, workerGroup));
WorkerHeartBeat workerNodeInfo = serverNodeManager.getWorkerNodeInfo(node);
hostWorkers.add(HostWorker.of(node, workerNodeInfo.getWorkerHostWeight(), workerGroup));
}
}
return hostWorkers;
}
protected int getWorkerHostWeightFromHeartbeat(String heartBeatInfo) {
int hostWeight = Constants.DEFAULT_WORKER_HOST_WEIGHT;
if (!StringUtils.isEmpty(heartBeatInfo)) {
HeartBeat heartBeat = HeartBeat.decodeHeartBeat(heartBeatInfo);
if (heartBeat != null) {
hostWeight = heartBeat.getWorkerHostWeight();
}
}
return hostWeight;
}
}

39
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java

@ -17,18 +17,19 @@
package org.apache.dolphinscheduler.server.master.dispatch.host;
import org.apache.commons.collections.CollectionUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.HeartBeat;
import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWeight;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWorker;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.LowerWeightRoundRobin;
import org.apache.dolphinscheduler.server.master.registry.WorkerInfoChangeListener;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.PostConstruct;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@ -39,11 +40,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* lower weight host manager
*/
@ -97,7 +93,7 @@ public class LowerWeightHostManager extends CommonHostManager {
private class WorkerWeightListener implements WorkerInfoChangeListener {
@Override
public void notify(Map<String, Set<String>> workerGroups, Map<String, String> workerNodeInfo) {
public void notify(Map<String, Set<String>> workerGroups, Map<String, WorkerHeartBeat> workerNodeInfo) {
syncWorkerResources(workerGroups, workerNodeInfo);
}
}
@ -109,7 +105,7 @@ public class LowerWeightHostManager extends CommonHostManager {
* @param workerNodeInfoMap worker node info map, key is worker node, value is worker info.
*/
private void syncWorkerResources(final Map<String, Set<String>> workerGroupNodes,
final Map<String, String> workerNodeInfoMap) {
final Map<String, WorkerHeartBeat> workerNodeInfoMap) {
try {
Map<String, Set<HostWeight>> workerHostWeights = new HashMap<>();
for (Map.Entry<String, Set<String>> entry : workerGroupNodes.entrySet()) {
@ -117,7 +113,7 @@ public class LowerWeightHostManager extends CommonHostManager {
Set<String> nodes = entry.getValue();
Set<HostWeight> hostWeights = new HashSet<>(nodes.size());
for (String node : nodes) {
String heartbeat = workerNodeInfoMap.getOrDefault(node, null);
WorkerHeartBeat heartbeat = workerNodeInfoMap.getOrDefault(node, null);
Optional<HostWeight> hostWeightOpt = getHostWeight(node, workerGroup, heartbeat);
hostWeightOpt.ifPresent(hostWeights::add);
}
@ -131,13 +127,9 @@ public class LowerWeightHostManager extends CommonHostManager {
}
}
private Optional<HostWeight> getHostWeight(String addr, String workerGroup, String heartBeatInfo) {
if (StringUtils.isEmpty(heartBeatInfo)) {
logger.warn("worker {} in work group {} have not received the heartbeat", addr, workerGroup);
return Optional.empty();
}
HeartBeat heartBeat = HeartBeat.decodeHeartBeat(heartBeatInfo);
public Optional<HostWeight> getHostWeight(String addr, String workerGroup, WorkerHeartBeat heartBeat) {
if (heartBeat == null) {
logger.warn("worker {} in work group {} have not received the heartbeat", addr, workerGroup);
return Optional.empty();
}
if (Constants.ABNORMAL_NODE_STATUS == heartBeat.getServerStatus()) {
@ -151,12 +143,15 @@ public class LowerWeightHostManager extends CommonHostManager {
return Optional.empty();
}
return Optional.of(
new HostWeight(HostWorker.of(addr, heartBeat.getWorkerHostWeight(), workerGroup),
heartBeat.getCpuUsage(), heartBeat.getMemoryUsage(), heartBeat.getLoadAverage(),
heartBeat.getWorkerWaitingTaskCount(), heartBeat.getStartupTime()));
new HostWeight(
HostWorker.of(addr, heartBeat.getWorkerHostWeight(), workerGroup),
heartBeat.getCpuUsage(),
heartBeat.getMemoryUsage(),
heartBeat.getLoadAverage(),
heartBeat.getWorkerWaitingTaskCount(),
heartBeat.getStartupTime()));
}
private void syncWorkerHostWeight(Map<String, Set<HostWeight>> workerHostWeights) {
lock.lock();
try {

70
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java

@ -1,70 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.registry;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.utils.HeartBeat;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Master heart beat task
*/
public class MasterHeartBeatTask implements Runnable {
private final Logger logger = LoggerFactory.getLogger(MasterHeartBeatTask.class);
private final Set<String> heartBeatPaths;
private final RegistryClient registryClient;
private final HeartBeat heartBeat;
private final AtomicInteger heartBeatErrorTimes = new AtomicInteger();
public MasterHeartBeatTask(long startupTime,
double maxCpuloadAvg,
double reservedMemory,
Set<String> heartBeatPaths,
RegistryClient registryClient) {
this.heartBeatPaths = heartBeatPaths;
this.registryClient = registryClient;
this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg, reservedMemory);
}
public String getHeartBeatInfo() {
return this.heartBeat.encodeHeartBeat();
}
@Override
public void run() {
try {
if (!ServerLifeCycleManager.isRunning()) {
return;
}
for (String heartBeatPath : heartBeatPaths) {
registryClient.persistEphemeral(heartBeatPath, heartBeat.encodeHeartBeat());
}
heartBeatErrorTimes.set(0);
} catch (Throwable ex) {
logger.error("HeartBeat task execute failed, errorTimes: {}", heartBeatErrorTimes.get(), ex);
}
}
}

56
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java

@ -17,36 +17,28 @@
package org.apache.dolphinscheduler.server.master.registry;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_NODE;
import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.registry.api.RegistryException;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.service.FailoverService;
import org.apache.dolphinscheduler.server.master.task.MasterHeartBeatTask;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.apache.commons.lang3.StringUtils;
import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.google.common.collect.Sets;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_NODE;
import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
/**
* <p>DolphinScheduler master register client, used to connect to registry and hand the registry events.
* <p>When the Master node startup, it will register in registry center. And schedule a {@link MasterHeartBeatTask} to update its metadata in registry.
* <p>When the Master node startup, it will register in registry center. And start a {@link MasterHeartBeatTask} to update its metadata in registry.
*/
@Component
public class MasterRegistryClient implements AutoCloseable {
@ -65,18 +57,11 @@ public class MasterRegistryClient implements AutoCloseable {
@Autowired
private MasterConnectStrategy masterConnectStrategy;
private ScheduledExecutorService heartBeatExecutor;
/**
* master startup time, ms
*/
private long startupTime;
private MasterHeartBeatTask masterHeartBeatTask;
public void start() {
try {
this.startupTime = System.currentTimeMillis();
this.heartBeatExecutor =
Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor"));
this.masterHeartBeatTask = new MasterHeartBeatTask(masterConfig, registryClient);
// master registry
registry();
registryClient.addConnectionStateListener(
@ -166,17 +151,11 @@ public class MasterRegistryClient implements AutoCloseable {
*/
void registry() {
logger.info("Master node : {} registering to registry center", masterConfig.getMasterAddress());
String localNodePath = masterConfig.getMasterRegistryNodePath();
Duration masterHeartbeatInterval = masterConfig.getHeartbeatInterval();
MasterHeartBeatTask heartBeatTask = new MasterHeartBeatTask(startupTime,
masterConfig.getMaxCpuLoadAvg(),
masterConfig.getReservedMemory(),
Sets.newHashSet(localNodePath),
registryClient);
String masterRegistryPath = masterConfig.getMasterRegistryPath();
// remove before persist
registryClient.remove(localNodePath);
registryClient.persistEphemeral(localNodePath, heartBeatTask.getHeartBeatInfo());
registryClient.remove(masterRegistryPath);
registryClient.persistEphemeral(masterRegistryPath, JSONUtils.toJsonString(masterHeartBeatTask.getHeartBeat()));
while (!registryClient.checkNodeExists(NetUtils.getHost(), NodeType.MASTER)) {
logger.warn("The current master server node:{} cannot find in registry", NetUtils.getHost());
@ -186,19 +165,18 @@ public class MasterRegistryClient implements AutoCloseable {
// sleep 1s, waiting master failover remove
ThreadUtils.sleep(SLEEP_TIME_MILLIS);
this.heartBeatExecutor.scheduleWithFixedDelay(heartBeatTask, 0L, masterHeartbeatInterval.getSeconds(),
TimeUnit.SECONDS);
logger.info("Master node : {} registered to registry center successfully with heartBeatInterval : {}s",
masterConfig.getMasterAddress(), masterHeartbeatInterval);
masterHeartBeatTask.start();
logger.info("Master node : {} registered to registry center successfully", masterConfig.getMasterAddress());
}
public void deregister() {
try {
registryClient.remove(masterConfig.getMasterRegistryNodePath());
registryClient.remove(masterConfig.getMasterRegistryPath());
logger.info("Master node : {} unRegistry to register center.", masterConfig.getMasterAddress());
heartBeatExecutor.shutdown();
logger.info("MasterServer heartbeat executor shutdown");
if (masterHeartBeatTask != null) {
masterHeartBeatTask.shutdown();
}
registryClient.close();
} catch (Exception e) {
logger.error("MasterServer remove registry path exception ", e);

160
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java

@ -17,13 +17,13 @@
package org.apache.dolphinscheduler.server.master.registry;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
@ -34,10 +34,13 @@ import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.queue.MasterPriorityQueue;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import javax.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -52,14 +55,10 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
/**
* server node manager
@ -69,23 +68,19 @@ public class ServerNodeManager implements InitializingBean {
private final Logger logger = LoggerFactory.getLogger(ServerNodeManager.class);
/**
* master lock
*/
private final Lock masterLock = new ReentrantLock();
/**
* worker group lock
*/
private final Lock workerGroupLock = new ReentrantLock();
private final ReentrantReadWriteLock workerGroupLock = new ReentrantReadWriteLock();
private final ReentrantReadWriteLock.ReadLock workerGroupReadLock = workerGroupLock.readLock();
private final ReentrantReadWriteLock.WriteLock workerGroupWriteLock = workerGroupLock.writeLock();
/**
* worker node info lock
*/
private final Lock workerNodeInfoLock = new ReentrantLock();
private final ReentrantReadWriteLock workerNodeInfoLock = new ReentrantReadWriteLock();
private final ReentrantReadWriteLock.ReadLock workerNodeInfoReadLock = workerNodeInfoLock.readLock();
private final ReentrantReadWriteLock.WriteLock workerNodeInfoWriteLock = workerNodeInfoLock.writeLock();
/**
* worker group nodes
* worker group nodes, workerGroup -> ips
*/
private final ConcurrentHashMap<String, Set<String>> workerGroupNodes = new ConcurrentHashMap<>();
@ -94,10 +89,7 @@ public class ServerNodeManager implements InitializingBean {
*/
private final Set<String> masterNodes = new HashSet<>();
/**
* worker node info
*/
private final Map<String, String> workerNodeInfo = new HashMap<>();
private final Map<String, WorkerHeartBeat> workerNodeInfo = new HashMap<>();
/**
* executor service
@ -108,7 +100,7 @@ public class ServerNodeManager implements InitializingBean {
private RegistryClient registryClient;
/**
* eg : /node/worker/group/127.0.0.1:xxx
* eg : /dolphinscheduler/node/worker/group/127.0.0.1:xxx
*/
private static final int WORKER_LISTENER_CHECK_LENGTH = 5;
@ -244,26 +236,29 @@ public class ServerNodeManager implements InitializingBean {
final String data = event.data();
if (registryClient.isWorkerPath(path)) {
try {
String[] parts = path.split("/");
if (parts.length < WORKER_LISTENER_CHECK_LENGTH) {
throw new IllegalArgumentException(String.format("worker group path : %s is not valid, ignore", path));
}
final String workerGroupName = parts[parts.length - 2];
final String workerAddress = parts[parts.length - 1];
if (type == Type.ADD) {
logger.info("worker group node : {} added.", path);
String group = parseGroup(path);
Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(group);
Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(workerGroupName);
logger.info("currentNodes : {}", currentNodes);
syncWorkerGroupNodes(group, currentNodes);
syncWorkerGroupNodes(workerGroupName, currentNodes);
} else if (type == Type.REMOVE) {
logger.info("worker group node : {} down.", path);
String group = parseGroup(path);
Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(group);
syncWorkerGroupNodes(group, currentNodes);
Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(workerGroupName);
syncWorkerGroupNodes(workerGroupName, currentNodes);
alertDao.sendServerStoppedAlert(1, path, "WORKER");
} else if (type == Type.UPDATE) {
logger.debug("worker group node : {} update, data: {}", path, data);
String group = parseGroup(path);
Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(group);
syncWorkerGroupNodes(group, currentNodes);
Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(workerGroupName);
syncWorkerGroupNodes(workerGroupName, currentNodes);
String node = parseNode(path);
syncSingleWorkerNodeInfo(node, data);
syncSingleWorkerNodeInfo(workerAddress, JSONUtils.parseObject(data, WorkerHeartBeat.class));
}
notifyWorkerInfoChangeListeners();
} catch (IllegalArgumentException ex) {
@ -274,22 +269,6 @@ public class ServerNodeManager implements InitializingBean {
}
}
private String parseGroup(String path) {
String[] parts = path.split("/");
if (parts.length < WORKER_LISTENER_CHECK_LENGTH) {
throw new IllegalArgumentException(String.format("worker group path : %s is not valid, ignore", path));
}
return parts[parts.length - 2];
}
private String parseNode(String path) {
String[] parts = path.split("/");
if (parts.length < WORKER_LISTENER_CHECK_LENGTH) {
throw new IllegalArgumentException(String.format("worker group path : %s is not valid, ignore", path));
}
return parts[parts.length - 1];
}
}
class MasterDataListener implements SubscribeListener {
@ -333,20 +312,6 @@ public class ServerNodeManager implements InitializingBean {
}
/**
* get master nodes
*
* @return master nodes
*/
public Set<String> getMasterNodes() {
masterLock.lock();
try {
return Collections.unmodifiableSet(masterNodes);
} finally {
masterLock.unlock();
}
}
/**
* sync master nodes
*
@ -355,18 +320,17 @@ public class ServerNodeManager implements InitializingBean {
private void syncMasterNodes(Collection<String> nodes, List<Server> masterNodes) {
masterLock.lock();
try {
String addr = NetUtils.getAddr(NetUtils.getHost(), masterConfig.getListenPort());
this.masterNodes.addAll(nodes);
this.masterPriorityQueue.clear();
this.masterPriorityQueue.putList(masterNodes);
int index = masterPriorityQueue.getIndex(addr);
int index = masterPriorityQueue.getIndex(masterConfig.getMasterAddress());
if (index >= 0) {
MASTER_SIZE = nodes.size();
MASTER_SLOT = index;
} else {
logger.warn("current addr:{} is not in active master list", addr);
logger.warn("current addr:{} is not in active master list", masterConfig.getMasterAddress());
}
logger.info("update master nodes, master size: {}, slot: {}, addr: {}", MASTER_SIZE, MASTER_SLOT, addr);
logger.info("update master nodes, master size: {}, slot: {}, addr: {}", MASTER_SIZE, MASTER_SLOT, masterConfig.getMasterAddress());
} finally {
masterLock.unlock();
}
@ -379,19 +343,24 @@ public class ServerNodeManager implements InitializingBean {
* @param nodes worker nodes
*/
private void syncWorkerGroupNodes(String workerGroup, Collection<String> nodes) {
workerGroupLock.lock();
workerGroupWriteLock.lock();
try {
Set<String> workerNodes = workerGroupNodes.getOrDefault(workerGroup, new HashSet<>());
workerNodes.clear();
workerNodes.addAll(nodes);
workerGroupNodes.put(workerGroup, workerNodes);
} finally {
workerGroupLock.unlock();
workerGroupWriteLock.unlock();
}
}
public Map<String, Set<String>> getWorkerGroupNodes() {
return Collections.unmodifiableMap(workerGroupNodes);
workerGroupReadLock.lock();
try {
return Collections.unmodifiableMap(workerGroupNodes);
} finally {
workerGroupReadLock.unlock();
}
}
/**
@ -401,7 +370,7 @@ public class ServerNodeManager implements InitializingBean {
* @return worker nodes
*/
public Set<String> getWorkerGroupNodes(String workerGroup) {
workerGroupLock.lock();
workerGroupReadLock.lock();
try {
if (StringUtils.isEmpty(workerGroup)) {
workerGroup = Constants.DEFAULT_WORKER_GROUP;
@ -412,16 +381,11 @@ public class ServerNodeManager implements InitializingBean {
}
return nodes;
} finally {
workerGroupLock.unlock();
workerGroupReadLock.unlock();
}
}
/**
* get worker node info
*
* @return worker node info
*/
public Map<String, String> getWorkerNodeInfo() {
public Map<String, WorkerHeartBeat> getWorkerNodeInfo() {
return Collections.unmodifiableMap(workerNodeInfo);
}
@ -431,12 +395,12 @@ public class ServerNodeManager implements InitializingBean {
* @param workerNode worker node
* @return worker node info
*/
public String getWorkerNodeInfo(String workerNode) {
workerNodeInfoLock.lock();
public WorkerHeartBeat getWorkerNodeInfo(String workerNode) {
workerNodeInfoReadLock.lock();
try {
return workerNodeInfo.getOrDefault(workerNode, null);
} finally {
workerNodeInfoLock.unlock();
workerNodeInfoReadLock.unlock();
}
}
@ -446,24 +410,26 @@ public class ServerNodeManager implements InitializingBean {
* @param newWorkerNodeInfo new worker node info
*/
private void syncAllWorkerNodeInfo(Map<String, String> newWorkerNodeInfo) {
workerNodeInfoLock.lock();
workerNodeInfoWriteLock.lock();
try {
workerNodeInfo.clear();
workerNodeInfo.putAll(newWorkerNodeInfo);
for (Map.Entry<String, String> entry : newWorkerNodeInfo.entrySet()) {
workerNodeInfo.put(entry.getKey(), JSONUtils.parseObject(entry.getValue(), WorkerHeartBeat.class));
}
} finally {
workerNodeInfoLock.unlock();
workerNodeInfoWriteLock.unlock();
}
}
/**
* sync single worker node info
*/
private void syncSingleWorkerNodeInfo(String node, String info) {
workerNodeInfoLock.lock();
private void syncSingleWorkerNodeInfo(String node, WorkerHeartBeat info) {
workerNodeInfoWriteLock.lock();
try {
workerNodeInfo.put(node, info);
} finally {
workerNodeInfoLock.unlock();
workerNodeInfoWriteLock.unlock();
}
}
@ -478,7 +444,7 @@ public class ServerNodeManager implements InitializingBean {
private void notifyWorkerInfoChangeListeners() {
Map<String, Set<String>> workerGroupNodes = getWorkerGroupNodes();
Map<String, String> workerNodeInfo = getWorkerNodeInfo();
Map<String, WorkerHeartBeat> workerNodeInfo = getWorkerNodeInfo();
for (WorkerInfoChangeListener listener : workerInfoChangeListeners) {
listener.notify(workerGroupNodes, workerNodeInfo);
}

4
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/WorkerInfoChangeListener.java

@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.server.master.registry;
import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
import java.util.Map;
import java.util.Set;
@ -31,6 +33,6 @@ public interface WorkerInfoChangeListener {
* @param workerGroups worker groups map, key is worker group name, value is worker address.
* @param workerNodeInfo worker node info map, key is worker address, value is worker info.
*/
void notify(Map<String, Set<String>> workerGroups, Map<String, String> workerNodeInfo);
void notify(Map<String, Set<String>> workerGroups, Map<String, WorkerHeartBeat> workerNodeInfo);
}

71
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/task/MasterHeartBeatTask.java

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.task;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.model.BaseHeartBeatTask;
import org.apache.dolphinscheduler.common.model.MasterHeartBeat;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
@Slf4j
public class MasterHeartBeatTask extends BaseHeartBeatTask<MasterHeartBeat> {
private final MasterConfig masterConfig;
private final RegistryClient registryClient;
private final String heartBeatPath;
private final int processId;
public MasterHeartBeatTask(@NonNull MasterConfig masterConfig,
@NonNull RegistryClient registryClient) {
super("MasterHeartBeatTask", masterConfig.getHeartbeatInterval().toMillis());
this.masterConfig = masterConfig;
this.registryClient = registryClient;
this.heartBeatPath = masterConfig.getMasterRegistryPath();
this.processId = OSUtils.getProcessID();
}
@Override
public MasterHeartBeat getHeartBeat() {
return MasterHeartBeat.builder()
.startupTime(ServerLifeCycleManager.getServerStartupTime())
.reportTime(System.currentTimeMillis())
.cpuUsage(OSUtils.cpuUsage())
.loadAverage(OSUtils.loadAverage())
.availablePhysicalMemorySize(OSUtils.availablePhysicalMemorySize())
.maxCpuloadAvg(masterConfig.getMaxCpuLoadAvg())
.reservedMemory(masterConfig.getReservedMemory())
.processId(processId)
.build();
}
@Override
public void writeHeartBeat(MasterHeartBeat masterHeartBeat) {
String masterHeartBeatJson = JSONUtils.toJsonString(masterHeartBeat);
registryClient.persistEphemeral(heartBeatPath, masterHeartBeatJson);
log.info("Success write master heartBeatInfo into registry, masterRegistryPath: {}, heartBeatInfo: {}",
heartBeatPath, masterHeartBeatJson);
}
}

2
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.master.dispatch.host;
import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.ExecutionContextTestUtils;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
@ -55,6 +56,7 @@ public class RoundRobinHostManagerTest {
@Test
public void testSelectWithResult() {
Mockito.when(serverNodeManager.getWorkerGroupNodes("default")).thenReturn(Sets.newHashSet("192.168.1.1:22"));
Mockito.when(serverNodeManager.getWorkerNodeInfo("192.168.1.1:22")).thenReturn(new WorkerHeartBeat());
ExecutionContext context = ExecutionContextTestUtils.getExecutionContext(10000);
Host host = roundRobinHostManager.select(context);
Assert.assertTrue(!Strings.isNullOrEmpty(host.getAddress()));

8
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java

@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.registry.api.ConnectionState;
import org.apache.dolphinscheduler.server.master.cache.impl.ProcessInstanceExecCacheManagerImpl;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.task.MasterHeartBeatTask;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
@ -69,6 +70,12 @@ public class MasterRegistryClientTest {
@Mock
private ProcessService processService;
@Mock
private MasterConnectStrategy masterConnectStrategy;
@Mock
private MasterHeartBeatTask masterHeartBeatTask;
@Mock
private ProcessInstanceExecCacheManagerImpl processInstanceExecCacheManager;
@ -81,6 +88,7 @@ public class MasterRegistryClientTest {
});
ReflectionTestUtils.setField(masterRegistryClient, "registryClient", registryClient);
ReflectionTestUtils.setField(masterRegistryClient, "masterHeartBeatTask", masterHeartBeatTask);
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(1);

36
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java

@ -19,11 +19,13 @@ package org.apache.dolphinscheduler.service.registry;
import com.google.common.base.Strings;
import lombok.NonNull;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.MasterHeartBeat;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.HeartBeat;
import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.registry.api.ConnectionListener;
import org.apache.dolphinscheduler.registry.api.Registry;
@ -94,21 +96,33 @@ public class RegistryClient {
List<Server> serverList = new ArrayList<>();
for (Map.Entry<String, String> entry : serverMaps.entrySet()) {
HeartBeat heartBeat = HeartBeat.decodeHeartBeat(entry.getValue());
if (heartBeat == null) {
String serverPath = entry.getKey();
String heartBeatJson = entry.getValue();
if (StringUtils.isEmpty(heartBeatJson)) {
logger.error("The heartBeatJson is empty, serverPath: {}", serverPath);
continue;
}
Server server = new Server();
server.setResInfo(JSONUtils.toJsonString(heartBeat));
server.setCreateTime(new Date(heartBeat.getStartupTime()));
server.setLastHeartbeatTime(new Date(heartBeat.getReportTime()));
server.setId(heartBeat.getProcessId());
switch (nodeType) {
case MASTER:
MasterHeartBeat masterHeartBeat = JSONUtils.parseObject(heartBeatJson, MasterHeartBeat.class);
server.setCreateTime(new Date(masterHeartBeat.getStartupTime()));
server.setLastHeartbeatTime(new Date(masterHeartBeat.getReportTime()));
server.setId(masterHeartBeat.getProcessId());
break;
case WORKER:
WorkerHeartBeat workerHeartBeat = JSONUtils.parseObject(heartBeatJson, WorkerHeartBeat.class);
server.setCreateTime(new Date(workerHeartBeat.getStartupTime()));
server.setLastHeartbeatTime(new Date(workerHeartBeat.getReportTime()));
server.setId(workerHeartBeat.getProcessId());
break;
}
String key = entry.getKey();
server.setZkDirectory(parentPath + "/" + key);
server.setResInfo(heartBeatJson);
// todo: add host, port in heartBeat Info, so that we don't need to parse this again
server.setZkDirectory(parentPath + "/" + serverPath);
// set host and port
String[] hostAndPort = key.split(COLON);
String[] hostAndPort = serverPath.split(COLON);
String[] hosts = hostAndPort[0].split(DIVISION_STRING);
// fetch the last one
server.setHost(hosts[hosts.length - 1]);

4
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.plugin.task.api.ProcessUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient;
@ -95,6 +96,9 @@ public class WorkerServer implements IStoppable {
@Autowired
private MessageRetryRunner messageRetryRunner;
@Autowired
private WorkerConfig workerConfig;
/**
* worker server startup, not use web service
*

18
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java

@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.worker.config;
import com.google.common.collect.Sets;
import lombok.Data;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.registry.api.ConnectStrategyProperties;
import org.slf4j.Logger;
@ -31,6 +32,9 @@ import org.springframework.validation.annotation.Validated;
import java.time.Duration;
import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
@Data
@Validated
@ -57,6 +61,7 @@ public class WorkerConfig implements Validator {
* This field doesn't need to set at config file, it will be calculated by workerIp:listenPort
*/
private String workerAddress;
private Set<String> workerGroupRegistryPaths;
@Override
public boolean supports(Class<?> clazz) {
@ -76,6 +81,18 @@ public class WorkerConfig implements Validator {
workerConfig.setMaxCpuLoadAvg(Runtime.getRuntime().availableProcessors() * 2);
}
workerConfig.setWorkerAddress(NetUtils.getAddr(workerConfig.getListenPort()));
workerConfig.setGroups(workerConfig.getGroups().stream().map(String::trim).collect(Collectors.toSet()));
if (CollectionUtils.isEmpty(workerConfig.getGroups())) {
errors.rejectValue("groups", null, "should not be empty");
}
Set<String> workerRegistryPaths = workerConfig.getGroups()
.stream()
.map(workerGroup -> REGISTRY_DOLPHINSCHEDULER_WORKERS + "/" + workerGroup + "/" + workerConfig.getWorkerAddress())
.collect(Collectors.toSet());
workerConfig.setWorkerGroupRegistryPaths(workerRegistryPaths);
printConfig();
}
@ -93,5 +110,6 @@ public class WorkerConfig implements Validator {
logger.info("Worker config: alertListenPort -> {}", alertListenPort);
logger.info("Worker config: registryDisconnectStrategy -> {}", registryDisconnectStrategy);
logger.info("Worker config: workerAddress -> {}", registryDisconnectStrategy);
logger.info("Worker config: workerGroupRegistryPaths: {}", workerGroupRegistryPaths);
}
}

79
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerHeartBeatTask.java

@ -1,79 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.registry;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.utils.HeartBeat;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Heart beat task
*/
public class WorkerHeartBeatTask implements Runnable {
private final Logger logger = LoggerFactory.getLogger(WorkerHeartBeatTask.class);
private final Set<String> heartBeatPaths;
private final RegistryClient registryClient;
private int workerWaitingTaskCount;
private final HeartBeat heartBeat;
private final AtomicInteger heartBeatErrorTimes = new AtomicInteger();
public WorkerHeartBeatTask(long startupTime,
double maxCpuloadAvg,
double reservedMemory,
int hostWeight,
Set<String> heartBeatPaths,
RegistryClient registryClient,
int workerThreadCount,
int workerWaitingTaskCount) {
this.heartBeatPaths = heartBeatPaths;
this.registryClient = registryClient;
this.workerWaitingTaskCount = workerWaitingTaskCount;
this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg, reservedMemory, hostWeight, workerThreadCount);
}
public String getHeartBeatInfo() {
return this.heartBeat.encodeHeartBeat();
}
@Override
public void run() {
try {
if (!ServerLifeCycleManager.isRunning()) {
return;
}
heartBeat.setStartupTime(ServerLifeCycleManager.getServerStartupTime());
// update waiting task count
heartBeat.setWorkerWaitingTaskCount(workerWaitingTaskCount);
for (String heartBeatPath : heartBeatPaths) {
registryClient.persistEphemeral(heartBeatPath, heartBeat.encodeHeartBeat());
}
heartBeatErrorTimes.set(0);
} catch (Throwable ex) {
logger.error("HeartBeat task execute failed, errorTimes: {}", heartBeatErrorTimes.get(), ex);
}
}
}

110
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java

@ -17,80 +17,52 @@
package org.apache.dolphinscheduler.server.worker.registry;
import com.google.common.base.Strings;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.registry.api.RegistryException;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.server.worker.task.WorkerHeartBeatTask;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.Set;
import java.util.StringJoiner;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
/**
* worker registry
*/
@Slf4j
@Service
public class WorkerRegistryClient implements AutoCloseable {
private final Logger logger = LoggerFactory.getLogger(WorkerRegistryClient.class);
/**
* worker config
*/
@Autowired
private WorkerConfig workerConfig;
/**
* worker manager
*/
@Autowired
private WorkerManagerThread workerManagerThread;
/**
* heartbeat executor
*/
private ScheduledExecutorService heartBeatExecutor;
@Autowired
private RegistryClient registryClient;
@Autowired
private WorkerConnectStrategy workerConnectStrategy;
/**
* worker startup time, ms
*/
private long startupTime;
private WorkerHeartBeatTask workerHeartBeatTask;
private Set<String> workerGroups;
@PostConstruct
public void initWorkRegistry() {
this.workerGroups = workerConfig.getGroups();
this.startupTime = System.currentTimeMillis();
this.heartBeatExecutor =
Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor"));
this.workerHeartBeatTask = new WorkerHeartBeatTask(
workerConfig,
registryClient,
() -> workerManagerThread.getWaitSubmitQueueSize());
}
public void start() {
@ -107,24 +79,13 @@ public class WorkerRegistryClient implements AutoCloseable {
* registry
*/
private void registry() {
String address = NetUtils.getAddr(workerConfig.getListenPort());
Set<String> workerZkPaths = getWorkerZkPaths();
long workerHeartbeatInterval = workerConfig.getHeartbeatInterval().getSeconds();
WorkerHeartBeatTask heartBeatTask = new WorkerHeartBeatTask(startupTime,
workerConfig.getMaxCpuLoadAvg(),
workerConfig.getReservedMemory(),
workerConfig.getHostWeight(),
workerZkPaths,
registryClient,
workerConfig.getExecThreads(),
workerManagerThread.getThreadPoolQueueSize());
for (String workerZKPath : workerZkPaths) {
WorkerHeartBeat workerHeartBeat = workerHeartBeatTask.getHeartBeat();
for (String workerZKPath : workerConfig.getWorkerGroupRegistryPaths()) {
// remove before persist
registryClient.remove(workerZKPath);
registryClient.persistEphemeral(workerZKPath, heartBeatTask.getHeartBeatInfo());
logger.info("worker node : {} registry to ZK {} successfully", address, workerZKPath);
registryClient.persistEphemeral(workerZKPath, JSONUtils.toJsonString(workerHeartBeat));
log.info("Worker node: {} registry to ZK {} successfully", workerConfig.getWorkerAddress(), workerZKPath);
}
while (!registryClient.checkNodeExists(NetUtils.getHost(), NodeType.WORKER)) {
@ -134,37 +95,9 @@ public class WorkerRegistryClient implements AutoCloseable {
// sleep 1s, waiting master failover remove
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
this.heartBeatExecutor.scheduleWithFixedDelay(heartBeatTask, workerHeartbeatInterval, workerHeartbeatInterval,
TimeUnit.SECONDS);
logger.info("worker node : {} heartbeat interval {} s", address, workerHeartbeatInterval);
}
/**
* get worker path
*/
public Set<String> getWorkerZkPaths() {
Set<String> workerPaths = Sets.newHashSet();
String address = getLocalAddress();
for (String workGroup : this.workerGroups) {
StringJoiner workerPathJoiner = new StringJoiner(SINGLE_SLASH);
workerPathJoiner.add(REGISTRY_DOLPHINSCHEDULER_WORKERS);
if (Strings.isNullOrEmpty(workGroup)) {
workGroup = DEFAULT_WORKER_GROUP;
}
// trim and lower case is need
workerPathJoiner.add(workGroup.trim().toLowerCase());
workerPathJoiner.add(address);
workerPaths.add(workerPathJoiner.toString());
}
return workerPaths;
}
/**
* get local address
*/
private String getLocalAddress() {
return NetUtils.getAddr(workerConfig.getListenPort());
workerHeartBeatTask.start();
log.info("Worker node: {} registry finished", workerConfig.getWorkerAddress());
}
public void setRegistryStoppable(IStoppable stoppable) {
@ -173,12 +106,11 @@ public class WorkerRegistryClient implements AutoCloseable {
@Override
public void close() throws IOException {
if (heartBeatExecutor != null) {
heartBeatExecutor.shutdownNow();
logger.info("Heartbeat executor shutdown");
if (workerHeartBeatTask != null) {
workerHeartBeatTask.shutdown();
}
registryClient.close();
logger.info("registry client closed");
log.info("Worker registry client closed");
}
}

107
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java

@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.model.BaseHeartBeatTask;
import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import java.util.function.Supplier;
@Slf4j
public class WorkerHeartBeatTask extends BaseHeartBeatTask<WorkerHeartBeat> {
private final WorkerConfig workerConfig;
private final RegistryClient registryClient;
private final Supplier<Integer> workerWaitingTaskCount;
private final int processId;
public WorkerHeartBeatTask(@NonNull WorkerConfig workerConfig,
@NonNull RegistryClient registryClient,
@NonNull Supplier<Integer> workerWaitingTaskCount) {
super("WorkerHeartBeatTask", workerConfig.getHeartbeatInterval().toMillis());
this.workerConfig = workerConfig;
this.registryClient = registryClient;
this.workerWaitingTaskCount = workerWaitingTaskCount;
this.processId = OSUtils.getProcessID();
}
@Override
public WorkerHeartBeat getHeartBeat() {
double loadAverage = OSUtils.loadAverage();
double cpuUsage = OSUtils.cpuUsage();
int maxCpuLoadAvg = workerConfig.getMaxCpuLoadAvg();
double reservedMemory = workerConfig.getReservedMemory();
double availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize();
int execThreads = workerConfig.getExecThreads();
int workerWaitingTaskCount = this.workerWaitingTaskCount.get();
int serverStatus = getServerStatus(loadAverage, maxCpuLoadAvg, availablePhysicalMemorySize, reservedMemory, execThreads, workerWaitingTaskCount);
return WorkerHeartBeat.builder()
.startupTime(ServerLifeCycleManager.getServerStartupTime())
.reportTime(System.currentTimeMillis())
.cpuUsage(cpuUsage)
.loadAverage(loadAverage)
.availablePhysicalMemorySize(availablePhysicalMemorySize)
.maxCpuloadAvg(maxCpuLoadAvg)
.reservedMemory(reservedMemory)
.processId(processId)
.workerHostWeight(workerConfig.getHostWeight())
.workerWaitingTaskCount(this.workerWaitingTaskCount.get())
.workerExecThreadCount(workerConfig.getExecThreads())
.serverStatus(serverStatus)
.build();
}
@Override
public void writeHeartBeat(WorkerHeartBeat workerHeartBeat) {
String workerHeartBeatJson = JSONUtils.toJsonString(workerHeartBeat);
for (String workerGroupRegistryPath : workerConfig.getWorkerGroupRegistryPaths()) {
registryClient.persistEphemeral(workerGroupRegistryPath, workerHeartBeatJson);
}
log.info("Success write worker group heartBeatInfo into registry, workGroupPath: {} workerHeartBeatInfo: {}",
workerConfig.getWorkerGroupRegistryPaths(), workerHeartBeatJson);
}
public int getServerStatus(double loadAverage,
double maxCpuloadAvg,
double availablePhysicalMemorySize,
double reservedMemory,
int workerExecThreadCount,
int workerWaitingTaskCount) {
if (loadAverage > maxCpuloadAvg || availablePhysicalMemorySize < reservedMemory) {
log.warn("current cpu load average {} is too high or available memory {}G is too low, under max.cpuload.avg={} and reserved.memory={}G",
loadAverage, availablePhysicalMemorySize, maxCpuloadAvg, reservedMemory);
return Constants.ABNORMAL_NODE_STATUS;
} else if (workerWaitingTaskCount > workerExecThreadCount) {
log.warn("current waiting task count {} is large than worker thread count {}, worker is busy", workerWaitingTaskCount, workerExecThreadCount);
return Constants.BUSY_NODE_STATUE;
} else {
return Constants.NORMAL_NODE_STATUS;
}
}
}
Loading…
Cancel
Save