diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java index af62033a74..3851f77e94 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java @@ -482,18 +482,18 @@ public class OSUtils { /** * check memory and cpu usage * - * @param systemCpuLoad systemCpuLoad - * @param systemReservedMemory systemReservedMemory + * @param maxCpuloadAvg maxCpuloadAvg + * @param reservedMemory reservedMemory * @return check memory and cpu usage */ - public static Boolean checkResource(double systemCpuLoad, double systemReservedMemory) { + public static Boolean checkResource(double maxCpuloadAvg, double reservedMemory) { // system load average double loadAverage = loadAverage(); // system available physical memory double availablePhysicalMemorySize = availablePhysicalMemorySize(); - - if (loadAverage > systemCpuLoad || availablePhysicalMemorySize < systemReservedMemory) { - logger.warn("load is too high or availablePhysicalMemorySize(G) is too low, it's availablePhysicalMemorySize(G):{},loadAvg:{}", availablePhysicalMemorySize, loadAverage); + if (loadAverage > maxCpuloadAvg || availablePhysicalMemorySize < reservedMemory) { + logger.warn("current cpu load average {} is too high or available memory {}G is too low, under max.cpuload.avg={} and reserved.memory={}G", + loadAverage, availablePhysicalMemorySize, maxCpuloadAvg, reservedMemory); return false; } else { return true; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index 69058a49b8..97729c176f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -172,7 +172,7 @@ public class TaskPriorityQueueConsumer extends Thread { result = dispatcher.dispatch(executionContext); } } catch (ExecuteException e) { - logger.error("dispatch error", e); + logger.error("dispatch error: {}", e.getMessage()); } return result; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java index 34c8c8e285..475c5b9b70 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java @@ -26,12 +26,13 @@ import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteExce import org.apache.dolphinscheduler.server.master.dispatch.executor.ExecutorManager; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; import org.apache.dolphinscheduler.server.master.dispatch.host.HostManager; + +import java.util.concurrent.ConcurrentHashMap; + import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import java.util.concurrent.ConcurrentHashMap; - /** * executor dispatcher */ @@ -84,8 +85,8 @@ public class ExecutorDispatcher implements InitializingBean { Host host = hostManager.select(context); if (StringUtils.isEmpty(host.getAddress())) { - throw new ExecuteException(String.format("fail to execute : %s due to no suitable worker , " + - "current task need to %s worker group execute", + throw new ExecuteException(String.format("fail to execute : %s due to no suitable worker, " + + "current task needs worker group %s to execute", context.getCommand(),context.getWorkerGroup())); } context.setHost(host); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java index 8b7d37c4e3..7679c2dd27 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java @@ -158,8 +158,8 @@ public class LowerWeightHostManager extends CommonHostManager { String[] parts = heartbeat.split(Constants.COMMA); int status = Integer.parseInt(parts[8]); if (status == Constants.ABNORMAL_NODE_STATUS) { - logger.warn("load is too high or availablePhysicalMemorySize(G) is too low, it's availablePhysicalMemorySize(G):{},loadAvg:{}", - Double.parseDouble(parts[3]), Double.parseDouble(parts[2])); + logger.warn("worker {} current cpu load average {} is too high or available memory {}G is too low", + addr, Double.parseDouble(parts[2]), Double.parseDouble(parts[3])); return null; } double cpu = Double.parseDouble(parts[0]); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java index 460cc9fcdf..123130286f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java @@ -89,14 +89,12 @@ public class HeartBeatTask implements Runnable { } } - double availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize(); double loadAverage = OSUtils.loadAverage(); - + double availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize(); int status = Constants.NORMAL_NODE_STATUS; - - if (availablePhysicalMemorySize < reservedMemory - || loadAverage > maxCpuloadAvg) { - logger.warn("load is too high or availablePhysicalMemorySize(G) is too low, it's availablePhysicalMemorySize(G):{},loadAvg:{}", availablePhysicalMemorySize, loadAverage); + if (loadAverage > maxCpuloadAvg || availablePhysicalMemorySize < reservedMemory) { + logger.warn("current cpu load average {} is too high or available memory {}G is too low, under max.cpuload.avg={} and reserved.memory={}G", + loadAverage, availablePhysicalMemorySize, maxCpuloadAvg, reservedMemory); status = Constants.ABNORMAL_NODE_STATUS; }