From 3f2ca7bca3e612b2ce5d22324d22aa33fac04ea2 Mon Sep 17 00:00:00 2001 From: JinYong Li <42576980+JinyLeeChina@users.noreply.github.com> Date: Tue, 23 Aug 2022 11:30:13 +0800 Subject: [PATCH] [Fix-9980] [Server] fix heartBeatTaskCount bug (#11232) * fix heartBeat bug * modify class name * fix conflict Co-authored-by: JinyLeeChina --- .../master/registry/MasterHeartBeatTask.java | 70 +++++++++++++++++++ .../master/registry/MasterRegistryClient.java | 32 +++++---- .../worker/registry/WorkerHeartBeatTask.java | 18 ++--- .../worker/registry/WorkerRegistryClient.java | 17 +++-- 4 files changed, 99 insertions(+), 38 deletions(-) create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java rename dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java => dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerHeartBeatTask.java (81%) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java new file mode 100644 index 0000000000..5ca7c87f1b --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.registry; + +import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager; +import org.apache.dolphinscheduler.common.utils.HeartBeat; +import org.apache.dolphinscheduler.service.registry.RegistryClient; + +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Master heart beat task + */ +public class MasterHeartBeatTask implements Runnable { + + private final Logger logger = LoggerFactory.getLogger(MasterHeartBeatTask.class); + + private final Set heartBeatPaths; + private final RegistryClient registryClient; + private final HeartBeat heartBeat; + private final AtomicInteger heartBeatErrorTimes = new AtomicInteger(); + + public MasterHeartBeatTask(long startupTime, + double maxCpuloadAvg, + double reservedMemory, + Set heartBeatPaths, + RegistryClient registryClient) { + this.heartBeatPaths = heartBeatPaths; + this.registryClient = registryClient; + this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg, reservedMemory); + } + + public String getHeartBeatInfo() { + return this.heartBeat.encodeHeartBeat(); + } + + @Override + public void run() { + try { + if (!ServerLifeCycleManager.isRunning()) { + return; + } + for (String heartBeatPath : heartBeatPaths) { + registryClient.persistEphemeral(heartBeatPath, heartBeat.encodeHeartBeat()); + } + heartBeatErrorTimes.set(0); + } catch (Throwable ex) { + logger.error("HeartBeat task execute failed, errorTimes: {}", heartBeatErrorTimes.get(), ex); + } + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java index 63938bd928..99a65e9fce 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java @@ -17,8 +17,9 @@ package org.apache.dolphinscheduler.server.master.registry; -import com.google.common.collect.Sets; -import org.apache.commons.lang3.StringUtils; +import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_NODE; +import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS; + import org.apache.dolphinscheduler.common.IStoppable; import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.thread.ThreadUtils; @@ -27,24 +28,25 @@ import org.apache.dolphinscheduler.registry.api.RegistryException; import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.service.FailoverService; -import org.apache.dolphinscheduler.server.registry.HeartBeatTask; import org.apache.dolphinscheduler.service.registry.RegistryClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; + +import org.apache.commons.lang3.StringUtils; import java.time.Duration; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_NODE; -import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import com.google.common.collect.Sets; /** *

DolphinScheduler master register client, used to connect to registry and hand the registry events. - *

When the Master node startup, it will register in registry center. And schedule a {@link HeartBeatTask} to update its metadata in registry. + *

When the Master node startup, it will register in registry center. And schedule a {@link MasterHeartBeatTask} to update its metadata in registry. */ @Component public class MasterRegistryClient implements AutoCloseable { @@ -166,11 +168,11 @@ public class MasterRegistryClient implements AutoCloseable { logger.info("Master node : {} registering to registry center", masterConfig.getMasterAddress()); String localNodePath = masterConfig.getMasterRegistryNodePath(); Duration masterHeartbeatInterval = masterConfig.getHeartbeatInterval(); - HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime, - masterConfig.getMaxCpuLoadAvg(), - masterConfig.getReservedMemory(), - Sets.newHashSet(localNodePath), - registryClient); + MasterHeartBeatTask heartBeatTask = new MasterHeartBeatTask(startupTime, + masterConfig.getMaxCpuLoadAvg(), + masterConfig.getReservedMemory(), + Sets.newHashSet(localNodePath), + registryClient); // remove before persist registryClient.remove(localNodePath); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerHeartBeatTask.java similarity index 81% rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java rename to dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerHeartBeatTask.java index 495d48e342..84506753be 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerHeartBeatTask.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.server.registry; +package org.apache.dolphinscheduler.server.worker.registry; import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager; import org.apache.dolphinscheduler.common.utils.HeartBeat; @@ -29,9 +29,9 @@ import java.util.concurrent.atomic.AtomicInteger; /** * Heart beat task */ -public class HeartBeatTask implements Runnable { +public class WorkerHeartBeatTask implements Runnable { - private final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class); + private final Logger logger = LoggerFactory.getLogger(WorkerHeartBeatTask.class); private final Set heartBeatPaths; private final RegistryClient registryClient; @@ -40,17 +40,7 @@ public class HeartBeatTask implements Runnable { private final AtomicInteger heartBeatErrorTimes = new AtomicInteger(); - public HeartBeatTask(long startupTime, - double maxCpuloadAvg, - double reservedMemory, - Set heartBeatPaths, - RegistryClient registryClient) { - this.heartBeatPaths = heartBeatPaths; - this.registryClient = registryClient; - this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg, reservedMemory); - } - - public HeartBeatTask(long startupTime, + public WorkerHeartBeatTask(long startupTime, double maxCpuloadAvg, double reservedMemory, int hostWeight, diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java index 225439b33d..5c3f7bf507 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java @@ -26,7 +26,6 @@ import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.registry.api.RegistryException; import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; -import org.apache.dolphinscheduler.server.registry.HeartBeatTask; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; import org.apache.dolphinscheduler.service.registry.RegistryClient; @@ -112,14 +111,14 @@ public class WorkerRegistryClient implements AutoCloseable { Set workerZkPaths = getWorkerZkPaths(); long workerHeartbeatInterval = workerConfig.getHeartbeatInterval().getSeconds(); - HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime, - workerConfig.getMaxCpuLoadAvg(), - workerConfig.getReservedMemory(), - workerConfig.getHostWeight(), - workerZkPaths, - registryClient, - workerConfig.getExecThreads(), - workerManagerThread.getThreadPoolQueueSize()); + WorkerHeartBeatTask heartBeatTask = new WorkerHeartBeatTask(startupTime, + workerConfig.getMaxCpuLoadAvg(), + workerConfig.getReservedMemory(), + workerConfig.getHostWeight(), + workerZkPaths, + registryClient, + workerConfig.getExecThreads(), + workerManagerThread.getThreadPoolQueueSize()); for (String workerZKPath : workerZkPaths) { // remove before persist