From 67e7f88d8b7f65696d36fcb3c85c2d0b7836cae8 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Wed, 31 Aug 2022 16:20:23 +0800 Subject: [PATCH] Refactor heart beat task, use json to serialize/deserialize (#11702) * Refactor heart beat task, use json to serialize/deserialize --- .../service/impl/WorkerGroupServiceImpl.java | 10 +- .../common/model/BaseHeartBeatTask.java | 81 ++++++ .../common/model/HeartBeat.java | 21 ++ .../common/model/MasterHeartBeat.java | 39 +++ .../dolphinscheduler/common/model/Server.java | 92 +----- .../common/model/WorkerHeartBeat.java | 47 ++++ .../common/utils/HeartBeat.java | 261 ------------------ .../common/utils/JSONUtils.java | 46 ++- .../common/utils/HeartBeatTest.java | 77 ------ .../server/master/config/MasterConfig.java | 7 +- .../dispatch/host/CommonHostManager.java | 24 +- .../dispatch/host/LowerWeightHostManager.java | 39 ++- .../master/registry/MasterHeartBeatTask.java | 70 ----- .../master/registry/MasterRegistryClient.java | 56 ++-- .../master/registry/ServerNodeManager.java | 160 +++++------ .../registry/WorkerInfoChangeListener.java | 4 +- .../master/task/MasterHeartBeatTask.java | 71 +++++ .../host/RoundRobinHostManagerTest.java | 2 + .../registry/MasterRegistryClientTest.java | 8 + .../service/registry/RegistryClient.java | 36 ++- .../server/worker/WorkerServer.java | 4 + .../server/worker/config/WorkerConfig.java | 18 ++ .../worker/registry/WorkerHeartBeatTask.java | 79 ------ .../worker/registry/WorkerRegistryClient.java | 110 ++------ .../worker/task/WorkerHeartBeatTask.java | 107 +++++++ 25 files changed, 583 insertions(+), 886 deletions(-) create mode 100644 dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/BaseHeartBeatTask.java create mode 100644 dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/HeartBeat.java create mode 100644 dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java create mode 100644 dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerHeartBeat.java delete mode 100644 dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HeartBeat.java delete mode 100644 dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/HeartBeatTest.java delete mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/task/MasterHeartBeatTask.java delete mode 100644 dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerHeartBeatTask.java create mode 100644 dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java index eee031942c..732ecd8e7b 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java @@ -28,7 +28,9 @@ import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.AuthorizationType; import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.enums.UserType; -import org.apache.dolphinscheduler.common.utils.HeartBeat; +import org.apache.dolphinscheduler.common.model.HeartBeat; +import org.apache.dolphinscheduler.common.model.WorkerHeartBeat; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.WorkerGroup; @@ -318,9 +320,9 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro wg.setName(workerGroup); if (isPaging) { String registeredValue = registryClient.get(workerGroupPath + Constants.SINGLE_SLASH + childrenNodes.iterator().next()); - HeartBeat heartBeat = HeartBeat.decodeHeartBeat(registeredValue); - wg.setCreateTime(new Date(heartBeat.getStartupTime())); - wg.setUpdateTime(new Date(heartBeat.getReportTime())); + WorkerHeartBeat workerHeartBeat = JSONUtils.parseObject(registeredValue, WorkerHeartBeat.class); + wg.setCreateTime(new Date(workerHeartBeat.getStartupTime())); + wg.setUpdateTime(new Date(workerHeartBeat.getReportTime())); wg.setSystemDefault(true); if (workerGroupsMap != null && workerGroupsMap.containsKey(workerGroup)) { wg.setDescription(workerGroupsMap.get(workerGroup).getDescription()); diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/BaseHeartBeatTask.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/BaseHeartBeatTask.java new file mode 100644 index 0000000000..557f7ec86b --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/BaseHeartBeatTask.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.common.model; + +import lombok.extern.slf4j.Slf4j; +import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager; +import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; + +@Slf4j +public abstract class BaseHeartBeatTask extends BaseDaemonThread { + + private final String threadName; + private final long heartBeatInterval; + + protected boolean runningFlag; + + public BaseHeartBeatTask(String threadName, long heartBeatInterval) { + super(threadName); + this.threadName = threadName; + this.heartBeatInterval = heartBeatInterval; + this.runningFlag = true; + } + + @Override + public synchronized void start() { + log.info("Starting {}", threadName); + super.start(); + log.info("Started {}, heartBeatInterval: {}", threadName, heartBeatInterval); + } + + @Override + public void run() { + while (runningFlag) { + try { + if (!ServerLifeCycleManager.isRunning()) { + log.info("The current server status is {}, will not write heartBeatInfo into registry", ServerLifeCycleManager.getServerStatus()); + continue; + } + T heartBeat = getHeartBeat(); + writeHeartBeat(heartBeat); + } catch (Exception ex) { + log.error("{} task execute failed", threadName, ex); + } finally { + try { + Thread.sleep(heartBeatInterval); + } catch (InterruptedException e) { + handleInterruptException(e); + } + } + } + } + + public void shutdown() { + log.warn("{} task finished", threadName); + runningFlag = false; + } + + private void handleInterruptException(InterruptedException ex) { + log.warn("{} has been interrupted", threadName, ex); + Thread.currentThread().interrupt(); + } + + public abstract T getHeartBeat(); + + public abstract void writeHeartBeat(T heartBeat); +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/HeartBeat.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/HeartBeat.java new file mode 100644 index 0000000000..1a1d1610e4 --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/HeartBeat.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.common.model; + +public interface HeartBeat { +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java new file mode 100644 index 0000000000..95ece3522e --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.common.model; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class MasterHeartBeat implements HeartBeat { + private long startupTime; + private long reportTime; + private double cpuUsage; + private double memoryUsage; + private double loadAverage; + private double availablePhysicalMemorySize; + private double maxCpuloadAvg; + private double reservedMemory; + private int processId; +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/Server.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/Server.java index 4bd4648e93..743979eed7 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/Server.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/Server.java @@ -17,31 +17,19 @@ package org.apache.dolphinscheduler.common.model; +import lombok.Data; + import java.util.Date; -/** - * server - */ +@Data public class Server { - /** - * id - */ private int id; - /** - * host - */ private String host; - /** - * port - */ private int port; - /** - * master directory in zookeeper - */ private String zkDirectory; /** @@ -49,82 +37,8 @@ public class Server { */ private String resInfo; - /** - * create time - */ private Date createTime; - /** - * laster heart beat time - */ private Date lastHeartbeatTime; - public int getId() { - return id; - } - - public void setId(int id) { - this.id = id; - } - - public String getHost() { - return host; - } - - public void setHost(String host) { - this.host = host; - } - - public int getPort() { - return port; - } - - public void setPort(int port) { - this.port = port; - } - - public Date getCreateTime() { - return createTime; - } - - public void setCreateTime(Date createTime) { - this.createTime = createTime; - } - - public String getZkDirectory() { - return zkDirectory; - } - - public void setZkDirectory(String zkDirectory) { - this.zkDirectory = zkDirectory; - } - - public Date getLastHeartbeatTime() { - return lastHeartbeatTime; - } - - public void setLastHeartbeatTime(Date lastHeartbeatTime) { - this.lastHeartbeatTime = lastHeartbeatTime; - } - - public String getResInfo() { - return resInfo; - } - - public void setResInfo(String resInfo) { - this.resInfo = resInfo; - } - - @Override - public String toString() { - return "MasterServer{" + - "id=" + id + - ", host='" + host + '\'' + - ", port=" + port + - ", zkDirectory='" + zkDirectory + '\'' + - ", resInfo='" + resInfo + '\'' + - ", createTime=" + createTime + - ", lastHeartbeatTime=" + lastHeartbeatTime + - '}'; - } } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerHeartBeat.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerHeartBeat.java new file mode 100644 index 0000000000..4bb765d180 --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerHeartBeat.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.common.model; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class WorkerHeartBeat implements HeartBeat { + + private long startupTime; + private long reportTime; + private double cpuUsage; + private double memoryUsage; + private double loadAverage; + private double availablePhysicalMemorySize; + private double maxCpuloadAvg; + private double reservedMemory; + private int serverStatus; + private int processId; + + private int workerHostWeight; // worker host weight + private int workerWaitingTaskCount; // worker waiting task count + private int workerExecThreadCount; // worker thread pool thread count + + +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HeartBeat.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HeartBeat.java deleted file mode 100644 index ecfa814cc6..0000000000 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HeartBeat.java +++ /dev/null @@ -1,261 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.common.utils; - -import org.apache.dolphinscheduler.common.Constants; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class HeartBeat { - - private static final Logger logger = LoggerFactory.getLogger(HeartBeat.class); - - private long startupTime; - private long reportTime; - private double cpuUsage; - private double memoryUsage; - private double loadAverage; - private double availablePhysicalMemorySize; - private double maxCpuloadAvg; - private double reservedMemory; - private int serverStatus; - private int processId; - - private int workerHostWeight; // worker host weight - private int workerWaitingTaskCount; // worker waiting task count - private int workerExecThreadCount; // worker thread pool thread count - - private double diskAvailable; - - public double getDiskAvailable() { - return diskAvailable; - } - - public void setDiskAvailable(double diskAvailable) { - this.diskAvailable = diskAvailable; - } - - public long getStartupTime() { - return startupTime; - } - - public void setStartupTime(long startupTime) { - this.startupTime = startupTime; - } - - public long getReportTime() { - return reportTime; - } - - public void setReportTime(long reportTime) { - this.reportTime = reportTime; - } - - public double getCpuUsage() { - return cpuUsage; - } - - public void setCpuUsage(double cpuUsage) { - this.cpuUsage = cpuUsage; - } - - public double getMemoryUsage() { - return memoryUsage; - } - - public void setMemoryUsage(double memoryUsage) { - this.memoryUsage = memoryUsage; - } - - public double getLoadAverage() { - return loadAverage; - } - - public void setLoadAverage(double loadAverage) { - this.loadAverage = loadAverage; - } - - public double getAvailablePhysicalMemorySize() { - return availablePhysicalMemorySize; - } - - public void setAvailablePhysicalMemorySize(double availablePhysicalMemorySize) { - this.availablePhysicalMemorySize = availablePhysicalMemorySize; - } - - public double getMaxCpuloadAvg() { - return maxCpuloadAvg; - } - - public void setMaxCpuloadAvg(double maxCpuloadAvg) { - this.maxCpuloadAvg = maxCpuloadAvg; - } - - public double getReservedMemory() { - return reservedMemory; - } - - public void setReservedMemory(double reservedMemory) { - this.reservedMemory = reservedMemory; - } - - public int getServerStatus() { - return serverStatus; - } - - public void setServerStatus(int serverStatus) { - this.serverStatus = serverStatus; - } - - public int getProcessId() { - return processId; - } - - public void setProcessId(int processId) { - this.processId = processId; - } - - public int getWorkerHostWeight() { - return workerHostWeight; - } - - public void setWorkerHostWeight(int workerHostWeight) { - this.workerHostWeight = workerHostWeight; - } - - public int getWorkerWaitingTaskCount() { - return workerWaitingTaskCount; - } - - public void setWorkerWaitingTaskCount(int workerWaitingTaskCount) { - this.workerWaitingTaskCount = workerWaitingTaskCount; - } - - public int getWorkerExecThreadCount() { - return workerExecThreadCount; - } - - public void setWorkerExecThreadCount(int workerExecThreadCount) { - this.workerExecThreadCount = workerExecThreadCount; - } - - public HeartBeat() { - this.reportTime = System.currentTimeMillis(); - this.serverStatus = Constants.NORMAL_NODE_STATUS; - } - - public HeartBeat(long startupTime, double maxCpuloadAvg, double reservedMemory) { - this.reportTime = System.currentTimeMillis(); - this.serverStatus = Constants.NORMAL_NODE_STATUS; - this.startupTime = startupTime; - this.maxCpuloadAvg = maxCpuloadAvg; - this.reservedMemory = reservedMemory; - } - - public HeartBeat(long startupTime, double maxCpuloadAvg, double reservedMemory, int hostWeight, int workerExecThreadCount) { - this.reportTime = System.currentTimeMillis(); - this.serverStatus = Constants.NORMAL_NODE_STATUS; - this.startupTime = startupTime; - this.maxCpuloadAvg = maxCpuloadAvg; - this.reservedMemory = reservedMemory; - this.workerHostWeight = hostWeight; - this.workerExecThreadCount = workerExecThreadCount; - } - - /** - * fill system info - */ - private void fillSystemInfo() { - this.cpuUsage = OSUtils.cpuUsage(); - this.loadAverage = OSUtils.loadAverage(); - this.availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize(); - this.memoryUsage = OSUtils.memoryUsage(); - this.diskAvailable = OSUtils.diskAvailable(); - this.processId = OSUtils.getProcessID(); - } - - /** - * update server state - */ - public void updateServerState() { - this.reportTime = System.currentTimeMillis(); - if (loadAverage > maxCpuloadAvg || availablePhysicalMemorySize < reservedMemory) { - logger.warn("current cpu load average {} is too high or available memory {}G is too low, under max.cpuload.avg={} and reserved.memory={}G", - loadAverage, availablePhysicalMemorySize, maxCpuloadAvg, reservedMemory); - this.serverStatus = Constants.ABNORMAL_NODE_STATUS; - } else if (workerWaitingTaskCount > workerExecThreadCount) { - logger.warn("current waiting task count {} is large than worker thread count {}, worker is busy", workerWaitingTaskCount, workerExecThreadCount); - this.serverStatus = Constants.BUSY_NODE_STATUE; - } else { - this.serverStatus = Constants.NORMAL_NODE_STATUS; - } - } - - /** - * encode heartbeat - */ - public String encodeHeartBeat() { - this.fillSystemInfo(); - this.updateServerState(); - - StringBuilder builder = new StringBuilder(100); - builder.append(cpuUsage).append(Constants.COMMA); - builder.append(memoryUsage).append(Constants.COMMA); - builder.append(loadAverage).append(Constants.COMMA); - builder.append(availablePhysicalMemorySize).append(Constants.COMMA); - builder.append(maxCpuloadAvg).append(Constants.COMMA); - builder.append(reservedMemory).append(Constants.COMMA); - builder.append(startupTime).append(Constants.COMMA); - builder.append(reportTime).append(Constants.COMMA); - builder.append(serverStatus).append(Constants.COMMA); - builder.append(processId).append(Constants.COMMA); - builder.append(workerHostWeight).append(Constants.COMMA); - builder.append(workerExecThreadCount).append(Constants.COMMA); - builder.append(workerWaitingTaskCount).append(Constants.COMMA); - builder.append(diskAvailable); - - return builder.toString(); - } - - /** - * decode heartbeat - */ - public static HeartBeat decodeHeartBeat(String heartBeatInfo) { - String[] parts = heartBeatInfo.split(Constants.COMMA); - if (parts.length != Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH) { - return null; - } - HeartBeat heartBeat = new HeartBeat(); - heartBeat.cpuUsage = Double.parseDouble(parts[0]); - heartBeat.memoryUsage = Double.parseDouble(parts[1]); - heartBeat.loadAverage = Double.parseDouble(parts[2]); - heartBeat.availablePhysicalMemorySize = Double.parseDouble(parts[3]); - heartBeat.maxCpuloadAvg = Double.parseDouble(parts[4]); - heartBeat.reservedMemory = Double.parseDouble(parts[5]); - heartBeat.startupTime = Long.parseLong(parts[6]); - heartBeat.reportTime = Long.parseLong(parts[7]); - heartBeat.serverStatus = Integer.parseInt(parts[8]); - heartBeat.processId = Integer.parseInt(parts[9]); - heartBeat.workerHostWeight = Integer.parseInt(parts[10]); - heartBeat.workerExecThreadCount = Integer.parseInt(parts[11]); - heartBeat.workerWaitingTaskCount = Integer.parseInt(parts[12]); - heartBeat.diskAvailable = Double.parseDouble(parts[13]); - return heartBeat; - } -} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java index 1a09ae11ef..b94e34e510 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java @@ -17,28 +17,6 @@ package org.apache.dolphinscheduler.common.utils; -import static java.nio.charset.StandardCharsets.UTF_8; - -import static com.fasterxml.jackson.databind.DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT; -import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES; -import static com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL; -import static com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_GETTERS; - -import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.spi.utils.StringUtils; - -import java.io.IOException; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.TimeZone; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonProcessingException; @@ -55,6 +33,26 @@ import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.TextNode; import com.fasterxml.jackson.databind.type.CollectionType; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.spi.utils.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.TimeZone; + +import static com.fasterxml.jackson.databind.DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT; +import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES; +import static com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL; +import static com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_GETTERS; +import static java.nio.charset.StandardCharsets.UTF_8; /** * json utils @@ -130,7 +128,7 @@ public class JSONUtils { * @return an object of type T from the string * classOfT */ - public static T parseObject(String json, Class clazz) { + public static @Nullable T parseObject(String json, Class clazz) { if (StringUtils.isEmpty(json)) { return null; } @@ -138,7 +136,7 @@ public class JSONUtils { try { return objectMapper.readValue(json, clazz); } catch (Exception e) { - logger.error("parse object exception!", e); + logger.error("Parse object exception, jsonStr: {}, class: {}", json, clazz, e); } return null; } diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/HeartBeatTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/HeartBeatTest.java deleted file mode 100644 index c207f6bfb5..0000000000 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/HeartBeatTest.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.common.utils; - -import static org.junit.Assert.assertEquals; - -import org.apache.dolphinscheduler.common.Constants; - -import org.junit.Test; - -/** - * NetUtilsTest - */ -public class HeartBeatTest { - - @Test - public void testAbnormalState() { - long startupTime = System.currentTimeMillis(); - double loadAverage = 100; - double reservedMemory = 100; - HeartBeat heartBeat = new HeartBeat(startupTime, loadAverage, reservedMemory); - heartBeat.updateServerState(); - assertEquals(Constants.ABNORMAL_NODE_STATUS, heartBeat.getServerStatus()); - } - - @Test - public void testBusyState() { - long startupTime = System.currentTimeMillis(); - double loadAverage = 0; - double reservedMemory = 0; - int hostWeight = 1; - int taskCount = 200; - int workerThreadCount = 199; - HeartBeat heartBeat = new HeartBeat(startupTime, loadAverage, reservedMemory, hostWeight, workerThreadCount); - - heartBeat.setWorkerWaitingTaskCount(taskCount); - heartBeat.updateServerState(); - assertEquals(Constants.BUSY_NODE_STATUE, heartBeat.getServerStatus()); - } - - @Test - public void testDecodeHeartBeat() throws Exception { - String heartBeatInfo = "0.35,0.58,3.09,6.47,5.0,1.0,1634033006749,1634033006857,1,29732,1,199,200,65.86"; - HeartBeat heartBeat = HeartBeat.decodeHeartBeat(heartBeatInfo); - - double delta = 0.001; - assertEquals(0.35, heartBeat.getCpuUsage(), delta); - assertEquals(0.58, heartBeat.getMemoryUsage(), delta); - assertEquals(3.09, heartBeat.getLoadAverage(), delta); - assertEquals(6.47, heartBeat.getAvailablePhysicalMemorySize(), delta); - assertEquals(5.0, heartBeat.getMaxCpuloadAvg(), delta); - assertEquals(1.0, heartBeat.getReservedMemory(), delta); - assertEquals(1634033006749L, heartBeat.getStartupTime()); - assertEquals(1634033006857L, heartBeat.getReportTime()); - assertEquals(1, heartBeat.getServerStatus()); - assertEquals(29732, heartBeat.getProcessId()); - assertEquals(199, heartBeat.getWorkerExecThreadCount()); - assertEquals(200, heartBeat.getWorkerWaitingTaskCount()); - assertEquals(65.86, heartBeat.getDiskAvailable(), delta); - } - -} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java index 26e2cbe02a..b0426f477a 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java @@ -95,7 +95,7 @@ public class MasterConfig implements Validator { private String masterAddress; // /nodes/master/ip:listenPort - private String masterRegistryNodePath; + private String masterRegistryPath; @Override public boolean supports(Class clazz) { @@ -139,8 +139,7 @@ public class MasterConfig implements Validator { masterConfig.setMaxCpuLoadAvg(Runtime.getRuntime().availableProcessors() * 2); } masterConfig.setMasterAddress(NetUtils.getAddr(masterConfig.getListenPort())); - masterConfig - .setMasterRegistryNodePath(REGISTRY_DOLPHINSCHEDULER_MASTERS + "/" + masterConfig.getMasterAddress()); + masterConfig.setMasterRegistryPath(REGISTRY_DOLPHINSCHEDULER_MASTERS + "/" + masterConfig.getMasterAddress()); printConfig(); } @@ -161,6 +160,6 @@ public class MasterConfig implements Validator { logger.info("Master config: killYarnJobWhenTaskFailover -> {} ", killYarnJobWhenTaskFailover); logger.info("Master config: registryDisconnectStrategy -> {} ", registryDisconnectStrategy); logger.info("Master config: masterAddress -> {} ", masterAddress); - logger.info("Master config: masterRegistryNodePath -> {} ", masterRegistryNodePath); + logger.info("Master config: masterRegistryPath -> {} ", masterRegistryPath); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java index 4051068c53..bbf84a3d74 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java @@ -17,8 +17,8 @@ package org.apache.dolphinscheduler.server.master.dispatch.host; -import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.utils.HeartBeat; +import org.apache.commons.collections.CollectionUtils; +import org.apache.dolphinscheduler.common.model.WorkerHeartBeat; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; @@ -27,14 +27,13 @@ import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; +import org.springframework.beans.factory.annotation.Autowired; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Set; -import org.springframework.beans.factory.annotation.Autowired; - /** * common host manager */ @@ -80,23 +79,10 @@ public abstract class CommonHostManager implements HostManager { Set nodes = serverNodeManager.getWorkerGroupNodes(workerGroup); if (CollectionUtils.isNotEmpty(nodes)) { for (String node : nodes) { - String heartbeat = serverNodeManager.getWorkerNodeInfo(node); - int hostWeight = getWorkerHostWeightFromHeartbeat(heartbeat); - hostWorkers.add(HostWorker.of(node, hostWeight, workerGroup)); + WorkerHeartBeat workerNodeInfo = serverNodeManager.getWorkerNodeInfo(node); + hostWorkers.add(HostWorker.of(node, workerNodeInfo.getWorkerHostWeight(), workerGroup)); } } return hostWorkers; } - - protected int getWorkerHostWeightFromHeartbeat(String heartBeatInfo) { - int hostWeight = Constants.DEFAULT_WORKER_HOST_WEIGHT; - if (!StringUtils.isEmpty(heartBeatInfo)) { - HeartBeat heartBeat = HeartBeat.decodeHeartBeat(heartBeatInfo); - if (heartBeat != null) { - hostWeight = heartBeat.getWorkerHostWeight(); - } - } - return hostWeight; - } - } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java index 5002484144..3ad46ddd53 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java @@ -17,18 +17,19 @@ package org.apache.dolphinscheduler.server.master.dispatch.host; +import org.apache.commons.collections.CollectionUtils; import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.utils.HeartBeat; +import org.apache.dolphinscheduler.common.model.WorkerHeartBeat; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWeight; import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWorker; import org.apache.dolphinscheduler.server.master.dispatch.host.assign.LowerWeightRoundRobin; import org.apache.dolphinscheduler.server.master.registry.WorkerInfoChangeListener; -import org.apache.dolphinscheduler.spi.utils.StringUtils; - -import org.apache.commons.collections.CollectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import javax.annotation.PostConstruct; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -39,11 +40,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import javax.annotation.PostConstruct; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * lower weight host manager */ @@ -97,7 +93,7 @@ public class LowerWeightHostManager extends CommonHostManager { private class WorkerWeightListener implements WorkerInfoChangeListener { @Override - public void notify(Map> workerGroups, Map workerNodeInfo) { + public void notify(Map> workerGroups, Map workerNodeInfo) { syncWorkerResources(workerGroups, workerNodeInfo); } } @@ -109,7 +105,7 @@ public class LowerWeightHostManager extends CommonHostManager { * @param workerNodeInfoMap worker node info map, key is worker node, value is worker info. */ private void syncWorkerResources(final Map> workerGroupNodes, - final Map workerNodeInfoMap) { + final Map workerNodeInfoMap) { try { Map> workerHostWeights = new HashMap<>(); for (Map.Entry> entry : workerGroupNodes.entrySet()) { @@ -117,7 +113,7 @@ public class LowerWeightHostManager extends CommonHostManager { Set nodes = entry.getValue(); Set hostWeights = new HashSet<>(nodes.size()); for (String node : nodes) { - String heartbeat = workerNodeInfoMap.getOrDefault(node, null); + WorkerHeartBeat heartbeat = workerNodeInfoMap.getOrDefault(node, null); Optional hostWeightOpt = getHostWeight(node, workerGroup, heartbeat); hostWeightOpt.ifPresent(hostWeights::add); } @@ -131,13 +127,9 @@ public class LowerWeightHostManager extends CommonHostManager { } } - private Optional getHostWeight(String addr, String workerGroup, String heartBeatInfo) { - if (StringUtils.isEmpty(heartBeatInfo)) { - logger.warn("worker {} in work group {} have not received the heartbeat", addr, workerGroup); - return Optional.empty(); - } - HeartBeat heartBeat = HeartBeat.decodeHeartBeat(heartBeatInfo); + public Optional getHostWeight(String addr, String workerGroup, WorkerHeartBeat heartBeat) { if (heartBeat == null) { + logger.warn("worker {} in work group {} have not received the heartbeat", addr, workerGroup); return Optional.empty(); } if (Constants.ABNORMAL_NODE_STATUS == heartBeat.getServerStatus()) { @@ -151,12 +143,15 @@ public class LowerWeightHostManager extends CommonHostManager { return Optional.empty(); } return Optional.of( - new HostWeight(HostWorker.of(addr, heartBeat.getWorkerHostWeight(), workerGroup), - heartBeat.getCpuUsage(), heartBeat.getMemoryUsage(), heartBeat.getLoadAverage(), - heartBeat.getWorkerWaitingTaskCount(), heartBeat.getStartupTime())); + new HostWeight( + HostWorker.of(addr, heartBeat.getWorkerHostWeight(), workerGroup), + heartBeat.getCpuUsage(), + heartBeat.getMemoryUsage(), + heartBeat.getLoadAverage(), + heartBeat.getWorkerWaitingTaskCount(), + heartBeat.getStartupTime())); } - private void syncWorkerHostWeight(Map> workerHostWeights) { lock.lock(); try { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java deleted file mode 100644 index 5ca7c87f1b..0000000000 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.master.registry; - -import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager; -import org.apache.dolphinscheduler.common.utils.HeartBeat; -import org.apache.dolphinscheduler.service.registry.RegistryClient; - -import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Master heart beat task - */ -public class MasterHeartBeatTask implements Runnable { - - private final Logger logger = LoggerFactory.getLogger(MasterHeartBeatTask.class); - - private final Set heartBeatPaths; - private final RegistryClient registryClient; - private final HeartBeat heartBeat; - private final AtomicInteger heartBeatErrorTimes = new AtomicInteger(); - - public MasterHeartBeatTask(long startupTime, - double maxCpuloadAvg, - double reservedMemory, - Set heartBeatPaths, - RegistryClient registryClient) { - this.heartBeatPaths = heartBeatPaths; - this.registryClient = registryClient; - this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg, reservedMemory); - } - - public String getHeartBeatInfo() { - return this.heartBeat.encodeHeartBeat(); - } - - @Override - public void run() { - try { - if (!ServerLifeCycleManager.isRunning()) { - return; - } - for (String heartBeatPath : heartBeatPaths) { - registryClient.persistEphemeral(heartBeatPath, heartBeat.encodeHeartBeat()); - } - heartBeatErrorTimes.set(0); - } catch (Throwable ex) { - logger.error("HeartBeat task execute failed, errorTimes: {}", heartBeatErrorTimes.get(), ex); - } - } -} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java index 99a65e9fce..ac8d60e20e 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java @@ -17,36 +17,28 @@ package org.apache.dolphinscheduler.server.master.registry; -import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_NODE; -import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS; - +import org.apache.commons.lang3.StringUtils; import org.apache.dolphinscheduler.common.IStoppable; import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.thread.ThreadUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.registry.api.RegistryException; -import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.service.FailoverService; +import org.apache.dolphinscheduler.server.master.task.MasterHeartBeatTask; import org.apache.dolphinscheduler.service.registry.RegistryClient; - -import org.apache.commons.lang3.StringUtils; - -import java.time.Duration; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import com.google.common.collect.Sets; +import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_NODE; +import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS; /** *

DolphinScheduler master register client, used to connect to registry and hand the registry events. - *

When the Master node startup, it will register in registry center. And schedule a {@link MasterHeartBeatTask} to update its metadata in registry. + *

When the Master node startup, it will register in registry center. And start a {@link MasterHeartBeatTask} to update its metadata in registry. */ @Component public class MasterRegistryClient implements AutoCloseable { @@ -65,18 +57,11 @@ public class MasterRegistryClient implements AutoCloseable { @Autowired private MasterConnectStrategy masterConnectStrategy; - private ScheduledExecutorService heartBeatExecutor; - - /** - * master startup time, ms - */ - private long startupTime; + private MasterHeartBeatTask masterHeartBeatTask; public void start() { try { - this.startupTime = System.currentTimeMillis(); - this.heartBeatExecutor = - Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor")); + this.masterHeartBeatTask = new MasterHeartBeatTask(masterConfig, registryClient); // master registry registry(); registryClient.addConnectionStateListener( @@ -166,17 +151,11 @@ public class MasterRegistryClient implements AutoCloseable { */ void registry() { logger.info("Master node : {} registering to registry center", masterConfig.getMasterAddress()); - String localNodePath = masterConfig.getMasterRegistryNodePath(); - Duration masterHeartbeatInterval = masterConfig.getHeartbeatInterval(); - MasterHeartBeatTask heartBeatTask = new MasterHeartBeatTask(startupTime, - masterConfig.getMaxCpuLoadAvg(), - masterConfig.getReservedMemory(), - Sets.newHashSet(localNodePath), - registryClient); + String masterRegistryPath = masterConfig.getMasterRegistryPath(); // remove before persist - registryClient.remove(localNodePath); - registryClient.persistEphemeral(localNodePath, heartBeatTask.getHeartBeatInfo()); + registryClient.remove(masterRegistryPath); + registryClient.persistEphemeral(masterRegistryPath, JSONUtils.toJsonString(masterHeartBeatTask.getHeartBeat())); while (!registryClient.checkNodeExists(NetUtils.getHost(), NodeType.MASTER)) { logger.warn("The current master server node:{} cannot find in registry", NetUtils.getHost()); @@ -186,19 +165,18 @@ public class MasterRegistryClient implements AutoCloseable { // sleep 1s, waiting master failover remove ThreadUtils.sleep(SLEEP_TIME_MILLIS); - this.heartBeatExecutor.scheduleWithFixedDelay(heartBeatTask, 0L, masterHeartbeatInterval.getSeconds(), - TimeUnit.SECONDS); - logger.info("Master node : {} registered to registry center successfully with heartBeatInterval : {}s", - masterConfig.getMasterAddress(), masterHeartbeatInterval); + masterHeartBeatTask.start(); + logger.info("Master node : {} registered to registry center successfully", masterConfig.getMasterAddress()); } public void deregister() { try { - registryClient.remove(masterConfig.getMasterRegistryNodePath()); + registryClient.remove(masterConfig.getMasterRegistryPath()); logger.info("Master node : {} unRegistry to register center.", masterConfig.getMasterAddress()); - heartBeatExecutor.shutdown(); - logger.info("MasterServer heartbeat executor shutdown"); + if (masterHeartBeatTask != null) { + masterHeartBeatTask.shutdown(); + } registryClient.close(); } catch (Exception e) { logger.error("MasterServer remove registry path exception ", e); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java index 58df904fc5..af96da3f2f 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java @@ -17,13 +17,13 @@ package org.apache.dolphinscheduler.server.master.registry; -import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS; -import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS; - +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang.StringUtils; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.model.Server; -import org.apache.dolphinscheduler.common.utils.NetUtils; +import org.apache.dolphinscheduler.common.model.WorkerHeartBeat; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.entity.WorkerGroup; import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper; @@ -34,10 +34,13 @@ import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.service.queue.MasterPriorityQueue; import org.apache.dolphinscheduler.service.registry.RegistryClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang3.StringUtils; - +import javax.annotation.PreDestroy; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -52,14 +55,10 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; -import javax.annotation.PreDestroy; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.InitializingBean; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; +import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS; +import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS; /** * server node manager @@ -69,23 +68,19 @@ public class ServerNodeManager implements InitializingBean { private final Logger logger = LoggerFactory.getLogger(ServerNodeManager.class); - /** - * master lock - */ private final Lock masterLock = new ReentrantLock(); - /** - * worker group lock - */ - private final Lock workerGroupLock = new ReentrantLock(); + private final ReentrantReadWriteLock workerGroupLock = new ReentrantReadWriteLock(); + private final ReentrantReadWriteLock.ReadLock workerGroupReadLock = workerGroupLock.readLock(); + private final ReentrantReadWriteLock.WriteLock workerGroupWriteLock = workerGroupLock.writeLock(); - /** - * worker node info lock - */ - private final Lock workerNodeInfoLock = new ReentrantLock(); + + private final ReentrantReadWriteLock workerNodeInfoLock = new ReentrantReadWriteLock(); + private final ReentrantReadWriteLock.ReadLock workerNodeInfoReadLock = workerNodeInfoLock.readLock(); + private final ReentrantReadWriteLock.WriteLock workerNodeInfoWriteLock = workerNodeInfoLock.writeLock(); /** - * worker group nodes + * worker group nodes, workerGroup -> ips */ private final ConcurrentHashMap> workerGroupNodes = new ConcurrentHashMap<>(); @@ -94,10 +89,7 @@ public class ServerNodeManager implements InitializingBean { */ private final Set masterNodes = new HashSet<>(); - /** - * worker node info - */ - private final Map workerNodeInfo = new HashMap<>(); + private final Map workerNodeInfo = new HashMap<>(); /** * executor service @@ -108,7 +100,7 @@ public class ServerNodeManager implements InitializingBean { private RegistryClient registryClient; /** - * eg : /node/worker/group/127.0.0.1:xxx + * eg : /dolphinscheduler/node/worker/group/127.0.0.1:xxx */ private static final int WORKER_LISTENER_CHECK_LENGTH = 5; @@ -244,26 +236,29 @@ public class ServerNodeManager implements InitializingBean { final String data = event.data(); if (registryClient.isWorkerPath(path)) { try { + String[] parts = path.split("/"); + if (parts.length < WORKER_LISTENER_CHECK_LENGTH) { + throw new IllegalArgumentException(String.format("worker group path : %s is not valid, ignore", path)); + } + final String workerGroupName = parts[parts.length - 2]; + final String workerAddress = parts[parts.length - 1]; + if (type == Type.ADD) { logger.info("worker group node : {} added.", path); - String group = parseGroup(path); - Collection currentNodes = registryClient.getWorkerGroupNodesDirectly(group); + Collection currentNodes = registryClient.getWorkerGroupNodesDirectly(workerGroupName); logger.info("currentNodes : {}", currentNodes); - syncWorkerGroupNodes(group, currentNodes); + syncWorkerGroupNodes(workerGroupName, currentNodes); } else if (type == Type.REMOVE) { logger.info("worker group node : {} down.", path); - String group = parseGroup(path); - Collection currentNodes = registryClient.getWorkerGroupNodesDirectly(group); - syncWorkerGroupNodes(group, currentNodes); + Collection currentNodes = registryClient.getWorkerGroupNodesDirectly(workerGroupName); + syncWorkerGroupNodes(workerGroupName, currentNodes); alertDao.sendServerStoppedAlert(1, path, "WORKER"); } else if (type == Type.UPDATE) { logger.debug("worker group node : {} update, data: {}", path, data); - String group = parseGroup(path); - Collection currentNodes = registryClient.getWorkerGroupNodesDirectly(group); - syncWorkerGroupNodes(group, currentNodes); + Collection currentNodes = registryClient.getWorkerGroupNodesDirectly(workerGroupName); + syncWorkerGroupNodes(workerGroupName, currentNodes); - String node = parseNode(path); - syncSingleWorkerNodeInfo(node, data); + syncSingleWorkerNodeInfo(workerAddress, JSONUtils.parseObject(data, WorkerHeartBeat.class)); } notifyWorkerInfoChangeListeners(); } catch (IllegalArgumentException ex) { @@ -274,22 +269,6 @@ public class ServerNodeManager implements InitializingBean { } } - - private String parseGroup(String path) { - String[] parts = path.split("/"); - if (parts.length < WORKER_LISTENER_CHECK_LENGTH) { - throw new IllegalArgumentException(String.format("worker group path : %s is not valid, ignore", path)); - } - return parts[parts.length - 2]; - } - - private String parseNode(String path) { - String[] parts = path.split("/"); - if (parts.length < WORKER_LISTENER_CHECK_LENGTH) { - throw new IllegalArgumentException(String.format("worker group path : %s is not valid, ignore", path)); - } - return parts[parts.length - 1]; - } } class MasterDataListener implements SubscribeListener { @@ -333,20 +312,6 @@ public class ServerNodeManager implements InitializingBean { } - /** - * get master nodes - * - * @return master nodes - */ - public Set getMasterNodes() { - masterLock.lock(); - try { - return Collections.unmodifiableSet(masterNodes); - } finally { - masterLock.unlock(); - } - } - /** * sync master nodes * @@ -355,18 +320,17 @@ public class ServerNodeManager implements InitializingBean { private void syncMasterNodes(Collection nodes, List masterNodes) { masterLock.lock(); try { - String addr = NetUtils.getAddr(NetUtils.getHost(), masterConfig.getListenPort()); this.masterNodes.addAll(nodes); this.masterPriorityQueue.clear(); this.masterPriorityQueue.putList(masterNodes); - int index = masterPriorityQueue.getIndex(addr); + int index = masterPriorityQueue.getIndex(masterConfig.getMasterAddress()); if (index >= 0) { MASTER_SIZE = nodes.size(); MASTER_SLOT = index; } else { - logger.warn("current addr:{} is not in active master list", addr); + logger.warn("current addr:{} is not in active master list", masterConfig.getMasterAddress()); } - logger.info("update master nodes, master size: {}, slot: {}, addr: {}", MASTER_SIZE, MASTER_SLOT, addr); + logger.info("update master nodes, master size: {}, slot: {}, addr: {}", MASTER_SIZE, MASTER_SLOT, masterConfig.getMasterAddress()); } finally { masterLock.unlock(); } @@ -379,19 +343,24 @@ public class ServerNodeManager implements InitializingBean { * @param nodes worker nodes */ private void syncWorkerGroupNodes(String workerGroup, Collection nodes) { - workerGroupLock.lock(); + workerGroupWriteLock.lock(); try { Set workerNodes = workerGroupNodes.getOrDefault(workerGroup, new HashSet<>()); workerNodes.clear(); workerNodes.addAll(nodes); workerGroupNodes.put(workerGroup, workerNodes); } finally { - workerGroupLock.unlock(); + workerGroupWriteLock.unlock(); } } public Map> getWorkerGroupNodes() { - return Collections.unmodifiableMap(workerGroupNodes); + workerGroupReadLock.lock(); + try { + return Collections.unmodifiableMap(workerGroupNodes); + } finally { + workerGroupReadLock.unlock(); + } } /** @@ -401,7 +370,7 @@ public class ServerNodeManager implements InitializingBean { * @return worker nodes */ public Set getWorkerGroupNodes(String workerGroup) { - workerGroupLock.lock(); + workerGroupReadLock.lock(); try { if (StringUtils.isEmpty(workerGroup)) { workerGroup = Constants.DEFAULT_WORKER_GROUP; @@ -412,16 +381,11 @@ public class ServerNodeManager implements InitializingBean { } return nodes; } finally { - workerGroupLock.unlock(); + workerGroupReadLock.unlock(); } } - /** - * get worker node info - * - * @return worker node info - */ - public Map getWorkerNodeInfo() { + public Map getWorkerNodeInfo() { return Collections.unmodifiableMap(workerNodeInfo); } @@ -431,12 +395,12 @@ public class ServerNodeManager implements InitializingBean { * @param workerNode worker node * @return worker node info */ - public String getWorkerNodeInfo(String workerNode) { - workerNodeInfoLock.lock(); + public WorkerHeartBeat getWorkerNodeInfo(String workerNode) { + workerNodeInfoReadLock.lock(); try { return workerNodeInfo.getOrDefault(workerNode, null); } finally { - workerNodeInfoLock.unlock(); + workerNodeInfoReadLock.unlock(); } } @@ -446,24 +410,26 @@ public class ServerNodeManager implements InitializingBean { * @param newWorkerNodeInfo new worker node info */ private void syncAllWorkerNodeInfo(Map newWorkerNodeInfo) { - workerNodeInfoLock.lock(); + workerNodeInfoWriteLock.lock(); try { workerNodeInfo.clear(); - workerNodeInfo.putAll(newWorkerNodeInfo); + for (Map.Entry entry : newWorkerNodeInfo.entrySet()) { + workerNodeInfo.put(entry.getKey(), JSONUtils.parseObject(entry.getValue(), WorkerHeartBeat.class)); + } } finally { - workerNodeInfoLock.unlock(); + workerNodeInfoWriteLock.unlock(); } } /** * sync single worker node info */ - private void syncSingleWorkerNodeInfo(String node, String info) { - workerNodeInfoLock.lock(); + private void syncSingleWorkerNodeInfo(String node, WorkerHeartBeat info) { + workerNodeInfoWriteLock.lock(); try { workerNodeInfo.put(node, info); } finally { - workerNodeInfoLock.unlock(); + workerNodeInfoWriteLock.unlock(); } } @@ -478,7 +444,7 @@ public class ServerNodeManager implements InitializingBean { private void notifyWorkerInfoChangeListeners() { Map> workerGroupNodes = getWorkerGroupNodes(); - Map workerNodeInfo = getWorkerNodeInfo(); + Map workerNodeInfo = getWorkerNodeInfo(); for (WorkerInfoChangeListener listener : workerInfoChangeListeners) { listener.notify(workerGroupNodes, workerNodeInfo); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/WorkerInfoChangeListener.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/WorkerInfoChangeListener.java index f885a6fba0..9efc3517b8 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/WorkerInfoChangeListener.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/WorkerInfoChangeListener.java @@ -17,6 +17,8 @@ package org.apache.dolphinscheduler.server.master.registry; +import org.apache.dolphinscheduler.common.model.WorkerHeartBeat; + import java.util.Map; import java.util.Set; @@ -31,6 +33,6 @@ public interface WorkerInfoChangeListener { * @param workerGroups worker groups map, key is worker group name, value is worker address. * @param workerNodeInfo worker node info map, key is worker address, value is worker info. */ - void notify(Map> workerGroups, Map workerNodeInfo); + void notify(Map> workerGroups, Map workerNodeInfo); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/task/MasterHeartBeatTask.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/task/MasterHeartBeatTask.java new file mode 100644 index 0000000000..53b90b7370 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/task/MasterHeartBeatTask.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.task; + +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; +import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager; +import org.apache.dolphinscheduler.common.model.BaseHeartBeatTask; +import org.apache.dolphinscheduler.common.model.MasterHeartBeat; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.OSUtils; +import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.service.registry.RegistryClient; + +@Slf4j +public class MasterHeartBeatTask extends BaseHeartBeatTask { + + private final MasterConfig masterConfig; + + private final RegistryClient registryClient; + + private final String heartBeatPath; + + private final int processId; + + public MasterHeartBeatTask(@NonNull MasterConfig masterConfig, + @NonNull RegistryClient registryClient) { + super("MasterHeartBeatTask", masterConfig.getHeartbeatInterval().toMillis()); + this.masterConfig = masterConfig; + this.registryClient = registryClient; + this.heartBeatPath = masterConfig.getMasterRegistryPath(); + this.processId = OSUtils.getProcessID(); + } + + @Override + public MasterHeartBeat getHeartBeat() { + return MasterHeartBeat.builder() + .startupTime(ServerLifeCycleManager.getServerStartupTime()) + .reportTime(System.currentTimeMillis()) + .cpuUsage(OSUtils.cpuUsage()) + .loadAverage(OSUtils.loadAverage()) + .availablePhysicalMemorySize(OSUtils.availablePhysicalMemorySize()) + .maxCpuloadAvg(masterConfig.getMaxCpuLoadAvg()) + .reservedMemory(masterConfig.getReservedMemory()) + .processId(processId) + .build(); + } + + @Override + public void writeHeartBeat(MasterHeartBeat masterHeartBeat) { + String masterHeartBeatJson = JSONUtils.toJsonString(masterHeartBeat); + registryClient.persistEphemeral(heartBeatPath, masterHeartBeatJson); + log.info("Success write master heartBeatInfo into registry, masterRegistryPath: {}, heartBeatInfo: {}", + heartBeatPath, masterHeartBeatJson); + } +} diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java index c8121d494b..c0c554e43b 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.master.dispatch.host; +import org.apache.dolphinscheduler.common.model.WorkerHeartBeat; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.server.master.dispatch.ExecutionContextTestUtils; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; @@ -55,6 +56,7 @@ public class RoundRobinHostManagerTest { @Test public void testSelectWithResult() { Mockito.when(serverNodeManager.getWorkerGroupNodes("default")).thenReturn(Sets.newHashSet("192.168.1.1:22")); + Mockito.when(serverNodeManager.getWorkerNodeInfo("192.168.1.1:22")).thenReturn(new WorkerHeartBeat()); ExecutionContext context = ExecutionContextTestUtils.getExecutionContext(10000); Host host = roundRobinHostManager.select(context); Assert.assertTrue(!Strings.isNullOrEmpty(host.getAddress())); diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java index cc17d22df7..b71e267216 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java @@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.registry.api.ConnectionState; import org.apache.dolphinscheduler.server.master.cache.impl.ProcessInstanceExecCacheManagerImpl; import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.master.task.MasterHeartBeatTask; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.registry.RegistryClient; @@ -69,6 +70,12 @@ public class MasterRegistryClientTest { @Mock private ProcessService processService; + @Mock + private MasterConnectStrategy masterConnectStrategy; + + @Mock + private MasterHeartBeatTask masterHeartBeatTask; + @Mock private ProcessInstanceExecCacheManagerImpl processInstanceExecCacheManager; @@ -81,6 +88,7 @@ public class MasterRegistryClientTest { }); ReflectionTestUtils.setField(masterRegistryClient, "registryClient", registryClient); + ReflectionTestUtils.setField(masterRegistryClient, "masterHeartBeatTask", masterHeartBeatTask); ProcessInstance processInstance = new ProcessInstance(); processInstance.setId(1); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java index a6c6218fee..508ba293aa 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java @@ -19,11 +19,13 @@ package org.apache.dolphinscheduler.service.registry; import com.google.common.base.Strings; import lombok.NonNull; +import org.apache.commons.lang3.StringUtils; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.IStoppable; import org.apache.dolphinscheduler.common.enums.NodeType; +import org.apache.dolphinscheduler.common.model.MasterHeartBeat; import org.apache.dolphinscheduler.common.model.Server; -import org.apache.dolphinscheduler.common.utils.HeartBeat; +import org.apache.dolphinscheduler.common.model.WorkerHeartBeat; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.registry.api.ConnectionListener; import org.apache.dolphinscheduler.registry.api.Registry; @@ -94,21 +96,33 @@ public class RegistryClient { List serverList = new ArrayList<>(); for (Map.Entry entry : serverMaps.entrySet()) { - HeartBeat heartBeat = HeartBeat.decodeHeartBeat(entry.getValue()); - if (heartBeat == null) { + String serverPath = entry.getKey(); + String heartBeatJson = entry.getValue(); + if (StringUtils.isEmpty(heartBeatJson)) { + logger.error("The heartBeatJson is empty, serverPath: {}", serverPath); continue; } - Server server = new Server(); - server.setResInfo(JSONUtils.toJsonString(heartBeat)); - server.setCreateTime(new Date(heartBeat.getStartupTime())); - server.setLastHeartbeatTime(new Date(heartBeat.getReportTime())); - server.setId(heartBeat.getProcessId()); + switch (nodeType) { + case MASTER: + MasterHeartBeat masterHeartBeat = JSONUtils.parseObject(heartBeatJson, MasterHeartBeat.class); + server.setCreateTime(new Date(masterHeartBeat.getStartupTime())); + server.setLastHeartbeatTime(new Date(masterHeartBeat.getReportTime())); + server.setId(masterHeartBeat.getProcessId()); + break; + case WORKER: + WorkerHeartBeat workerHeartBeat = JSONUtils.parseObject(heartBeatJson, WorkerHeartBeat.class); + server.setCreateTime(new Date(workerHeartBeat.getStartupTime())); + server.setLastHeartbeatTime(new Date(workerHeartBeat.getReportTime())); + server.setId(workerHeartBeat.getProcessId()); + break; + } - String key = entry.getKey(); - server.setZkDirectory(parentPath + "/" + key); + server.setResInfo(heartBeatJson); + // todo: add host, port in heartBeat Info, so that we don't need to parse this again + server.setZkDirectory(parentPath + "/" + serverPath); // set host and port - String[] hostAndPort = key.split(COLON); + String[] hostAndPort = serverPath.split(COLON); String[] hosts = hostAndPort[0].split(DIVISION_STRING); // fetch the last one server.setHost(hosts[hosts.length - 1]); diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index f29f93eed3..555b5bc770 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.plugin.task.api.ProcessUtils; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager; +import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner; import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient; import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient; @@ -95,6 +96,9 @@ public class WorkerServer implements IStoppable { @Autowired private MessageRetryRunner messageRetryRunner; + @Autowired + private WorkerConfig workerConfig; + /** * worker server startup, not use web service * diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java index f50e6f2d03..a7e6927415 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.worker.config; import com.google.common.collect.Sets; import lombok.Data; +import org.apache.commons.collections4.CollectionUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.registry.api.ConnectStrategyProperties; import org.slf4j.Logger; @@ -31,6 +32,9 @@ import org.springframework.validation.annotation.Validated; import java.time.Duration; import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS; @Data @Validated @@ -57,6 +61,7 @@ public class WorkerConfig implements Validator { * This field doesn't need to set at config file, it will be calculated by workerIp:listenPort */ private String workerAddress; + private Set workerGroupRegistryPaths; @Override public boolean supports(Class clazz) { @@ -76,6 +81,18 @@ public class WorkerConfig implements Validator { workerConfig.setMaxCpuLoadAvg(Runtime.getRuntime().availableProcessors() * 2); } workerConfig.setWorkerAddress(NetUtils.getAddr(workerConfig.getListenPort())); + + workerConfig.setGroups(workerConfig.getGroups().stream().map(String::trim).collect(Collectors.toSet())); + if (CollectionUtils.isEmpty(workerConfig.getGroups())) { + errors.rejectValue("groups", null, "should not be empty"); + } + + Set workerRegistryPaths = workerConfig.getGroups() + .stream() + .map(workerGroup -> REGISTRY_DOLPHINSCHEDULER_WORKERS + "/" + workerGroup + "/" + workerConfig.getWorkerAddress()) + .collect(Collectors.toSet()); + + workerConfig.setWorkerGroupRegistryPaths(workerRegistryPaths); printConfig(); } @@ -93,5 +110,6 @@ public class WorkerConfig implements Validator { logger.info("Worker config: alertListenPort -> {}", alertListenPort); logger.info("Worker config: registryDisconnectStrategy -> {}", registryDisconnectStrategy); logger.info("Worker config: workerAddress -> {}", registryDisconnectStrategy); + logger.info("Worker config: workerGroupRegistryPaths: {}", workerGroupRegistryPaths); } } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerHeartBeatTask.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerHeartBeatTask.java deleted file mode 100644 index 84506753be..0000000000 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerHeartBeatTask.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.worker.registry; - -import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager; -import org.apache.dolphinscheduler.common.utils.HeartBeat; -import org.apache.dolphinscheduler.service.registry.RegistryClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * Heart beat task - */ -public class WorkerHeartBeatTask implements Runnable { - - private final Logger logger = LoggerFactory.getLogger(WorkerHeartBeatTask.class); - - private final Set heartBeatPaths; - private final RegistryClient registryClient; - private int workerWaitingTaskCount; - private final HeartBeat heartBeat; - - private final AtomicInteger heartBeatErrorTimes = new AtomicInteger(); - - public WorkerHeartBeatTask(long startupTime, - double maxCpuloadAvg, - double reservedMemory, - int hostWeight, - Set heartBeatPaths, - RegistryClient registryClient, - int workerThreadCount, - int workerWaitingTaskCount) { - this.heartBeatPaths = heartBeatPaths; - this.registryClient = registryClient; - this.workerWaitingTaskCount = workerWaitingTaskCount; - this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg, reservedMemory, hostWeight, workerThreadCount); - } - - public String getHeartBeatInfo() { - return this.heartBeat.encodeHeartBeat(); - } - - @Override - public void run() { - try { - if (!ServerLifeCycleManager.isRunning()) { - return; - } - heartBeat.setStartupTime(ServerLifeCycleManager.getServerStartupTime()); - // update waiting task count - heartBeat.setWorkerWaitingTaskCount(workerWaitingTaskCount); - - for (String heartBeatPath : heartBeatPaths) { - registryClient.persistEphemeral(heartBeatPath, heartBeat.encodeHeartBeat()); - } - heartBeatErrorTimes.set(0); - } catch (Throwable ex) { - logger.error("HeartBeat task execute failed, errorTimes: {}", heartBeatErrorTimes.get(), ex); - } - } -} diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java index 5c3f7bf507..d2147d1559 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java @@ -17,80 +17,52 @@ package org.apache.dolphinscheduler.server.worker.registry; -import com.google.common.base.Strings; -import com.google.common.collect.Sets; +import lombok.extern.slf4j.Slf4j; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.IStoppable; import org.apache.dolphinscheduler.common.enums.NodeType; +import org.apache.dolphinscheduler.common.model.WorkerHeartBeat; import org.apache.dolphinscheduler.common.thread.ThreadUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.registry.api.RegistryException; -import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; +import org.apache.dolphinscheduler.server.worker.task.WorkerHeartBeatTask; import org.apache.dolphinscheduler.service.registry.RegistryClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.io.IOException; -import java.util.Set; -import java.util.StringJoiner; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; -import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS; -import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH; + import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS; -/** - * worker registry - */ +@Slf4j @Service public class WorkerRegistryClient implements AutoCloseable { - private final Logger logger = LoggerFactory.getLogger(WorkerRegistryClient.class); - - /** - * worker config - */ @Autowired private WorkerConfig workerConfig; - /** - * worker manager - */ @Autowired private WorkerManagerThread workerManagerThread; - /** - * heartbeat executor - */ - private ScheduledExecutorService heartBeatExecutor; - @Autowired private RegistryClient registryClient; @Autowired private WorkerConnectStrategy workerConnectStrategy; - /** - * worker startup time, ms - */ - private long startupTime; + private WorkerHeartBeatTask workerHeartBeatTask; - private Set workerGroups; @PostConstruct public void initWorkRegistry() { - this.workerGroups = workerConfig.getGroups(); - this.startupTime = System.currentTimeMillis(); - this.heartBeatExecutor = - Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor")); + this.workerHeartBeatTask = new WorkerHeartBeatTask( + workerConfig, + registryClient, + () -> workerManagerThread.getWaitSubmitQueueSize()); } public void start() { @@ -107,24 +79,13 @@ public class WorkerRegistryClient implements AutoCloseable { * registry */ private void registry() { - String address = NetUtils.getAddr(workerConfig.getListenPort()); - Set workerZkPaths = getWorkerZkPaths(); - long workerHeartbeatInterval = workerConfig.getHeartbeatInterval().getSeconds(); - - WorkerHeartBeatTask heartBeatTask = new WorkerHeartBeatTask(startupTime, - workerConfig.getMaxCpuLoadAvg(), - workerConfig.getReservedMemory(), - workerConfig.getHostWeight(), - workerZkPaths, - registryClient, - workerConfig.getExecThreads(), - workerManagerThread.getThreadPoolQueueSize()); - - for (String workerZKPath : workerZkPaths) { + WorkerHeartBeat workerHeartBeat = workerHeartBeatTask.getHeartBeat(); + + for (String workerZKPath : workerConfig.getWorkerGroupRegistryPaths()) { // remove before persist registryClient.remove(workerZKPath); - registryClient.persistEphemeral(workerZKPath, heartBeatTask.getHeartBeatInfo()); - logger.info("worker node : {} registry to ZK {} successfully", address, workerZKPath); + registryClient.persistEphemeral(workerZKPath, JSONUtils.toJsonString(workerHeartBeat)); + log.info("Worker node: {} registry to ZK {} successfully", workerConfig.getWorkerAddress(), workerZKPath); } while (!registryClient.checkNodeExists(NetUtils.getHost(), NodeType.WORKER)) { @@ -134,37 +95,9 @@ public class WorkerRegistryClient implements AutoCloseable { // sleep 1s, waiting master failover remove ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); - this.heartBeatExecutor.scheduleWithFixedDelay(heartBeatTask, workerHeartbeatInterval, workerHeartbeatInterval, - TimeUnit.SECONDS); - logger.info("worker node : {} heartbeat interval {} s", address, workerHeartbeatInterval); - } - - /** - * get worker path - */ - public Set getWorkerZkPaths() { - Set workerPaths = Sets.newHashSet(); - String address = getLocalAddress(); - - for (String workGroup : this.workerGroups) { - StringJoiner workerPathJoiner = new StringJoiner(SINGLE_SLASH); - workerPathJoiner.add(REGISTRY_DOLPHINSCHEDULER_WORKERS); - if (Strings.isNullOrEmpty(workGroup)) { - workGroup = DEFAULT_WORKER_GROUP; - } - // trim and lower case is need - workerPathJoiner.add(workGroup.trim().toLowerCase()); - workerPathJoiner.add(address); - workerPaths.add(workerPathJoiner.toString()); - } - return workerPaths; - } - /** - * get local address - */ - private String getLocalAddress() { - return NetUtils.getAddr(workerConfig.getListenPort()); + workerHeartBeatTask.start(); + log.info("Worker node: {} registry finished", workerConfig.getWorkerAddress()); } public void setRegistryStoppable(IStoppable stoppable) { @@ -173,12 +106,11 @@ public class WorkerRegistryClient implements AutoCloseable { @Override public void close() throws IOException { - if (heartBeatExecutor != null) { - heartBeatExecutor.shutdownNow(); - logger.info("Heartbeat executor shutdown"); + if (workerHeartBeatTask != null) { + workerHeartBeatTask.shutdown(); } registryClient.close(); - logger.info("registry client closed"); + log.info("Worker registry client closed"); } } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java new file mode 100644 index 0000000000..672135613a --- /dev/null +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.task; + +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager; +import org.apache.dolphinscheduler.common.model.BaseHeartBeatTask; +import org.apache.dolphinscheduler.common.model.WorkerHeartBeat; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.OSUtils; +import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; +import org.apache.dolphinscheduler.service.registry.RegistryClient; + +import java.util.function.Supplier; + +@Slf4j +public class WorkerHeartBeatTask extends BaseHeartBeatTask { + + private final WorkerConfig workerConfig; + private final RegistryClient registryClient; + + private final Supplier workerWaitingTaskCount; + + private final int processId; + + public WorkerHeartBeatTask(@NonNull WorkerConfig workerConfig, + @NonNull RegistryClient registryClient, + @NonNull Supplier workerWaitingTaskCount) { + super("WorkerHeartBeatTask", workerConfig.getHeartbeatInterval().toMillis()); + this.workerConfig = workerConfig; + this.registryClient = registryClient; + this.workerWaitingTaskCount = workerWaitingTaskCount; + this.processId = OSUtils.getProcessID(); + } + + @Override + public WorkerHeartBeat getHeartBeat() { + double loadAverage = OSUtils.loadAverage(); + double cpuUsage = OSUtils.cpuUsage(); + int maxCpuLoadAvg = workerConfig.getMaxCpuLoadAvg(); + double reservedMemory = workerConfig.getReservedMemory(); + double availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize(); + int execThreads = workerConfig.getExecThreads(); + int workerWaitingTaskCount = this.workerWaitingTaskCount.get(); + int serverStatus = getServerStatus(loadAverage, maxCpuLoadAvg, availablePhysicalMemorySize, reservedMemory, execThreads, workerWaitingTaskCount); + + return WorkerHeartBeat.builder() + .startupTime(ServerLifeCycleManager.getServerStartupTime()) + .reportTime(System.currentTimeMillis()) + .cpuUsage(cpuUsage) + .loadAverage(loadAverage) + .availablePhysicalMemorySize(availablePhysicalMemorySize) + .maxCpuloadAvg(maxCpuLoadAvg) + .reservedMemory(reservedMemory) + .processId(processId) + .workerHostWeight(workerConfig.getHostWeight()) + .workerWaitingTaskCount(this.workerWaitingTaskCount.get()) + .workerExecThreadCount(workerConfig.getExecThreads()) + .serverStatus(serverStatus) + .build(); + } + + @Override + public void writeHeartBeat(WorkerHeartBeat workerHeartBeat) { + String workerHeartBeatJson = JSONUtils.toJsonString(workerHeartBeat); + for (String workerGroupRegistryPath : workerConfig.getWorkerGroupRegistryPaths()) { + registryClient.persistEphemeral(workerGroupRegistryPath, workerHeartBeatJson); + } + log.info("Success write worker group heartBeatInfo into registry, workGroupPath: {} workerHeartBeatInfo: {}", + workerConfig.getWorkerGroupRegistryPaths(), workerHeartBeatJson); + } + + public int getServerStatus(double loadAverage, + double maxCpuloadAvg, + double availablePhysicalMemorySize, + double reservedMemory, + int workerExecThreadCount, + int workerWaitingTaskCount) { + if (loadAverage > maxCpuloadAvg || availablePhysicalMemorySize < reservedMemory) { + log.warn("current cpu load average {} is too high or available memory {}G is too low, under max.cpuload.avg={} and reserved.memory={}G", + loadAverage, availablePhysicalMemorySize, maxCpuloadAvg, reservedMemory); + return Constants.ABNORMAL_NODE_STATUS; + } else if (workerWaitingTaskCount > workerExecThreadCount) { + log.warn("current waiting task count {} is large than worker thread count {}, worker is busy", workerWaitingTaskCount, workerExecThreadCount); + return Constants.BUSY_NODE_STATUE; + } else { + return Constants.NORMAL_NODE_STATUS; + } + } +}