Browse Source

[Improvement][Server] Optimize log display (#5311)

pull/3/MERGE
Shiwen Cheng 4 years ago committed by GitHub
parent
commit
83519bdfc3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 12
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java
  2. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
  3. 9
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
  4. 4
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
  5. 10
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java

12
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java

@ -482,18 +482,18 @@ public class OSUtils {
/** /**
* check memory and cpu usage * check memory and cpu usage
* *
* @param systemCpuLoad systemCpuLoad * @param maxCpuloadAvg maxCpuloadAvg
* @param systemReservedMemory systemReservedMemory * @param reservedMemory reservedMemory
* @return check memory and cpu usage * @return check memory and cpu usage
*/ */
public static Boolean checkResource(double systemCpuLoad, double systemReservedMemory) { public static Boolean checkResource(double maxCpuloadAvg, double reservedMemory) {
// system load average // system load average
double loadAverage = loadAverage(); double loadAverage = loadAverage();
// system available physical memory // system available physical memory
double availablePhysicalMemorySize = availablePhysicalMemorySize(); double availablePhysicalMemorySize = availablePhysicalMemorySize();
if (loadAverage > maxCpuloadAvg || availablePhysicalMemorySize < reservedMemory) {
if (loadAverage > systemCpuLoad || availablePhysicalMemorySize < systemReservedMemory) { logger.warn("current cpu load average {} is too high or available memory {}G is too low, under max.cpuload.avg={} and reserved.memory={}G",
logger.warn("load is too high or availablePhysicalMemorySize(G) is too low, it's availablePhysicalMemorySize(G):{},loadAvg:{}", availablePhysicalMemorySize, loadAverage); loadAverage, availablePhysicalMemorySize, maxCpuloadAvg, reservedMemory);
return false; return false;
} else { } else {
return true; return true;

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java

@ -172,7 +172,7 @@ public class TaskPriorityQueueConsumer extends Thread {
result = dispatcher.dispatch(executionContext); result = dispatcher.dispatch(executionContext);
} }
} catch (ExecuteException e) { } catch (ExecuteException e) {
logger.error("dispatch error", e); logger.error("dispatch error: {}", e.getMessage());
} }
return result; return result;
} }

9
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java

@ -26,12 +26,13 @@ import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteExce
import org.apache.dolphinscheduler.server.master.dispatch.executor.ExecutorManager; import org.apache.dolphinscheduler.server.master.dispatch.executor.ExecutorManager;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
import org.apache.dolphinscheduler.server.master.dispatch.host.HostManager; import org.apache.dolphinscheduler.server.master.dispatch.host.HostManager;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.concurrent.ConcurrentHashMap;
/** /**
* executor dispatcher * executor dispatcher
*/ */
@ -84,8 +85,8 @@ public class ExecutorDispatcher implements InitializingBean {
Host host = hostManager.select(context); Host host = hostManager.select(context);
if (StringUtils.isEmpty(host.getAddress())) { if (StringUtils.isEmpty(host.getAddress())) {
throw new ExecuteException(String.format("fail to execute : %s due to no suitable worker , " + throw new ExecuteException(String.format("fail to execute : %s due to no suitable worker, "
"current task need to %s worker group execute", + "current task needs worker group %s to execute",
context.getCommand(),context.getWorkerGroup())); context.getCommand(),context.getWorkerGroup()));
} }
context.setHost(host); context.setHost(host);

4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java

@ -158,8 +158,8 @@ public class LowerWeightHostManager extends CommonHostManager {
String[] parts = heartbeat.split(Constants.COMMA); String[] parts = heartbeat.split(Constants.COMMA);
int status = Integer.parseInt(parts[8]); int status = Integer.parseInt(parts[8]);
if (status == Constants.ABNORMAL_NODE_STATUS) { if (status == Constants.ABNORMAL_NODE_STATUS) {
logger.warn("load is too high or availablePhysicalMemorySize(G) is too low, it's availablePhysicalMemorySize(G):{},loadAvg:{}", logger.warn("worker {} current cpu load average {} is too high or available memory {}G is too low",
Double.parseDouble(parts[3]), Double.parseDouble(parts[2])); addr, Double.parseDouble(parts[2]), Double.parseDouble(parts[3]));
return null; return null;
} }
double cpu = Double.parseDouble(parts[0]); double cpu = Double.parseDouble(parts[0]);

10
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java

@ -89,14 +89,12 @@ public class HeartBeatTask implements Runnable {
} }
} }
double availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize();
double loadAverage = OSUtils.loadAverage(); double loadAverage = OSUtils.loadAverage();
double availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize();
int status = Constants.NORMAL_NODE_STATUS; int status = Constants.NORMAL_NODE_STATUS;
if (loadAverage > maxCpuloadAvg || availablePhysicalMemorySize < reservedMemory) {
if (availablePhysicalMemorySize < reservedMemory logger.warn("current cpu load average {} is too high or available memory {}G is too low, under max.cpuload.avg={} and reserved.memory={}G",
|| loadAverage > maxCpuloadAvg) { loadAverage, availablePhysicalMemorySize, maxCpuloadAvg, reservedMemory);
logger.warn("load is too high or availablePhysicalMemorySize(G) is too low, it's availablePhysicalMemorySize(G):{},loadAvg:{}", availablePhysicalMemorySize, loadAverage);
status = Constants.ABNORMAL_NODE_STATUS; status = Constants.ABNORMAL_NODE_STATUS;
} }

Loading…
Cancel
Save