Browse Source

[Fix-9980] [Server] fix heartBeatTaskCount bug (#11232)

* fix heartBeat bug

* modify class name

* fix conflict

Co-authored-by: JinyLeeChina <jiny.li@foxmail.com>
3.1.0-release
JinYong Li 2 years ago committed by GitHub
parent
commit
3f2ca7bca3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 70
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java
  2. 32
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
  3. 18
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerHeartBeatTask.java
  4. 17
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java

70
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java

@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.registry;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.utils.HeartBeat;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Master heart beat task
*/
public class MasterHeartBeatTask implements Runnable {
private final Logger logger = LoggerFactory.getLogger(MasterHeartBeatTask.class);
private final Set<String> heartBeatPaths;
private final RegistryClient registryClient;
private final HeartBeat heartBeat;
private final AtomicInteger heartBeatErrorTimes = new AtomicInteger();
public MasterHeartBeatTask(long startupTime,
double maxCpuloadAvg,
double reservedMemory,
Set<String> heartBeatPaths,
RegistryClient registryClient) {
this.heartBeatPaths = heartBeatPaths;
this.registryClient = registryClient;
this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg, reservedMemory);
}
public String getHeartBeatInfo() {
return this.heartBeat.encodeHeartBeat();
}
@Override
public void run() {
try {
if (!ServerLifeCycleManager.isRunning()) {
return;
}
for (String heartBeatPath : heartBeatPaths) {
registryClient.persistEphemeral(heartBeatPath, heartBeat.encodeHeartBeat());
}
heartBeatErrorTimes.set(0);
} catch (Throwable ex) {
logger.error("HeartBeat task execute failed, errorTimes: {}", heartBeatErrorTimes.get(), ex);
}
}
}

32
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java

@ -17,8 +17,9 @@
package org.apache.dolphinscheduler.server.master.registry;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_NODE;
import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
@ -27,24 +28,25 @@ import org.apache.dolphinscheduler.registry.api.RegistryException;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.service.FailoverService;
import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.apache.commons.lang3.StringUtils;
import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_NODE;
import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.google.common.collect.Sets;
/**
* <p>DolphinScheduler master register client, used to connect to registry and hand the registry events.
* <p>When the Master node startup, it will register in registry center. And schedule a {@link HeartBeatTask} to update its metadata in registry.
* <p>When the Master node startup, it will register in registry center. And schedule a {@link MasterHeartBeatTask} to update its metadata in registry.
*/
@Component
public class MasterRegistryClient implements AutoCloseable {
@ -166,11 +168,11 @@ public class MasterRegistryClient implements AutoCloseable {
logger.info("Master node : {} registering to registry center", masterConfig.getMasterAddress());
String localNodePath = masterConfig.getMasterRegistryNodePath();
Duration masterHeartbeatInterval = masterConfig.getHeartbeatInterval();
HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
masterConfig.getMaxCpuLoadAvg(),
masterConfig.getReservedMemory(),
Sets.newHashSet(localNodePath),
registryClient);
MasterHeartBeatTask heartBeatTask = new MasterHeartBeatTask(startupTime,
masterConfig.getMaxCpuLoadAvg(),
masterConfig.getReservedMemory(),
Sets.newHashSet(localNodePath),
registryClient);
// remove before persist
registryClient.remove(localNodePath);

18
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java → dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerHeartBeatTask.java

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.registry;
package org.apache.dolphinscheduler.server.worker.registry;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.utils.HeartBeat;
@ -29,9 +29,9 @@ import java.util.concurrent.atomic.AtomicInteger;
/**
* Heart beat task
*/
public class HeartBeatTask implements Runnable {
public class WorkerHeartBeatTask implements Runnable {
private final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class);
private final Logger logger = LoggerFactory.getLogger(WorkerHeartBeatTask.class);
private final Set<String> heartBeatPaths;
private final RegistryClient registryClient;
@ -40,17 +40,7 @@ public class HeartBeatTask implements Runnable {
private final AtomicInteger heartBeatErrorTimes = new AtomicInteger();
public HeartBeatTask(long startupTime,
double maxCpuloadAvg,
double reservedMemory,
Set<String> heartBeatPaths,
RegistryClient registryClient) {
this.heartBeatPaths = heartBeatPaths;
this.registryClient = registryClient;
this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg, reservedMemory);
}
public HeartBeatTask(long startupTime,
public WorkerHeartBeatTask(long startupTime,
double maxCpuloadAvg,
double reservedMemory,
int hostWeight,

17
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java

@ -26,7 +26,6 @@ import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.registry.api.RegistryException;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
@ -112,14 +111,14 @@ public class WorkerRegistryClient implements AutoCloseable {
Set<String> workerZkPaths = getWorkerZkPaths();
long workerHeartbeatInterval = workerConfig.getHeartbeatInterval().getSeconds();
HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
workerConfig.getMaxCpuLoadAvg(),
workerConfig.getReservedMemory(),
workerConfig.getHostWeight(),
workerZkPaths,
registryClient,
workerConfig.getExecThreads(),
workerManagerThread.getThreadPoolQueueSize());
WorkerHeartBeatTask heartBeatTask = new WorkerHeartBeatTask(startupTime,
workerConfig.getMaxCpuLoadAvg(),
workerConfig.getReservedMemory(),
workerConfig.getHostWeight(),
workerZkPaths,
registryClient,
workerConfig.getExecThreads(),
workerManagerThread.getThreadPoolQueueSize());
for (String workerZKPath : workerZkPaths) {
// remove before persist

Loading…
Cancel
Save