diff --git a/deploy/kubernetes/dolphinscheduler/README.md b/deploy/kubernetes/dolphinscheduler/README.md index b2e9974447..6c04c4085d 100644 --- a/deploy/kubernetes/dolphinscheduler/README.md +++ b/deploy/kubernetes/dolphinscheduler/README.md @@ -196,11 +196,14 @@ Please refer to the [Quick Start in Kubernetes](../../../docs/docs/en/guide/inst | master.env.MASTER_EXEC_THREADS | string | `"100"` | Master execute thread number to limit process instances | | master.env.MASTER_FAILOVER_INTERVAL | string | `"10m"` | Master failover interval, the unit is minute | | master.env.MASTER_HEARTBEAT_ERROR_THRESHOLD | string | `"5"` | Master heartbeat error threshold | -| master.env.MASTER_HEARTBEAT_INTERVAL | string | `"10s"` | Master heartbeat interval, the unit is second | | master.env.MASTER_HOST_SELECTOR | string | `"LowerWeight"` | Master host selector to select a suitable worker, optional values include Random, RoundRobin, LowerWeight | | master.env.MASTER_KILL_APPLICATION_WHEN_HANDLE_FAILOVER | string | `"true"` | Master kill application when handle failover | -| master.env.MASTER_MAX_CPU_LOAD_AVG | string | `"1"` | Master max cpuload avg, only higher than the system cpu load average, master server can schedule | -| master.env.MASTER_RESERVED_MEMORY | string | `"0.3"` | Master reserved memory, only lower than system available memory, master server can schedule, the unit is G | +| master.env.MASTER_MAX_HEARTBEAT_INTERVAL | string | `"10s"` | Master max heartbeat interval | +| master.env.MASTER_SERVER_LOAD_PROTECTION_ENABLED | bool | `false` | If set true, will open master overload protection | +| master.env.MASTER_SERVER_LOAD_PROTECTION_MAX_CPU_USAGE_PERCENTAGE_THRESHOLDS | float | `0.7` | Master max cpu usage, when the master's cpu usage is smaller then this value, master server can execute workflow. | +| master.env.MASTER_SERVER_LOAD_PROTECTION_MAX_DISK_USAGE_PERCENTAGE_THRESHOLDS | float | `0.7` | Master max disk usage , when the master's disk usage is smaller then this value, master server can execute workflow. | +| master.env.MASTER_SERVER_LOAD_PROTECTION_MAX_JVM_MEMORY_USAGE_PERCENTAGE_THRESHOLDS | float | `0.7` | Master max JVM memory usage , when the master's jvm memory usage is smaller then this value, master server can execute workflow. | +| master.env.MASTER_SERVER_LOAD_PROTECTION_MAX_SYSTEM_MEMORY_USAGE_PERCENTAGE_THRESHOLDS | float | `0.7` | Master max System memory usage , when the master's system memory usage is smaller then this value, master server can execute workflow. | | master.env.MASTER_STATE_WHEEL_INTERVAL | string | `"5s"` | master state wheel interval, the unit is second | | master.env.MASTER_TASK_COMMIT_INTERVAL | string | `"1s"` | master commit task interval, the unit is second | | master.env.MASTER_TASK_COMMIT_RETRYTIMES | string | `"5"` | Master commit task retry times | @@ -293,12 +296,17 @@ Please refer to the [Quick Start in Kubernetes](../../../docs/docs/en/guide/inst | worker.affinity | object | `{}` | Affinity is a group of affinity scheduling rules. If specified, the pod's scheduling constraints. More info: [node-affinity](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/#node-affinity) | | worker.annotations | object | `{}` | You can use annotations to attach arbitrary non-identifying metadata to objects. Clients such as tools and libraries can retrieve this metadata. | | worker.enabled | bool | `true` | Enable or disable the Worker component | +| worker.env.DEFAULT_TENANT_ENABLED | bool | `false` | If set true, will use worker bootstrap user as the tenant to execute task when the tenant is `default`; | | worker.env.WORKER_EXEC_THREADS | string | `"100"` | Worker execute thread number to limit task instances | -| worker.env.WORKER_HEARTBEAT_INTERVAL | string | `"10s"` | Worker heartbeat interval, the unit is second | -| worker.env.WORKER_HEART_ERROR_THRESHOLD | string | `"5"` | Worker heartbeat error threshold | | worker.env.WORKER_HOST_WEIGHT | string | `"100"` | Worker host weight to dispatch tasks | -| worker.env.WORKER_MAX_CPU_LOAD_AVG | string | `"1"` | Worker max cpu load avg, only higher than the system cpu load average, worker server can be dispatched tasks | -| worker.env.WORKER_RESERVED_MEMORY | string | `"0.3"` | Worker reserved memory, only lower than system available memory, worker server can be dispatched tasks, the unit is G | +| worker.env.WORKER_MAX_HEARTBEAT_INTERVAL | string | `"10s"` | Worker heartbeat interval | +| worker.env.WORKER_SERVER_LOAD_PROTECTION_ENABLED | bool | `false` | If set true, will open worker overload protection | +| worker.env.WORKER_SERVER_LOAD_PROTECTION_MAX_CPU_USAGE_PERCENTAGE_THRESHOLDS | float | `0.7` | Worker max cpu usage, when the worker's cpu usage is smaller then this value, worker server can be dispatched tasks. | +| worker.env.WORKER_SERVER_LOAD_PROTECTION_MAX_DISK_USAGE_PERCENTAGE_THRESHOLDS | float | `0.7` | Worker max disk usage , when the worker's disk usage is smaller then this value, worker server can be dispatched tasks. | +| worker.env.WORKER_SERVER_LOAD_PROTECTION_MAX_JVM_MEMORY_USAGE_PERCENTAGE_THRESHOLDS | float | `0.7` | Worker max jvm memory usage , when the worker's jvm memory usage is smaller then this value, worker server can be dispatched tasks. | +| worker.env.WORKER_SERVER_LOAD_PROTECTION_MAX_SYSTEM_MEMORY_USAGE_PERCENTAGE_THRESHOLDS | float | `0.7` | Worker max memory usage , when the worker's memory usage is smaller then this value, worker server can be dispatched tasks. | +| worker.env.WORKER_TENANT_CONFIG_AUTO_CREATE_TENANT_ENABLED | bool | `true` | tenant corresponds to the user of the system, which is used by the worker to submit the job. If system does not have this user, it will be automatically created after the parameter worker.tenant.auto.create is true. | +| worker.env.WORKER_TENANT_CONFIG_DISTRIBUTED_TENANT | bool | `false` | Scenes to be used for distributed users. For example, users created by FreeIpa are stored in LDAP. This parameter only applies to Linux, When this parameter is true, worker.tenant.auto.create has no effect and will not automatically create tenants. | | worker.keda.advanced | object | `{}` | Specify HPA related options | | worker.keda.cooldownPeriod | int | `30` | How many seconds KEDA will wait before scaling to zero. Note that HPA has a separate cooldown period for scale-downs | | worker.keda.enabled | bool | `false` | Enable or disable the Keda component | diff --git a/deploy/kubernetes/dolphinscheduler/values.yaml b/deploy/kubernetes/dolphinscheduler/values.yaml index cea0e616db..6effdf15ac 100644 --- a/deploy/kubernetes/dolphinscheduler/values.yaml +++ b/deploy/kubernetes/dolphinscheduler/values.yaml @@ -496,8 +496,8 @@ master: MASTER_DISPATCH_TASK_NUM: "3" # -- Master host selector to select a suitable worker, optional values include Random, RoundRobin, LowerWeight MASTER_HOST_SELECTOR: "LowerWeight" - # -- Master heartbeat interval, the unit is second - MASTER_HEARTBEAT_INTERVAL: "10s" + # -- Master max heartbeat interval + MASTER_MAX_HEARTBEAT_INTERVAL: "10s" # -- Master heartbeat error threshold MASTER_HEARTBEAT_ERROR_THRESHOLD: "5" # -- Master commit task retry times @@ -506,10 +506,16 @@ master: MASTER_TASK_COMMIT_INTERVAL: "1s" # -- master state wheel interval, the unit is second MASTER_STATE_WHEEL_INTERVAL: "5s" - # -- Master max cpuload avg, only higher than the system cpu load average, master server can schedule - MASTER_MAX_CPU_LOAD_AVG: "1" - # -- Master reserved memory, only lower than system available memory, master server can schedule, the unit is G - MASTER_RESERVED_MEMORY: "0.3" + # -- If set true, will open master overload protection + MASTER_SERVER_LOAD_PROTECTION_ENABLED: false + # -- Master max cpu usage, when the master's cpu usage is smaller then this value, master server can execute workflow. + MASTER_SERVER_LOAD_PROTECTION_MAX_CPU_USAGE_PERCENTAGE_THRESHOLDS: 0.7 + # -- Master max JVM memory usage , when the master's jvm memory usage is smaller then this value, master server can execute workflow. + MASTER_SERVER_LOAD_PROTECTION_MAX_JVM_MEMORY_USAGE_PERCENTAGE_THRESHOLDS: 0.7 + # -- Master max System memory usage , when the master's system memory usage is smaller then this value, master server can execute workflow. + MASTER_SERVER_LOAD_PROTECTION_MAX_SYSTEM_MEMORY_USAGE_PERCENTAGE_THRESHOLDS: 0.7 + # -- Master max disk usage , when the master's disk usage is smaller then this value, master server can execute workflow. + MASTER_SERVER_LOAD_PROTECTION_MAX_DISK_USAGE_PERCENTAGE_THRESHOLDS: 0.7 # -- Master failover interval, the unit is minute MASTER_FAILOVER_INTERVAL: "10m" # -- Master kill application when handle failover @@ -621,18 +627,29 @@ worker: # -- `PersistentVolumeClaim` size storage: "20Gi" env: - # -- Worker max cpu load avg, only higher than the system cpu load average, worker server can be dispatched tasks - WORKER_MAX_CPU_LOAD_AVG: "1" - # -- Worker reserved memory, only lower than system available memory, worker server can be dispatched tasks, the unit is G - WORKER_RESERVED_MEMORY: "0.3" + # -- If set true, will open worker overload protection + WORKER_SERVER_LOAD_PROTECTION_ENABLED: false + # -- Worker max cpu usage, when the worker's cpu usage is smaller then this value, worker server can be dispatched tasks. + WORKER_SERVER_LOAD_PROTECTION_MAX_CPU_USAGE_PERCENTAGE_THRESHOLDS: 0.7 + # -- Worker max jvm memory usage , when the worker's jvm memory usage is smaller then this value, worker server can be dispatched tasks. + WORKER_SERVER_LOAD_PROTECTION_MAX_JVM_MEMORY_USAGE_PERCENTAGE_THRESHOLDS: 0.7 + # -- Worker max memory usage , when the worker's memory usage is smaller then this value, worker server can be dispatched tasks. + WORKER_SERVER_LOAD_PROTECTION_MAX_SYSTEM_MEMORY_USAGE_PERCENTAGE_THRESHOLDS: 0.7 + # -- Worker max disk usage , when the worker's disk usage is smaller then this value, worker server can be dispatched tasks. + WORKER_SERVER_LOAD_PROTECTION_MAX_DISK_USAGE_PERCENTAGE_THRESHOLDS: 0.7 # -- Worker execute thread number to limit task instances WORKER_EXEC_THREADS: "100" - # -- Worker heartbeat interval, the unit is second - WORKER_HEARTBEAT_INTERVAL: "10s" - # -- Worker heartbeat error threshold - WORKER_HEART_ERROR_THRESHOLD: "5" + # -- Worker heartbeat interval + WORKER_MAX_HEARTBEAT_INTERVAL: "10s" # -- Worker host weight to dispatch tasks WORKER_HOST_WEIGHT: "100" + # -- tenant corresponds to the user of the system, which is used by the worker to submit the job. If system does not have this user, it will be automatically created after the parameter worker.tenant.auto.create is true. + WORKER_TENANT_CONFIG_AUTO_CREATE_TENANT_ENABLED: true + # -- Scenes to be used for distributed users. For example, users created by FreeIpa are stored in LDAP. This parameter only applies to Linux, When this parameter is true, worker.tenant.auto.create has no effect and will not automatically create tenants. + WORKER_TENANT_CONFIG_DISTRIBUTED_TENANT: false + # -- If set true, will use worker bootstrap user as the tenant to execute task when the tenant is `default`; + DEFAULT_TENANT_ENABLED: false + keda: # -- Enable or disable the Keda component enabled: false diff --git a/docs/docs/en/architecture/configuration.md b/docs/docs/en/architecture/configuration.md index 01f925055f..f4ab1435d5 100644 --- a/docs/docs/en/architecture/configuration.md +++ b/docs/docs/en/architecture/configuration.md @@ -279,46 +279,50 @@ Location: `api-server/conf/application.yaml` Location: `master-server/conf/application.yaml` -| Parameters | Default value | Description | -|------------------------------------------------------|---------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| master.listen-port | 5678 | master listen port | -| master.fetch-command-num | 10 | the number of commands fetched by master | -| master.pre-exec-threads | 10 | master prepare execute thread number to limit handle commands in parallel | -| master.exec-threads | 100 | master execute thread number to limit process instances in parallel | -| master.dispatch-task-number | 3 | master dispatch task number per batch | -| master.host-selector | lower_weight | master host selector to select a suitable worker, default value: LowerWeight. Optional values include random, round_robin, lower_weight | -| master.heartbeat-interval | 10 | master heartbeat interval, the unit is second | -| master.task-commit-retry-times | 5 | master commit task retry times | -| master.task-commit-interval | 1000 | master commit task interval, the unit is millisecond | -| master.state-wheel-interval | 5 | time to check status | -| master.max-cpu-load-avg | 1 | master max cpuload avg percentage, only higher than the system cpu load average, master server can schedule. default value 1: will use 100% cpu | -| master.reserved-memory | 0.3 | master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, only the available memory is higher than 30%, master server can schedule. | -| master.failover-interval | 10 | failover interval, the unit is minute | -| master.kill-application-when-task-failover | true | whether to kill yarn/k8s application when failover taskInstance | -| master.registry-disconnect-strategy.strategy | stop | Used when the master disconnect from registry, default value: stop. Optional values include stop, waiting | -| master.registry-disconnect-strategy.max-waiting-time | 100s | Used when the master disconnect from registry, and the disconnect strategy is waiting, this config means the master will waiting to reconnect to registry in given times, and after the waiting times, if the master still cannot connect to registry, will stop itself, if the value is 0s, the Master will wait infinitely | -| master.worker-group-refresh-interval | 10s | The interval to refresh worker group from db to memory | +| Parameters | Default value | Description | +|-----------------------------------------------------------------------------|---------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| master.listen-port | 5678 | master listen port | +| master.fetch-command-num | 10 | the number of commands fetched by master | +| master.pre-exec-threads | 10 | master prepare execute thread number to limit handle commands in parallel | +| master.exec-threads | 100 | master execute thread number to limit process instances in parallel | +| master.dispatch-task-number | 3 | master dispatch task number per batch | +| master.host-selector | lower_weight | master host selector to select a suitable worker, default value: LowerWeight. Optional values include random, round_robin, lower_weight | +| master.max-heartbeat-interval | 10s | master max heartbeat interval | +| master.task-commit-retry-times | 5 | master commit task retry times | +| master.task-commit-interval | 1000 | master commit task interval, the unit is millisecond | +| master.state-wheel-interval | 5 | time to check status | +| master.server-load-protection.enabled | true | If set true, will open master overload protection | +| master.server-load-protection.max-cpu-usage-percentage-thresholds | 0.7 | Master max cpu usage, when the master's cpu usage is smaller then this value, master server can execute workflow. | +| master.server-load-protection.max-jvm-memory-usage-percentage-thresholds | 0.7 | Master max JVM memory usage , when the master's jvm memory usage is smaller then this value, master server can execute workflow. | +| master.server-load-protection.max-system-memory-usage-percentage-thresholds | 0.7 | Master max System memory usage , when the master's system memory usage is smaller then this value, master server can execute workflow. | +| master.server-load-protection.max-disk-usage-percentage-thresholds | 0.7 | Master max disk usage , when the master's disk usage is smaller then this value, master server can execute workflow. | +| master.failover-interval | 10 | failover interval, the unit is minute | +| master.kill-application-when-task-failover | true | whether to kill yarn/k8s application when failover taskInstance | +| master.registry-disconnect-strategy.strategy | stop | Used when the master disconnect from registry, default value: stop. Optional values include stop, waiting | +| master.registry-disconnect-strategy.max-waiting-time | 100s | Used when the master disconnect from registry, and the disconnect strategy is waiting, this config means the master will waiting to reconnect to registry in given times, and after the waiting times, if the master still cannot connect to registry, will stop itself, if the value is 0s, the Master will wait infinitely | +| master.worker-group-refresh-interval | 10s | The interval to refresh worker group from db to memory | ### Worker Server related configuration Location: `worker-server/conf/application.yaml` -| Parameters | Default value | Description | -|------------------------------------------------------|---------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| worker.listen-port | 1234 | worker-service listen port | -| worker.exec-threads | 100 | worker-service execute thread number, used to limit the number of task instances in parallel | -| worker.heartbeat-interval | 10 | worker-service heartbeat interval, the unit is second | -| worker.host-weight | 100 | worker host weight to dispatch tasks | -| worker.max-cpu-load-avg | 1 | worker max cpuload avg, only higher than the system cpu load average, worker server can be dispatched tasks. default value 1: will use 100% cpu. | -| worker.reserved-memory | 0.3 | worker reserved memory, only lower than system available memory, worker server can be dispatched tasks. default value 0.3, only the available memory is higher than 30%, worker server can receive task. | -| worker.alert-listen-host | localhost | the alert listen host of worker | -| worker.alert-listen-port | 50052 | the alert listen port of worker | -| worker.registry-disconnect-strategy.strategy | stop | Used when the worker disconnect from registry, default value: stop. Optional values include stop, waiting | -| worker.registry-disconnect-strategy.max-waiting-time | 100s | Used when the worker disconnect from registry, and the disconnect strategy is waiting, this config means the worker will waiting to reconnect to registry in given times, and after the waiting times, if the worker still cannot connect to registry, will stop itself, if the value is 0s, will wait infinitely | -| worker.task-execute-threads-full-policy | REJECT | If REJECT, when the task waiting in the worker reaches exec-threads, it will reject the received task and the Master will redispatch it; If CONTINUE, it will put the task into the worker's execution queue and wait for a free thread to start execution | -| worker.tenant-config.auto-create-tenant-enabled | true | tenant corresponds to the user of the system, which is used by the worker to submit the job. If system does not have this user, it will be automatically created after the parameter worker.tenant.auto.create is true. | -| worker.tenant-config.distributed-tenant-enabled | false | When this parameter is true, auto-create-tenant-enabled has no effect and will not automatically create tenants | -| worker.tenant-config.default-tenant-enabled | false | If set true, will use worker bootstrap user as the tenant to execute task when the tenant is `default`. | +| Parameters | Default value | Description | +|--------------------------------------------------------------------------------|---------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| worker.listen-port | 1234 | worker-service listen port | +| worker.exec-threads | 100 | worker-service execute thread number, used to limit the number of task instances in parallel | +| worker.max-heartbeat-interval | 10s | worker-service max heartbeat interval | +| worker.host-weight | 100 | worker host weight to dispatch tasks | +| worker.server-load-protection.enabled | true | If set true will open worker overload protection | +| worker.max-cpu-usage-percentage-thresholds.max-cpu-usage-percentage-thresholds | 0.7 | Master max cpu usage, when the master's cpu usage is smaller then this value, master server can execute workflow. | +| worker.server-load-protection.max-jvm-memory-usage-percentage-thresholds | 0.7 | Master max JVM memory usage , when the master's jvm memory usage is smaller then this value, master server can execute workflow. | +| worker.server-load-protection.max-system-memory-usage-percentage-thresholds | 0.7 | Master max System memory usage , when the master's system memory usage is smaller then this value, master server can execute workflow. | +| worker.server-load-protection.max-disk-usage-percentage-thresholds | 0.7 | Master max disk usage , when the master's disk usage is smaller then this value, master server can execute workflow. | +| worker.registry-disconnect-strategy.strategy | stop | Used when the worker disconnect from registry, default value: stop. Optional values include stop, waiting | +| worker.registry-disconnect-strategy.max-waiting-time | 100s | Used when the worker disconnect from registry, and the disconnect strategy is waiting, this config means the worker will waiting to reconnect to registry in given times, and after the waiting times, if the worker still cannot connect to registry, will stop itself, if the value is 0s, will wait infinitely | +| worker.task-execute-threads-full-policy | REJECT | If REJECT, when the task waiting in the worker reaches exec-threads, it will reject the received task and the Master will redispatch it; If CONTINUE, it will put the task into the worker's execution queue and wait for a free thread to start execution | +| worker.tenant-config.auto-create-tenant-enabled | true | tenant corresponds to the user of the system, which is used by the worker to submit the job. If system does not have this user, it will be automatically created after the parameter worker.tenant.auto.create is true. | +| worker.tenant-config.distributed-tenant-enabled | false | When this parameter is true, auto-create-tenant-enabled has no effect and will not automatically create tenants | +| worker.tenant-config.default-tenant-enabled | false | If set true, will use worker bootstrap user as the tenant to execute task when the tenant is `default`. | ### Alert Server related configuration diff --git a/docs/docs/zh/architecture/configuration.md b/docs/docs/zh/architecture/configuration.md index 719a58c7cd..65113b76a1 100644 --- a/docs/docs/zh/architecture/configuration.md +++ b/docs/docs/zh/architecture/configuration.md @@ -287,7 +287,7 @@ common.properties配置文件目前主要是配置hadoop/s3/yarn/applicationId | master.exec-threads | 100 | master工作线程数量,用于限制并行的流程实例数量 | | master.dispatch-task-number | 3 | master每个批次的派发任务数量 | | master.host-selector | lower_weight | master host选择器,用于选择合适的worker执行任务,可选值: random, round_robin, lower_weight | -| master.heartbeat-interval | 10 | master心跳间隔,单位为秒 | +| master.max-heartbeat-interval | 10s | master最大心跳间隔 | | master.task-commit-retry-times | 5 | 任务重试次数 | | master.task-commit-interval | 1000 | 任务提交间隔,单位为毫秒 | | master.state-wheel-interval | 5 | 轮询检查状态时间 | @@ -308,7 +308,7 @@ common.properties配置文件目前主要是配置hadoop/s3/yarn/applicationId |------------------------------------------------------|-----------|-------------------------------------------------------------------------------------------------------------------------------------------| | worker.listen-port | 1234 | worker监听端口 | | worker.exec-threads | 100 | worker工作线程数量,用于限制并行的任务实例数量 | -| worker.heartbeat-interval | 10 | worker心跳间隔,单位为秒 | +| worker.max-heartbeat-interval | 10s | worker最大心跳间隔 | | worker.host-weight | 100 | 派发任务时,worker主机的权重 | | worker.tenant-auto-create | true | 租户对应于系统的用户,由worker提交作业.如果系统没有该用户,则在参数worker.tenant.auto.create为true后自动创建。 | | worker.max-cpu-load-avg | 1 | worker最大cpuload均值,只有高于系统cpuload均值时,worker服务才能被派发任务. 默认值为1: 会使用100%的CPU | diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/config/AlertConfig.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/config/AlertConfig.java index b39b7ea31a..824851fd92 100644 --- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/config/AlertConfig.java +++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/config/AlertConfig.java @@ -41,7 +41,7 @@ public final class AlertConfig implements Validator { private int waitTimeout; - private Duration heartbeatInterval = Duration.ofSeconds(60); + private Duration maxHeartbeatInterval = Duration.ofSeconds(60); private String alertServerAddress; @@ -54,8 +54,8 @@ public final class AlertConfig implements Validator { public void validate(Object target, Errors errors) { AlertConfig alertConfig = (AlertConfig) target; - if (heartbeatInterval.getSeconds() <= 0) { - errors.rejectValue("heartbeat-interval", null, "should be a valid duration"); + if (maxHeartbeatInterval.getSeconds() <= 0) { + errors.rejectValue("max-heartbeat-interval", null, "should be a valid duration"); } if (StringUtils.isEmpty(alertServerAddress)) { @@ -68,6 +68,6 @@ public final class AlertConfig implements Validator { private void printConfig() { log.info("Alert config: port -> {}", port); log.info("Alert config: alertServerAddress -> {}", alertServerAddress); - log.info("Alert config: heartbeatInterval -> {}", heartbeatInterval); + log.info("Alert config: maxHeartbeatInterval -> {}", maxHeartbeatInterval); } } diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/registry/AlertHeartbeatTask.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/registry/AlertHeartbeatTask.java index 150b8dbe0c..3b2d588f85 100644 --- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/registry/AlertHeartbeatTask.java +++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/registry/AlertHeartbeatTask.java @@ -18,11 +18,14 @@ package org.apache.dolphinscheduler.alert.registry; import org.apache.dolphinscheduler.alert.config.AlertConfig; +import org.apache.dolphinscheduler.common.enums.ServerStatus; import org.apache.dolphinscheduler.common.model.AlertServerHeartBeat; import org.apache.dolphinscheduler.common.model.BaseHeartBeatTask; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; +import org.apache.dolphinscheduler.meter.metrics.MetricsProvider; +import org.apache.dolphinscheduler.meter.metrics.SystemMetrics; import org.apache.dolphinscheduler.registry.api.RegistryClient; import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; @@ -37,14 +40,18 @@ public class AlertHeartbeatTask extends BaseHeartBeatTask private final AlertConfig alertConfig; private final Integer processId; private final RegistryClient registryClient; + + private final MetricsProvider metricsProvider; private final String heartBeatPath; private final long startupTime; public AlertHeartbeatTask(AlertConfig alertConfig, + MetricsProvider metricsProvider, RegistryClient registryClient) { - super("AlertHeartbeatTask", alertConfig.getHeartbeatInterval().toMillis()); + super("AlertHeartbeatTask", alertConfig.getMaxHeartbeatInterval().toMillis()); this.startupTime = System.currentTimeMillis(); this.alertConfig = alertConfig; + this.metricsProvider = metricsProvider; this.registryClient = registryClient; this.heartBeatPath = RegistryNodeType.ALERT_SERVER.getRegistryPath() + "/" + alertConfig.getAlertServerAddress(); @@ -53,13 +60,15 @@ public class AlertHeartbeatTask extends BaseHeartBeatTask @Override public AlertServerHeartBeat getHeartBeat() { + SystemMetrics systemMetrics = metricsProvider.getSystemMetrics(); return AlertServerHeartBeat.builder() .processId(processId) .startupTime(startupTime) .reportTime(System.currentTimeMillis()) - .cpuUsage(OSUtils.cpuUsagePercentage()) - .memoryUsage(OSUtils.memoryUsagePercentage()) - .availablePhysicalMemorySize(OSUtils.availablePhysicalMemorySize()) + .cpuUsage(systemMetrics.getTotalCpuUsedPercentage()) + .memoryUsage(systemMetrics.getSystemMemoryUsedPercentage()) + .jvmMemoryUsage(systemMetrics.getJvmMemoryUsedPercentage()) + .serverStatus(ServerStatus.NORMAL) .host(NetUtils.getHost()) .port(alertConfig.getPort()) .build(); diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/registry/AlertRegistryClient.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/registry/AlertRegistryClient.java index a93742e54b..cdb5e5eca8 100644 --- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/registry/AlertRegistryClient.java +++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/registry/AlertRegistryClient.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.alert.registry; import org.apache.dolphinscheduler.alert.config.AlertConfig; +import org.apache.dolphinscheduler.meter.metrics.MetricsProvider; import org.apache.dolphinscheduler.registry.api.RegistryClient; import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; @@ -36,12 +37,15 @@ public class AlertRegistryClient implements AutoCloseable { @Autowired private AlertConfig alertConfig; + @Autowired + private MetricsProvider metricsProvider; + private AlertHeartbeatTask alertHeartbeatTask; public void start() { log.info("AlertRegistryClient starting..."); registryClient.getLock(RegistryNodeType.ALERT_LOCK.getRegistryPath()); - alertHeartbeatTask = new AlertHeartbeatTask(alertConfig, registryClient); + alertHeartbeatTask = new AlertHeartbeatTask(alertConfig, metricsProvider, registryClient); alertHeartbeatTask.start(); // start heartbeat task log.info("AlertRegistryClient started..."); diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/resources/application.yaml b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/resources/application.yaml index 2d95552737..0b28d6c8b0 100644 --- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/resources/application.yaml +++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/resources/application.yaml @@ -80,7 +80,7 @@ alert: # Mark each alert of alert server if late after x milliseconds as failed. # Define value is (0 = infinite), and alert server would be waiting alert result. wait-timeout: 0 - heartbeat-interval: 60s + max-heartbeat-interval: 60s query_alert_threshold: 100 registry: diff --git a/dolphinscheduler-api-test/dolphinscheduler-api-test-case/src/test/java/org/apache/dolphinscheduler/api/test/cases/ProcessInstanceAPITest.java b/dolphinscheduler-api-test/dolphinscheduler-api-test-case/src/test/java/org/apache/dolphinscheduler/api/test/cases/ProcessInstanceAPITest.java index e49d1ea05e..0c939bb0fb 100644 --- a/dolphinscheduler-api-test/dolphinscheduler-api-test-case/src/test/java/org/apache/dolphinscheduler/api/test/cases/ProcessInstanceAPITest.java +++ b/dolphinscheduler-api-test/dolphinscheduler-api-test-case/src/test/java/org/apache/dolphinscheduler/api/test/cases/ProcessInstanceAPITest.java @@ -19,6 +19,9 @@ package org.apache.dolphinscheduler.api.test.cases; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + import org.apache.dolphinscheduler.api.test.core.DolphinScheduler; import org.apache.dolphinscheduler.api.test.entity.HttpResponse; import org.apache.dolphinscheduler.api.test.entity.LoginResponseData; @@ -34,6 +37,7 @@ import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.commons.collections4.CollectionUtils; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.util.EntityUtils; @@ -42,12 +46,14 @@ import java.text.SimpleDateFormat; import java.util.Date; import java.util.LinkedHashMap; import java.util.List; +import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Test; +import org.testcontainers.shaded.org.awaitility.Awaitility; import lombok.extern.slf4j.Slf4j; @@ -106,7 +112,7 @@ public class ProcessInstanceAPITest { // create test project HttpResponse createProjectResponse = projectPage.createProject(loginUser, "project-test"); HttpResponse queryAllProjectListResponse = projectPage.queryAllProjectList(loginUser); - Assertions.assertTrue(queryAllProjectListResponse.getBody().getSuccess()); + assertTrue(queryAllProjectListResponse.getBody().getSuccess()); projectCode = (long) ((LinkedHashMap) ((List) queryAllProjectListResponse.getBody().getData()).get(0)).get("code"); // upload test workflow definition json @@ -115,17 +121,17 @@ public class ProcessInstanceAPITest { CloseableHttpResponse importProcessDefinitionResponse = processDefinitionPage .importProcessDefinition(loginUser, projectCode, file); String data = EntityUtils.toString(importProcessDefinitionResponse.getEntity()); - Assertions.assertTrue(data.contains("\"success\":true")); + assertTrue(data.contains("\"success\":true")); // get workflow definition code HttpResponse queryAllProcessDefinitionByProjectCodeResponse = processDefinitionPage.queryAllProcessDefinitionByProjectCode(loginUser, projectCode); - Assertions.assertTrue(queryAllProcessDefinitionByProjectCodeResponse.getBody().getSuccess()); - Assertions.assertTrue(queryAllProcessDefinitionByProjectCodeResponse.getBody().getData().toString().contains("hello world")); + assertTrue(queryAllProcessDefinitionByProjectCodeResponse.getBody().getSuccess()); + assertTrue(queryAllProcessDefinitionByProjectCodeResponse.getBody().getData().toString().contains("hello world")); processDefinitionCode = (long) ((LinkedHashMap) ((LinkedHashMap) ((List) queryAllProcessDefinitionByProjectCodeResponse.getBody().getData()).get(0)).get("processDefinition")).get("code"); // release test workflow HttpResponse releaseProcessDefinitionResponse = processDefinitionPage.releaseProcessDefinition(loginUser, projectCode, processDefinitionCode, ReleaseState.ONLINE); - Assertions.assertTrue(releaseProcessDefinitionResponse.getBody().getSuccess()); + assertTrue(releaseProcessDefinitionResponse.getBody().getSuccess()); // trigger workflow instance SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @@ -133,15 +139,21 @@ public class ProcessInstanceAPITest { String scheduleTime = String.format("%s,%s", formatter.format(date), formatter.format(date)); log.info("use current time {} as scheduleTime", scheduleTime); HttpResponse startProcessInstanceResponse = executorPage.startProcessInstance(loginUser, projectCode, processDefinitionCode, scheduleTime, FailureStrategy.END, WarningType.NONE); - Assertions.assertTrue(startProcessInstanceResponse.getBody().getSuccess()); - // make sure process instance has completed and successfully persisted into db - Thread.sleep(5000); + assertTrue(startProcessInstanceResponse.getBody().getSuccess()); - // query workflow instance by trigger code - triggerCode = (long) startProcessInstanceResponse.getBody().getData(); - HttpResponse queryProcessInstancesByTriggerCodeResponse = processInstancePage.queryProcessInstancesByTriggerCode(loginUser, projectCode, triggerCode); - Assertions.assertTrue(queryProcessInstancesByTriggerCodeResponse.getBody().getSuccess()); - processInstanceId = (int) ((LinkedHashMap) ((List) queryProcessInstancesByTriggerCodeResponse.getBody().getData()).get(0)).get("id"); + // make sure process instance has completed and successfully persisted into db + Awaitility.await() + .atMost(30, TimeUnit.SECONDS) + .untilAsserted(() -> { + // query workflow instance by trigger code + triggerCode = (long) startProcessInstanceResponse.getBody().getData(); + HttpResponse queryProcessInstancesByTriggerCodeResponse = processInstancePage.queryProcessInstancesByTriggerCode(loginUser, projectCode, triggerCode); + assertTrue(queryProcessInstancesByTriggerCodeResponse.getBody().getSuccess()); + List> body = (List>) queryProcessInstancesByTriggerCodeResponse.getBody().getData(); + assertTrue(CollectionUtils.isNotEmpty(body)); + assertEquals("SUCCESS", body.get(0).get("state")); + processInstanceId = (int) body.get(0).get("id"); + }); } catch (Exception e) { log.error("failed", e); Assertions.fail(); @@ -152,34 +164,34 @@ public class ProcessInstanceAPITest { @Order(2) public void testQueryProcessInstanceList() { HttpResponse queryProcessInstanceListResponse = processInstancePage.queryProcessInstanceList(loginUser, projectCode, 1, 10); - Assertions.assertTrue(queryProcessInstanceListResponse.getBody().getSuccess()); - Assertions.assertTrue(queryProcessInstanceListResponse.getBody().getData().toString().contains("test_import")); + assertTrue(queryProcessInstanceListResponse.getBody().getSuccess()); + assertTrue(queryProcessInstanceListResponse.getBody().getData().toString().contains("test_import")); } @Test @Order(3) public void testQueryTaskListByProcessId() { HttpResponse queryTaskListByProcessIdResponse = processInstancePage.queryTaskListByProcessId(loginUser, projectCode, processInstanceId); - Assertions.assertTrue(queryTaskListByProcessIdResponse.getBody().getSuccess()); - Assertions.assertTrue(queryTaskListByProcessIdResponse.getBody().getData().toString().contains("test_import")); + assertTrue(queryTaskListByProcessIdResponse.getBody().getSuccess()); + assertTrue(queryTaskListByProcessIdResponse.getBody().getData().toString().contains("test_import")); } @Test @Order(4) public void testQueryProcessInstanceById() { HttpResponse queryProcessInstanceByIdResponse = processInstancePage.queryProcessInstanceById(loginUser, projectCode, processInstanceId); - Assertions.assertTrue(queryProcessInstanceByIdResponse.getBody().getSuccess()); - Assertions.assertTrue(queryProcessInstanceByIdResponse.getBody().getData().toString().contains("test_import")); + assertTrue(queryProcessInstanceByIdResponse.getBody().getSuccess()); + assertTrue(queryProcessInstanceByIdResponse.getBody().getData().toString().contains("test_import")); } @Test @Order(5) public void testDeleteProcessInstanceById() { HttpResponse deleteProcessInstanceByIdResponse = processInstancePage.deleteProcessInstanceById(loginUser, projectCode, processInstanceId); - Assertions.assertTrue(deleteProcessInstanceByIdResponse.getBody().getSuccess()); + assertTrue(deleteProcessInstanceByIdResponse.getBody().getSuccess()); HttpResponse queryProcessInstanceListResponse = processInstancePage.queryProcessInstanceList(loginUser, projectCode, 1, 10); - Assertions.assertTrue(queryProcessInstanceListResponse.getBody().getSuccess()); + assertTrue(queryProcessInstanceListResponse.getBody().getSuccess()); Assertions.assertFalse(queryProcessInstanceListResponse.getBody().getData().toString().contains("test_import")); } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ServerStatus.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ServerStatus.java index 16a0f0e34c..1e4f49721a 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ServerStatus.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ServerStatus.java @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.common.enums; public enum ServerStatus { - NORMAL, ABNORMAL, BUSY + NORMAL, + BUSY } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/AlertServerHeartBeat.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/AlertServerHeartBeat.java index 9273b77a1c..7cbd83b8ce 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/AlertServerHeartBeat.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/AlertServerHeartBeat.java @@ -17,6 +17,8 @@ package org.apache.dolphinscheduler.common.model; +import org.apache.dolphinscheduler.common.enums.ServerStatus; + import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; @@ -33,8 +35,15 @@ public class AlertServerHeartBeat implements HeartBeat { private long reportTime; private double cpuUsage; private double memoryUsage; - private double availablePhysicalMemorySize; + private double jvmMemoryUsage; + + private ServerStatus serverStatus; private String host; private int port; + + @Override + public ServerStatus getServerStatus() { + return serverStatus; + } } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/BaseHeartBeatTask.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/BaseHeartBeatTask.java index 9bdddc25ae..2ea0219084 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/BaseHeartBeatTask.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/BaseHeartBeatTask.java @@ -23,13 +23,19 @@ import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; import lombok.extern.slf4j.Slf4j; @Slf4j -public abstract class BaseHeartBeatTask extends BaseDaemonThread { +public abstract class BaseHeartBeatTask extends BaseDaemonThread { + + private static final long DEFAULT_HEARTBEAT_SCAN_INTERVAL = 1_000L; private final String threadName; private final long heartBeatInterval; protected boolean runningFlag; + protected long lastWriteTime = 0L; + + protected T lastHeartBeat = null; + public BaseHeartBeatTask(String threadName, long heartBeatInterval) { super(threadName); this.threadName = threadName; @@ -54,12 +60,18 @@ public abstract class BaseHeartBeatTask extends BaseDaemonThread { continue; } T heartBeat = getHeartBeat(); - writeHeartBeat(heartBeat); + // if first time or heartBeat status changed, write heartBeatInfo into registry + if (System.currentTimeMillis() - lastWriteTime >= heartBeatInterval + || !lastHeartBeat.getServerStatus().equals(heartBeat.getServerStatus())) { + lastHeartBeat = heartBeat; + writeHeartBeat(heartBeat); + lastWriteTime = System.currentTimeMillis(); + } } catch (Exception ex) { log.error("{} task execute failed", threadName, ex); } finally { try { - Thread.sleep(heartBeatInterval); + Thread.sleep(DEFAULT_HEARTBEAT_SCAN_INTERVAL); } catch (InterruptedException e) { handleInterruptException(e); } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/HeartBeat.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/HeartBeat.java index 218afa245e..3a105227aa 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/HeartBeat.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/HeartBeat.java @@ -17,10 +17,14 @@ package org.apache.dolphinscheduler.common.model; +import org.apache.dolphinscheduler.common.enums.ServerStatus; + public interface HeartBeat { String getHost(); + ServerStatus getServerStatus(); + int getPort(); } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java index 6386f32ae6..ecc140bcfb 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java @@ -33,13 +33,17 @@ public class MasterHeartBeat implements HeartBeat { private long startupTime; private long reportTime; private double cpuUsage; + private double jvmMemoryUsage; private double memoryUsage; - private double availablePhysicalMemorySize; - private double reservedMemory; - private double diskAvailable; - private int processId; + private double diskUsage; private ServerStatus serverStatus; + private int processId; private String host; private int port; + + @Override + public ServerStatus getServerStatus() { + return serverStatus; + } } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerHeartBeat.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerHeartBeat.java index 396f227f47..056fc6a2c7 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerHeartBeat.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerHeartBeat.java @@ -33,11 +33,9 @@ public class WorkerHeartBeat implements HeartBeat { private long startupTime; private long reportTime; private double cpuUsage; + private double jvmMemoryUsage; private double memoryUsage; - private double loadAverage; - private double availablePhysicalMemorySize; - private double reservedMemory; - private double diskAvailable; + private double diskUsage; private ServerStatus serverStatus; private int processId; @@ -45,7 +43,11 @@ public class WorkerHeartBeat implements HeartBeat { private int port; private int workerHostWeight; // worker host weight - private int workerWaitingTaskCount; // worker waiting task count - private int workerExecThreadCount; // worker thread pool thread count + private int threadPoolUsage; // worker waiting task count + + @Override + public ServerStatus getServerStatus() { + return serverStatus; + } } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java index 22563efe46..243161df88 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java @@ -21,23 +21,17 @@ import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.shell.ShellExecutor; import oshi.SystemInfo; -import oshi.hardware.CentralProcessor; -import oshi.hardware.GlobalMemory; import oshi.hardware.HardwareAbstractionLayer; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.SystemUtils; import java.io.BufferedReader; -import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; import java.lang.management.ManagementFactory; -import java.lang.management.OperatingSystemMXBean; import java.lang.management.RuntimeMXBean; -import java.math.RoundingMode; -import java.text.DecimalFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -45,32 +39,17 @@ import java.util.List; import java.util.StringTokenizer; import java.util.regex.Pattern; +import lombok.experimental.UtilityClass; import lombok.extern.slf4j.Slf4j; -/** - * os utils - */ +// todo: Split to WindowsOSUtils/LinuxOSUtils/MacOSOSUtils/K8sOSUtils... @Slf4j +@UtilityClass public class OSUtils { private static final SystemInfo SI = new SystemInfo(); - public static final String TWO_DECIMAL = "0.00"; - - /** - * return -1 when the function can not get hardware env info - * e.g {@link OSUtils#cpuUsagePercentage()} - */ - public static final double NEGATIVE_ONE = -1; private static final HardwareAbstractionLayer hal = SI.getHardware(); - private static long[] prevTicks = new long[CentralProcessor.TickType.values().length]; - private static long prevTickTime = 0L; - private static volatile double cpuUsage = 0.0D; - private static final double TOTAL_MEMORY = hal.getMemory().getTotal() / 1024.0 / 1024 / 1024; - - private OSUtils() { - throw new UnsupportedOperationException("Construct OSUtils"); - } /** * Initialization regularization, solve the problem of pre-compilation performance, @@ -78,79 +57,12 @@ public class OSUtils { */ private static final Pattern PATTERN = Pattern.compile("\\s+"); - /** - * get disk usage - * Keep 2 decimal - * - * @return disk free size, unit: GB - */ - public static double diskAvailable() { - File file = new File("."); - long freeSpace = file.getFreeSpace(); // unallocated / free disk space in bytes. - - double diskAvailable = freeSpace / 1024.0 / 1024 / 1024; - - DecimalFormat df = new DecimalFormat(TWO_DECIMAL); - df.setRoundingMode(RoundingMode.HALF_UP); - return Double.parseDouble(df.format(diskAvailable)); + public static long getTotalSystemMemory() { + return hal.getMemory().getTotal(); } - /** - * get available physical or pod memory size - *

- * Keep 2 decimal - * - * @return Available physical or pod memory size, unit: G - */ - public static double availablePhysicalMemorySize() { - double availablePhysicalMemorySize; - - if (KubernetesUtils.isKubernetesMode()) { - long freeMemory = Runtime.getRuntime().freeMemory(); - availablePhysicalMemorySize = freeMemory / 1024.0 / 1024 / 1024; - } else { - GlobalMemory memory = hal.getMemory(); - availablePhysicalMemorySize = memory.getAvailable() / 1024.0 / 1024 / 1024; - } - DecimalFormat df = new DecimalFormat(TWO_DECIMAL); - df.setRoundingMode(RoundingMode.HALF_UP); - return Double.parseDouble(df.format(availablePhysicalMemorySize)); - } - - /** - * get cpu usage - * - * @return cpu usage - */ - public static double cpuUsagePercentage() { - CentralProcessor processor = hal.getProcessor(); - - // Check if > ~ 0.95 seconds since last tick count. - long now = System.currentTimeMillis(); - if (now - prevTickTime > 950) { - // Enough time has elapsed. - if (KubernetesUtils.isKubernetesMode()) { - OperatingSystemMXBean operatingSystemMXBean = ManagementFactory.getOperatingSystemMXBean(); - cpuUsage = operatingSystemMXBean.getSystemLoadAverage(); - } else { - cpuUsage = processor.getSystemCpuLoadBetweenTicks(prevTicks); - } - - prevTickTime = System.currentTimeMillis(); - prevTicks = processor.getSystemCpuLoadTicks(); - } - - if (Double.isNaN(cpuUsage)) { - return NEGATIVE_ONE; - } - - DecimalFormat df = new DecimalFormat(TWO_DECIMAL); - df.setRoundingMode(RoundingMode.HALF_UP); - return Double.parseDouble(df.format(cpuUsage)); - } - - public static double memoryUsagePercentage() { - return (TOTAL_MEMORY - availablePhysicalMemorySize()) / TOTAL_MEMORY; + public static long getSystemAvailableMemoryUsed() { + return hal.getMemory().getAvailable(); } public static List getUserList() { @@ -433,43 +345,11 @@ public class OSUtils { return ShellExecutor.execCommand(command); } - /** - * get process id - * - * @return process id - */ public static int getProcessID() { RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean(); return Integer.parseInt(runtimeMXBean.getName().split("@")[0]); } - /** - * Check memory and cpu usage is overload the given thredshod. - * - * @param maxCpuLoadAvgThreshold maxCpuLoadAvg - * @param reservedMemoryThreshold reservedMemory - * @return True, if the cpu or memory exceed the given thredshod. - */ - public static Boolean isOverload(double maxCpuLoadAvgThreshold, double reservedMemoryThreshold) { - // system load average - double freeCPUPercentage = 1 - cpuUsagePercentage(); - // system available physical memory - double freeMemoryPercentage = 1 - memoryUsagePercentage(); - if (freeCPUPercentage > maxCpuLoadAvgThreshold) { - log.warn("Current cpu load average {} is too high, max.cpuLoad.avg={}", freeCPUPercentage, - maxCpuLoadAvgThreshold); - return true; - } - - if (freeMemoryPercentage < reservedMemoryThreshold) { - log.warn( - "Current available memory percentage{} is too low, reserved.memory={}", freeMemoryPercentage, - reservedMemoryThreshold); - return true; - } - return false; - } - public static Boolean isWindows() { return System.getProperty("os.name").startsWith("Windows"); } diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/os/OSUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/OSUtilsTest.java similarity index 64% rename from dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/os/OSUtilsTest.java rename to dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/OSUtilsTest.java index 4c37e99dc1..1916aa4424 100644 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/os/OSUtilsTest.java +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/OSUtilsTest.java @@ -14,48 +14,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.common.os; -import org.apache.dolphinscheduler.common.utils.OSUtils; +package org.apache.dolphinscheduler.common.utils; import org.apache.commons.lang3.SystemUtils; import java.util.List; +import lombok.extern.slf4j.Slf4j; + import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -/** - * OSUtilsTest - */ +@Slf4j public class OSUtilsTest { - private static Logger logger = LoggerFactory.getLogger(OSUtilsTest.class); - - @Test - public void diskAvailable() { - double diskAvailable = OSUtils.diskAvailable(); - logger.info("diskAvailable : {}", diskAvailable); - Assertions.assertTrue(diskAvailable >= 0.0); - } - - @Test - public void cpuUsage() { - double cpuUsage = OSUtils.cpuUsagePercentage(); - logger.info("cpuUsage : {}", cpuUsage); - Assertions.assertTrue(cpuUsage >= 0.0); - } - - @Test - public void availablePhysicalMemorySize() { - double physicalMemorySize = OSUtils.availablePhysicalMemorySize(); - logger.info("physicalMemorySize : {}", physicalMemorySize); - Assertions.assertTrue(physicalMemorySize >= 0.0); - - } - @Test public void existTenantCodeInLinux() { if (SystemUtils.IS_OS_LINUX) { @@ -77,9 +50,31 @@ public class OSUtilsTest { Assertions.assertFalse(userList.contains("xxxtt")); } else { Assertions.assertFalse(false, "system must be linux"); - } + } + @Test + void getTotalSystemMemory() throws InterruptedException { + double totalSystemMemory = OSUtils.getTotalSystemMemory(); + Assertions.assertTrue(totalSystemMemory > 0); + // Assert that the memory is not changed + Thread.sleep(1000L); + Assertions.assertEquals(totalSystemMemory, OSUtils.getTotalSystemMemory()); } + @Test + void getSystemMemoryAvailable() { + long systemAvailableMemoryUsed = OSUtils.getSystemAvailableMemoryUsed(); + Assertions.assertTrue(systemAvailableMemoryUsed > 0); + } + + @Test + void getSystemMemoryUsedPercentage() { + long totalSystemMemory = OSUtils.getTotalSystemMemory(); + long systemMemoryAvailable = OSUtils.getSystemAvailableMemoryUsed(); + double systemAvailableMemoryUsedPercentage = + (double) (totalSystemMemory - systemMemoryAvailable) / totalSystemMemory; + + Assertions.assertTrue(systemAvailableMemoryUsedPercentage > 0); + } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index 7169203834..82f3bda760 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -21,9 +21,13 @@ import org.apache.dolphinscheduler.common.IStoppable; import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager; import org.apache.dolphinscheduler.common.thread.ThreadUtils; +import org.apache.dolphinscheduler.meter.metrics.MetricsProvider; +import org.apache.dolphinscheduler.meter.metrics.SystemMetrics; import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager; import org.apache.dolphinscheduler.scheduler.api.SchedulerApi; +import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics; import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient; +import org.apache.dolphinscheduler.server.master.registry.MasterSlotManager; import org.apache.dolphinscheduler.server.master.rpc.MasterRpcServer; import org.apache.dolphinscheduler.server.master.runner.EventExecuteService; import org.apache.dolphinscheduler.server.master.runner.FailoverExecuteThread; @@ -73,6 +77,12 @@ public class MasterServer implements IStoppable { @Autowired private MasterRpcServer masterRPCServer; + @Autowired + private MetricsProvider metricsProvider; + + @Autowired + private MasterSlotManager masterSlotManager; + public static void main(String[] args) { Thread.currentThread().setName(Constants.THREAD_NAME_MASTER_SERVER); SpringApplication.run(MasterServer.class); @@ -89,6 +99,8 @@ public class MasterServer implements IStoppable { // install task plugin this.taskPluginManager.loadPlugin(); + this.masterSlotManager.start(); + // self tolerant this.masterRegistryClient.start(); this.masterRegistryClient.setRegistryStoppable(this); @@ -100,6 +112,19 @@ public class MasterServer implements IStoppable { this.schedulerApi.start(); + MasterServerMetrics.registerMasterCpuUsageGauge(() -> { + SystemMetrics systemMetrics = metricsProvider.getSystemMetrics(); + return systemMetrics.getTotalCpuUsedPercentage(); + }); + MasterServerMetrics.registerMasterMemoryAvailableGauge(() -> { + SystemMetrics systemMetrics = metricsProvider.getSystemMetrics(); + return (systemMetrics.getSystemMemoryMax() - systemMetrics.getSystemMemoryUsed()) / 1024.0 / 1024 / 1024; + }); + MasterServerMetrics.registerMasterMemoryUsageGauge(() -> { + SystemMetrics systemMetrics = metricsProvider.getSystemMetrics(); + return systemMetrics.getJvmMemoryUsedPercentage(); + }); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { if (!ServerLifeCycleManager.isStopped()) { close("MasterServer shutdownHook"); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java index 553789d384..02c0dcb819 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java @@ -78,7 +78,7 @@ public class MasterConfig implements Validator { /** * Master heart beat task execute interval. */ - private Duration heartbeatInterval = Duration.ofSeconds(10); + private Duration maxHeartbeatInterval = Duration.ofSeconds(10); /** * task submit max retry times. */ @@ -91,8 +91,7 @@ public class MasterConfig implements Validator { * state wheel check interval, if this value is bigger, may increase the delay of task/processInstance. */ private Duration stateWheelInterval = Duration.ofMillis(5); - private double maxCpuLoadAvg = 1; - private double reservedMemory = 0.1; + private MasterServerLoadProtection serverLoadProtection = new MasterServerLoadProtection(); private Duration failoverInterval = Duration.ofMinutes(10); private boolean killApplicationWhenTaskFailover = true; private ConnectStrategyProperties registryDisconnectStrategy = new ConnectStrategyProperties(); @@ -128,8 +127,8 @@ public class MasterConfig implements Validator { if (masterConfig.getDispatchTaskNumber() <= 0) { errors.rejectValue("dispatch-task-number", null, "should be a positive value"); } - if (masterConfig.getHeartbeatInterval().toMillis() < 0) { - errors.rejectValue("heartbeat-interval", null, "should be a valid duration"); + if (masterConfig.getMaxHeartbeatInterval().toMillis() < 0) { + errors.rejectValue("max-heartbeat-interval", null, "should be a valid duration"); } if (masterConfig.getTaskCommitRetryTimes() <= 0) { errors.rejectValue("task-commit-retry-times", null, "should be a positive value"); @@ -143,12 +142,6 @@ public class MasterConfig implements Validator { if (masterConfig.getFailoverInterval().toMillis() <= 0) { errors.rejectValue("failover-interval", null, "should be a valid duration"); } - if (masterConfig.getMaxCpuLoadAvg() <= 0) { - masterConfig.setMaxCpuLoadAvg(100); - } - if (masterConfig.getReservedMemory() <= 0) { - masterConfig.setReservedMemory(100); - } if (masterConfig.getWorkerGroupRefreshInterval().getSeconds() < 10) { errors.rejectValue("worker-group-refresh-interval", null, "should >= 10s"); @@ -171,12 +164,11 @@ public class MasterConfig implements Validator { "\n exec-threads -> " + execThreads + "\n dispatch-task-number -> " + dispatchTaskNumber + "\n host-selector -> " + hostSelector + - "\n heartbeat-interval -> " + heartbeatInterval + + "\n max-heartbeat-interval -> " + maxHeartbeatInterval + "\n task-commit-retry-times -> " + taskCommitRetryTimes + "\n task-commit-interval -> " + taskCommitInterval + "\n state-wheel-interval -> " + stateWheelInterval + - "\n max-cpu-load-avg -> " + maxCpuLoadAvg + - "\n reserved-memory -> " + reservedMemory + + "\n server-load-protection -> " + serverLoadProtection + "\n failover-interval -> " + failoverInterval + "\n kill-application-when-task-failover -> " + killApplicationWhenTaskFailover + "\n registry-disconnect-strategy -> " + registryDisconnectStrategy + diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterServerLoadProtection.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterServerLoadProtection.java new file mode 100644 index 0000000000..03570d691d --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterServerLoadProtection.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.config; + +import org.apache.dolphinscheduler.meter.metrics.SystemMetrics; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Data +@NoArgsConstructor +@AllArgsConstructor +public class MasterServerLoadProtection { + + private boolean enabled = true; + + private double maxCpuUsagePercentageThresholds = 0.7; + + private double maxJVMMemoryUsagePercentageThresholds = 0.7; + + private double maxSystemMemoryUsagePercentageThresholds = 0.7; + + private double maxDiskUsagePercentageThresholds = 0.7; + + public boolean isOverload(SystemMetrics systemMetrics) { + if (!enabled) { + return false; + } + if (systemMetrics.getTotalCpuUsedPercentage() > maxCpuUsagePercentageThresholds) { + log.info( + "Master OverLoad: the TotalCpuUsedPercentage: {} is over then the MaxCpuUsagePercentageThresholds {}", + systemMetrics.getTotalCpuUsedPercentage(), maxCpuUsagePercentageThresholds); + return true; + } + if (systemMetrics.getJvmMemoryUsedPercentage() > maxJVMMemoryUsagePercentageThresholds) { + log.info( + "Master OverLoad: the JvmMemoryUsedPercentage: {} is over then the MaxJVMMemoryUsagePercentageThresholds {}", + systemMetrics.getJvmMemoryUsedPercentage(), maxCpuUsagePercentageThresholds); + return true; + } + if (systemMetrics.getDiskUsedPercentage() > maxDiskUsagePercentageThresholds) { + log.info("Master OverLoad: the DiskUsedPercentage: {} is over then the MaxDiskUsagePercentageThresholds {}", + systemMetrics.getDiskUsedPercentage(), maxCpuUsagePercentageThresholds); + return true; + } + if (systemMetrics.getSystemMemoryUsedPercentage() > maxSystemMemoryUsagePercentageThresholds) { + log.info( + "Master OverLoad: the SystemMemoryUsedPercentage: {} is over then the MaxSystemMemoryUsagePercentageThresholds {}", + systemMetrics.getSystemMemoryUsedPercentage(), maxSystemMemoryUsagePercentageThresholds); + return true; + } + return false; + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java index 607b78abbc..e14cd26a9f 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java @@ -130,28 +130,22 @@ public class LowerWeightHostManager extends CommonHostManager { } } - public Optional getHostWeight(String addr, String workerGroup, WorkerHeartBeat heartBeat) { + public Optional getHostWeight(String workerAddress, String workerGroup, WorkerHeartBeat heartBeat) { if (heartBeat == null) { - log.warn("worker {} in work group {} have not received the heartbeat", addr, workerGroup); - return Optional.empty(); - } - if (ServerStatus.ABNORMAL == heartBeat.getServerStatus()) { - log.warn("worker {} current cpu load average {} is too high or available memory {}G is too low", - addr, heartBeat.getLoadAverage(), heartBeat.getAvailablePhysicalMemorySize()); + log.warn("Worker {} in WorkerGroup {} have not received the heartbeat", workerAddress, workerGroup); return Optional.empty(); } if (ServerStatus.BUSY == heartBeat.getServerStatus()) { - log.warn("worker {} is busy, current waiting task count {} is large than worker thread count {}", - addr, heartBeat.getWorkerWaitingTaskCount(), heartBeat.getWorkerExecThreadCount()); + log.warn("Worker {} in workerGroup {} is Busy, heartbeat is {}", workerAddress, workerGroup, heartBeat); return Optional.empty(); } return Optional.of( new HostWeight( - HostWorker.of(addr, heartBeat.getWorkerHostWeight(), workerGroup), + HostWorker.of(workerAddress, heartBeat.getWorkerHostWeight(), workerGroup), heartBeat.getCpuUsage(), heartBeat.getMemoryUsage(), - heartBeat.getLoadAverage(), - heartBeat.getWorkerWaitingTaskCount(), + heartBeat.getDiskUsage(), + heartBeat.getThreadPoolUsage(), heartBeat.getStartupTime())); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java index 2cd65be1f1..147761f7df 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java @@ -20,69 +20,44 @@ package org.apache.dolphinscheduler.server.master.dispatch.host.assign; import org.apache.dolphinscheduler.extract.base.utils.Constants; import org.apache.dolphinscheduler.extract.base.utils.Host; -/** - * host weight - */ +import lombok.Data; + +@Data public class HostWeight { - private final int CPU_FACTOR = 10; + private final int THREAD_USAGE_FACTOR = 10; + + private final int CPU_USAGE_FACTOR = 20; - private final int MEMORY_FACTOR = 20; + private final int MEMORY_USAGE_FACTOR = 20; - private final int LOAD_AVERAGE_FACTOR = 70; + private final int DISK_USAGE_FACTOR = 50; - private final HostWorker hostWorker; + private final Host host; private final double weight; + // if the weight is small, then is will be chosen first private double currentWeight; - private final int waitingTaskCount; - - public HostWeight(HostWorker hostWorker, double cpu, double memory, double loadAverage, int waitingTaskCount, + public HostWeight(HostWorker hostWorker, + double cpuUsage, + double memoryUsage, + double diskUsage, + double threadPoolUsage, long startTime) { - this.hostWorker = hostWorker; - this.weight = calculateWeight(cpu, memory, loadAverage, startTime); + this.host = hostWorker; + this.weight = calculateWeight(cpuUsage, memoryUsage, diskUsage, threadPoolUsage, startTime); this.currentWeight = this.weight; - this.waitingTaskCount = waitingTaskCount; - } - - public double getWeight() { - return weight; - } - - public double getCurrentWeight() { - return currentWeight; - } - - public void setCurrentWeight(double currentWeight) { - this.currentWeight = currentWeight; - } - - public HostWorker getHostWorker() { - return hostWorker; - } - - public Host getHost() { - return (Host) hostWorker; - } - - public int getWaitingTaskCount() { - return waitingTaskCount; - } - - @Override - public String toString() { - return "HostWeight{" - + "hostWorker=" + hostWorker - + ", weight=" + weight - + ", currentWeight=" + currentWeight - + ", waitingTaskCount=" + waitingTaskCount - + '}'; } - private double calculateWeight(double cpu, double memory, double loadAverage, long startTime) { - double calculatedWeight = cpu * CPU_FACTOR + memory * MEMORY_FACTOR + loadAverage * LOAD_AVERAGE_FACTOR; + private double calculateWeight(double cpuUsage, + double memoryUsage, + double diskUsage, + double threadPoolUsage, + long startTime) { + double calculatedWeight = 100 - (cpuUsage * CPU_USAGE_FACTOR + memoryUsage * MEMORY_USAGE_FACTOR + + diskUsage * DISK_USAGE_FACTOR + threadPoolUsage * THREAD_USAGE_FACTOR); long uptime = System.currentTimeMillis() - startTime; if (uptime > 0 && uptime < Constants.WARM_UP_TIME) { // If the warm-up is not over, add the weight diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java index 8ddfff6da5..d03fd59ada 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java @@ -18,14 +18,6 @@ package org.apache.dolphinscheduler.server.master.dispatch.host.assign; import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; -import java.util.stream.Collectors; - -import org.springframework.util.CollectionUtils; - -import com.google.common.collect.Lists; /** * lower weight round robin @@ -43,8 +35,7 @@ public class LowerWeightRoundRobin extends AbstractSelector { double totalWeight = 0; double lowWeight = 0; HostWeight lowerNode = null; - List weights = canAssignTaskHost(sources); - for (HostWeight hostWeight : weights) { + for (HostWeight hostWeight : sources) { totalWeight += hostWeight.getWeight(); hostWeight.setCurrentWeight(hostWeight.getCurrentWeight() + hostWeight.getWeight()); if (lowerNode == null || lowWeight > hostWeight.getCurrentWeight()) { @@ -58,24 +49,4 @@ public class LowerWeightRoundRobin extends AbstractSelector { return lowerNode; } - private List canAssignTaskHost(Collection sources) { - if (CollectionUtils.isEmpty(sources)) { - return Collections.emptyList(); - } - List zeroWaitingTask = - sources.stream().filter(h -> h.getWaitingTaskCount() == 0).collect(Collectors.toList()); - if (!zeroWaitingTask.isEmpty()) { - return zeroWaitingTask; - } - HostWeight hostWeight = sources.stream().min(Comparator.comparing(HostWeight::getWaitingTaskCount)).get(); - List waitingTask = Lists.newArrayList(hostWeight); - List equalWaitingTask = sources.stream() - .filter(h -> !h.getHost().equals(hostWeight.getHost()) - && h.getWaitingTaskCount() == hostWeight.getWaitingTaskCount()) - .collect(Collectors.toList()); - if (!equalWaitingTask.isEmpty()) { - waitingTask.addAll(equalWaitingTask); - } - return waitingTask; - } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/MasterServerMetrics.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/MasterServerMetrics.java index a0d153ab62..78eb0b7216 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/MasterServerMetrics.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/MasterServerMetrics.java @@ -17,8 +17,11 @@ package org.apache.dolphinscheduler.server.master.metrics; +import java.util.function.Supplier; + import lombok.experimental.UtilityClass; import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.Metrics; @UtilityClass @@ -40,6 +43,24 @@ public class MasterServerMetrics { .description("Master server consume command count") .register(Metrics.globalRegistry); + public void registerMasterMemoryAvailableGauge(Supplier supplier) { + Gauge.builder("ds.master.memory.available", supplier) + .description("Master memory available") + .register(Metrics.globalRegistry); + } + + public void registerMasterCpuUsageGauge(Supplier supplier) { + Gauge.builder("ds.master.cpu.usage", supplier) + .description("worker cpu usage") + .register(Metrics.globalRegistry); + } + + public void registerMasterMemoryUsageGauge(Supplier supplier) { + Gauge.builder("ds.master.memory.usage", supplier) + .description("Master memory usage") + .register(Metrics.globalRegistry); + } + public void incMasterOverload() { masterOverloadCounter.increment(); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectionStateListener.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectionStateListener.java index aa82178813..0a141b69e8 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectionStateListener.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectionStateListener.java @@ -20,8 +20,6 @@ package org.apache.dolphinscheduler.server.master.registry; import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager; import org.apache.dolphinscheduler.registry.api.ConnectionListener; import org.apache.dolphinscheduler.registry.api.ConnectionState; -import org.apache.dolphinscheduler.registry.api.RegistryClient; -import org.apache.dolphinscheduler.server.master.config.MasterConfig; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; @@ -29,15 +27,9 @@ import lombok.extern.slf4j.Slf4j; @Slf4j public class MasterConnectionStateListener implements ConnectionListener { - private final MasterConfig masterConfig; - private final RegistryClient registryClient; private final MasterConnectStrategy masterConnectStrategy; - public MasterConnectionStateListener(@NonNull MasterConfig masterConfig, - @NonNull RegistryClient registryClient, - @NonNull MasterConnectStrategy masterConnectStrategy) { - this.masterConfig = masterConfig; - this.registryClient = registryClient; + public MasterConnectionStateListener(@NonNull MasterConnectStrategy masterConnectStrategy) { this.masterConnectStrategy = masterConnectStrategy; } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java index f2aaf417ed..054eb4770c 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java @@ -20,9 +20,12 @@ package org.apache.dolphinscheduler.server.master.registry; import static org.apache.dolphinscheduler.common.constants.Constants.SLEEP_TIME_MILLIS; import org.apache.dolphinscheduler.common.IStoppable; +import org.apache.dolphinscheduler.common.enums.ServerStatus; +import org.apache.dolphinscheduler.common.model.MasterHeartBeat; import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; +import org.apache.dolphinscheduler.meter.metrics.MetricsProvider; import org.apache.dolphinscheduler.registry.api.RegistryClient; import org.apache.dolphinscheduler.registry.api.RegistryException; import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; @@ -54,6 +57,9 @@ public class MasterRegistryClient implements AutoCloseable { @Autowired private MasterConfig masterConfig; + @Autowired + private MetricsProvider metricsProvider; + @Autowired private MasterConnectStrategy masterConnectStrategy; @@ -61,11 +67,10 @@ public class MasterRegistryClient implements AutoCloseable { public void start() { try { - this.masterHeartBeatTask = new MasterHeartBeatTask(masterConfig, registryClient); + this.masterHeartBeatTask = new MasterHeartBeatTask(masterConfig, metricsProvider, registryClient); // master registry registry(); - registryClient.addConnectionStateListener( - new MasterConnectionStateListener(masterConfig, registryClient, masterConnectStrategy)); + registryClient.addConnectionStateListener(new MasterConnectionStateListener(masterConnectStrategy)); registryClient.subscribe(RegistryNodeType.ALL_SERVERS.getRegistryPath(), new MasterRegistryDataListener()); } catch (Exception e) { throw new RegistryException("Master registry client start up error", e); @@ -153,6 +158,13 @@ public class MasterRegistryClient implements AutoCloseable { log.info("Master node : {} registering to registry center", masterConfig.getMasterAddress()); String masterRegistryPath = masterConfig.getMasterRegistryPath(); + MasterHeartBeat heartBeat = masterHeartBeatTask.getHeartBeat(); + while (ServerStatus.BUSY.equals(heartBeat.getServerStatus())) { + log.warn("Master node is BUSY: {}", heartBeat); + heartBeat = masterHeartBeatTask.getHeartBeat(); + ThreadUtils.sleep(SLEEP_TIME_MILLIS); + } + // remove before persist registryClient.remove(masterRegistryPath); registryClient.persistEphemeral(masterRegistryPath, JSONUtils.toJsonString(masterHeartBeatTask.getHeartBeat())); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterSlotManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterSlotManager.java index 5fb0c74f5a..834f56c2a4 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterSlotManager.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterSlotManager.java @@ -30,8 +30,6 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; -import javax.annotation.PostConstruct; - import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -50,8 +48,7 @@ public class MasterSlotManager { private volatile int currentSlot = 0; private volatile int totalSlot = 0; - @PostConstruct - public void init() { + public void start() { serverNodeManager.addMasterInfoChangeListener(new MasterSlotManager.SlotChangeListener()); } @@ -72,7 +69,7 @@ public class MasterSlotManager { @Override public void notify(Map masterNodeInfo) { List serverList = masterNodeInfo.values().stream() - .filter(heartBeat -> !heartBeat.getServerStatus().equals(ServerStatus.ABNORMAL)) + .filter(heartBeat -> !heartBeat.getServerStatus().equals(ServerStatus.BUSY)) .map(this::convertHeartBeatToServer).collect(Collectors.toList()); syncMasterNodes(serverList); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java index 4d84644cb0..2fddd94384 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java @@ -21,11 +21,13 @@ import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager; import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; import org.apache.dolphinscheduler.common.thread.ThreadUtils; -import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.meter.metrics.MetricsProvider; +import org.apache.dolphinscheduler.meter.metrics.SystemMetrics; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.master.config.MasterServerLoadProtection; import org.apache.dolphinscheduler.server.master.event.WorkflowEvent; import org.apache.dolphinscheduler.server.master.event.WorkflowEventQueue; import org.apache.dolphinscheduler.server.master.event.WorkflowEventType; @@ -78,6 +80,9 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl @Autowired private MasterTaskExecutorBootstrap masterTaskExecutorBootstrap; + @Autowired + private MetricsProvider metricsProvider; + protected MasterSchedulerBootstrap() { super("MasterCommandLoopThread"); } @@ -102,11 +107,9 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl log.info("MasterSchedulerBootstrap stopped..."); } - /** - * run of MasterSchedulerService - */ @Override public void run() { + MasterServerLoadProtection serverLoadProtection = masterConfig.getServerLoadProtection(); while (!ServerLifeCycleManager.isStopped()) { try { if (!ServerLifeCycleManager.isRunning()) { @@ -115,9 +118,8 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl Thread.sleep(Constants.SLEEP_TIME_MILLIS); } // todo: if the workflow event queue is much, we need to handle the back pressure - boolean isOverload = - OSUtils.isOverload(masterConfig.getMaxCpuLoadAvg(), masterConfig.getReservedMemory()); - if (isOverload) { + SystemMetrics systemMetrics = metricsProvider.getSystemMetrics(); + if (serverLoadProtection.isOverload(systemMetrics)) { log.warn("The current server is overload, cannot consumes commands."); MasterServerMetrics.incMasterOverload(); Thread.sleep(Constants.SLEEP_TIME_MILLIS); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/task/MasterHeartBeatTask.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/task/MasterHeartBeatTask.java index f8a9b30e28..e9b0970ed3 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/task/MasterHeartBeatTask.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/task/MasterHeartBeatTask.java @@ -24,8 +24,11 @@ import org.apache.dolphinscheduler.common.model.MasterHeartBeat; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; +import org.apache.dolphinscheduler.meter.metrics.MetricsProvider; +import org.apache.dolphinscheduler.meter.metrics.SystemMetrics; import org.apache.dolphinscheduler.registry.api.RegistryClient; import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.master.config.MasterServerLoadProtection; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; @@ -35,6 +38,8 @@ public class MasterHeartBeatTask extends BaseHeartBeatTask { private final MasterConfig masterConfig; + private final MetricsProvider metricsProvider; + private final RegistryClient registryClient; private final String heartBeatPath; @@ -42,9 +47,11 @@ public class MasterHeartBeatTask extends BaseHeartBeatTask { private final int processId; public MasterHeartBeatTask(@NonNull MasterConfig masterConfig, + @NonNull MetricsProvider metricsProvider, @NonNull RegistryClient registryClient) { - super("MasterHeartBeatTask", masterConfig.getHeartbeatInterval().toMillis()); + super("MasterHeartBeatTask", masterConfig.getMaxHeartbeatInterval().toMillis()); this.masterConfig = masterConfig; + this.metricsProvider = metricsProvider; this.registryClient = registryClient; this.heartBeatPath = masterConfig.getMasterRegistryPath(); this.processId = OSUtils.getProcessID(); @@ -52,16 +59,17 @@ public class MasterHeartBeatTask extends BaseHeartBeatTask { @Override public MasterHeartBeat getHeartBeat() { + SystemMetrics systemMetrics = metricsProvider.getSystemMetrics(); + ServerStatus serverStatus = getServerStatus(systemMetrics, masterConfig.getServerLoadProtection()); return MasterHeartBeat.builder() .startupTime(ServerLifeCycleManager.getServerStartupTime()) .reportTime(System.currentTimeMillis()) - .cpuUsage(OSUtils.cpuUsagePercentage()) - .availablePhysicalMemorySize(OSUtils.availablePhysicalMemorySize()) - .reservedMemory(masterConfig.getReservedMemory()) - .memoryUsage(OSUtils.memoryUsagePercentage()) - .diskAvailable(OSUtils.diskAvailable()) + .cpuUsage(systemMetrics.getTotalCpuUsedPercentage()) + .jvmMemoryUsage(systemMetrics.getJvmMemoryUsedPercentage()) + .memoryUsage(systemMetrics.getSystemMemoryUsedPercentage()) + .diskUsage(systemMetrics.getDiskUsedPercentage()) .processId(processId) - .serverStatus(getServerStatus()) + .serverStatus(serverStatus) .host(NetUtils.getHost()) .port(masterConfig.getListenPort()) .build(); @@ -75,9 +83,8 @@ public class MasterHeartBeatTask extends BaseHeartBeatTask { heartBeatPath, masterHeartBeatJson); } - private ServerStatus getServerStatus() { - return OSUtils.isOverload(masterConfig.getMaxCpuLoadAvg(), masterConfig.getReservedMemory()) - ? ServerStatus.ABNORMAL - : ServerStatus.NORMAL; + private ServerStatus getServerStatus(SystemMetrics systemMetrics, + MasterServerLoadProtection masterServerLoadProtection) { + return masterServerLoadProtection.isOverload(systemMetrics) ? ServerStatus.BUSY : ServerStatus.NORMAL; } } diff --git a/dolphinscheduler-master/src/main/resources/application.yaml b/dolphinscheduler-master/src/main/resources/application.yaml index cbf25079f3..ce7b1df7dd 100644 --- a/dolphinscheduler-master/src/main/resources/application.yaml +++ b/dolphinscheduler-master/src/main/resources/application.yaml @@ -113,16 +113,23 @@ master: # master host selector to select a suitable worker, default value: LowerWeight. Optional values include random, round_robin, lower_weight host-selector: lower_weight # master heartbeat interval - heartbeat-interval: 10s + max-heartbeat-interval: 10s # master commit task retry times task-commit-retry-times: 5 # master commit task interval task-commit-interval: 1s state-wheel-interval: 5s - # master max cpuload avg percentage, only higher than the system cpu load average, master server can schedule. default value 1: will use 100% cpu - max-cpu-load-avg: 1 - # master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, only the available memory is higher than 30%, master server can schedule. - reserved-memory: 0.3 + server-load-protection: + # If set true, will open master overload protection + enabled: true + # Master max cpu usage, when the master's cpu usage is smaller then this value, master server can execute workflow. + max-cpu-usage-percentage-thresholds: 0.7 + # Master max JVM memory usage , when the master's jvm memory usage is smaller then this value, master server can execute workflow. + max-jvm-memory-usage-percentage-thresholds: 0.7 + # Master max System memory usage , when the master's system memory usage is smaller then this value, master server can execute workflow. + max-system-memory-usage-percentage-thresholds: 0.7 + # Master max disk usage , when the master's disk usage is smaller then this value, master server can execute workflow. + max-disk-usage-percentage-thresholds: 0.7 # failover interval, the unit is minute failover-interval: 10m # kill yarn / k8s application when failover taskInstance, default true diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/config/MasterServerLoadProtectionTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/config/MasterServerLoadProtectionTest.java new file mode 100644 index 0000000000..90627f99d3 --- /dev/null +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/config/MasterServerLoadProtectionTest.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.config; + +import org.apache.dolphinscheduler.meter.metrics.SystemMetrics; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class MasterServerLoadProtectionTest { + + @Test + void isOverload() { + MasterServerLoadProtection masterServerLoadProtection = new MasterServerLoadProtection(); + SystemMetrics systemMetrics = SystemMetrics.builder() + .jvmMemoryUsedPercentage(0.71) + .systemMemoryUsedPercentage(0.71) + .totalCpuUsedPercentage(0.71) + .diskUsedPercentage(0.71) + .build(); + masterServerLoadProtection.setEnabled(false); + Assertions.assertFalse(masterServerLoadProtection.isOverload(systemMetrics)); + + masterServerLoadProtection.setEnabled(true); + Assertions.assertTrue(masterServerLoadProtection.isOverload(systemMetrics)); + } +} diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java index 8159603544..c8b4c71fc0 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java @@ -28,11 +28,11 @@ public class LowerWeightRoundRobinTest { @Test public void testSelect() { Collection sources = new ArrayList<>(); - sources.add(new HostWeight(HostWorker.of("192.158.2.1:11", 100, "default"), 0.06, 0.44, 3.84, 1, + sources.add(new HostWeight(HostWorker.of("192.158.2.1:11", 100, "default"), 0.06, 0.44, 0.384, 0.1, System.currentTimeMillis() - 60 * 8 * 1000)); - sources.add(new HostWeight(HostWorker.of("192.158.2.2:22", 100, "default"), 0.06, 0.56, 3.24, 2, + sources.add(new HostWeight(HostWorker.of("192.158.2.2:22", 100, "default"), 0.06, 0.56, 0.324, 0.2, System.currentTimeMillis() - 60 * 5 * 1000)); - sources.add(new HostWeight(HostWorker.of("192.158.2.3:33", 100, "default"), 0.06, 0.80, 3.15, 1, + sources.add(new HostWeight(HostWorker.of("192.158.2.3:33", 100, "default"), 0.06, 0.80, 0.315, 0.1, System.currentTimeMillis() - 60 * 2 * 1000)); LowerWeightRoundRobin roundRobin = new LowerWeightRoundRobin(); @@ -40,30 +40,27 @@ public class LowerWeightRoundRobinTest { result = roundRobin.select(sources); Assertions.assertEquals("192.158.2.1", result.getHost().getIp()); result = roundRobin.select(sources); - Assertions.assertEquals("192.158.2.1", result.getHost().getIp()); - Assertions.assertEquals("192.158.2.1", result.getHost().getIp()); + Assertions.assertEquals("192.158.2.2", result.getHost().getIp()); result = roundRobin.select(sources); Assertions.assertEquals("192.158.2.1", result.getHost().getIp()); result = roundRobin.select(sources); - Assertions.assertEquals("192.158.2.3", result.getHost().getIp()); - Assertions.assertEquals("192.158.2.3", result.getHost().getIp()); + Assertions.assertEquals("192.158.2.2", result.getHost().getIp()); result = roundRobin.select(sources); Assertions.assertEquals("192.158.2.1", result.getHost().getIp()); result = roundRobin.select(sources); - Assertions.assertEquals("192.158.2.1", result.getHost().getIp()); - Assertions.assertEquals("192.158.2.1", result.getHost().getIp()); + Assertions.assertEquals("192.158.2.2", result.getHost().getIp()); } @Test public void testWarmUpSelect() { Collection sources = new ArrayList<>(); - sources.add(new HostWeight(HostWorker.of("192.158.2.1:11", 100, "default"), 0.06, 0.44, 3.84, 0, + sources.add(new HostWeight(HostWorker.of("192.158.2.1:11", 100, "default"), 0.06, 0.44, 0.384, 0, System.currentTimeMillis() - 60 * 8 * 1000)); - sources.add(new HostWeight(HostWorker.of("192.158.2.2:22", 100, "default"), 0.06, 0.44, 3.84, 0, + sources.add(new HostWeight(HostWorker.of("192.158.2.2:22", 100, "default"), 0.06, 0.44, 0.384, 0, System.currentTimeMillis() - 60 * 5 * 1000)); - sources.add(new HostWeight(HostWorker.of("192.158.2.3:33", 100, "default"), 0.06, 0.44, 3.84, 0, + sources.add(new HostWeight(HostWorker.of("192.158.2.3:33", 100, "default"), 0.06, 0.44, 0.384, 0, System.currentTimeMillis() - 60 * 3 * 1000)); - sources.add(new HostWeight(HostWorker.of("192.158.2.4:33", 100, "default"), 0.06, 0.44, 3.84, 0, + sources.add(new HostWeight(HostWorker.of("192.158.2.4:33", 100, "default"), 0.06, 0.44, 0.384, 0, System.currentTimeMillis() - 60 * 11 * 1000)); LowerWeightRoundRobin roundRobin = new LowerWeightRoundRobin(); @@ -88,11 +85,11 @@ public class LowerWeightRoundRobinTest { result = roundRobin.doSelect(sources); Assertions.assertNull(result); - sources.add(new HostWeight(HostWorker.of("192.158.2.1:11", 100, "default"), 0.06, 0.44, 3.14, 1, + sources.add(new HostWeight(HostWorker.of("192.158.2.1:11", 100, "default"), 0.06, 0.44, 0.314, 0.1, System.currentTimeMillis() - 60 * 8 * 1000)); - sources.add(new HostWeight(HostWorker.of("192.158.2.2:22", 100, "default"), 0.06, 0.56, 3.24, 2, + sources.add(new HostWeight(HostWorker.of("192.158.2.2:22", 100, "default"), 0.06, 0.56, 0.324, 0.2, System.currentTimeMillis() - 60 * 5 * 1000)); - sources.add(new HostWeight(HostWorker.of("192.158.2.3:33", 100, "default"), 0.06, 0.80, 3.15, 1, + sources.add(new HostWeight(HostWorker.of("192.158.2.3:33", 100, "default"), 0.06, 0.80, 0.315, 0.1, System.currentTimeMillis() - 60 * 2 * 1000)); result = roundRobin.doSelect(sources); Assertions.assertEquals("192.158.2.1", result.getHost().getIp()); diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java index 269650741d..133ed4e4ff 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java @@ -20,6 +20,8 @@ package org.apache.dolphinscheduler.server.master.registry; import static org.mockito.BDDMockito.given; import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.ServerStatus; +import org.apache.dolphinscheduler.common.model.MasterHeartBeat; import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; @@ -65,8 +67,10 @@ public class MasterRegistryClientTest { private MasterConfig masterConfig; @BeforeEach - public void before() throws Exception { + public void before() { given(registryClient.getHostByEventDataPath(Mockito.anyString())).willReturn("127.0.0.1:8080"); + given(masterHeartBeatTask.getHeartBeat()) + .willReturn(MasterHeartBeat.builder().serverStatus(ServerStatus.NORMAL).build()); ReflectionTestUtils.setField(masterRegistryClient, "registryClient", registryClient); ReflectionTestUtils.setField(masterRegistryClient, "masterHeartBeatTask", masterHeartBeatTask); diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterSlotManagerTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterSlotManagerTest.java index 4ee75a0392..38ece75e46 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterSlotManagerTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterSlotManagerTest.java @@ -46,7 +46,7 @@ public class MasterSlotManagerTest { // on normal Master side Mockito.when(masterConfig.getMasterAddress()).thenReturn("127.0.0.1:7777"); - sendHeartBeat(ServerStatus.ABNORMAL, ServerStatus.NORMAL); + sendHeartBeat(ServerStatus.BUSY, ServerStatus.NORMAL); Assertions.assertEquals(1, masterSlotManager.getMasterSize()); Assertions.assertEquals(0, masterSlotManager.getSlot()); @@ -60,7 +60,7 @@ public class MasterSlotManagerTest { // on abnormal Master side Mockito.when(masterConfig.getMasterAddress()).thenReturn("127.0.0.1:6666"); - sendHeartBeat(ServerStatus.ABNORMAL, ServerStatus.NORMAL); + sendHeartBeat(ServerStatus.BUSY, ServerStatus.NORMAL); Assertions.assertEquals(0, masterSlotManager.getMasterSize()); Assertions.assertEquals(0, masterSlotManager.getSlot()); diff --git a/dolphinscheduler-meter/pom.xml b/dolphinscheduler-meter/pom.xml index 9fbb578f8c..af37719721 100644 --- a/dolphinscheduler-meter/pom.xml +++ b/dolphinscheduler-meter/pom.xml @@ -42,6 +42,12 @@ + + org.apache.dolphinscheduler + dolphinscheduler-common + dev-SNAPSHOT + + org.springframework.boot spring-boot-starter-actuator diff --git a/dolphinscheduler-meter/src/main/java/org/apache/dolphinscheduler/meter/metrics/DefaultMetricsProvider.java b/dolphinscheduler-meter/src/main/java/org/apache/dolphinscheduler/meter/metrics/DefaultMetricsProvider.java new file mode 100644 index 0000000000..0ce6ceb4a4 --- /dev/null +++ b/dolphinscheduler-meter/src/main/java/org/apache/dolphinscheduler/meter/metrics/DefaultMetricsProvider.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.meter.metrics; + +import org.apache.dolphinscheduler.common.utils.OSUtils; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import io.micrometer.core.instrument.MeterRegistry; + +@Component +public class DefaultMetricsProvider implements MetricsProvider { + + @Autowired + private MeterRegistry meterRegistry; + + private SystemMetrics systemMetrics; + + private long lastRefreshTime = 0; + + private static final long SYSTEM_METRICS_REFRESH_INTERVAL = 1_000L; + + @Override + public SystemMetrics getSystemMetrics() { + if (System.currentTimeMillis() - lastRefreshTime < SYSTEM_METRICS_REFRESH_INTERVAL) { + return systemMetrics; + } + + double systemCpuUsage = meterRegistry.get("system.cpu.usage").gauge().value(); + double processCpuUsage = meterRegistry.get("process.cpu.usage").gauge().value(); + + double jvmMemoryUsed = meterRegistry.get("jvm.memory.used").meter().measure().iterator().next().getValue(); + double jvmMemoryMax = meterRegistry.get("jvm.memory.max").meter().measure().iterator().next().getValue(); + + long totalSystemMemory = OSUtils.getTotalSystemMemory(); + long systemMemoryAvailable = OSUtils.getSystemAvailableMemoryUsed(); + + systemMetrics = SystemMetrics.builder() + .systemCpuUsagePercentage(systemCpuUsage) + .processCpuUsagePercentage(processCpuUsage) + .totalCpuUsedPercentage(systemCpuUsage + processCpuUsage) + .jvmMemoryUsed(jvmMemoryUsed) + .jvmMemoryMax(jvmMemoryMax) + .jvmMemoryUsedPercentage(jvmMemoryUsed / jvmMemoryMax) + .systemMemoryUsed(totalSystemMemory - systemMemoryAvailable) + .systemMemoryMax(totalSystemMemory) + .systemMemoryUsedPercentage((double) (totalSystemMemory - systemMemoryAvailable) / totalSystemMemory) + .build(); + lastRefreshTime = System.currentTimeMillis(); + return systemMetrics; + } + +} diff --git a/dolphinscheduler-meter/src/main/java/org/apache/dolphinscheduler/meter/metrics/MetricsProvider.java b/dolphinscheduler-meter/src/main/java/org/apache/dolphinscheduler/meter/metrics/MetricsProvider.java new file mode 100644 index 0000000000..5371d4daa9 --- /dev/null +++ b/dolphinscheduler-meter/src/main/java/org/apache/dolphinscheduler/meter/metrics/MetricsProvider.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.meter.metrics; + +public interface MetricsProvider { + + SystemMetrics getSystemMetrics(); + +} diff --git a/dolphinscheduler-meter/src/main/java/org/apache/dolphinscheduler/meter/metrics/SystemMetrics.java b/dolphinscheduler-meter/src/main/java/org/apache/dolphinscheduler/meter/metrics/SystemMetrics.java new file mode 100644 index 0000000000..dcffafb83d --- /dev/null +++ b/dolphinscheduler-meter/src/main/java/org/apache/dolphinscheduler/meter/metrics/SystemMetrics.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.meter.metrics; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class SystemMetrics { + + // CPU + private double systemCpuUsagePercentage; + private double processCpuUsagePercentage; + private double totalCpuUsedPercentage; + + // JVM-Memory + // todo: get pod memory usage + private double jvmMemoryUsed; + private double jvmMemoryMax; + private double jvmMemoryUsedPercentage; + + // System-Memory + // todo: get pod cpu usage + private double systemMemoryUsed; + private double systemMemoryMax; + private double systemMemoryUsedPercentage; + + // Disk + // todo: get pod disk usage + private double diskUsed; + private double diskTotal; + private double diskUsedPercentage; + +} diff --git a/dolphinscheduler-meter/src/main/resources/grafana-demo/docker-compose.yaml b/dolphinscheduler-meter/src/main/resources/grafana-demo/docker-compose.yaml index 185803ad4f..9a7dcc4f0a 100644 --- a/dolphinscheduler-meter/src/main/resources/grafana-demo/docker-compose.yaml +++ b/dolphinscheduler-meter/src/main/resources/grafana-demo/docker-compose.yaml @@ -33,7 +33,7 @@ services: networks: [ test ] ports: # due to the DolphinScheduler frontend port is 3000, so we change the grafana default port to 3001. - - "3001:3000" + - "3000:3000" environment: GF_AUTH_ANONYMOUS_ENABLED: "true" volumes: diff --git a/dolphinscheduler-standalone-server/src/main/resources/application.yaml b/dolphinscheduler-standalone-server/src/main/resources/application.yaml index 363943587b..8e58b89568 100644 --- a/dolphinscheduler-standalone-server/src/main/resources/application.yaml +++ b/dolphinscheduler-standalone-server/src/main/resources/application.yaml @@ -182,16 +182,22 @@ master: # master host selector to select a suitable worker, default value: LowerWeight. Optional values include random, round_robin, lower_weight host-selector: lower_weight # master heartbeat interval - heartbeat-interval: 10s + max-heartbeat-interval: 10s # master commit task retry times task-commit-retry-times: 5 # master commit task interval task-commit-interval: 1s state-wheel-interval: 5s - # master max cpuload avg percentage, only higher than the system cpu load average, master server can schedule. default value 1: will use 100% cpu - max-cpu-load-avg: 1 - # master reserved memory, only lower than system available memory, master server can schedule. default value 0.1, only the available memory is higher than 10%, master server can schedule. - reserved-memory: 0.1 + server-load-protection: + enabled: true + # Master max cpu usage, when the master's cpu usage is smaller then this value, master server can execute workflow. + max-cpu-usage-percentage-thresholds: 0.9 + # Master max JVM memory usage , when the master's jvm memory usage is smaller then this value, master server can execute workflow. + max-jvm-memory-usage-percentage-thresholds: 0.9 + # Master max System memory usage , when the master's system memory usage is smaller then this value, master server can execute workflow. + max-system-memory-usage-percentage-thresholds: 0.9 + # Master max disk usage , when the master's disk usage is smaller then this value, master server can execute workflow. + max-disk-usage-percentage-thresholds: 0.9 # failover interval failover-interval: 10m # kill yarn/k8s application when failover taskInstance, default true @@ -204,13 +210,19 @@ worker: # worker execute thread number to limit task instances in parallel exec-threads: 10 # worker heartbeat interval - heartbeat-interval: 10s + max-heartbeat-interval: 10s # worker host weight to dispatch tasks, default value 100 host-weight: 100 - # worker max cpuload avg, only higher than the system cpu load average, worker server can be dispatched tasks. default value 1: will use 100% cpu. - max-cpu-load-avg: 1 - # worker reserved memory, only lower than system available memory, worker server can be dispatched tasks. default value 0.1, only the available memory is higher than 10%, worker server can receive task. - reserved-memory: 0.1 + server-load-protection: + enabled: true + # Worker max cpu usage, when the worker's cpu usage is smaller then this value, worker server can be dispatched tasks. + max-cpu-usage-percentage-thresholds: 0.9 + # Worker max JVM memory usage , when the worker's jvm memory usage is smaller then this value, worker server can be dispatched tasks. + max-jvm-memory-usage-percentage-thresholds: 0.9 + # Worker max System memory usage , when the worker's system memory usage is smaller then this value, worker server can be dispatched tasks. + max-system-memory-usage-percentage-thresholds: 0.9 + # Worker max disk usage , when the worker's disk usage is smaller then this value, worker server can be dispatched tasks. + max-disk-usage-percentage-thresholds: 0.9 task-execute-threads-full-policy: REJECT tenant-config: # tenant corresponds to the user of the system, which is used by the worker to submit the job. If system does not have this user, it will be automatically created after the parameter worker.tenant.auto.create is true. @@ -225,7 +237,7 @@ alert: # Mark each alert of alert server if late after x milliseconds as failed. # Define value is (0 = infinite), and alert server would be waiting alert result. wait-timeout: 0 - heartbeat-interval: 60s + max-heartbeat-interval: 60s query_alert_threshold: 100 api: diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index e8ae5381fd..5af0c2617e 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -21,11 +21,14 @@ import org.apache.dolphinscheduler.common.IStoppable; import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager; import org.apache.dolphinscheduler.common.thread.ThreadUtils; +import org.apache.dolphinscheduler.meter.metrics.MetricsProvider; +import org.apache.dolphinscheduler.meter.metrics.SystemMetrics; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager; import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils; import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner; +import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics; import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient; import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcServer; import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutor; @@ -63,6 +66,9 @@ public class WorkerServer implements IStoppable { @Autowired private MessageRetryRunner messageRetryRunner; + @Autowired + private MetricsProvider metricsProvider; + /** * worker server startup, not use web service * @@ -83,6 +89,19 @@ public class WorkerServer implements IStoppable { this.messageRetryRunner.start(); + WorkerServerMetrics.registerWorkerCpuUsageGauge(() -> { + SystemMetrics systemMetrics = metricsProvider.getSystemMetrics(); + return systemMetrics.getTotalCpuUsedPercentage(); + }); + WorkerServerMetrics.registerWorkerMemoryAvailableGauge(() -> { + SystemMetrics systemMetrics = metricsProvider.getSystemMetrics(); + return (systemMetrics.getSystemMemoryMax() - systemMetrics.getSystemMemoryUsed()) / 1024.0 / 1024 / 1024; + }); + WorkerServerMetrics.registerWorkerMemoryUsageGauge(() -> { + SystemMetrics systemMetrics = metricsProvider.getSystemMetrics(); + return systemMetrics.getJvmMemoryUsedPercentage(); + }); + /* * registry hooks, which are called before the process exits */ diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java index db85d50244..d901effbd1 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java @@ -43,10 +43,9 @@ public class WorkerConfig implements Validator { private int listenPort = 1234; private int execThreads = 10; - private Duration heartbeatInterval = Duration.ofSeconds(10); + private Duration maxHeartbeatInterval = Duration.ofSeconds(10); private int hostWeight = 100; - private int maxCpuLoadAvg = -1; - private double reservedMemory = 0.1; + private WorkerServerLoadProtection serverLoadProtection = new WorkerServerLoadProtection(); private ConnectStrategyProperties registryDisconnectStrategy = new ConnectStrategyProperties(); /** @@ -70,11 +69,8 @@ public class WorkerConfig implements Validator { if (workerConfig.getExecThreads() <= 0) { errors.rejectValue("exec-threads", null, "should be a positive value"); } - if (workerConfig.getHeartbeatInterval().getSeconds() <= 0) { - errors.rejectValue("heartbeat-interval", null, "shoule be a valid duration"); - } - if (workerConfig.getMaxCpuLoadAvg() <= 0) { - workerConfig.setMaxCpuLoadAvg(Runtime.getRuntime().availableProcessors() * 2); + if (workerConfig.getMaxHeartbeatInterval().getSeconds() <= 0) { + errors.rejectValue("max-heartbeat-interval", null, "shoule be a valid duration"); } if (StringUtils.isEmpty(workerConfig.getWorkerAddress())) { workerConfig.setWorkerAddress(NetUtils.getAddr(workerConfig.getListenPort())); @@ -90,11 +86,10 @@ public class WorkerConfig implements Validator { "\n****************************Worker Configuration**************************************" + "\n listen-port -> " + listenPort + "\n exec-threads -> " + execThreads + - "\n heartbeat-interval -> " + heartbeatInterval + + "\n max-heartbeat-interval -> " + maxHeartbeatInterval + "\n host-weight -> " + hostWeight + "\n tenantConfig -> " + tenantConfig + - "\n max-cpu-load-avg -> " + maxCpuLoadAvg + - "\n reserved-memory -> " + reservedMemory + + "\n server-load-protection -> " + serverLoadProtection + "\n registry-disconnect-strategy -> " + registryDisconnectStrategy + "\n task-execute-threads-full-policy: " + taskExecuteThreadsFullPolicy + "\n address -> " + workerAddress + diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerServerLoadProtection.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerServerLoadProtection.java new file mode 100644 index 0000000000..6e68a71bf5 --- /dev/null +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerServerLoadProtection.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.config; + +import org.apache.dolphinscheduler.meter.metrics.SystemMetrics; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Data +@Slf4j +@NoArgsConstructor +@AllArgsConstructor +public class WorkerServerLoadProtection { + + private boolean enabled = true; + + private double maxCpuUsagePercentageThresholds = 0.7; + + private double maxJVMMemoryUsagePercentageThresholds = 0.7; + + private double maxSystemMemoryUsagePercentageThresholds = 0.7; + + private double maxDiskUsagePercentageThresholds = 0.7; + + public boolean isOverload(SystemMetrics systemMetrics) { + if (!enabled) { + return false; + } + if (systemMetrics.getTotalCpuUsedPercentage() > maxCpuUsagePercentageThresholds) { + log.info( + "Worker OverLoad: the TotalCpuUsedPercentage: {} is over then the MaxCpuUsagePercentageThresholds {}", + systemMetrics.getTotalCpuUsedPercentage(), maxCpuUsagePercentageThresholds); + return true; + } + if (systemMetrics.getJvmMemoryUsedPercentage() > maxJVMMemoryUsagePercentageThresholds) { + log.info( + "Worker OverLoad: the JvmMemoryUsedPercentage: {} is over then the maxCpuUsagePercentageThresholds {}", + systemMetrics.getJvmMemoryUsedPercentage(), maxJVMMemoryUsagePercentageThresholds); + return true; + } + if (systemMetrics.getDiskUsedPercentage() > maxDiskUsagePercentageThresholds) { + log.info("Worker OverLoad: the DiskUsedPercentage: {} is over then the MaxCpuUsagePercentageThresholds {}", + systemMetrics.getDiskUsedPercentage(), maxDiskUsagePercentageThresholds); + return true; + } + if (systemMetrics.getSystemMemoryUsedPercentage() > maxSystemMemoryUsagePercentageThresholds) { + log.info( + "Worker OverLoad: the SystemMemoryUsedPercentage: {} is over then the MaxSystemMemoryUsagePercentageThresholds {}", + systemMetrics.getSystemMemoryUsedPercentage(), maxSystemMemoryUsagePercentageThresholds); + return true; + } + return false; + } + +} diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java index b383af752d..8c04aa752c 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java @@ -21,11 +21,13 @@ import static org.apache.dolphinscheduler.common.constants.Constants.SLEEP_TIME_ import org.apache.dolphinscheduler.common.IStoppable; import org.apache.dolphinscheduler.common.constants.Constants; +import org.apache.dolphinscheduler.common.enums.ServerStatus; import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.model.WorkerHeartBeat; import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.extract.base.utils.Host; +import org.apache.dolphinscheduler.meter.metrics.MetricsProvider; import org.apache.dolphinscheduler.registry.api.RegistryClient; import org.apache.dolphinscheduler.registry.api.RegistryException; import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; @@ -55,7 +57,7 @@ public class WorkerRegistryClient implements AutoCloseable { private WorkerConfig workerConfig; @Autowired - private WorkerTaskExecutorThreadPool workerManagerThread; + private WorkerTaskExecutorThreadPool workerTaskExecutorThreadPool; @Autowired private RegistryClient registryClient; @@ -64,14 +66,18 @@ public class WorkerRegistryClient implements AutoCloseable { @Lazy private WorkerConnectStrategy workerConnectStrategy; + @Autowired + private MetricsProvider metricsProvider; + private WorkerHeartBeatTask workerHeartBeatTask; @PostConstruct public void initWorkRegistry() { this.workerHeartBeatTask = new WorkerHeartBeatTask( workerConfig, + metricsProvider, registryClient, - () -> workerManagerThread.getWaitingTaskExecutorSize()); + workerTaskExecutorThreadPool); } public void start() { @@ -84,11 +90,13 @@ public class WorkerRegistryClient implements AutoCloseable { } } - /** - * registry - */ - private void registry() { + private void registry() throws InterruptedException { WorkerHeartBeat workerHeartBeat = workerHeartBeatTask.getHeartBeat(); + while (ServerStatus.BUSY.equals(workerHeartBeat.getServerStatus())) { + log.warn("Worker node is BUSY: {}", workerHeartBeat); + workerHeartBeat = workerHeartBeatTask.getHeartBeat(); + Thread.sleep(SLEEP_TIME_MILLIS); + } String workerZKPath = workerConfig.getWorkerRegistryPath(); // remove before persist registryClient.remove(workerZKPath); diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPool.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPool.java index 28588c0dbf..0e4bb98080 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPool.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPool.java @@ -18,7 +18,6 @@ package org.apache.dolphinscheduler.server.worker.runner; import org.apache.dolphinscheduler.common.thread.ThreadUtils; -import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.server.worker.config.TaskExecuteThreadsFullPolicy; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics; @@ -42,12 +41,8 @@ public class WorkerTaskExecutorThreadPool { ThreadUtils.newDaemonFixedThreadExecutor("WorkerTaskExecutorThreadPool", workerConfig.getExecThreads()); this.workerConfig = workerConfig; - WorkerServerMetrics.registerWorkerCpuUsageGauge(OSUtils::cpuUsagePercentage); - WorkerServerMetrics.registerWorkerMemoryAvailableGauge(OSUtils::availablePhysicalMemorySize); - WorkerServerMetrics.registerWorkerMemoryUsageGauge(OSUtils::memoryUsagePercentage); - WorkerServerMetrics.registerWorkerExecuteQueueSizeGauge( - () -> threadPoolExecutor.getQueue().size() - threadPoolExecutor.getActiveCount()); - WorkerServerMetrics.registerWorkerActiveExecuteThreadGauge(threadPoolExecutor::getActiveCount); + WorkerServerMetrics.registerWorkerExecuteQueueSizeGauge(this::getWaitingTaskExecutorSize); + WorkerServerMetrics.registerWorkerActiveExecuteThreadGauge(this::getRunningTaskExecutorSize); } public boolean submitWorkerTaskExecutor(WorkerTaskExecutor workerTaskExecutor) { diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java index 1294163e1e..7a4535c79f 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java @@ -24,10 +24,12 @@ import org.apache.dolphinscheduler.common.model.WorkerHeartBeat; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; +import org.apache.dolphinscheduler.meter.metrics.MetricsProvider; +import org.apache.dolphinscheduler.meter.metrics.SystemMetrics; import org.apache.dolphinscheduler.registry.api.RegistryClient; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; - -import java.util.function.Supplier; +import org.apache.dolphinscheduler.server.worker.config.WorkerServerLoadProtection; +import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThreadPool; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; @@ -38,43 +40,38 @@ public class WorkerHeartBeatTask extends BaseHeartBeatTask { private final WorkerConfig workerConfig; private final RegistryClient registryClient; - private final Supplier workerWaitingTaskCount; + private final MetricsProvider metricsProvider; + private final WorkerTaskExecutorThreadPool workerTaskExecutorThreadPool; private final int processId; public WorkerHeartBeatTask(@NonNull WorkerConfig workerConfig, + @NonNull MetricsProvider metricsProvider, @NonNull RegistryClient registryClient, - @NonNull Supplier workerWaitingTaskCount) { - super("WorkerHeartBeatTask", workerConfig.getHeartbeatInterval().toMillis()); + @NonNull WorkerTaskExecutorThreadPool workerTaskExecutorThreadPool) { + super("WorkerHeartBeatTask", workerConfig.getMaxHeartbeatInterval().toMillis()); + this.metricsProvider = metricsProvider; this.workerConfig = workerConfig; this.registryClient = registryClient; - this.workerWaitingTaskCount = workerWaitingTaskCount; + this.workerTaskExecutorThreadPool = workerTaskExecutorThreadPool; this.processId = OSUtils.getProcessID(); } @Override public WorkerHeartBeat getHeartBeat() { - double cpuUsagePercentage = OSUtils.cpuUsagePercentage(); - int maxCpuUsePercentage = workerConfig.getMaxCpuLoadAvg(); - double reservedMemory = workerConfig.getReservedMemory(); - double memoryUsagePercentage = OSUtils.memoryUsagePercentage(); - int execThreads = workerConfig.getExecThreads(); - ServerStatus serverStatus = - getServerStatus(cpuUsagePercentage, maxCpuUsePercentage, memoryUsagePercentage, reservedMemory, - execThreads, this.workerWaitingTaskCount.get()); + SystemMetrics systemMetrics = metricsProvider.getSystemMetrics(); + ServerStatus serverStatus = getServerStatus(systemMetrics, workerConfig, workerTaskExecutorThreadPool); return WorkerHeartBeat.builder() .startupTime(ServerLifeCycleManager.getServerStartupTime()) .reportTime(System.currentTimeMillis()) - .cpuUsage(cpuUsagePercentage) - .availablePhysicalMemorySize(OSUtils.availablePhysicalMemorySize()) - .memoryUsage(OSUtils.memoryUsagePercentage()) - .reservedMemory(reservedMemory) - .diskAvailable(OSUtils.diskAvailable()) + .cpuUsage(systemMetrics.getTotalCpuUsedPercentage()) + .jvmMemoryUsage(systemMetrics.getJvmMemoryUsedPercentage()) + .memoryUsage(systemMetrics.getSystemMemoryUsedPercentage()) .processId(processId) .workerHostWeight(workerConfig.getHostWeight()) - .workerWaitingTaskCount(this.workerWaitingTaskCount.get()) - .workerExecThreadCount(workerConfig.getExecThreads()) + .threadPoolUsage(workerTaskExecutorThreadPool.getRunningTaskExecutorSize() + + workerTaskExecutorThreadPool.getWaitingTaskExecutorSize()) .serverStatus(serverStatus) .host(NetUtils.getHost()) .port(workerConfig.getListenPort()) @@ -91,23 +88,13 @@ public class WorkerHeartBeatTask extends BaseHeartBeatTask { workerRegistryPath, workerHeartBeatJson); } - private ServerStatus getServerStatus(double cpuUsagePercentage, - double maxCpuUsePercentage, - double memoryUsagePercentage, - double reservedMemory, - int workerExecThreadCount, - int workerWaitingTaskCount) { - if (cpuUsagePercentage > maxCpuUsePercentage || (1 - memoryUsagePercentage) < reservedMemory) { - log.warn( - "current cpu load average {} is higher than {} or available memory {} is lower than {}", - cpuUsagePercentage, maxCpuUsePercentage, 1 - memoryUsagePercentage, reservedMemory); - return ServerStatus.ABNORMAL; - } else if (workerWaitingTaskCount > workerExecThreadCount) { - log.warn("current waiting task count {} is large than worker thread count {}, worker is busy", - workerWaitingTaskCount, workerExecThreadCount); + private ServerStatus getServerStatus(SystemMetrics systemMetrics, + WorkerConfig workerConfig, + WorkerTaskExecutorThreadPool workerTaskExecutorThreadPool) { + if (workerTaskExecutorThreadPool.isOverload()) { return ServerStatus.BUSY; - } else { - return ServerStatus.NORMAL; } + WorkerServerLoadProtection serverLoadProtection = workerConfig.getServerLoadProtection(); + return serverLoadProtection.isOverload(systemMetrics) ? ServerStatus.BUSY : ServerStatus.NORMAL; } } diff --git a/dolphinscheduler-worker/src/main/resources/application.yaml b/dolphinscheduler-worker/src/main/resources/application.yaml index 50a3f19917..ad0535ac76 100644 --- a/dolphinscheduler-worker/src/main/resources/application.yaml +++ b/dolphinscheduler-worker/src/main/resources/application.yaml @@ -44,13 +44,20 @@ worker: # worker execute thread number to limit task instances in parallel exec-threads: 100 # worker heartbeat interval - heartbeat-interval: 10s + max-heartbeat-interval: 10s # worker host weight to dispatch tasks, default value 100 host-weight: 100 - # worker max cpuload avg, only higher than the system cpu load average, worker server can be dispatched tasks. default value 1: will use 100% cpu. - max-cpu-load-avg: 1 - # worker reserved memory, only lower than system available memory, worker server can be dispatched tasks. default value 0.3, only the available memory is higher than 30%, worker server can receive task. - reserved-memory: 0.3 + server-load-protection: + # If set true, will open worker overload protection + enabled: true + # Worker max cpu usage, when the worker's cpu usage is smaller then this value, worker server can be dispatched tasks. + max-cpu-usage-percentage-thresholds: 0.7 + # Worker max jvm memory usage , when the worker's jvm memory usage is smaller then this value, worker server can be dispatched tasks. + max-jvm-memory-usage-percentage-thresholds: 0.7 + # Worker max System memory usage , when the master's system memory usage is smaller then this value, master server can execute workflow. + max-system-memory-usage-percentage-thresholds: 0.7 + # Worker max disk usage , when the worker's disk usage is smaller then this value, worker server can be dispatched tasks. + max-disk-usage-percentage-thresholds: 0.7 registry-disconnect-strategy: # The disconnect strategy: stop, waiting strategy: waiting diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/config/WorkerServerLoadProtectionTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/config/WorkerServerLoadProtectionTest.java new file mode 100644 index 0000000000..696e9c2478 --- /dev/null +++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/config/WorkerServerLoadProtectionTest.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.config; + +import org.apache.dolphinscheduler.meter.metrics.SystemMetrics; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class WorkerServerLoadProtectionTest { + + @Test + void isOverload() { + WorkerServerLoadProtection workerServerLoadProtection = new WorkerServerLoadProtection(); + SystemMetrics systemMetrics = SystemMetrics.builder() + .jvmMemoryUsedPercentage(0.71) + .systemMemoryUsedPercentage(0.71) + .totalCpuUsedPercentage(0.71) + .diskUsedPercentage(0.71) + .build(); + workerServerLoadProtection.setEnabled(false); + Assertions.assertFalse(workerServerLoadProtection.isOverload(systemMetrics)); + + workerServerLoadProtection.setEnabled(true); + Assertions.assertTrue(workerServerLoadProtection.isOverload(systemMetrics)); + } +} diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java index e6043486f5..f44c759420 100644 --- a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java +++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java @@ -20,14 +20,15 @@ package org.apache.dolphinscheduler.server.worker.registry; import static org.mockito.BDDMockito.given; import org.apache.dolphinscheduler.common.utils.NetUtils; +import org.apache.dolphinscheduler.meter.metrics.MetricsProvider; +import org.apache.dolphinscheduler.meter.metrics.SystemMetrics; import org.apache.dolphinscheduler.registry.api.RegistryClient; import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; +import org.apache.dolphinscheduler.server.worker.config.WorkerServerLoadProtection; import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThreadPool; import java.time.Duration; -import java.util.Set; -import java.util.concurrent.ScheduledExecutorService; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -36,10 +37,6 @@ import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Sets; /** * worker registry test @@ -47,10 +44,6 @@ import com.google.common.collect.Sets; @ExtendWith(MockitoExtension.class) public class WorkerRegistryClientTest { - private static final Logger log = LoggerFactory.getLogger(WorkerRegistryClientTest.class); - - private static final String TEST_WORKER_GROUP = "test"; - @InjectMocks private WorkerRegistryClient workerRegistryClient; @@ -61,10 +54,7 @@ public class WorkerRegistryClientTest { private WorkerConfig workerConfig; @Mock - private Set workerGroups = Sets.newHashSet("127.0.0.1"); - - @Mock - private ScheduledExecutorService heartBeatExecutor; + private MetricsProvider metricsProvider; @Mock private WorkerTaskExecutorThreadPool workerManagerThread; @@ -72,17 +62,13 @@ public class WorkerRegistryClientTest { @Mock private WorkerConnectStrategy workerConnectStrategy; - // private static final Set workerGroups; - - static { - // workerGroups = Sets.newHashSet(DEFAULT_WORKER_GROUP, TEST_WORKER_GROUP); - } - @Test public void testStart() { given(workerConfig.getWorkerAddress()).willReturn(NetUtils.getAddr(1234)); - given(workerConfig.getHeartbeatInterval()).willReturn(Duration.ofSeconds(1)); + given(workerConfig.getMaxHeartbeatInterval()).willReturn(Duration.ofSeconds(1)); + given(workerConfig.getServerLoadProtection()).willReturn(new WorkerServerLoadProtection()); + given(metricsProvider.getSystemMetrics()).willReturn(new SystemMetrics()); given(registryClient.checkNodeExists(Mockito.anyString(), Mockito.any(RegistryNodeType.class))) .willReturn(true);