|
|
|
@ -20,11 +20,11 @@ package org.apache.dolphinscheduler.server.master.dispatch.host;
|
|
|
|
|
import org.apache.dolphinscheduler.common.Constants; |
|
|
|
|
import org.apache.dolphinscheduler.common.utils.HeartBeat; |
|
|
|
|
import org.apache.dolphinscheduler.remote.utils.Host; |
|
|
|
|
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; |
|
|
|
|
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; |
|
|
|
|
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWeight; |
|
|
|
|
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWorker; |
|
|
|
|
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.LowerWeightRoundRobin; |
|
|
|
|
import org.apache.dolphinscheduler.server.master.registry.WorkerInfoChangeListener; |
|
|
|
|
import org.apache.dolphinscheduler.spi.utils.StringUtils; |
|
|
|
|
|
|
|
|
|
import org.apache.commons.collections.CollectionUtils; |
|
|
|
@ -36,14 +36,10 @@ import java.util.Map;
|
|
|
|
|
import java.util.Optional; |
|
|
|
|
import java.util.Set; |
|
|
|
|
import java.util.concurrent.ConcurrentHashMap; |
|
|
|
|
import java.util.concurrent.Executors; |
|
|
|
|
import java.util.concurrent.ScheduledExecutorService; |
|
|
|
|
import java.util.concurrent.TimeUnit; |
|
|
|
|
import java.util.concurrent.locks.Lock; |
|
|
|
|
import java.util.concurrent.locks.ReentrantLock; |
|
|
|
|
|
|
|
|
|
import javax.annotation.PostConstruct; |
|
|
|
|
import javax.annotation.PreDestroy; |
|
|
|
|
|
|
|
|
|
import org.slf4j.Logger; |
|
|
|
|
import org.slf4j.LoggerFactory; |
|
|
|
@ -70,23 +66,12 @@ public class LowerWeightHostManager extends CommonHostManager {
|
|
|
|
|
*/ |
|
|
|
|
private Lock lock; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* executor service |
|
|
|
|
*/ |
|
|
|
|
private ScheduledExecutorService executorService; |
|
|
|
|
|
|
|
|
|
@PostConstruct |
|
|
|
|
public void init() { |
|
|
|
|
this.selector = new LowerWeightRoundRobin(); |
|
|
|
|
this.workerHostWeightsMap = new ConcurrentHashMap<>(); |
|
|
|
|
this.lock = new ReentrantLock(); |
|
|
|
|
this.executorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("LowerWeightHostManagerExecutor")); |
|
|
|
|
this.executorService.scheduleWithFixedDelay(new RefreshResourceTask(), 0, 1, TimeUnit.SECONDS); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@PreDestroy |
|
|
|
|
public void close() { |
|
|
|
|
this.executorService.shutdownNow(); |
|
|
|
|
serverNodeManager.addWorkerInfoChangeListener(new WorkerWeightListener()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
@ -109,6 +94,69 @@ public class LowerWeightHostManager extends CommonHostManager {
|
|
|
|
|
throw new UnsupportedOperationException("not support"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private class WorkerWeightListener implements WorkerInfoChangeListener { |
|
|
|
|
@Override |
|
|
|
|
public void notify(Map<String, Set<String>> workerGroups, Map<String, String> workerNodeInfo) { |
|
|
|
|
syncWorkerResources(workerGroups, workerNodeInfo); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Sync worker resource. |
|
|
|
|
* |
|
|
|
|
* @param workerGroupNodes worker group nodes, key is worker group, value is worker group nodes. |
|
|
|
|
* @param workerNodeInfoMap worker node info map, key is worker node, value is worker info. |
|
|
|
|
*/ |
|
|
|
|
private void syncWorkerResources(final Map<String, Set<String>> workerGroupNodes, |
|
|
|
|
final Map<String, String> workerNodeInfoMap) { |
|
|
|
|
try { |
|
|
|
|
Map<String, Set<HostWeight>> workerHostWeights = new HashMap<>(); |
|
|
|
|
for (Map.Entry<String, Set<String>> entry : workerGroupNodes.entrySet()) { |
|
|
|
|
String workerGroup = entry.getKey(); |
|
|
|
|
Set<String> nodes = entry.getValue(); |
|
|
|
|
Set<HostWeight> hostWeights = new HashSet<>(nodes.size()); |
|
|
|
|
for (String node : nodes) { |
|
|
|
|
String heartbeat = workerNodeInfoMap.getOrDefault(node, null); |
|
|
|
|
Optional<HostWeight> hostWeightOpt = getHostWeight(node, workerGroup, heartbeat); |
|
|
|
|
hostWeightOpt.ifPresent(hostWeights::add); |
|
|
|
|
} |
|
|
|
|
if (!hostWeights.isEmpty()) { |
|
|
|
|
workerHostWeights.put(workerGroup, hostWeights); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
syncWorkerHostWeight(workerHostWeights); |
|
|
|
|
} catch (Throwable ex) { |
|
|
|
|
logger.error("Sync worker resource error", ex); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private Optional<HostWeight> getHostWeight(String addr, String workerGroup, String heartBeatInfo) { |
|
|
|
|
if (StringUtils.isEmpty(heartBeatInfo)) { |
|
|
|
|
logger.warn("worker {} in work group {} have not received the heartbeat", addr, workerGroup); |
|
|
|
|
return Optional.empty(); |
|
|
|
|
} |
|
|
|
|
HeartBeat heartBeat = HeartBeat.decodeHeartBeat(heartBeatInfo); |
|
|
|
|
if (heartBeat == null) { |
|
|
|
|
return Optional.empty(); |
|
|
|
|
} |
|
|
|
|
if (Constants.ABNORMAL_NODE_STATUS == heartBeat.getServerStatus()) { |
|
|
|
|
logger.warn("worker {} current cpu load average {} is too high or available memory {}G is too low", |
|
|
|
|
addr, heartBeat.getLoadAverage(), heartBeat.getAvailablePhysicalMemorySize()); |
|
|
|
|
return Optional.empty(); |
|
|
|
|
} |
|
|
|
|
if (Constants.BUSY_NODE_STATUE == heartBeat.getServerStatus()) { |
|
|
|
|
logger.warn("worker {} is busy, current waiting task count {} is large than worker thread count {}", |
|
|
|
|
addr, heartBeat.getWorkerWaitingTaskCount(), heartBeat.getWorkerExecThreadCount()); |
|
|
|
|
return Optional.empty(); |
|
|
|
|
} |
|
|
|
|
return Optional.of( |
|
|
|
|
new HostWeight(HostWorker.of(addr, heartBeat.getWorkerHostWeight(), workerGroup), |
|
|
|
|
heartBeat.getCpuUsage(), heartBeat.getMemoryUsage(), heartBeat.getLoadAverage(), |
|
|
|
|
heartBeat.getStartupTime())); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private void syncWorkerHostWeight(Map<String, Set<HostWeight>> workerHostWeights) { |
|
|
|
|
lock.lock(); |
|
|
|
|
try { |
|
|
|
@ -128,58 +176,4 @@ public class LowerWeightHostManager extends CommonHostManager {
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
class RefreshResourceTask implements Runnable { |
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
public void run() { |
|
|
|
|
try { |
|
|
|
|
Map<String, Set<HostWeight>> workerHostWeights = new HashMap<>(); |
|
|
|
|
Map<String, Set<String>> workerGroupNodes = serverNodeManager.getWorkerGroupNodes(); |
|
|
|
|
for (Map.Entry<String, Set<String>> entry : workerGroupNodes.entrySet()) { |
|
|
|
|
String workerGroup = entry.getKey(); |
|
|
|
|
Set<String> nodes = entry.getValue(); |
|
|
|
|
Set<HostWeight> hostWeights = new HashSet<>(nodes.size()); |
|
|
|
|
for (String node : nodes) { |
|
|
|
|
String heartbeat = serverNodeManager.getWorkerNodeInfo(node); |
|
|
|
|
Optional<HostWeight> hostWeightOpt = getHostWeight(node, workerGroup, heartbeat); |
|
|
|
|
if (hostWeightOpt.isPresent()) { |
|
|
|
|
hostWeights.add(hostWeightOpt.get()); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if (!hostWeights.isEmpty()) { |
|
|
|
|
workerHostWeights.put(workerGroup, hostWeights); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
syncWorkerHostWeight(workerHostWeights); |
|
|
|
|
} catch (Throwable ex) { |
|
|
|
|
logger.error("RefreshResourceTask error", ex); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public Optional<HostWeight> getHostWeight(String addr, String workerGroup, String heartBeatInfo) { |
|
|
|
|
if (StringUtils.isEmpty(heartBeatInfo)) { |
|
|
|
|
logger.warn("worker {} in work group {} have not received the heartbeat", addr, workerGroup); |
|
|
|
|
return Optional.empty(); |
|
|
|
|
} |
|
|
|
|
HeartBeat heartBeat = HeartBeat.decodeHeartBeat(heartBeatInfo); |
|
|
|
|
if (heartBeat == null) { |
|
|
|
|
return Optional.empty(); |
|
|
|
|
} |
|
|
|
|
if (Constants.ABNORMAL_NODE_STATUS == heartBeat.getServerStatus()) { |
|
|
|
|
logger.warn("worker {} current cpu load average {} is too high or available memory {}G is too low", |
|
|
|
|
addr, heartBeat.getLoadAverage(), heartBeat.getAvailablePhysicalMemorySize()); |
|
|
|
|
return Optional.empty(); |
|
|
|
|
} |
|
|
|
|
if (Constants.BUSY_NODE_STATUE == heartBeat.getServerStatus()) { |
|
|
|
|
logger.warn("worker {} is busy, current waiting task count {} is large than worker thread count {}", |
|
|
|
|
addr, heartBeat.getWorkerWaitingTaskCount(), heartBeat.getWorkerExecThreadCount()); |
|
|
|
|
return Optional.empty(); |
|
|
|
|
} |
|
|
|
|
return Optional.of( |
|
|
|
|
new HostWeight(HostWorker.of(addr, heartBeat.getWorkerHostWeight(), workerGroup), |
|
|
|
|
heartBeat.getCpuUsage(), heartBeat.getMemoryUsage(), heartBeat.getLoadAverage(), |
|
|
|
|
heartBeat.getStartupTime())); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
} |
|
|
|
|