Browse Source

Enable set ServerLoadProtection fot Master/Worker (#15439)

3.2.1-prepare
Wenjun Ruan 11 months ago committed by GitHub
parent
commit
7b9c9e0eb2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 22
      deploy/kubernetes/dolphinscheduler/README.md
  2. 45
      deploy/kubernetes/dolphinscheduler/values.yaml
  3. 74
      docs/docs/en/architecture/configuration.md
  4. 4
      docs/docs/zh/architecture/configuration.md
  5. 8
      dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/config/AlertConfig.java
  6. 17
      dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/registry/AlertHeartbeatTask.java
  7. 6
      dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/registry/AlertRegistryClient.java
  8. 2
      dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/resources/application.yaml
  9. 54
      dolphinscheduler-api-test/dolphinscheduler-api-test-case/src/test/java/org/apache/dolphinscheduler/api/test/cases/ProcessInstanceAPITest.java
  10. 3
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ServerStatus.java
  11. 11
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/AlertServerHeartBeat.java
  12. 18
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/BaseHeartBeatTask.java
  13. 4
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/HeartBeat.java
  14. 12
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java
  15. 14
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerHeartBeat.java
  16. 134
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java
  17. 59
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/OSUtilsTest.java
  18. 25
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
  19. 20
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
  20. 73
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterServerLoadProtection.java
  21. 18
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
  22. 73
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java
  23. 31
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java
  24. 21
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/MasterServerMetrics.java
  25. 10
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectionStateListener.java
  26. 18
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
  27. 7
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterSlotManager.java
  28. 16
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
  29. 29
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/task/MasterHeartBeatTask.java
  30. 17
      dolphinscheduler-master/src/main/resources/application.yaml
  31. 42
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/config/MasterServerLoadProtectionTest.java
  32. 29
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java
  33. 6
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
  34. 4
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterSlotManagerTest.java
  35. 6
      dolphinscheduler-meter/pom.xml
  36. 69
      dolphinscheduler-meter/src/main/java/org/apache/dolphinscheduler/meter/metrics/DefaultMetricsProvider.java
  37. 24
      dolphinscheduler-meter/src/main/java/org/apache/dolphinscheduler/meter/metrics/MetricsProvider.java
  38. 54
      dolphinscheduler-meter/src/main/java/org/apache/dolphinscheduler/meter/metrics/SystemMetrics.java
  39. 2
      dolphinscheduler-meter/src/main/resources/grafana-demo/docker-compose.yaml
  40. 34
      dolphinscheduler-standalone-server/src/main/resources/application.yaml
  41. 19
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
  42. 17
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
  43. 73
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerServerLoadProtection.java
  44. 20
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
  45. 9
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPool.java
  46. 61
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
  47. 17
      dolphinscheduler-worker/src/main/resources/application.yaml
  48. 42
      dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/config/WorkerServerLoadProtectionTest.java
  49. 28
      dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java

22
deploy/kubernetes/dolphinscheduler/README.md

@ -196,11 +196,14 @@ Please refer to the [Quick Start in Kubernetes](../../../docs/docs/en/guide/inst
| master.env.MASTER_EXEC_THREADS | string | `"100"` | Master execute thread number to limit process instances | | master.env.MASTER_EXEC_THREADS | string | `"100"` | Master execute thread number to limit process instances |
| master.env.MASTER_FAILOVER_INTERVAL | string | `"10m"` | Master failover interval, the unit is minute | | master.env.MASTER_FAILOVER_INTERVAL | string | `"10m"` | Master failover interval, the unit is minute |
| master.env.MASTER_HEARTBEAT_ERROR_THRESHOLD | string | `"5"` | Master heartbeat error threshold | | master.env.MASTER_HEARTBEAT_ERROR_THRESHOLD | string | `"5"` | Master heartbeat error threshold |
| master.env.MASTER_HEARTBEAT_INTERVAL | string | `"10s"` | Master heartbeat interval, the unit is second |
| master.env.MASTER_HOST_SELECTOR | string | `"LowerWeight"` | Master host selector to select a suitable worker, optional values include Random, RoundRobin, LowerWeight | | master.env.MASTER_HOST_SELECTOR | string | `"LowerWeight"` | Master host selector to select a suitable worker, optional values include Random, RoundRobin, LowerWeight |
| master.env.MASTER_KILL_APPLICATION_WHEN_HANDLE_FAILOVER | string | `"true"` | Master kill application when handle failover | | master.env.MASTER_KILL_APPLICATION_WHEN_HANDLE_FAILOVER | string | `"true"` | Master kill application when handle failover |
| master.env.MASTER_MAX_CPU_LOAD_AVG | string | `"1"` | Master max cpuload avg, only higher than the system cpu load average, master server can schedule | | master.env.MASTER_MAX_HEARTBEAT_INTERVAL | string | `"10s"` | Master max heartbeat interval |
| master.env.MASTER_RESERVED_MEMORY | string | `"0.3"` | Master reserved memory, only lower than system available memory, master server can schedule, the unit is G | | master.env.MASTER_SERVER_LOAD_PROTECTION_ENABLED | bool | `false` | If set true, will open master overload protection |
| master.env.MASTER_SERVER_LOAD_PROTECTION_MAX_CPU_USAGE_PERCENTAGE_THRESHOLDS | float | `0.7` | Master max cpu usage, when the master's cpu usage is smaller then this value, master server can execute workflow. |
| master.env.MASTER_SERVER_LOAD_PROTECTION_MAX_DISK_USAGE_PERCENTAGE_THRESHOLDS | float | `0.7` | Master max disk usage , when the master's disk usage is smaller then this value, master server can execute workflow. |
| master.env.MASTER_SERVER_LOAD_PROTECTION_MAX_JVM_MEMORY_USAGE_PERCENTAGE_THRESHOLDS | float | `0.7` | Master max JVM memory usage , when the master's jvm memory usage is smaller then this value, master server can execute workflow. |
| master.env.MASTER_SERVER_LOAD_PROTECTION_MAX_SYSTEM_MEMORY_USAGE_PERCENTAGE_THRESHOLDS | float | `0.7` | Master max System memory usage , when the master's system memory usage is smaller then this value, master server can execute workflow. |
| master.env.MASTER_STATE_WHEEL_INTERVAL | string | `"5s"` | master state wheel interval, the unit is second | | master.env.MASTER_STATE_WHEEL_INTERVAL | string | `"5s"` | master state wheel interval, the unit is second |
| master.env.MASTER_TASK_COMMIT_INTERVAL | string | `"1s"` | master commit task interval, the unit is second | | master.env.MASTER_TASK_COMMIT_INTERVAL | string | `"1s"` | master commit task interval, the unit is second |
| master.env.MASTER_TASK_COMMIT_RETRYTIMES | string | `"5"` | Master commit task retry times | | master.env.MASTER_TASK_COMMIT_RETRYTIMES | string | `"5"` | Master commit task retry times |
@ -293,12 +296,17 @@ Please refer to the [Quick Start in Kubernetes](../../../docs/docs/en/guide/inst
| worker.affinity | object | `{}` | Affinity is a group of affinity scheduling rules. If specified, the pod's scheduling constraints. More info: [node-affinity](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/#node-affinity) | | worker.affinity | object | `{}` | Affinity is a group of affinity scheduling rules. If specified, the pod's scheduling constraints. More info: [node-affinity](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/#node-affinity) |
| worker.annotations | object | `{}` | You can use annotations to attach arbitrary non-identifying metadata to objects. Clients such as tools and libraries can retrieve this metadata. | | worker.annotations | object | `{}` | You can use annotations to attach arbitrary non-identifying metadata to objects. Clients such as tools and libraries can retrieve this metadata. |
| worker.enabled | bool | `true` | Enable or disable the Worker component | | worker.enabled | bool | `true` | Enable or disable the Worker component |
| worker.env.DEFAULT_TENANT_ENABLED | bool | `false` | If set true, will use worker bootstrap user as the tenant to execute task when the tenant is `default`; |
| worker.env.WORKER_EXEC_THREADS | string | `"100"` | Worker execute thread number to limit task instances | | worker.env.WORKER_EXEC_THREADS | string | `"100"` | Worker execute thread number to limit task instances |
| worker.env.WORKER_HEARTBEAT_INTERVAL | string | `"10s"` | Worker heartbeat interval, the unit is second |
| worker.env.WORKER_HEART_ERROR_THRESHOLD | string | `"5"` | Worker heartbeat error threshold |
| worker.env.WORKER_HOST_WEIGHT | string | `"100"` | Worker host weight to dispatch tasks | | worker.env.WORKER_HOST_WEIGHT | string | `"100"` | Worker host weight to dispatch tasks |
| worker.env.WORKER_MAX_CPU_LOAD_AVG | string | `"1"` | Worker max cpu load avg, only higher than the system cpu load average, worker server can be dispatched tasks | | worker.env.WORKER_MAX_HEARTBEAT_INTERVAL | string | `"10s"` | Worker heartbeat interval |
| worker.env.WORKER_RESERVED_MEMORY | string | `"0.3"` | Worker reserved memory, only lower than system available memory, worker server can be dispatched tasks, the unit is G | | worker.env.WORKER_SERVER_LOAD_PROTECTION_ENABLED | bool | `false` | If set true, will open worker overload protection |
| worker.env.WORKER_SERVER_LOAD_PROTECTION_MAX_CPU_USAGE_PERCENTAGE_THRESHOLDS | float | `0.7` | Worker max cpu usage, when the worker's cpu usage is smaller then this value, worker server can be dispatched tasks. |
| worker.env.WORKER_SERVER_LOAD_PROTECTION_MAX_DISK_USAGE_PERCENTAGE_THRESHOLDS | float | `0.7` | Worker max disk usage , when the worker's disk usage is smaller then this value, worker server can be dispatched tasks. |
| worker.env.WORKER_SERVER_LOAD_PROTECTION_MAX_JVM_MEMORY_USAGE_PERCENTAGE_THRESHOLDS | float | `0.7` | Worker max jvm memory usage , when the worker's jvm memory usage is smaller then this value, worker server can be dispatched tasks. |
| worker.env.WORKER_SERVER_LOAD_PROTECTION_MAX_SYSTEM_MEMORY_USAGE_PERCENTAGE_THRESHOLDS | float | `0.7` | Worker max memory usage , when the worker's memory usage is smaller then this value, worker server can be dispatched tasks. |
| worker.env.WORKER_TENANT_CONFIG_AUTO_CREATE_TENANT_ENABLED | bool | `true` | tenant corresponds to the user of the system, which is used by the worker to submit the job. If system does not have this user, it will be automatically created after the parameter worker.tenant.auto.create is true. |
| worker.env.WORKER_TENANT_CONFIG_DISTRIBUTED_TENANT | bool | `false` | Scenes to be used for distributed users. For example, users created by FreeIpa are stored in LDAP. This parameter only applies to Linux, When this parameter is true, worker.tenant.auto.create has no effect and will not automatically create tenants. |
| worker.keda.advanced | object | `{}` | Specify HPA related options | | worker.keda.advanced | object | `{}` | Specify HPA related options |
| worker.keda.cooldownPeriod | int | `30` | How many seconds KEDA will wait before scaling to zero. Note that HPA has a separate cooldown period for scale-downs | | worker.keda.cooldownPeriod | int | `30` | How many seconds KEDA will wait before scaling to zero. Note that HPA has a separate cooldown period for scale-downs |
| worker.keda.enabled | bool | `false` | Enable or disable the Keda component | | worker.keda.enabled | bool | `false` | Enable or disable the Keda component |

45
deploy/kubernetes/dolphinscheduler/values.yaml

@ -496,8 +496,8 @@ master:
MASTER_DISPATCH_TASK_NUM: "3" MASTER_DISPATCH_TASK_NUM: "3"
# -- Master host selector to select a suitable worker, optional values include Random, RoundRobin, LowerWeight # -- Master host selector to select a suitable worker, optional values include Random, RoundRobin, LowerWeight
MASTER_HOST_SELECTOR: "LowerWeight" MASTER_HOST_SELECTOR: "LowerWeight"
# -- Master heartbeat interval, the unit is second # -- Master max heartbeat interval
MASTER_HEARTBEAT_INTERVAL: "10s" MASTER_MAX_HEARTBEAT_INTERVAL: "10s"
# -- Master heartbeat error threshold # -- Master heartbeat error threshold
MASTER_HEARTBEAT_ERROR_THRESHOLD: "5" MASTER_HEARTBEAT_ERROR_THRESHOLD: "5"
# -- Master commit task retry times # -- Master commit task retry times
@ -506,10 +506,16 @@ master:
MASTER_TASK_COMMIT_INTERVAL: "1s" MASTER_TASK_COMMIT_INTERVAL: "1s"
# -- master state wheel interval, the unit is second # -- master state wheel interval, the unit is second
MASTER_STATE_WHEEL_INTERVAL: "5s" MASTER_STATE_WHEEL_INTERVAL: "5s"
# -- Master max cpuload avg, only higher than the system cpu load average, master server can schedule # -- If set true, will open master overload protection
MASTER_MAX_CPU_LOAD_AVG: "1" MASTER_SERVER_LOAD_PROTECTION_ENABLED: false
# -- Master reserved memory, only lower than system available memory, master server can schedule, the unit is G # -- Master max cpu usage, when the master's cpu usage is smaller then this value, master server can execute workflow.
MASTER_RESERVED_MEMORY: "0.3" MASTER_SERVER_LOAD_PROTECTION_MAX_CPU_USAGE_PERCENTAGE_THRESHOLDS: 0.7
# -- Master max JVM memory usage , when the master's jvm memory usage is smaller then this value, master server can execute workflow.
MASTER_SERVER_LOAD_PROTECTION_MAX_JVM_MEMORY_USAGE_PERCENTAGE_THRESHOLDS: 0.7
# -- Master max System memory usage , when the master's system memory usage is smaller then this value, master server can execute workflow.
MASTER_SERVER_LOAD_PROTECTION_MAX_SYSTEM_MEMORY_USAGE_PERCENTAGE_THRESHOLDS: 0.7
# -- Master max disk usage , when the master's disk usage is smaller then this value, master server can execute workflow.
MASTER_SERVER_LOAD_PROTECTION_MAX_DISK_USAGE_PERCENTAGE_THRESHOLDS: 0.7
# -- Master failover interval, the unit is minute # -- Master failover interval, the unit is minute
MASTER_FAILOVER_INTERVAL: "10m" MASTER_FAILOVER_INTERVAL: "10m"
# -- Master kill application when handle failover # -- Master kill application when handle failover
@ -621,18 +627,29 @@ worker:
# -- `PersistentVolumeClaim` size # -- `PersistentVolumeClaim` size
storage: "20Gi" storage: "20Gi"
env: env:
# -- Worker max cpu load avg, only higher than the system cpu load average, worker server can be dispatched tasks # -- If set true, will open worker overload protection
WORKER_MAX_CPU_LOAD_AVG: "1" WORKER_SERVER_LOAD_PROTECTION_ENABLED: false
# -- Worker reserved memory, only lower than system available memory, worker server can be dispatched tasks, the unit is G # -- Worker max cpu usage, when the worker's cpu usage is smaller then this value, worker server can be dispatched tasks.
WORKER_RESERVED_MEMORY: "0.3" WORKER_SERVER_LOAD_PROTECTION_MAX_CPU_USAGE_PERCENTAGE_THRESHOLDS: 0.7
# -- Worker max jvm memory usage , when the worker's jvm memory usage is smaller then this value, worker server can be dispatched tasks.
WORKER_SERVER_LOAD_PROTECTION_MAX_JVM_MEMORY_USAGE_PERCENTAGE_THRESHOLDS: 0.7
# -- Worker max memory usage , when the worker's memory usage is smaller then this value, worker server can be dispatched tasks.
WORKER_SERVER_LOAD_PROTECTION_MAX_SYSTEM_MEMORY_USAGE_PERCENTAGE_THRESHOLDS: 0.7
# -- Worker max disk usage , when the worker's disk usage is smaller then this value, worker server can be dispatched tasks.
WORKER_SERVER_LOAD_PROTECTION_MAX_DISK_USAGE_PERCENTAGE_THRESHOLDS: 0.7
# -- Worker execute thread number to limit task instances # -- Worker execute thread number to limit task instances
WORKER_EXEC_THREADS: "100" WORKER_EXEC_THREADS: "100"
# -- Worker heartbeat interval, the unit is second # -- Worker heartbeat interval
WORKER_HEARTBEAT_INTERVAL: "10s" WORKER_MAX_HEARTBEAT_INTERVAL: "10s"
# -- Worker heartbeat error threshold
WORKER_HEART_ERROR_THRESHOLD: "5"
# -- Worker host weight to dispatch tasks # -- Worker host weight to dispatch tasks
WORKER_HOST_WEIGHT: "100" WORKER_HOST_WEIGHT: "100"
# -- tenant corresponds to the user of the system, which is used by the worker to submit the job. If system does not have this user, it will be automatically created after the parameter worker.tenant.auto.create is true.
WORKER_TENANT_CONFIG_AUTO_CREATE_TENANT_ENABLED: true
# -- Scenes to be used for distributed users. For example, users created by FreeIpa are stored in LDAP. This parameter only applies to Linux, When this parameter is true, worker.tenant.auto.create has no effect and will not automatically create tenants.
WORKER_TENANT_CONFIG_DISTRIBUTED_TENANT: false
# -- If set true, will use worker bootstrap user as the tenant to execute task when the tenant is `default`;
DEFAULT_TENANT_ENABLED: false
keda: keda:
# -- Enable or disable the Keda component # -- Enable or disable the Keda component
enabled: false enabled: false

74
docs/docs/en/architecture/configuration.md

@ -279,46 +279,50 @@ Location: `api-server/conf/application.yaml`
Location: `master-server/conf/application.yaml` Location: `master-server/conf/application.yaml`
| Parameters | Default value | Description | | Parameters | Default value | Description |
|------------------------------------------------------|---------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| |-----------------------------------------------------------------------------|---------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| master.listen-port | 5678 | master listen port | | master.listen-port | 5678 | master listen port |
| master.fetch-command-num | 10 | the number of commands fetched by master | | master.fetch-command-num | 10 | the number of commands fetched by master |
| master.pre-exec-threads | 10 | master prepare execute thread number to limit handle commands in parallel | | master.pre-exec-threads | 10 | master prepare execute thread number to limit handle commands in parallel |
| master.exec-threads | 100 | master execute thread number to limit process instances in parallel | | master.exec-threads | 100 | master execute thread number to limit process instances in parallel |
| master.dispatch-task-number | 3 | master dispatch task number per batch | | master.dispatch-task-number | 3 | master dispatch task number per batch |
| master.host-selector | lower_weight | master host selector to select a suitable worker, default value: LowerWeight. Optional values include random, round_robin, lower_weight | | master.host-selector | lower_weight | master host selector to select a suitable worker, default value: LowerWeight. Optional values include random, round_robin, lower_weight |
| master.heartbeat-interval | 10 | master heartbeat interval, the unit is second | | master.max-heartbeat-interval | 10s | master max heartbeat interval |
| master.task-commit-retry-times | 5 | master commit task retry times | | master.task-commit-retry-times | 5 | master commit task retry times |
| master.task-commit-interval | 1000 | master commit task interval, the unit is millisecond | | master.task-commit-interval | 1000 | master commit task interval, the unit is millisecond |
| master.state-wheel-interval | 5 | time to check status | | master.state-wheel-interval | 5 | time to check status |
| master.max-cpu-load-avg | 1 | master max cpuload avg percentage, only higher than the system cpu load average, master server can schedule. default value 1: will use 100% cpu | | master.server-load-protection.enabled | true | If set true, will open master overload protection |
| master.reserved-memory | 0.3 | master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, only the available memory is higher than 30%, master server can schedule. | | master.server-load-protection.max-cpu-usage-percentage-thresholds | 0.7 | Master max cpu usage, when the master's cpu usage is smaller then this value, master server can execute workflow. |
| master.failover-interval | 10 | failover interval, the unit is minute | | master.server-load-protection.max-jvm-memory-usage-percentage-thresholds | 0.7 | Master max JVM memory usage , when the master's jvm memory usage is smaller then this value, master server can execute workflow. |
| master.kill-application-when-task-failover | true | whether to kill yarn/k8s application when failover taskInstance | | master.server-load-protection.max-system-memory-usage-percentage-thresholds | 0.7 | Master max System memory usage , when the master's system memory usage is smaller then this value, master server can execute workflow. |
| master.registry-disconnect-strategy.strategy | stop | Used when the master disconnect from registry, default value: stop. Optional values include stop, waiting | | master.server-load-protection.max-disk-usage-percentage-thresholds | 0.7 | Master max disk usage , when the master's disk usage is smaller then this value, master server can execute workflow. |
| master.registry-disconnect-strategy.max-waiting-time | 100s | Used when the master disconnect from registry, and the disconnect strategy is waiting, this config means the master will waiting to reconnect to registry in given times, and after the waiting times, if the master still cannot connect to registry, will stop itself, if the value is 0s, the Master will wait infinitely | | master.failover-interval | 10 | failover interval, the unit is minute |
| master.worker-group-refresh-interval | 10s | The interval to refresh worker group from db to memory | | master.kill-application-when-task-failover | true | whether to kill yarn/k8s application when failover taskInstance |
| master.registry-disconnect-strategy.strategy | stop | Used when the master disconnect from registry, default value: stop. Optional values include stop, waiting |
| master.registry-disconnect-strategy.max-waiting-time | 100s | Used when the master disconnect from registry, and the disconnect strategy is waiting, this config means the master will waiting to reconnect to registry in given times, and after the waiting times, if the master still cannot connect to registry, will stop itself, if the value is 0s, the Master will wait infinitely |
| master.worker-group-refresh-interval | 10s | The interval to refresh worker group from db to memory |
### Worker Server related configuration ### Worker Server related configuration
Location: `worker-server/conf/application.yaml` Location: `worker-server/conf/application.yaml`
| Parameters | Default value | Description | | Parameters | Default value | Description |
|------------------------------------------------------|---------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| |--------------------------------------------------------------------------------|---------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| worker.listen-port | 1234 | worker-service listen port | | worker.listen-port | 1234 | worker-service listen port |
| worker.exec-threads | 100 | worker-service execute thread number, used to limit the number of task instances in parallel | | worker.exec-threads | 100 | worker-service execute thread number, used to limit the number of task instances in parallel |
| worker.heartbeat-interval | 10 | worker-service heartbeat interval, the unit is second | | worker.max-heartbeat-interval | 10s | worker-service max heartbeat interval |
| worker.host-weight | 100 | worker host weight to dispatch tasks | | worker.host-weight | 100 | worker host weight to dispatch tasks |
| worker.max-cpu-load-avg | 1 | worker max cpuload avg, only higher than the system cpu load average, worker server can be dispatched tasks. default value 1: will use 100% cpu. | | worker.server-load-protection.enabled | true | If set true will open worker overload protection |
| worker.reserved-memory | 0.3 | worker reserved memory, only lower than system available memory, worker server can be dispatched tasks. default value 0.3, only the available memory is higher than 30%, worker server can receive task. | | worker.max-cpu-usage-percentage-thresholds.max-cpu-usage-percentage-thresholds | 0.7 | Master max cpu usage, when the master's cpu usage is smaller then this value, master server can execute workflow. |
| worker.alert-listen-host | localhost | the alert listen host of worker | | worker.server-load-protection.max-jvm-memory-usage-percentage-thresholds | 0.7 | Master max JVM memory usage , when the master's jvm memory usage is smaller then this value, master server can execute workflow. |
| worker.alert-listen-port | 50052 | the alert listen port of worker | | worker.server-load-protection.max-system-memory-usage-percentage-thresholds | 0.7 | Master max System memory usage , when the master's system memory usage is smaller then this value, master server can execute workflow. |
| worker.registry-disconnect-strategy.strategy | stop | Used when the worker disconnect from registry, default value: stop. Optional values include stop, waiting | | worker.server-load-protection.max-disk-usage-percentage-thresholds | 0.7 | Master max disk usage , when the master's disk usage is smaller then this value, master server can execute workflow. |
| worker.registry-disconnect-strategy.max-waiting-time | 100s | Used when the worker disconnect from registry, and the disconnect strategy is waiting, this config means the worker will waiting to reconnect to registry in given times, and after the waiting times, if the worker still cannot connect to registry, will stop itself, if the value is 0s, will wait infinitely | | worker.registry-disconnect-strategy.strategy | stop | Used when the worker disconnect from registry, default value: stop. Optional values include stop, waiting |
| worker.task-execute-threads-full-policy | REJECT | If REJECT, when the task waiting in the worker reaches exec-threads, it will reject the received task and the Master will redispatch it; If CONTINUE, it will put the task into the worker's execution queue and wait for a free thread to start execution | | worker.registry-disconnect-strategy.max-waiting-time | 100s | Used when the worker disconnect from registry, and the disconnect strategy is waiting, this config means the worker will waiting to reconnect to registry in given times, and after the waiting times, if the worker still cannot connect to registry, will stop itself, if the value is 0s, will wait infinitely |
| worker.tenant-config.auto-create-tenant-enabled | true | tenant corresponds to the user of the system, which is used by the worker to submit the job. If system does not have this user, it will be automatically created after the parameter worker.tenant.auto.create is true. | | worker.task-execute-threads-full-policy | REJECT | If REJECT, when the task waiting in the worker reaches exec-threads, it will reject the received task and the Master will redispatch it; If CONTINUE, it will put the task into the worker's execution queue and wait for a free thread to start execution |
| worker.tenant-config.distributed-tenant-enabled | false | When this parameter is true, auto-create-tenant-enabled has no effect and will not automatically create tenants | | worker.tenant-config.auto-create-tenant-enabled | true | tenant corresponds to the user of the system, which is used by the worker to submit the job. If system does not have this user, it will be automatically created after the parameter worker.tenant.auto.create is true. |
| worker.tenant-config.default-tenant-enabled | false | If set true, will use worker bootstrap user as the tenant to execute task when the tenant is `default`. | | worker.tenant-config.distributed-tenant-enabled | false | When this parameter is true, auto-create-tenant-enabled has no effect and will not automatically create tenants |
| worker.tenant-config.default-tenant-enabled | false | If set true, will use worker bootstrap user as the tenant to execute task when the tenant is `default`. |
### Alert Server related configuration ### Alert Server related configuration

4
docs/docs/zh/architecture/configuration.md

@ -287,7 +287,7 @@ common.properties配置文件目前主要是配置hadoop/s3/yarn/applicationId
| master.exec-threads | 100 | master工作线程数量,用于限制并行的流程实例数量 | | master.exec-threads | 100 | master工作线程数量,用于限制并行的流程实例数量 |
| master.dispatch-task-number | 3 | master每个批次的派发任务数量 | | master.dispatch-task-number | 3 | master每个批次的派发任务数量 |
| master.host-selector | lower_weight | master host选择器,用于选择合适的worker执行任务,可选值: random, round_robin, lower_weight | | master.host-selector | lower_weight | master host选择器,用于选择合适的worker执行任务,可选值: random, round_robin, lower_weight |
| master.heartbeat-interval | 10 | master心跳间隔,单位为秒 | | master.max-heartbeat-interval | 10s | master最大心跳间隔 |
| master.task-commit-retry-times | 5 | 任务重试次数 | | master.task-commit-retry-times | 5 | 任务重试次数 |
| master.task-commit-interval | 1000 | 任务提交间隔,单位为毫秒 | | master.task-commit-interval | 1000 | 任务提交间隔,单位为毫秒 |
| master.state-wheel-interval | 5 | 轮询检查状态时间 | | master.state-wheel-interval | 5 | 轮询检查状态时间 |
@ -308,7 +308,7 @@ common.properties配置文件目前主要是配置hadoop/s3/yarn/applicationId
|------------------------------------------------------|-----------|-------------------------------------------------------------------------------------------------------------------------------------------| |------------------------------------------------------|-----------|-------------------------------------------------------------------------------------------------------------------------------------------|
| worker.listen-port | 1234 | worker监听端口 | | worker.listen-port | 1234 | worker监听端口 |
| worker.exec-threads | 100 | worker工作线程数量,用于限制并行的任务实例数量 | | worker.exec-threads | 100 | worker工作线程数量,用于限制并行的任务实例数量 |
| worker.heartbeat-interval | 10 | worker心跳间隔,单位为秒 | | worker.max-heartbeat-interval | 10s | worker最大心跳间隔 |
| worker.host-weight | 100 | 派发任务时,worker主机的权重 | | worker.host-weight | 100 | 派发任务时,worker主机的权重 |
| worker.tenant-auto-create | true | 租户对应于系统的用户,由worker提交作业.如果系统没有该用户,则在参数worker.tenant.auto.create为true后自动创建。 | | worker.tenant-auto-create | true | 租户对应于系统的用户,由worker提交作业.如果系统没有该用户,则在参数worker.tenant.auto.create为true后自动创建。 |
| worker.max-cpu-load-avg | 1 | worker最大cpuload均值,只有高于系统cpuload均值时,worker服务才能被派发任务. 默认值为1: 会使用100%的CPU | | worker.max-cpu-load-avg | 1 | worker最大cpuload均值,只有高于系统cpuload均值时,worker服务才能被派发任务. 默认值为1: 会使用100%的CPU |

8
dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/config/AlertConfig.java

@ -41,7 +41,7 @@ public final class AlertConfig implements Validator {
private int waitTimeout; private int waitTimeout;
private Duration heartbeatInterval = Duration.ofSeconds(60); private Duration maxHeartbeatInterval = Duration.ofSeconds(60);
private String alertServerAddress; private String alertServerAddress;
@ -54,8 +54,8 @@ public final class AlertConfig implements Validator {
public void validate(Object target, Errors errors) { public void validate(Object target, Errors errors) {
AlertConfig alertConfig = (AlertConfig) target; AlertConfig alertConfig = (AlertConfig) target;
if (heartbeatInterval.getSeconds() <= 0) { if (maxHeartbeatInterval.getSeconds() <= 0) {
errors.rejectValue("heartbeat-interval", null, "should be a valid duration"); errors.rejectValue("max-heartbeat-interval", null, "should be a valid duration");
} }
if (StringUtils.isEmpty(alertServerAddress)) { if (StringUtils.isEmpty(alertServerAddress)) {
@ -68,6 +68,6 @@ public final class AlertConfig implements Validator {
private void printConfig() { private void printConfig() {
log.info("Alert config: port -> {}", port); log.info("Alert config: port -> {}", port);
log.info("Alert config: alertServerAddress -> {}", alertServerAddress); log.info("Alert config: alertServerAddress -> {}", alertServerAddress);
log.info("Alert config: heartbeatInterval -> {}", heartbeatInterval); log.info("Alert config: maxHeartbeatInterval -> {}", maxHeartbeatInterval);
} }
} }

17
dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/registry/AlertHeartbeatTask.java

@ -18,11 +18,14 @@
package org.apache.dolphinscheduler.alert.registry; package org.apache.dolphinscheduler.alert.registry;
import org.apache.dolphinscheduler.alert.config.AlertConfig; import org.apache.dolphinscheduler.alert.config.AlertConfig;
import org.apache.dolphinscheduler.common.enums.ServerStatus;
import org.apache.dolphinscheduler.common.model.AlertServerHeartBeat; import org.apache.dolphinscheduler.common.model.AlertServerHeartBeat;
import org.apache.dolphinscheduler.common.model.BaseHeartBeatTask; import org.apache.dolphinscheduler.common.model.BaseHeartBeatTask;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.meter.metrics.MetricsProvider;
import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
import org.apache.dolphinscheduler.registry.api.RegistryClient; import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
@ -37,14 +40,18 @@ public class AlertHeartbeatTask extends BaseHeartBeatTask<AlertServerHeartBeat>
private final AlertConfig alertConfig; private final AlertConfig alertConfig;
private final Integer processId; private final Integer processId;
private final RegistryClient registryClient; private final RegistryClient registryClient;
private final MetricsProvider metricsProvider;
private final String heartBeatPath; private final String heartBeatPath;
private final long startupTime; private final long startupTime;
public AlertHeartbeatTask(AlertConfig alertConfig, public AlertHeartbeatTask(AlertConfig alertConfig,
MetricsProvider metricsProvider,
RegistryClient registryClient) { RegistryClient registryClient) {
super("AlertHeartbeatTask", alertConfig.getHeartbeatInterval().toMillis()); super("AlertHeartbeatTask", alertConfig.getMaxHeartbeatInterval().toMillis());
this.startupTime = System.currentTimeMillis(); this.startupTime = System.currentTimeMillis();
this.alertConfig = alertConfig; this.alertConfig = alertConfig;
this.metricsProvider = metricsProvider;
this.registryClient = registryClient; this.registryClient = registryClient;
this.heartBeatPath = this.heartBeatPath =
RegistryNodeType.ALERT_SERVER.getRegistryPath() + "/" + alertConfig.getAlertServerAddress(); RegistryNodeType.ALERT_SERVER.getRegistryPath() + "/" + alertConfig.getAlertServerAddress();
@ -53,13 +60,15 @@ public class AlertHeartbeatTask extends BaseHeartBeatTask<AlertServerHeartBeat>
@Override @Override
public AlertServerHeartBeat getHeartBeat() { public AlertServerHeartBeat getHeartBeat() {
SystemMetrics systemMetrics = metricsProvider.getSystemMetrics();
return AlertServerHeartBeat.builder() return AlertServerHeartBeat.builder()
.processId(processId) .processId(processId)
.startupTime(startupTime) .startupTime(startupTime)
.reportTime(System.currentTimeMillis()) .reportTime(System.currentTimeMillis())
.cpuUsage(OSUtils.cpuUsagePercentage()) .cpuUsage(systemMetrics.getTotalCpuUsedPercentage())
.memoryUsage(OSUtils.memoryUsagePercentage()) .memoryUsage(systemMetrics.getSystemMemoryUsedPercentage())
.availablePhysicalMemorySize(OSUtils.availablePhysicalMemorySize()) .jvmMemoryUsage(systemMetrics.getJvmMemoryUsedPercentage())
.serverStatus(ServerStatus.NORMAL)
.host(NetUtils.getHost()) .host(NetUtils.getHost())
.port(alertConfig.getPort()) .port(alertConfig.getPort())
.build(); .build();

6
dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/registry/AlertRegistryClient.java

@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.alert.registry; package org.apache.dolphinscheduler.alert.registry;
import org.apache.dolphinscheduler.alert.config.AlertConfig; import org.apache.dolphinscheduler.alert.config.AlertConfig;
import org.apache.dolphinscheduler.meter.metrics.MetricsProvider;
import org.apache.dolphinscheduler.registry.api.RegistryClient; import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
@ -36,12 +37,15 @@ public class AlertRegistryClient implements AutoCloseable {
@Autowired @Autowired
private AlertConfig alertConfig; private AlertConfig alertConfig;
@Autowired
private MetricsProvider metricsProvider;
private AlertHeartbeatTask alertHeartbeatTask; private AlertHeartbeatTask alertHeartbeatTask;
public void start() { public void start() {
log.info("AlertRegistryClient starting..."); log.info("AlertRegistryClient starting...");
registryClient.getLock(RegistryNodeType.ALERT_LOCK.getRegistryPath()); registryClient.getLock(RegistryNodeType.ALERT_LOCK.getRegistryPath());
alertHeartbeatTask = new AlertHeartbeatTask(alertConfig, registryClient); alertHeartbeatTask = new AlertHeartbeatTask(alertConfig, metricsProvider, registryClient);
alertHeartbeatTask.start(); alertHeartbeatTask.start();
// start heartbeat task // start heartbeat task
log.info("AlertRegistryClient started..."); log.info("AlertRegistryClient started...");

2
dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/resources/application.yaml

@ -80,7 +80,7 @@ alert:
# Mark each alert of alert server if late after x milliseconds as failed. # Mark each alert of alert server if late after x milliseconds as failed.
# Define value is (0 = infinite), and alert server would be waiting alert result. # Define value is (0 = infinite), and alert server would be waiting alert result.
wait-timeout: 0 wait-timeout: 0
heartbeat-interval: 60s max-heartbeat-interval: 60s
query_alert_threshold: 100 query_alert_threshold: 100
registry: registry:

54
dolphinscheduler-api-test/dolphinscheduler-api-test-case/src/test/java/org/apache/dolphinscheduler/api/test/cases/ProcessInstanceAPITest.java

@ -19,6 +19,9 @@
package org.apache.dolphinscheduler.api.test.cases; package org.apache.dolphinscheduler.api.test.cases;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import org.apache.dolphinscheduler.api.test.core.DolphinScheduler; import org.apache.dolphinscheduler.api.test.core.DolphinScheduler;
import org.apache.dolphinscheduler.api.test.entity.HttpResponse; import org.apache.dolphinscheduler.api.test.entity.HttpResponse;
import org.apache.dolphinscheduler.api.test.entity.LoginResponseData; import org.apache.dolphinscheduler.api.test.entity.LoginResponseData;
@ -34,6 +37,7 @@ import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.util.EntityUtils; import org.apache.http.util.EntityUtils;
@ -42,12 +46,14 @@ import java.text.SimpleDateFormat;
import java.util.Date; import java.util.Date;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -106,7 +112,7 @@ public class ProcessInstanceAPITest {
// create test project // create test project
HttpResponse createProjectResponse = projectPage.createProject(loginUser, "project-test"); HttpResponse createProjectResponse = projectPage.createProject(loginUser, "project-test");
HttpResponse queryAllProjectListResponse = projectPage.queryAllProjectList(loginUser); HttpResponse queryAllProjectListResponse = projectPage.queryAllProjectList(loginUser);
Assertions.assertTrue(queryAllProjectListResponse.getBody().getSuccess()); assertTrue(queryAllProjectListResponse.getBody().getSuccess());
projectCode = (long) ((LinkedHashMap<String, Object>) ((List<LinkedHashMap>) queryAllProjectListResponse.getBody().getData()).get(0)).get("code"); projectCode = (long) ((LinkedHashMap<String, Object>) ((List<LinkedHashMap>) queryAllProjectListResponse.getBody().getData()).get(0)).get("code");
// upload test workflow definition json // upload test workflow definition json
@ -115,17 +121,17 @@ public class ProcessInstanceAPITest {
CloseableHttpResponse importProcessDefinitionResponse = processDefinitionPage CloseableHttpResponse importProcessDefinitionResponse = processDefinitionPage
.importProcessDefinition(loginUser, projectCode, file); .importProcessDefinition(loginUser, projectCode, file);
String data = EntityUtils.toString(importProcessDefinitionResponse.getEntity()); String data = EntityUtils.toString(importProcessDefinitionResponse.getEntity());
Assertions.assertTrue(data.contains("\"success\":true")); assertTrue(data.contains("\"success\":true"));
// get workflow definition code // get workflow definition code
HttpResponse queryAllProcessDefinitionByProjectCodeResponse = processDefinitionPage.queryAllProcessDefinitionByProjectCode(loginUser, projectCode); HttpResponse queryAllProcessDefinitionByProjectCodeResponse = processDefinitionPage.queryAllProcessDefinitionByProjectCode(loginUser, projectCode);
Assertions.assertTrue(queryAllProcessDefinitionByProjectCodeResponse.getBody().getSuccess()); assertTrue(queryAllProcessDefinitionByProjectCodeResponse.getBody().getSuccess());
Assertions.assertTrue(queryAllProcessDefinitionByProjectCodeResponse.getBody().getData().toString().contains("hello world")); assertTrue(queryAllProcessDefinitionByProjectCodeResponse.getBody().getData().toString().contains("hello world"));
processDefinitionCode = (long) ((LinkedHashMap<String, Object>) ((LinkedHashMap<String, Object>) ((List<LinkedHashMap>) queryAllProcessDefinitionByProjectCodeResponse.getBody().getData()).get(0)).get("processDefinition")).get("code"); processDefinitionCode = (long) ((LinkedHashMap<String, Object>) ((LinkedHashMap<String, Object>) ((List<LinkedHashMap>) queryAllProcessDefinitionByProjectCodeResponse.getBody().getData()).get(0)).get("processDefinition")).get("code");
// release test workflow // release test workflow
HttpResponse releaseProcessDefinitionResponse = processDefinitionPage.releaseProcessDefinition(loginUser, projectCode, processDefinitionCode, ReleaseState.ONLINE); HttpResponse releaseProcessDefinitionResponse = processDefinitionPage.releaseProcessDefinition(loginUser, projectCode, processDefinitionCode, ReleaseState.ONLINE);
Assertions.assertTrue(releaseProcessDefinitionResponse.getBody().getSuccess()); assertTrue(releaseProcessDefinitionResponse.getBody().getSuccess());
// trigger workflow instance // trigger workflow instance
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@ -133,15 +139,21 @@ public class ProcessInstanceAPITest {
String scheduleTime = String.format("%s,%s", formatter.format(date), formatter.format(date)); String scheduleTime = String.format("%s,%s", formatter.format(date), formatter.format(date));
log.info("use current time {} as scheduleTime", scheduleTime); log.info("use current time {} as scheduleTime", scheduleTime);
HttpResponse startProcessInstanceResponse = executorPage.startProcessInstance(loginUser, projectCode, processDefinitionCode, scheduleTime, FailureStrategy.END, WarningType.NONE); HttpResponse startProcessInstanceResponse = executorPage.startProcessInstance(loginUser, projectCode, processDefinitionCode, scheduleTime, FailureStrategy.END, WarningType.NONE);
Assertions.assertTrue(startProcessInstanceResponse.getBody().getSuccess()); assertTrue(startProcessInstanceResponse.getBody().getSuccess());
// make sure process instance has completed and successfully persisted into db
Thread.sleep(5000);
// query workflow instance by trigger code // make sure process instance has completed and successfully persisted into db
triggerCode = (long) startProcessInstanceResponse.getBody().getData(); Awaitility.await()
HttpResponse queryProcessInstancesByTriggerCodeResponse = processInstancePage.queryProcessInstancesByTriggerCode(loginUser, projectCode, triggerCode); .atMost(30, TimeUnit.SECONDS)
Assertions.assertTrue(queryProcessInstancesByTriggerCodeResponse.getBody().getSuccess()); .untilAsserted(() -> {
processInstanceId = (int) ((LinkedHashMap<String, Object>) ((List<LinkedHashMap>) queryProcessInstancesByTriggerCodeResponse.getBody().getData()).get(0)).get("id"); // query workflow instance by trigger code
triggerCode = (long) startProcessInstanceResponse.getBody().getData();
HttpResponse queryProcessInstancesByTriggerCodeResponse = processInstancePage.queryProcessInstancesByTriggerCode(loginUser, projectCode, triggerCode);
assertTrue(queryProcessInstancesByTriggerCodeResponse.getBody().getSuccess());
List<LinkedHashMap<String, Object>> body = (List<LinkedHashMap<String, Object>>) queryProcessInstancesByTriggerCodeResponse.getBody().getData();
assertTrue(CollectionUtils.isNotEmpty(body));
assertEquals("SUCCESS", body.get(0).get("state"));
processInstanceId = (int) body.get(0).get("id");
});
} catch (Exception e) { } catch (Exception e) {
log.error("failed", e); log.error("failed", e);
Assertions.fail(); Assertions.fail();
@ -152,34 +164,34 @@ public class ProcessInstanceAPITest {
@Order(2) @Order(2)
public void testQueryProcessInstanceList() { public void testQueryProcessInstanceList() {
HttpResponse queryProcessInstanceListResponse = processInstancePage.queryProcessInstanceList(loginUser, projectCode, 1, 10); HttpResponse queryProcessInstanceListResponse = processInstancePage.queryProcessInstanceList(loginUser, projectCode, 1, 10);
Assertions.assertTrue(queryProcessInstanceListResponse.getBody().getSuccess()); assertTrue(queryProcessInstanceListResponse.getBody().getSuccess());
Assertions.assertTrue(queryProcessInstanceListResponse.getBody().getData().toString().contains("test_import")); assertTrue(queryProcessInstanceListResponse.getBody().getData().toString().contains("test_import"));
} }
@Test @Test
@Order(3) @Order(3)
public void testQueryTaskListByProcessId() { public void testQueryTaskListByProcessId() {
HttpResponse queryTaskListByProcessIdResponse = processInstancePage.queryTaskListByProcessId(loginUser, projectCode, processInstanceId); HttpResponse queryTaskListByProcessIdResponse = processInstancePage.queryTaskListByProcessId(loginUser, projectCode, processInstanceId);
Assertions.assertTrue(queryTaskListByProcessIdResponse.getBody().getSuccess()); assertTrue(queryTaskListByProcessIdResponse.getBody().getSuccess());
Assertions.assertTrue(queryTaskListByProcessIdResponse.getBody().getData().toString().contains("test_import")); assertTrue(queryTaskListByProcessIdResponse.getBody().getData().toString().contains("test_import"));
} }
@Test @Test
@Order(4) @Order(4)
public void testQueryProcessInstanceById() { public void testQueryProcessInstanceById() {
HttpResponse queryProcessInstanceByIdResponse = processInstancePage.queryProcessInstanceById(loginUser, projectCode, processInstanceId); HttpResponse queryProcessInstanceByIdResponse = processInstancePage.queryProcessInstanceById(loginUser, projectCode, processInstanceId);
Assertions.assertTrue(queryProcessInstanceByIdResponse.getBody().getSuccess()); assertTrue(queryProcessInstanceByIdResponse.getBody().getSuccess());
Assertions.assertTrue(queryProcessInstanceByIdResponse.getBody().getData().toString().contains("test_import")); assertTrue(queryProcessInstanceByIdResponse.getBody().getData().toString().contains("test_import"));
} }
@Test @Test
@Order(5) @Order(5)
public void testDeleteProcessInstanceById() { public void testDeleteProcessInstanceById() {
HttpResponse deleteProcessInstanceByIdResponse = processInstancePage.deleteProcessInstanceById(loginUser, projectCode, processInstanceId); HttpResponse deleteProcessInstanceByIdResponse = processInstancePage.deleteProcessInstanceById(loginUser, projectCode, processInstanceId);
Assertions.assertTrue(deleteProcessInstanceByIdResponse.getBody().getSuccess()); assertTrue(deleteProcessInstanceByIdResponse.getBody().getSuccess());
HttpResponse queryProcessInstanceListResponse = processInstancePage.queryProcessInstanceList(loginUser, projectCode, 1, 10); HttpResponse queryProcessInstanceListResponse = processInstancePage.queryProcessInstanceList(loginUser, projectCode, 1, 10);
Assertions.assertTrue(queryProcessInstanceListResponse.getBody().getSuccess()); assertTrue(queryProcessInstanceListResponse.getBody().getSuccess());
Assertions.assertFalse(queryProcessInstanceListResponse.getBody().getData().toString().contains("test_import")); Assertions.assertFalse(queryProcessInstanceListResponse.getBody().getData().toString().contains("test_import"));
} }

3
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ServerStatus.java

@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.common.enums;
public enum ServerStatus { public enum ServerStatus {
NORMAL, ABNORMAL, BUSY NORMAL,
BUSY
} }

11
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/AlertServerHeartBeat.java

@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.common.model; package org.apache.dolphinscheduler.common.model;
import org.apache.dolphinscheduler.common.enums.ServerStatus;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Builder; import lombok.Builder;
import lombok.Data; import lombok.Data;
@ -33,8 +35,15 @@ public class AlertServerHeartBeat implements HeartBeat {
private long reportTime; private long reportTime;
private double cpuUsage; private double cpuUsage;
private double memoryUsage; private double memoryUsage;
private double availablePhysicalMemorySize; private double jvmMemoryUsage;
private ServerStatus serverStatus;
private String host; private String host;
private int port; private int port;
@Override
public ServerStatus getServerStatus() {
return serverStatus;
}
} }

18
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/BaseHeartBeatTask.java

@ -23,13 +23,19 @@ import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@Slf4j @Slf4j
public abstract class BaseHeartBeatTask<T> extends BaseDaemonThread { public abstract class BaseHeartBeatTask<T extends HeartBeat> extends BaseDaemonThread {
private static final long DEFAULT_HEARTBEAT_SCAN_INTERVAL = 1_000L;
private final String threadName; private final String threadName;
private final long heartBeatInterval; private final long heartBeatInterval;
protected boolean runningFlag; protected boolean runningFlag;
protected long lastWriteTime = 0L;
protected T lastHeartBeat = null;
public BaseHeartBeatTask(String threadName, long heartBeatInterval) { public BaseHeartBeatTask(String threadName, long heartBeatInterval) {
super(threadName); super(threadName);
this.threadName = threadName; this.threadName = threadName;
@ -54,12 +60,18 @@ public abstract class BaseHeartBeatTask<T> extends BaseDaemonThread {
continue; continue;
} }
T heartBeat = getHeartBeat(); T heartBeat = getHeartBeat();
writeHeartBeat(heartBeat); // if first time or heartBeat status changed, write heartBeatInfo into registry
if (System.currentTimeMillis() - lastWriteTime >= heartBeatInterval
|| !lastHeartBeat.getServerStatus().equals(heartBeat.getServerStatus())) {
lastHeartBeat = heartBeat;
writeHeartBeat(heartBeat);
lastWriteTime = System.currentTimeMillis();
}
} catch (Exception ex) { } catch (Exception ex) {
log.error("{} task execute failed", threadName, ex); log.error("{} task execute failed", threadName, ex);
} finally { } finally {
try { try {
Thread.sleep(heartBeatInterval); Thread.sleep(DEFAULT_HEARTBEAT_SCAN_INTERVAL);
} catch (InterruptedException e) { } catch (InterruptedException e) {
handleInterruptException(e); handleInterruptException(e);
} }

4
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/HeartBeat.java

@ -17,10 +17,14 @@
package org.apache.dolphinscheduler.common.model; package org.apache.dolphinscheduler.common.model;
import org.apache.dolphinscheduler.common.enums.ServerStatus;
public interface HeartBeat { public interface HeartBeat {
String getHost(); String getHost();
ServerStatus getServerStatus();
int getPort(); int getPort();
} }

12
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java

@ -33,13 +33,17 @@ public class MasterHeartBeat implements HeartBeat {
private long startupTime; private long startupTime;
private long reportTime; private long reportTime;
private double cpuUsage; private double cpuUsage;
private double jvmMemoryUsage;
private double memoryUsage; private double memoryUsage;
private double availablePhysicalMemorySize; private double diskUsage;
private double reservedMemory;
private double diskAvailable;
private int processId;
private ServerStatus serverStatus; private ServerStatus serverStatus;
private int processId;
private String host; private String host;
private int port; private int port;
@Override
public ServerStatus getServerStatus() {
return serverStatus;
}
} }

14
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerHeartBeat.java

@ -33,11 +33,9 @@ public class WorkerHeartBeat implements HeartBeat {
private long startupTime; private long startupTime;
private long reportTime; private long reportTime;
private double cpuUsage; private double cpuUsage;
private double jvmMemoryUsage;
private double memoryUsage; private double memoryUsage;
private double loadAverage; private double diskUsage;
private double availablePhysicalMemorySize;
private double reservedMemory;
private double diskAvailable;
private ServerStatus serverStatus; private ServerStatus serverStatus;
private int processId; private int processId;
@ -45,7 +43,11 @@ public class WorkerHeartBeat implements HeartBeat {
private int port; private int port;
private int workerHostWeight; // worker host weight private int workerHostWeight; // worker host weight
private int workerWaitingTaskCount; // worker waiting task count private int threadPoolUsage; // worker waiting task count
private int workerExecThreadCount; // worker thread pool thread count
@Override
public ServerStatus getServerStatus() {
return serverStatus;
}
} }

134
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java

@ -21,23 +21,17 @@ import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.shell.ShellExecutor; import org.apache.dolphinscheduler.common.shell.ShellExecutor;
import oshi.SystemInfo; import oshi.SystemInfo;
import oshi.hardware.CentralProcessor;
import oshi.hardware.GlobalMemory;
import oshi.hardware.HardwareAbstractionLayer; import oshi.hardware.HardwareAbstractionLayer;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils; import org.apache.commons.lang3.SystemUtils;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.lang.management.ManagementFactory; import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean;
import java.lang.management.RuntimeMXBean; import java.lang.management.RuntimeMXBean;
import java.math.RoundingMode;
import java.text.DecimalFormat;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
@ -45,32 +39,17 @@ import java.util.List;
import java.util.StringTokenizer; import java.util.StringTokenizer;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
/** // todo: Split to WindowsOSUtils/LinuxOSUtils/MacOSOSUtils/K8sOSUtils...
* os utils
*/
@Slf4j @Slf4j
@UtilityClass
public class OSUtils { public class OSUtils {
private static final SystemInfo SI = new SystemInfo(); private static final SystemInfo SI = new SystemInfo();
public static final String TWO_DECIMAL = "0.00";
/**
* return -1 when the function can not get hardware env info
* e.g {@link OSUtils#cpuUsagePercentage()}
*/
public static final double NEGATIVE_ONE = -1;
private static final HardwareAbstractionLayer hal = SI.getHardware(); private static final HardwareAbstractionLayer hal = SI.getHardware();
private static long[] prevTicks = new long[CentralProcessor.TickType.values().length];
private static long prevTickTime = 0L;
private static volatile double cpuUsage = 0.0D;
private static final double TOTAL_MEMORY = hal.getMemory().getTotal() / 1024.0 / 1024 / 1024;
private OSUtils() {
throw new UnsupportedOperationException("Construct OSUtils");
}
/** /**
* Initialization regularization, solve the problem of pre-compilation performance, * Initialization regularization, solve the problem of pre-compilation performance,
@ -78,79 +57,12 @@ public class OSUtils {
*/ */
private static final Pattern PATTERN = Pattern.compile("\\s+"); private static final Pattern PATTERN = Pattern.compile("\\s+");
/** public static long getTotalSystemMemory() {
* get disk usage return hal.getMemory().getTotal();
* Keep 2 decimal
*
* @return disk free size, unit: GB
*/
public static double diskAvailable() {
File file = new File(".");
long freeSpace = file.getFreeSpace(); // unallocated / free disk space in bytes.
double diskAvailable = freeSpace / 1024.0 / 1024 / 1024;
DecimalFormat df = new DecimalFormat(TWO_DECIMAL);
df.setRoundingMode(RoundingMode.HALF_UP);
return Double.parseDouble(df.format(diskAvailable));
} }
/** public static long getSystemAvailableMemoryUsed() {
* get available physical or pod memory size return hal.getMemory().getAvailable();
* <p>
* Keep 2 decimal
*
* @return Available physical or pod memory size, unit: G
*/
public static double availablePhysicalMemorySize() {
double availablePhysicalMemorySize;
if (KubernetesUtils.isKubernetesMode()) {
long freeMemory = Runtime.getRuntime().freeMemory();
availablePhysicalMemorySize = freeMemory / 1024.0 / 1024 / 1024;
} else {
GlobalMemory memory = hal.getMemory();
availablePhysicalMemorySize = memory.getAvailable() / 1024.0 / 1024 / 1024;
}
DecimalFormat df = new DecimalFormat(TWO_DECIMAL);
df.setRoundingMode(RoundingMode.HALF_UP);
return Double.parseDouble(df.format(availablePhysicalMemorySize));
}
/**
* get cpu usage
*
* @return cpu usage
*/
public static double cpuUsagePercentage() {
CentralProcessor processor = hal.getProcessor();
// Check if > ~ 0.95 seconds since last tick count.
long now = System.currentTimeMillis();
if (now - prevTickTime > 950) {
// Enough time has elapsed.
if (KubernetesUtils.isKubernetesMode()) {
OperatingSystemMXBean operatingSystemMXBean = ManagementFactory.getOperatingSystemMXBean();
cpuUsage = operatingSystemMXBean.getSystemLoadAverage();
} else {
cpuUsage = processor.getSystemCpuLoadBetweenTicks(prevTicks);
}
prevTickTime = System.currentTimeMillis();
prevTicks = processor.getSystemCpuLoadTicks();
}
if (Double.isNaN(cpuUsage)) {
return NEGATIVE_ONE;
}
DecimalFormat df = new DecimalFormat(TWO_DECIMAL);
df.setRoundingMode(RoundingMode.HALF_UP);
return Double.parseDouble(df.format(cpuUsage));
}
public static double memoryUsagePercentage() {
return (TOTAL_MEMORY - availablePhysicalMemorySize()) / TOTAL_MEMORY;
} }
public static List<String> getUserList() { public static List<String> getUserList() {
@ -433,43 +345,11 @@ public class OSUtils {
return ShellExecutor.execCommand(command); return ShellExecutor.execCommand(command);
} }
/**
* get process id
*
* @return process id
*/
public static int getProcessID() { public static int getProcessID() {
RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean(); RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean();
return Integer.parseInt(runtimeMXBean.getName().split("@")[0]); return Integer.parseInt(runtimeMXBean.getName().split("@")[0]);
} }
/**
* Check memory and cpu usage is overload the given thredshod.
*
* @param maxCpuLoadAvgThreshold maxCpuLoadAvg
* @param reservedMemoryThreshold reservedMemory
* @return True, if the cpu or memory exceed the given thredshod.
*/
public static Boolean isOverload(double maxCpuLoadAvgThreshold, double reservedMemoryThreshold) {
// system load average
double freeCPUPercentage = 1 - cpuUsagePercentage();
// system available physical memory
double freeMemoryPercentage = 1 - memoryUsagePercentage();
if (freeCPUPercentage > maxCpuLoadAvgThreshold) {
log.warn("Current cpu load average {} is too high, max.cpuLoad.avg={}", freeCPUPercentage,
maxCpuLoadAvgThreshold);
return true;
}
if (freeMemoryPercentage < reservedMemoryThreshold) {
log.warn(
"Current available memory percentage{} is too low, reserved.memory={}", freeMemoryPercentage,
reservedMemoryThreshold);
return true;
}
return false;
}
public static Boolean isWindows() { public static Boolean isWindows() {
return System.getProperty("os.name").startsWith("Windows"); return System.getProperty("os.name").startsWith("Windows");
} }

59
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/os/OSUtilsTest.java → dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/OSUtilsTest.java

@ -14,48 +14,21 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.common.os;
import org.apache.dolphinscheduler.common.utils.OSUtils; package org.apache.dolphinscheduler.common.utils;
import org.apache.commons.lang3.SystemUtils; import org.apache.commons.lang3.SystemUtils;
import java.util.List; import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** @Slf4j
* OSUtilsTest
*/
public class OSUtilsTest { public class OSUtilsTest {
private static Logger logger = LoggerFactory.getLogger(OSUtilsTest.class);
@Test
public void diskAvailable() {
double diskAvailable = OSUtils.diskAvailable();
logger.info("diskAvailable : {}", diskAvailable);
Assertions.assertTrue(diskAvailable >= 0.0);
}
@Test
public void cpuUsage() {
double cpuUsage = OSUtils.cpuUsagePercentage();
logger.info("cpuUsage : {}", cpuUsage);
Assertions.assertTrue(cpuUsage >= 0.0);
}
@Test
public void availablePhysicalMemorySize() {
double physicalMemorySize = OSUtils.availablePhysicalMemorySize();
logger.info("physicalMemorySize : {}", physicalMemorySize);
Assertions.assertTrue(physicalMemorySize >= 0.0);
}
@Test @Test
public void existTenantCodeInLinux() { public void existTenantCodeInLinux() {
if (SystemUtils.IS_OS_LINUX) { if (SystemUtils.IS_OS_LINUX) {
@ -77,9 +50,31 @@ public class OSUtilsTest {
Assertions.assertFalse(userList.contains("xxxtt")); Assertions.assertFalse(userList.contains("xxxtt"));
} else { } else {
Assertions.assertFalse(false, "system must be linux"); Assertions.assertFalse(false, "system must be linux");
} }
}
@Test
void getTotalSystemMemory() throws InterruptedException {
double totalSystemMemory = OSUtils.getTotalSystemMemory();
Assertions.assertTrue(totalSystemMemory > 0);
// Assert that the memory is not changed
Thread.sleep(1000L);
Assertions.assertEquals(totalSystemMemory, OSUtils.getTotalSystemMemory());
} }
@Test
void getSystemMemoryAvailable() {
long systemAvailableMemoryUsed = OSUtils.getSystemAvailableMemoryUsed();
Assertions.assertTrue(systemAvailableMemoryUsed > 0);
}
@Test
void getSystemMemoryUsedPercentage() {
long totalSystemMemory = OSUtils.getTotalSystemMemory();
long systemMemoryAvailable = OSUtils.getSystemAvailableMemoryUsed();
double systemAvailableMemoryUsedPercentage =
(double) (totalSystemMemory - systemMemoryAvailable) / totalSystemMemory;
Assertions.assertTrue(systemAvailableMemoryUsedPercentage > 0);
}
} }

25
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java

@ -21,9 +21,13 @@ import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager; import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.meter.metrics.MetricsProvider;
import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager; import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.scheduler.api.SchedulerApi; import org.apache.dolphinscheduler.scheduler.api.SchedulerApi;
import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient; import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
import org.apache.dolphinscheduler.server.master.registry.MasterSlotManager;
import org.apache.dolphinscheduler.server.master.rpc.MasterRpcServer; import org.apache.dolphinscheduler.server.master.rpc.MasterRpcServer;
import org.apache.dolphinscheduler.server.master.runner.EventExecuteService; import org.apache.dolphinscheduler.server.master.runner.EventExecuteService;
import org.apache.dolphinscheduler.server.master.runner.FailoverExecuteThread; import org.apache.dolphinscheduler.server.master.runner.FailoverExecuteThread;
@ -73,6 +77,12 @@ public class MasterServer implements IStoppable {
@Autowired @Autowired
private MasterRpcServer masterRPCServer; private MasterRpcServer masterRPCServer;
@Autowired
private MetricsProvider metricsProvider;
@Autowired
private MasterSlotManager masterSlotManager;
public static void main(String[] args) { public static void main(String[] args) {
Thread.currentThread().setName(Constants.THREAD_NAME_MASTER_SERVER); Thread.currentThread().setName(Constants.THREAD_NAME_MASTER_SERVER);
SpringApplication.run(MasterServer.class); SpringApplication.run(MasterServer.class);
@ -89,6 +99,8 @@ public class MasterServer implements IStoppable {
// install task plugin // install task plugin
this.taskPluginManager.loadPlugin(); this.taskPluginManager.loadPlugin();
this.masterSlotManager.start();
// self tolerant // self tolerant
this.masterRegistryClient.start(); this.masterRegistryClient.start();
this.masterRegistryClient.setRegistryStoppable(this); this.masterRegistryClient.setRegistryStoppable(this);
@ -100,6 +112,19 @@ public class MasterServer implements IStoppable {
this.schedulerApi.start(); this.schedulerApi.start();
MasterServerMetrics.registerMasterCpuUsageGauge(() -> {
SystemMetrics systemMetrics = metricsProvider.getSystemMetrics();
return systemMetrics.getTotalCpuUsedPercentage();
});
MasterServerMetrics.registerMasterMemoryAvailableGauge(() -> {
SystemMetrics systemMetrics = metricsProvider.getSystemMetrics();
return (systemMetrics.getSystemMemoryMax() - systemMetrics.getSystemMemoryUsed()) / 1024.0 / 1024 / 1024;
});
MasterServerMetrics.registerMasterMemoryUsageGauge(() -> {
SystemMetrics systemMetrics = metricsProvider.getSystemMetrics();
return systemMetrics.getJvmMemoryUsedPercentage();
});
Runtime.getRuntime().addShutdownHook(new Thread(() -> { Runtime.getRuntime().addShutdownHook(new Thread(() -> {
if (!ServerLifeCycleManager.isStopped()) { if (!ServerLifeCycleManager.isStopped()) {
close("MasterServer shutdownHook"); close("MasterServer shutdownHook");

20
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java

@ -78,7 +78,7 @@ public class MasterConfig implements Validator {
/** /**
* Master heart beat task execute interval. * Master heart beat task execute interval.
*/ */
private Duration heartbeatInterval = Duration.ofSeconds(10); private Duration maxHeartbeatInterval = Duration.ofSeconds(10);
/** /**
* task submit max retry times. * task submit max retry times.
*/ */
@ -91,8 +91,7 @@ public class MasterConfig implements Validator {
* state wheel check interval, if this value is bigger, may increase the delay of task/processInstance. * state wheel check interval, if this value is bigger, may increase the delay of task/processInstance.
*/ */
private Duration stateWheelInterval = Duration.ofMillis(5); private Duration stateWheelInterval = Duration.ofMillis(5);
private double maxCpuLoadAvg = 1; private MasterServerLoadProtection serverLoadProtection = new MasterServerLoadProtection();
private double reservedMemory = 0.1;
private Duration failoverInterval = Duration.ofMinutes(10); private Duration failoverInterval = Duration.ofMinutes(10);
private boolean killApplicationWhenTaskFailover = true; private boolean killApplicationWhenTaskFailover = true;
private ConnectStrategyProperties registryDisconnectStrategy = new ConnectStrategyProperties(); private ConnectStrategyProperties registryDisconnectStrategy = new ConnectStrategyProperties();
@ -128,8 +127,8 @@ public class MasterConfig implements Validator {
if (masterConfig.getDispatchTaskNumber() <= 0) { if (masterConfig.getDispatchTaskNumber() <= 0) {
errors.rejectValue("dispatch-task-number", null, "should be a positive value"); errors.rejectValue("dispatch-task-number", null, "should be a positive value");
} }
if (masterConfig.getHeartbeatInterval().toMillis() < 0) { if (masterConfig.getMaxHeartbeatInterval().toMillis() < 0) {
errors.rejectValue("heartbeat-interval", null, "should be a valid duration"); errors.rejectValue("max-heartbeat-interval", null, "should be a valid duration");
} }
if (masterConfig.getTaskCommitRetryTimes() <= 0) { if (masterConfig.getTaskCommitRetryTimes() <= 0) {
errors.rejectValue("task-commit-retry-times", null, "should be a positive value"); errors.rejectValue("task-commit-retry-times", null, "should be a positive value");
@ -143,12 +142,6 @@ public class MasterConfig implements Validator {
if (masterConfig.getFailoverInterval().toMillis() <= 0) { if (masterConfig.getFailoverInterval().toMillis() <= 0) {
errors.rejectValue("failover-interval", null, "should be a valid duration"); errors.rejectValue("failover-interval", null, "should be a valid duration");
} }
if (masterConfig.getMaxCpuLoadAvg() <= 0) {
masterConfig.setMaxCpuLoadAvg(100);
}
if (masterConfig.getReservedMemory() <= 0) {
masterConfig.setReservedMemory(100);
}
if (masterConfig.getWorkerGroupRefreshInterval().getSeconds() < 10) { if (masterConfig.getWorkerGroupRefreshInterval().getSeconds() < 10) {
errors.rejectValue("worker-group-refresh-interval", null, "should >= 10s"); errors.rejectValue("worker-group-refresh-interval", null, "should >= 10s");
@ -171,12 +164,11 @@ public class MasterConfig implements Validator {
"\n exec-threads -> " + execThreads + "\n exec-threads -> " + execThreads +
"\n dispatch-task-number -> " + dispatchTaskNumber + "\n dispatch-task-number -> " + dispatchTaskNumber +
"\n host-selector -> " + hostSelector + "\n host-selector -> " + hostSelector +
"\n heartbeat-interval -> " + heartbeatInterval + "\n max-heartbeat-interval -> " + maxHeartbeatInterval +
"\n task-commit-retry-times -> " + taskCommitRetryTimes + "\n task-commit-retry-times -> " + taskCommitRetryTimes +
"\n task-commit-interval -> " + taskCommitInterval + "\n task-commit-interval -> " + taskCommitInterval +
"\n state-wheel-interval -> " + stateWheelInterval + "\n state-wheel-interval -> " + stateWheelInterval +
"\n max-cpu-load-avg -> " + maxCpuLoadAvg + "\n server-load-protection -> " + serverLoadProtection +
"\n reserved-memory -> " + reservedMemory +
"\n failover-interval -> " + failoverInterval + "\n failover-interval -> " + failoverInterval +
"\n kill-application-when-task-failover -> " + killApplicationWhenTaskFailover + "\n kill-application-when-task-failover -> " + killApplicationWhenTaskFailover +
"\n registry-disconnect-strategy -> " + registryDisconnectStrategy + "\n registry-disconnect-strategy -> " + registryDisconnectStrategy +

73
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterServerLoadProtection.java

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.config;
import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Data
@NoArgsConstructor
@AllArgsConstructor
public class MasterServerLoadProtection {
private boolean enabled = true;
private double maxCpuUsagePercentageThresholds = 0.7;
private double maxJVMMemoryUsagePercentageThresholds = 0.7;
private double maxSystemMemoryUsagePercentageThresholds = 0.7;
private double maxDiskUsagePercentageThresholds = 0.7;
public boolean isOverload(SystemMetrics systemMetrics) {
if (!enabled) {
return false;
}
if (systemMetrics.getTotalCpuUsedPercentage() > maxCpuUsagePercentageThresholds) {
log.info(
"Master OverLoad: the TotalCpuUsedPercentage: {} is over then the MaxCpuUsagePercentageThresholds {}",
systemMetrics.getTotalCpuUsedPercentage(), maxCpuUsagePercentageThresholds);
return true;
}
if (systemMetrics.getJvmMemoryUsedPercentage() > maxJVMMemoryUsagePercentageThresholds) {
log.info(
"Master OverLoad: the JvmMemoryUsedPercentage: {} is over then the MaxJVMMemoryUsagePercentageThresholds {}",
systemMetrics.getJvmMemoryUsedPercentage(), maxCpuUsagePercentageThresholds);
return true;
}
if (systemMetrics.getDiskUsedPercentage() > maxDiskUsagePercentageThresholds) {
log.info("Master OverLoad: the DiskUsedPercentage: {} is over then the MaxDiskUsagePercentageThresholds {}",
systemMetrics.getDiskUsedPercentage(), maxCpuUsagePercentageThresholds);
return true;
}
if (systemMetrics.getSystemMemoryUsedPercentage() > maxSystemMemoryUsagePercentageThresholds) {
log.info(
"Master OverLoad: the SystemMemoryUsedPercentage: {} is over then the MaxSystemMemoryUsagePercentageThresholds {}",
systemMetrics.getSystemMemoryUsedPercentage(), maxSystemMemoryUsagePercentageThresholds);
return true;
}
return false;
}
}

18
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java

@ -130,28 +130,22 @@ public class LowerWeightHostManager extends CommonHostManager {
} }
} }
public Optional<HostWeight> getHostWeight(String addr, String workerGroup, WorkerHeartBeat heartBeat) { public Optional<HostWeight> getHostWeight(String workerAddress, String workerGroup, WorkerHeartBeat heartBeat) {
if (heartBeat == null) { if (heartBeat == null) {
log.warn("worker {} in work group {} have not received the heartbeat", addr, workerGroup); log.warn("Worker {} in WorkerGroup {} have not received the heartbeat", workerAddress, workerGroup);
return Optional.empty();
}
if (ServerStatus.ABNORMAL == heartBeat.getServerStatus()) {
log.warn("worker {} current cpu load average {} is too high or available memory {}G is too low",
addr, heartBeat.getLoadAverage(), heartBeat.getAvailablePhysicalMemorySize());
return Optional.empty(); return Optional.empty();
} }
if (ServerStatus.BUSY == heartBeat.getServerStatus()) { if (ServerStatus.BUSY == heartBeat.getServerStatus()) {
log.warn("worker {} is busy, current waiting task count {} is large than worker thread count {}", log.warn("Worker {} in workerGroup {} is Busy, heartbeat is {}", workerAddress, workerGroup, heartBeat);
addr, heartBeat.getWorkerWaitingTaskCount(), heartBeat.getWorkerExecThreadCount());
return Optional.empty(); return Optional.empty();
} }
return Optional.of( return Optional.of(
new HostWeight( new HostWeight(
HostWorker.of(addr, heartBeat.getWorkerHostWeight(), workerGroup), HostWorker.of(workerAddress, heartBeat.getWorkerHostWeight(), workerGroup),
heartBeat.getCpuUsage(), heartBeat.getCpuUsage(),
heartBeat.getMemoryUsage(), heartBeat.getMemoryUsage(),
heartBeat.getLoadAverage(), heartBeat.getDiskUsage(),
heartBeat.getWorkerWaitingTaskCount(), heartBeat.getThreadPoolUsage(),
heartBeat.getStartupTime())); heartBeat.getStartupTime()));
} }

73
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java

@ -20,69 +20,44 @@ package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
import org.apache.dolphinscheduler.extract.base.utils.Constants; import org.apache.dolphinscheduler.extract.base.utils.Constants;
import org.apache.dolphinscheduler.extract.base.utils.Host; import org.apache.dolphinscheduler.extract.base.utils.Host;
/** import lombok.Data;
* host weight
*/ @Data
public class HostWeight { public class HostWeight {
private final int CPU_FACTOR = 10; private final int THREAD_USAGE_FACTOR = 10;
private final int CPU_USAGE_FACTOR = 20;
private final int MEMORY_FACTOR = 20; private final int MEMORY_USAGE_FACTOR = 20;
private final int LOAD_AVERAGE_FACTOR = 70; private final int DISK_USAGE_FACTOR = 50;
private final HostWorker hostWorker; private final Host host;
private final double weight; private final double weight;
// if the weight is small, then is will be chosen first
private double currentWeight; private double currentWeight;
private final int waitingTaskCount; public HostWeight(HostWorker hostWorker,
double cpuUsage,
public HostWeight(HostWorker hostWorker, double cpu, double memory, double loadAverage, int waitingTaskCount, double memoryUsage,
double diskUsage,
double threadPoolUsage,
long startTime) { long startTime) {
this.hostWorker = hostWorker; this.host = hostWorker;
this.weight = calculateWeight(cpu, memory, loadAverage, startTime); this.weight = calculateWeight(cpuUsage, memoryUsage, diskUsage, threadPoolUsage, startTime);
this.currentWeight = this.weight; this.currentWeight = this.weight;
this.waitingTaskCount = waitingTaskCount;
}
public double getWeight() {
return weight;
}
public double getCurrentWeight() {
return currentWeight;
}
public void setCurrentWeight(double currentWeight) {
this.currentWeight = currentWeight;
}
public HostWorker getHostWorker() {
return hostWorker;
}
public Host getHost() {
return (Host) hostWorker;
}
public int getWaitingTaskCount() {
return waitingTaskCount;
}
@Override
public String toString() {
return "HostWeight{"
+ "hostWorker=" + hostWorker
+ ", weight=" + weight
+ ", currentWeight=" + currentWeight
+ ", waitingTaskCount=" + waitingTaskCount
+ '}';
} }
private double calculateWeight(double cpu, double memory, double loadAverage, long startTime) { private double calculateWeight(double cpuUsage,
double calculatedWeight = cpu * CPU_FACTOR + memory * MEMORY_FACTOR + loadAverage * LOAD_AVERAGE_FACTOR; double memoryUsage,
double diskUsage,
double threadPoolUsage,
long startTime) {
double calculatedWeight = 100 - (cpuUsage * CPU_USAGE_FACTOR + memoryUsage * MEMORY_USAGE_FACTOR
+ diskUsage * DISK_USAGE_FACTOR + threadPoolUsage * THREAD_USAGE_FACTOR);
long uptime = System.currentTimeMillis() - startTime; long uptime = System.currentTimeMillis() - startTime;
if (uptime > 0 && uptime < Constants.WARM_UP_TIME) { if (uptime > 0 && uptime < Constants.WARM_UP_TIME) {
// If the warm-up is not over, add the weight // If the warm-up is not over, add the weight

31
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java

@ -18,14 +18,6 @@
package org.apache.dolphinscheduler.server.master.dispatch.host.assign; package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
import org.springframework.util.CollectionUtils;
import com.google.common.collect.Lists;
/** /**
* lower weight round robin * lower weight round robin
@ -43,8 +35,7 @@ public class LowerWeightRoundRobin extends AbstractSelector<HostWeight> {
double totalWeight = 0; double totalWeight = 0;
double lowWeight = 0; double lowWeight = 0;
HostWeight lowerNode = null; HostWeight lowerNode = null;
List<HostWeight> weights = canAssignTaskHost(sources); for (HostWeight hostWeight : sources) {
for (HostWeight hostWeight : weights) {
totalWeight += hostWeight.getWeight(); totalWeight += hostWeight.getWeight();
hostWeight.setCurrentWeight(hostWeight.getCurrentWeight() + hostWeight.getWeight()); hostWeight.setCurrentWeight(hostWeight.getCurrentWeight() + hostWeight.getWeight());
if (lowerNode == null || lowWeight > hostWeight.getCurrentWeight()) { if (lowerNode == null || lowWeight > hostWeight.getCurrentWeight()) {
@ -58,24 +49,4 @@ public class LowerWeightRoundRobin extends AbstractSelector<HostWeight> {
return lowerNode; return lowerNode;
} }
private List<HostWeight> canAssignTaskHost(Collection<HostWeight> sources) {
if (CollectionUtils.isEmpty(sources)) {
return Collections.emptyList();
}
List<HostWeight> zeroWaitingTask =
sources.stream().filter(h -> h.getWaitingTaskCount() == 0).collect(Collectors.toList());
if (!zeroWaitingTask.isEmpty()) {
return zeroWaitingTask;
}
HostWeight hostWeight = sources.stream().min(Comparator.comparing(HostWeight::getWaitingTaskCount)).get();
List<HostWeight> waitingTask = Lists.newArrayList(hostWeight);
List<HostWeight> equalWaitingTask = sources.stream()
.filter(h -> !h.getHost().equals(hostWeight.getHost())
&& h.getWaitingTaskCount() == hostWeight.getWaitingTaskCount())
.collect(Collectors.toList());
if (!equalWaitingTask.isEmpty()) {
waitingTask.addAll(equalWaitingTask);
}
return waitingTask;
}
} }

21
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/MasterServerMetrics.java

@ -17,8 +17,11 @@
package org.apache.dolphinscheduler.server.master.metrics; package org.apache.dolphinscheduler.server.master.metrics;
import java.util.function.Supplier;
import lombok.experimental.UtilityClass; import lombok.experimental.UtilityClass;
import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Metrics;
@UtilityClass @UtilityClass
@ -40,6 +43,24 @@ public class MasterServerMetrics {
.description("Master server consume command count") .description("Master server consume command count")
.register(Metrics.globalRegistry); .register(Metrics.globalRegistry);
public void registerMasterMemoryAvailableGauge(Supplier<Number> supplier) {
Gauge.builder("ds.master.memory.available", supplier)
.description("Master memory available")
.register(Metrics.globalRegistry);
}
public void registerMasterCpuUsageGauge(Supplier<Number> supplier) {
Gauge.builder("ds.master.cpu.usage", supplier)
.description("worker cpu usage")
.register(Metrics.globalRegistry);
}
public void registerMasterMemoryUsageGauge(Supplier<Number> supplier) {
Gauge.builder("ds.master.memory.usage", supplier)
.description("Master memory usage")
.register(Metrics.globalRegistry);
}
public void incMasterOverload() { public void incMasterOverload() {
masterOverloadCounter.increment(); masterOverloadCounter.increment();
} }

10
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectionStateListener.java

@ -20,8 +20,6 @@ package org.apache.dolphinscheduler.server.master.registry;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager; import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.registry.api.ConnectionListener; import org.apache.dolphinscheduler.registry.api.ConnectionListener;
import org.apache.dolphinscheduler.registry.api.ConnectionState; import org.apache.dolphinscheduler.registry.api.ConnectionState;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import lombok.NonNull; import lombok.NonNull;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -29,15 +27,9 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j @Slf4j
public class MasterConnectionStateListener implements ConnectionListener { public class MasterConnectionStateListener implements ConnectionListener {
private final MasterConfig masterConfig;
private final RegistryClient registryClient;
private final MasterConnectStrategy masterConnectStrategy; private final MasterConnectStrategy masterConnectStrategy;
public MasterConnectionStateListener(@NonNull MasterConfig masterConfig, public MasterConnectionStateListener(@NonNull MasterConnectStrategy masterConnectStrategy) {
@NonNull RegistryClient registryClient,
@NonNull MasterConnectStrategy masterConnectStrategy) {
this.masterConfig = masterConfig;
this.registryClient = registryClient;
this.masterConnectStrategy = masterConnectStrategy; this.masterConnectStrategy = masterConnectStrategy;
} }

18
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java

@ -20,9 +20,12 @@ package org.apache.dolphinscheduler.server.master.registry;
import static org.apache.dolphinscheduler.common.constants.Constants.SLEEP_TIME_MILLIS; import static org.apache.dolphinscheduler.common.constants.Constants.SLEEP_TIME_MILLIS;
import org.apache.dolphinscheduler.common.IStoppable; import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.enums.ServerStatus;
import org.apache.dolphinscheduler.common.model.MasterHeartBeat;
import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.meter.metrics.MetricsProvider;
import org.apache.dolphinscheduler.registry.api.RegistryClient; import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.RegistryException; import org.apache.dolphinscheduler.registry.api.RegistryException;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
@ -54,6 +57,9 @@ public class MasterRegistryClient implements AutoCloseable {
@Autowired @Autowired
private MasterConfig masterConfig; private MasterConfig masterConfig;
@Autowired
private MetricsProvider metricsProvider;
@Autowired @Autowired
private MasterConnectStrategy masterConnectStrategy; private MasterConnectStrategy masterConnectStrategy;
@ -61,11 +67,10 @@ public class MasterRegistryClient implements AutoCloseable {
public void start() { public void start() {
try { try {
this.masterHeartBeatTask = new MasterHeartBeatTask(masterConfig, registryClient); this.masterHeartBeatTask = new MasterHeartBeatTask(masterConfig, metricsProvider, registryClient);
// master registry // master registry
registry(); registry();
registryClient.addConnectionStateListener( registryClient.addConnectionStateListener(new MasterConnectionStateListener(masterConnectStrategy));
new MasterConnectionStateListener(masterConfig, registryClient, masterConnectStrategy));
registryClient.subscribe(RegistryNodeType.ALL_SERVERS.getRegistryPath(), new MasterRegistryDataListener()); registryClient.subscribe(RegistryNodeType.ALL_SERVERS.getRegistryPath(), new MasterRegistryDataListener());
} catch (Exception e) { } catch (Exception e) {
throw new RegistryException("Master registry client start up error", e); throw new RegistryException("Master registry client start up error", e);
@ -153,6 +158,13 @@ public class MasterRegistryClient implements AutoCloseable {
log.info("Master node : {} registering to registry center", masterConfig.getMasterAddress()); log.info("Master node : {} registering to registry center", masterConfig.getMasterAddress());
String masterRegistryPath = masterConfig.getMasterRegistryPath(); String masterRegistryPath = masterConfig.getMasterRegistryPath();
MasterHeartBeat heartBeat = masterHeartBeatTask.getHeartBeat();
while (ServerStatus.BUSY.equals(heartBeat.getServerStatus())) {
log.warn("Master node is BUSY: {}", heartBeat);
heartBeat = masterHeartBeatTask.getHeartBeat();
ThreadUtils.sleep(SLEEP_TIME_MILLIS);
}
// remove before persist // remove before persist
registryClient.remove(masterRegistryPath); registryClient.remove(masterRegistryPath);
registryClient.persistEphemeral(masterRegistryPath, JSONUtils.toJsonString(masterHeartBeatTask.getHeartBeat())); registryClient.persistEphemeral(masterRegistryPath, JSONUtils.toJsonString(masterHeartBeatTask.getHeartBeat()));

7
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterSlotManager.java

@ -30,8 +30,6 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -50,8 +48,7 @@ public class MasterSlotManager {
private volatile int currentSlot = 0; private volatile int currentSlot = 0;
private volatile int totalSlot = 0; private volatile int totalSlot = 0;
@PostConstruct public void start() {
public void init() {
serverNodeManager.addMasterInfoChangeListener(new MasterSlotManager.SlotChangeListener()); serverNodeManager.addMasterInfoChangeListener(new MasterSlotManager.SlotChangeListener());
} }
@ -72,7 +69,7 @@ public class MasterSlotManager {
@Override @Override
public void notify(Map<String, MasterHeartBeat> masterNodeInfo) { public void notify(Map<String, MasterHeartBeat> masterNodeInfo) {
List<Server> serverList = masterNodeInfo.values().stream() List<Server> serverList = masterNodeInfo.values().stream()
.filter(heartBeat -> !heartBeat.getServerStatus().equals(ServerStatus.ABNORMAL)) .filter(heartBeat -> !heartBeat.getServerStatus().equals(ServerStatus.BUSY))
.map(this::convertHeartBeatToServer).collect(Collectors.toList()); .map(this::convertHeartBeatToServer).collect(Collectors.toList());
syncMasterNodes(serverList); syncMasterNodes(serverList);
} }

16
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java

@ -21,11 +21,13 @@ import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager; import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.meter.metrics.MetricsProvider;
import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.config.MasterServerLoadProtection;
import org.apache.dolphinscheduler.server.master.event.WorkflowEvent; import org.apache.dolphinscheduler.server.master.event.WorkflowEvent;
import org.apache.dolphinscheduler.server.master.event.WorkflowEventQueue; import org.apache.dolphinscheduler.server.master.event.WorkflowEventQueue;
import org.apache.dolphinscheduler.server.master.event.WorkflowEventType; import org.apache.dolphinscheduler.server.master.event.WorkflowEventType;
@ -78,6 +80,9 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
@Autowired @Autowired
private MasterTaskExecutorBootstrap masterTaskExecutorBootstrap; private MasterTaskExecutorBootstrap masterTaskExecutorBootstrap;
@Autowired
private MetricsProvider metricsProvider;
protected MasterSchedulerBootstrap() { protected MasterSchedulerBootstrap() {
super("MasterCommandLoopThread"); super("MasterCommandLoopThread");
} }
@ -102,11 +107,9 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
log.info("MasterSchedulerBootstrap stopped..."); log.info("MasterSchedulerBootstrap stopped...");
} }
/**
* run of MasterSchedulerService
*/
@Override @Override
public void run() { public void run() {
MasterServerLoadProtection serverLoadProtection = masterConfig.getServerLoadProtection();
while (!ServerLifeCycleManager.isStopped()) { while (!ServerLifeCycleManager.isStopped()) {
try { try {
if (!ServerLifeCycleManager.isRunning()) { if (!ServerLifeCycleManager.isRunning()) {
@ -115,9 +118,8 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
Thread.sleep(Constants.SLEEP_TIME_MILLIS); Thread.sleep(Constants.SLEEP_TIME_MILLIS);
} }
// todo: if the workflow event queue is much, we need to handle the back pressure // todo: if the workflow event queue is much, we need to handle the back pressure
boolean isOverload = SystemMetrics systemMetrics = metricsProvider.getSystemMetrics();
OSUtils.isOverload(masterConfig.getMaxCpuLoadAvg(), masterConfig.getReservedMemory()); if (serverLoadProtection.isOverload(systemMetrics)) {
if (isOverload) {
log.warn("The current server is overload, cannot consumes commands."); log.warn("The current server is overload, cannot consumes commands.");
MasterServerMetrics.incMasterOverload(); MasterServerMetrics.incMasterOverload();
Thread.sleep(Constants.SLEEP_TIME_MILLIS); Thread.sleep(Constants.SLEEP_TIME_MILLIS);

29
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/task/MasterHeartBeatTask.java

@ -24,8 +24,11 @@ import org.apache.dolphinscheduler.common.model.MasterHeartBeat;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.meter.metrics.MetricsProvider;
import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
import org.apache.dolphinscheduler.registry.api.RegistryClient; import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.config.MasterServerLoadProtection;
import lombok.NonNull; import lombok.NonNull;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -35,6 +38,8 @@ public class MasterHeartBeatTask extends BaseHeartBeatTask<MasterHeartBeat> {
private final MasterConfig masterConfig; private final MasterConfig masterConfig;
private final MetricsProvider metricsProvider;
private final RegistryClient registryClient; private final RegistryClient registryClient;
private final String heartBeatPath; private final String heartBeatPath;
@ -42,9 +47,11 @@ public class MasterHeartBeatTask extends BaseHeartBeatTask<MasterHeartBeat> {
private final int processId; private final int processId;
public MasterHeartBeatTask(@NonNull MasterConfig masterConfig, public MasterHeartBeatTask(@NonNull MasterConfig masterConfig,
@NonNull MetricsProvider metricsProvider,
@NonNull RegistryClient registryClient) { @NonNull RegistryClient registryClient) {
super("MasterHeartBeatTask", masterConfig.getHeartbeatInterval().toMillis()); super("MasterHeartBeatTask", masterConfig.getMaxHeartbeatInterval().toMillis());
this.masterConfig = masterConfig; this.masterConfig = masterConfig;
this.metricsProvider = metricsProvider;
this.registryClient = registryClient; this.registryClient = registryClient;
this.heartBeatPath = masterConfig.getMasterRegistryPath(); this.heartBeatPath = masterConfig.getMasterRegistryPath();
this.processId = OSUtils.getProcessID(); this.processId = OSUtils.getProcessID();
@ -52,16 +59,17 @@ public class MasterHeartBeatTask extends BaseHeartBeatTask<MasterHeartBeat> {
@Override @Override
public MasterHeartBeat getHeartBeat() { public MasterHeartBeat getHeartBeat() {
SystemMetrics systemMetrics = metricsProvider.getSystemMetrics();
ServerStatus serverStatus = getServerStatus(systemMetrics, masterConfig.getServerLoadProtection());
return MasterHeartBeat.builder() return MasterHeartBeat.builder()
.startupTime(ServerLifeCycleManager.getServerStartupTime()) .startupTime(ServerLifeCycleManager.getServerStartupTime())
.reportTime(System.currentTimeMillis()) .reportTime(System.currentTimeMillis())
.cpuUsage(OSUtils.cpuUsagePercentage()) .cpuUsage(systemMetrics.getTotalCpuUsedPercentage())
.availablePhysicalMemorySize(OSUtils.availablePhysicalMemorySize()) .jvmMemoryUsage(systemMetrics.getJvmMemoryUsedPercentage())
.reservedMemory(masterConfig.getReservedMemory()) .memoryUsage(systemMetrics.getSystemMemoryUsedPercentage())
.memoryUsage(OSUtils.memoryUsagePercentage()) .diskUsage(systemMetrics.getDiskUsedPercentage())
.diskAvailable(OSUtils.diskAvailable())
.processId(processId) .processId(processId)
.serverStatus(getServerStatus()) .serverStatus(serverStatus)
.host(NetUtils.getHost()) .host(NetUtils.getHost())
.port(masterConfig.getListenPort()) .port(masterConfig.getListenPort())
.build(); .build();
@ -75,9 +83,8 @@ public class MasterHeartBeatTask extends BaseHeartBeatTask<MasterHeartBeat> {
heartBeatPath, masterHeartBeatJson); heartBeatPath, masterHeartBeatJson);
} }
private ServerStatus getServerStatus() { private ServerStatus getServerStatus(SystemMetrics systemMetrics,
return OSUtils.isOverload(masterConfig.getMaxCpuLoadAvg(), masterConfig.getReservedMemory()) MasterServerLoadProtection masterServerLoadProtection) {
? ServerStatus.ABNORMAL return masterServerLoadProtection.isOverload(systemMetrics) ? ServerStatus.BUSY : ServerStatus.NORMAL;
: ServerStatus.NORMAL;
} }
} }

17
dolphinscheduler-master/src/main/resources/application.yaml

@ -113,16 +113,23 @@ master:
# master host selector to select a suitable worker, default value: LowerWeight. Optional values include random, round_robin, lower_weight # master host selector to select a suitable worker, default value: LowerWeight. Optional values include random, round_robin, lower_weight
host-selector: lower_weight host-selector: lower_weight
# master heartbeat interval # master heartbeat interval
heartbeat-interval: 10s max-heartbeat-interval: 10s
# master commit task retry times # master commit task retry times
task-commit-retry-times: 5 task-commit-retry-times: 5
# master commit task interval # master commit task interval
task-commit-interval: 1s task-commit-interval: 1s
state-wheel-interval: 5s state-wheel-interval: 5s
# master max cpuload avg percentage, only higher than the system cpu load average, master server can schedule. default value 1: will use 100% cpu server-load-protection:
max-cpu-load-avg: 1 # If set true, will open master overload protection
# master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, only the available memory is higher than 30%, master server can schedule. enabled: true
reserved-memory: 0.3 # Master max cpu usage, when the master's cpu usage is smaller then this value, master server can execute workflow.
max-cpu-usage-percentage-thresholds: 0.7
# Master max JVM memory usage , when the master's jvm memory usage is smaller then this value, master server can execute workflow.
max-jvm-memory-usage-percentage-thresholds: 0.7
# Master max System memory usage , when the master's system memory usage is smaller then this value, master server can execute workflow.
max-system-memory-usage-percentage-thresholds: 0.7
# Master max disk usage , when the master's disk usage is smaller then this value, master server can execute workflow.
max-disk-usage-percentage-thresholds: 0.7
# failover interval, the unit is minute # failover interval, the unit is minute
failover-interval: 10m failover-interval: 10m
# kill yarn / k8s application when failover taskInstance, default true # kill yarn / k8s application when failover taskInstance, default true

42
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/config/MasterServerLoadProtectionTest.java

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.config;
import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
class MasterServerLoadProtectionTest {
@Test
void isOverload() {
MasterServerLoadProtection masterServerLoadProtection = new MasterServerLoadProtection();
SystemMetrics systemMetrics = SystemMetrics.builder()
.jvmMemoryUsedPercentage(0.71)
.systemMemoryUsedPercentage(0.71)
.totalCpuUsedPercentage(0.71)
.diskUsedPercentage(0.71)
.build();
masterServerLoadProtection.setEnabled(false);
Assertions.assertFalse(masterServerLoadProtection.isOverload(systemMetrics));
masterServerLoadProtection.setEnabled(true);
Assertions.assertTrue(masterServerLoadProtection.isOverload(systemMetrics));
}
}

29
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java

@ -28,11 +28,11 @@ public class LowerWeightRoundRobinTest {
@Test @Test
public void testSelect() { public void testSelect() {
Collection<HostWeight> sources = new ArrayList<>(); Collection<HostWeight> sources = new ArrayList<>();
sources.add(new HostWeight(HostWorker.of("192.158.2.1:11", 100, "default"), 0.06, 0.44, 3.84, 1, sources.add(new HostWeight(HostWorker.of("192.158.2.1:11", 100, "default"), 0.06, 0.44, 0.384, 0.1,
System.currentTimeMillis() - 60 * 8 * 1000)); System.currentTimeMillis() - 60 * 8 * 1000));
sources.add(new HostWeight(HostWorker.of("192.158.2.2:22", 100, "default"), 0.06, 0.56, 3.24, 2, sources.add(new HostWeight(HostWorker.of("192.158.2.2:22", 100, "default"), 0.06, 0.56, 0.324, 0.2,
System.currentTimeMillis() - 60 * 5 * 1000)); System.currentTimeMillis() - 60 * 5 * 1000));
sources.add(new HostWeight(HostWorker.of("192.158.2.3:33", 100, "default"), 0.06, 0.80, 3.15, 1, sources.add(new HostWeight(HostWorker.of("192.158.2.3:33", 100, "default"), 0.06, 0.80, 0.315, 0.1,
System.currentTimeMillis() - 60 * 2 * 1000)); System.currentTimeMillis() - 60 * 2 * 1000));
LowerWeightRoundRobin roundRobin = new LowerWeightRoundRobin(); LowerWeightRoundRobin roundRobin = new LowerWeightRoundRobin();
@ -40,30 +40,27 @@ public class LowerWeightRoundRobinTest {
result = roundRobin.select(sources); result = roundRobin.select(sources);
Assertions.assertEquals("192.158.2.1", result.getHost().getIp()); Assertions.assertEquals("192.158.2.1", result.getHost().getIp());
result = roundRobin.select(sources); result = roundRobin.select(sources);
Assertions.assertEquals("192.158.2.1", result.getHost().getIp()); Assertions.assertEquals("192.158.2.2", result.getHost().getIp());
Assertions.assertEquals("192.158.2.1", result.getHost().getIp());
result = roundRobin.select(sources); result = roundRobin.select(sources);
Assertions.assertEquals("192.158.2.1", result.getHost().getIp()); Assertions.assertEquals("192.158.2.1", result.getHost().getIp());
result = roundRobin.select(sources); result = roundRobin.select(sources);
Assertions.assertEquals("192.158.2.3", result.getHost().getIp()); Assertions.assertEquals("192.158.2.2", result.getHost().getIp());
Assertions.assertEquals("192.158.2.3", result.getHost().getIp());
result = roundRobin.select(sources); result = roundRobin.select(sources);
Assertions.assertEquals("192.158.2.1", result.getHost().getIp()); Assertions.assertEquals("192.158.2.1", result.getHost().getIp());
result = roundRobin.select(sources); result = roundRobin.select(sources);
Assertions.assertEquals("192.158.2.1", result.getHost().getIp()); Assertions.assertEquals("192.158.2.2", result.getHost().getIp());
Assertions.assertEquals("192.158.2.1", result.getHost().getIp());
} }
@Test @Test
public void testWarmUpSelect() { public void testWarmUpSelect() {
Collection<HostWeight> sources = new ArrayList<>(); Collection<HostWeight> sources = new ArrayList<>();
sources.add(new HostWeight(HostWorker.of("192.158.2.1:11", 100, "default"), 0.06, 0.44, 3.84, 0, sources.add(new HostWeight(HostWorker.of("192.158.2.1:11", 100, "default"), 0.06, 0.44, 0.384, 0,
System.currentTimeMillis() - 60 * 8 * 1000)); System.currentTimeMillis() - 60 * 8 * 1000));
sources.add(new HostWeight(HostWorker.of("192.158.2.2:22", 100, "default"), 0.06, 0.44, 3.84, 0, sources.add(new HostWeight(HostWorker.of("192.158.2.2:22", 100, "default"), 0.06, 0.44, 0.384, 0,
System.currentTimeMillis() - 60 * 5 * 1000)); System.currentTimeMillis() - 60 * 5 * 1000));
sources.add(new HostWeight(HostWorker.of("192.158.2.3:33", 100, "default"), 0.06, 0.44, 3.84, 0, sources.add(new HostWeight(HostWorker.of("192.158.2.3:33", 100, "default"), 0.06, 0.44, 0.384, 0,
System.currentTimeMillis() - 60 * 3 * 1000)); System.currentTimeMillis() - 60 * 3 * 1000));
sources.add(new HostWeight(HostWorker.of("192.158.2.4:33", 100, "default"), 0.06, 0.44, 3.84, 0, sources.add(new HostWeight(HostWorker.of("192.158.2.4:33", 100, "default"), 0.06, 0.44, 0.384, 0,
System.currentTimeMillis() - 60 * 11 * 1000)); System.currentTimeMillis() - 60 * 11 * 1000));
LowerWeightRoundRobin roundRobin = new LowerWeightRoundRobin(); LowerWeightRoundRobin roundRobin = new LowerWeightRoundRobin();
@ -88,11 +85,11 @@ public class LowerWeightRoundRobinTest {
result = roundRobin.doSelect(sources); result = roundRobin.doSelect(sources);
Assertions.assertNull(result); Assertions.assertNull(result);
sources.add(new HostWeight(HostWorker.of("192.158.2.1:11", 100, "default"), 0.06, 0.44, 3.14, 1, sources.add(new HostWeight(HostWorker.of("192.158.2.1:11", 100, "default"), 0.06, 0.44, 0.314, 0.1,
System.currentTimeMillis() - 60 * 8 * 1000)); System.currentTimeMillis() - 60 * 8 * 1000));
sources.add(new HostWeight(HostWorker.of("192.158.2.2:22", 100, "default"), 0.06, 0.56, 3.24, 2, sources.add(new HostWeight(HostWorker.of("192.158.2.2:22", 100, "default"), 0.06, 0.56, 0.324, 0.2,
System.currentTimeMillis() - 60 * 5 * 1000)); System.currentTimeMillis() - 60 * 5 * 1000));
sources.add(new HostWeight(HostWorker.of("192.158.2.3:33", 100, "default"), 0.06, 0.80, 3.15, 1, sources.add(new HostWeight(HostWorker.of("192.158.2.3:33", 100, "default"), 0.06, 0.80, 0.315, 0.1,
System.currentTimeMillis() - 60 * 2 * 1000)); System.currentTimeMillis() - 60 * 2 * 1000));
result = roundRobin.doSelect(sources); result = roundRobin.doSelect(sources);
Assertions.assertEquals("192.158.2.1", result.getHost().getIp()); Assertions.assertEquals("192.158.2.1", result.getHost().getIp());

6
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java

@ -20,6 +20,8 @@ package org.apache.dolphinscheduler.server.master.registry;
import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.given;
import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ServerStatus;
import org.apache.dolphinscheduler.common.model.MasterHeartBeat;
import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@ -65,8 +67,10 @@ public class MasterRegistryClientTest {
private MasterConfig masterConfig; private MasterConfig masterConfig;
@BeforeEach @BeforeEach
public void before() throws Exception { public void before() {
given(registryClient.getHostByEventDataPath(Mockito.anyString())).willReturn("127.0.0.1:8080"); given(registryClient.getHostByEventDataPath(Mockito.anyString())).willReturn("127.0.0.1:8080");
given(masterHeartBeatTask.getHeartBeat())
.willReturn(MasterHeartBeat.builder().serverStatus(ServerStatus.NORMAL).build());
ReflectionTestUtils.setField(masterRegistryClient, "registryClient", registryClient); ReflectionTestUtils.setField(masterRegistryClient, "registryClient", registryClient);
ReflectionTestUtils.setField(masterRegistryClient, "masterHeartBeatTask", masterHeartBeatTask); ReflectionTestUtils.setField(masterRegistryClient, "masterHeartBeatTask", masterHeartBeatTask);

4
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterSlotManagerTest.java

@ -46,7 +46,7 @@ public class MasterSlotManagerTest {
// on normal Master side // on normal Master side
Mockito.when(masterConfig.getMasterAddress()).thenReturn("127.0.0.1:7777"); Mockito.when(masterConfig.getMasterAddress()).thenReturn("127.0.0.1:7777");
sendHeartBeat(ServerStatus.ABNORMAL, ServerStatus.NORMAL); sendHeartBeat(ServerStatus.BUSY, ServerStatus.NORMAL);
Assertions.assertEquals(1, masterSlotManager.getMasterSize()); Assertions.assertEquals(1, masterSlotManager.getMasterSize());
Assertions.assertEquals(0, masterSlotManager.getSlot()); Assertions.assertEquals(0, masterSlotManager.getSlot());
@ -60,7 +60,7 @@ public class MasterSlotManagerTest {
// on abnormal Master side // on abnormal Master side
Mockito.when(masterConfig.getMasterAddress()).thenReturn("127.0.0.1:6666"); Mockito.when(masterConfig.getMasterAddress()).thenReturn("127.0.0.1:6666");
sendHeartBeat(ServerStatus.ABNORMAL, ServerStatus.NORMAL); sendHeartBeat(ServerStatus.BUSY, ServerStatus.NORMAL);
Assertions.assertEquals(0, masterSlotManager.getMasterSize()); Assertions.assertEquals(0, masterSlotManager.getMasterSize());
Assertions.assertEquals(0, masterSlotManager.getSlot()); Assertions.assertEquals(0, masterSlotManager.getSlot());

6
dolphinscheduler-meter/pom.xml

@ -42,6 +42,12 @@
</dependencyManagement> </dependencyManagement>
<dependencies> <dependencies>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-common</artifactId>
<version>dev-SNAPSHOT</version>
</dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId> <artifactId>spring-boot-starter-actuator</artifactId>

69
dolphinscheduler-meter/src/main/java/org/apache/dolphinscheduler/meter/metrics/DefaultMetricsProvider.java

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.meter.metrics;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import io.micrometer.core.instrument.MeterRegistry;
@Component
public class DefaultMetricsProvider implements MetricsProvider {
@Autowired
private MeterRegistry meterRegistry;
private SystemMetrics systemMetrics;
private long lastRefreshTime = 0;
private static final long SYSTEM_METRICS_REFRESH_INTERVAL = 1_000L;
@Override
public SystemMetrics getSystemMetrics() {
if (System.currentTimeMillis() - lastRefreshTime < SYSTEM_METRICS_REFRESH_INTERVAL) {
return systemMetrics;
}
double systemCpuUsage = meterRegistry.get("system.cpu.usage").gauge().value();
double processCpuUsage = meterRegistry.get("process.cpu.usage").gauge().value();
double jvmMemoryUsed = meterRegistry.get("jvm.memory.used").meter().measure().iterator().next().getValue();
double jvmMemoryMax = meterRegistry.get("jvm.memory.max").meter().measure().iterator().next().getValue();
long totalSystemMemory = OSUtils.getTotalSystemMemory();
long systemMemoryAvailable = OSUtils.getSystemAvailableMemoryUsed();
systemMetrics = SystemMetrics.builder()
.systemCpuUsagePercentage(systemCpuUsage)
.processCpuUsagePercentage(processCpuUsage)
.totalCpuUsedPercentage(systemCpuUsage + processCpuUsage)
.jvmMemoryUsed(jvmMemoryUsed)
.jvmMemoryMax(jvmMemoryMax)
.jvmMemoryUsedPercentage(jvmMemoryUsed / jvmMemoryMax)
.systemMemoryUsed(totalSystemMemory - systemMemoryAvailable)
.systemMemoryMax(totalSystemMemory)
.systemMemoryUsedPercentage((double) (totalSystemMemory - systemMemoryAvailable) / totalSystemMemory)
.build();
lastRefreshTime = System.currentTimeMillis();
return systemMetrics;
}
}

24
dolphinscheduler-meter/src/main/java/org/apache/dolphinscheduler/meter/metrics/MetricsProvider.java

@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.meter.metrics;
public interface MetricsProvider {
SystemMetrics getSystemMetrics();
}

54
dolphinscheduler-meter/src/main/java/org/apache/dolphinscheduler/meter/metrics/SystemMetrics.java

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.meter.metrics;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class SystemMetrics {
// CPU
private double systemCpuUsagePercentage;
private double processCpuUsagePercentage;
private double totalCpuUsedPercentage;
// JVM-Memory
// todo: get pod memory usage
private double jvmMemoryUsed;
private double jvmMemoryMax;
private double jvmMemoryUsedPercentage;
// System-Memory
// todo: get pod cpu usage
private double systemMemoryUsed;
private double systemMemoryMax;
private double systemMemoryUsedPercentage;
// Disk
// todo: get pod disk usage
private double diskUsed;
private double diskTotal;
private double diskUsedPercentage;
}

2
dolphinscheduler-meter/src/main/resources/grafana-demo/docker-compose.yaml

@ -33,7 +33,7 @@ services:
networks: [ test ] networks: [ test ]
ports: ports:
# due to the DolphinScheduler frontend port is 3000, so we change the grafana default port to 3001. # due to the DolphinScheduler frontend port is 3000, so we change the grafana default port to 3001.
- "3001:3000" - "3000:3000"
environment: environment:
GF_AUTH_ANONYMOUS_ENABLED: "true" GF_AUTH_ANONYMOUS_ENABLED: "true"
volumes: volumes:

34
dolphinscheduler-standalone-server/src/main/resources/application.yaml

@ -182,16 +182,22 @@ master:
# master host selector to select a suitable worker, default value: LowerWeight. Optional values include random, round_robin, lower_weight # master host selector to select a suitable worker, default value: LowerWeight. Optional values include random, round_robin, lower_weight
host-selector: lower_weight host-selector: lower_weight
# master heartbeat interval # master heartbeat interval
heartbeat-interval: 10s max-heartbeat-interval: 10s
# master commit task retry times # master commit task retry times
task-commit-retry-times: 5 task-commit-retry-times: 5
# master commit task interval # master commit task interval
task-commit-interval: 1s task-commit-interval: 1s
state-wheel-interval: 5s state-wheel-interval: 5s
# master max cpuload avg percentage, only higher than the system cpu load average, master server can schedule. default value 1: will use 100% cpu server-load-protection:
max-cpu-load-avg: 1 enabled: true
# master reserved memory, only lower than system available memory, master server can schedule. default value 0.1, only the available memory is higher than 10%, master server can schedule. # Master max cpu usage, when the master's cpu usage is smaller then this value, master server can execute workflow.
reserved-memory: 0.1 max-cpu-usage-percentage-thresholds: 0.9
# Master max JVM memory usage , when the master's jvm memory usage is smaller then this value, master server can execute workflow.
max-jvm-memory-usage-percentage-thresholds: 0.9
# Master max System memory usage , when the master's system memory usage is smaller then this value, master server can execute workflow.
max-system-memory-usage-percentage-thresholds: 0.9
# Master max disk usage , when the master's disk usage is smaller then this value, master server can execute workflow.
max-disk-usage-percentage-thresholds: 0.9
# failover interval # failover interval
failover-interval: 10m failover-interval: 10m
# kill yarn/k8s application when failover taskInstance, default true # kill yarn/k8s application when failover taskInstance, default true
@ -204,13 +210,19 @@ worker:
# worker execute thread number to limit task instances in parallel # worker execute thread number to limit task instances in parallel
exec-threads: 10 exec-threads: 10
# worker heartbeat interval # worker heartbeat interval
heartbeat-interval: 10s max-heartbeat-interval: 10s
# worker host weight to dispatch tasks, default value 100 # worker host weight to dispatch tasks, default value 100
host-weight: 100 host-weight: 100
# worker max cpuload avg, only higher than the system cpu load average, worker server can be dispatched tasks. default value 1: will use 100% cpu. server-load-protection:
max-cpu-load-avg: 1 enabled: true
# worker reserved memory, only lower than system available memory, worker server can be dispatched tasks. default value 0.1, only the available memory is higher than 10%, worker server can receive task. # Worker max cpu usage, when the worker's cpu usage is smaller then this value, worker server can be dispatched tasks.
reserved-memory: 0.1 max-cpu-usage-percentage-thresholds: 0.9
# Worker max JVM memory usage , when the worker's jvm memory usage is smaller then this value, worker server can be dispatched tasks.
max-jvm-memory-usage-percentage-thresholds: 0.9
# Worker max System memory usage , when the worker's system memory usage is smaller then this value, worker server can be dispatched tasks.
max-system-memory-usage-percentage-thresholds: 0.9
# Worker max disk usage , when the worker's disk usage is smaller then this value, worker server can be dispatched tasks.
max-disk-usage-percentage-thresholds: 0.9
task-execute-threads-full-policy: REJECT task-execute-threads-full-policy: REJECT
tenant-config: tenant-config:
# tenant corresponds to the user of the system, which is used by the worker to submit the job. If system does not have this user, it will be automatically created after the parameter worker.tenant.auto.create is true. # tenant corresponds to the user of the system, which is used by the worker to submit the job. If system does not have this user, it will be automatically created after the parameter worker.tenant.auto.create is true.
@ -225,7 +237,7 @@ alert:
# Mark each alert of alert server if late after x milliseconds as failed. # Mark each alert of alert server if late after x milliseconds as failed.
# Define value is (0 = infinite), and alert server would be waiting alert result. # Define value is (0 = infinite), and alert server would be waiting alert result.
wait-timeout: 0 wait-timeout: 0
heartbeat-interval: 60s max-heartbeat-interval: 60s
query_alert_threshold: 100 query_alert_threshold: 100
api: api:

19
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

@ -21,11 +21,14 @@ import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager; import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.meter.metrics.MetricsProvider;
import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager; import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils; import org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner; import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient; import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcServer; import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcServer;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutor; import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutor;
@ -63,6 +66,9 @@ public class WorkerServer implements IStoppable {
@Autowired @Autowired
private MessageRetryRunner messageRetryRunner; private MessageRetryRunner messageRetryRunner;
@Autowired
private MetricsProvider metricsProvider;
/** /**
* worker server startup, not use web service * worker server startup, not use web service
* *
@ -83,6 +89,19 @@ public class WorkerServer implements IStoppable {
this.messageRetryRunner.start(); this.messageRetryRunner.start();
WorkerServerMetrics.registerWorkerCpuUsageGauge(() -> {
SystemMetrics systemMetrics = metricsProvider.getSystemMetrics();
return systemMetrics.getTotalCpuUsedPercentage();
});
WorkerServerMetrics.registerWorkerMemoryAvailableGauge(() -> {
SystemMetrics systemMetrics = metricsProvider.getSystemMetrics();
return (systemMetrics.getSystemMemoryMax() - systemMetrics.getSystemMemoryUsed()) / 1024.0 / 1024 / 1024;
});
WorkerServerMetrics.registerWorkerMemoryUsageGauge(() -> {
SystemMetrics systemMetrics = metricsProvider.getSystemMetrics();
return systemMetrics.getJvmMemoryUsedPercentage();
});
/* /*
* registry hooks, which are called before the process exits * registry hooks, which are called before the process exits
*/ */

17
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java

@ -43,10 +43,9 @@ public class WorkerConfig implements Validator {
private int listenPort = 1234; private int listenPort = 1234;
private int execThreads = 10; private int execThreads = 10;
private Duration heartbeatInterval = Duration.ofSeconds(10); private Duration maxHeartbeatInterval = Duration.ofSeconds(10);
private int hostWeight = 100; private int hostWeight = 100;
private int maxCpuLoadAvg = -1; private WorkerServerLoadProtection serverLoadProtection = new WorkerServerLoadProtection();
private double reservedMemory = 0.1;
private ConnectStrategyProperties registryDisconnectStrategy = new ConnectStrategyProperties(); private ConnectStrategyProperties registryDisconnectStrategy = new ConnectStrategyProperties();
/** /**
@ -70,11 +69,8 @@ public class WorkerConfig implements Validator {
if (workerConfig.getExecThreads() <= 0) { if (workerConfig.getExecThreads() <= 0) {
errors.rejectValue("exec-threads", null, "should be a positive value"); errors.rejectValue("exec-threads", null, "should be a positive value");
} }
if (workerConfig.getHeartbeatInterval().getSeconds() <= 0) { if (workerConfig.getMaxHeartbeatInterval().getSeconds() <= 0) {
errors.rejectValue("heartbeat-interval", null, "shoule be a valid duration"); errors.rejectValue("max-heartbeat-interval", null, "shoule be a valid duration");
}
if (workerConfig.getMaxCpuLoadAvg() <= 0) {
workerConfig.setMaxCpuLoadAvg(Runtime.getRuntime().availableProcessors() * 2);
} }
if (StringUtils.isEmpty(workerConfig.getWorkerAddress())) { if (StringUtils.isEmpty(workerConfig.getWorkerAddress())) {
workerConfig.setWorkerAddress(NetUtils.getAddr(workerConfig.getListenPort())); workerConfig.setWorkerAddress(NetUtils.getAddr(workerConfig.getListenPort()));
@ -90,11 +86,10 @@ public class WorkerConfig implements Validator {
"\n****************************Worker Configuration**************************************" + "\n****************************Worker Configuration**************************************" +
"\n listen-port -> " + listenPort + "\n listen-port -> " + listenPort +
"\n exec-threads -> " + execThreads + "\n exec-threads -> " + execThreads +
"\n heartbeat-interval -> " + heartbeatInterval + "\n max-heartbeat-interval -> " + maxHeartbeatInterval +
"\n host-weight -> " + hostWeight + "\n host-weight -> " + hostWeight +
"\n tenantConfig -> " + tenantConfig + "\n tenantConfig -> " + tenantConfig +
"\n max-cpu-load-avg -> " + maxCpuLoadAvg + "\n server-load-protection -> " + serverLoadProtection +
"\n reserved-memory -> " + reservedMemory +
"\n registry-disconnect-strategy -> " + registryDisconnectStrategy + "\n registry-disconnect-strategy -> " + registryDisconnectStrategy +
"\n task-execute-threads-full-policy: " + taskExecuteThreadsFullPolicy + "\n task-execute-threads-full-policy: " + taskExecuteThreadsFullPolicy +
"\n address -> " + workerAddress + "\n address -> " + workerAddress +

73
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerServerLoadProtection.java

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.config;
import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Data
@Slf4j
@NoArgsConstructor
@AllArgsConstructor
public class WorkerServerLoadProtection {
private boolean enabled = true;
private double maxCpuUsagePercentageThresholds = 0.7;
private double maxJVMMemoryUsagePercentageThresholds = 0.7;
private double maxSystemMemoryUsagePercentageThresholds = 0.7;
private double maxDiskUsagePercentageThresholds = 0.7;
public boolean isOverload(SystemMetrics systemMetrics) {
if (!enabled) {
return false;
}
if (systemMetrics.getTotalCpuUsedPercentage() > maxCpuUsagePercentageThresholds) {
log.info(
"Worker OverLoad: the TotalCpuUsedPercentage: {} is over then the MaxCpuUsagePercentageThresholds {}",
systemMetrics.getTotalCpuUsedPercentage(), maxCpuUsagePercentageThresholds);
return true;
}
if (systemMetrics.getJvmMemoryUsedPercentage() > maxJVMMemoryUsagePercentageThresholds) {
log.info(
"Worker OverLoad: the JvmMemoryUsedPercentage: {} is over then the maxCpuUsagePercentageThresholds {}",
systemMetrics.getJvmMemoryUsedPercentage(), maxJVMMemoryUsagePercentageThresholds);
return true;
}
if (systemMetrics.getDiskUsedPercentage() > maxDiskUsagePercentageThresholds) {
log.info("Worker OverLoad: the DiskUsedPercentage: {} is over then the MaxCpuUsagePercentageThresholds {}",
systemMetrics.getDiskUsedPercentage(), maxDiskUsagePercentageThresholds);
return true;
}
if (systemMetrics.getSystemMemoryUsedPercentage() > maxSystemMemoryUsagePercentageThresholds) {
log.info(
"Worker OverLoad: the SystemMemoryUsedPercentage: {} is over then the MaxSystemMemoryUsagePercentageThresholds {}",
systemMetrics.getSystemMemoryUsedPercentage(), maxSystemMemoryUsagePercentageThresholds);
return true;
}
return false;
}
}

20
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java

@ -21,11 +21,13 @@ import static org.apache.dolphinscheduler.common.constants.Constants.SLEEP_TIME_
import org.apache.dolphinscheduler.common.IStoppable; import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.ServerStatus;
import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.model.WorkerHeartBeat; import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.extract.base.utils.Host; import org.apache.dolphinscheduler.extract.base.utils.Host;
import org.apache.dolphinscheduler.meter.metrics.MetricsProvider;
import org.apache.dolphinscheduler.registry.api.RegistryClient; import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.RegistryException; import org.apache.dolphinscheduler.registry.api.RegistryException;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
@ -55,7 +57,7 @@ public class WorkerRegistryClient implements AutoCloseable {
private WorkerConfig workerConfig; private WorkerConfig workerConfig;
@Autowired @Autowired
private WorkerTaskExecutorThreadPool workerManagerThread; private WorkerTaskExecutorThreadPool workerTaskExecutorThreadPool;
@Autowired @Autowired
private RegistryClient registryClient; private RegistryClient registryClient;
@ -64,14 +66,18 @@ public class WorkerRegistryClient implements AutoCloseable {
@Lazy @Lazy
private WorkerConnectStrategy workerConnectStrategy; private WorkerConnectStrategy workerConnectStrategy;
@Autowired
private MetricsProvider metricsProvider;
private WorkerHeartBeatTask workerHeartBeatTask; private WorkerHeartBeatTask workerHeartBeatTask;
@PostConstruct @PostConstruct
public void initWorkRegistry() { public void initWorkRegistry() {
this.workerHeartBeatTask = new WorkerHeartBeatTask( this.workerHeartBeatTask = new WorkerHeartBeatTask(
workerConfig, workerConfig,
metricsProvider,
registryClient, registryClient,
() -> workerManagerThread.getWaitingTaskExecutorSize()); workerTaskExecutorThreadPool);
} }
public void start() { public void start() {
@ -84,11 +90,13 @@ public class WorkerRegistryClient implements AutoCloseable {
} }
} }
/** private void registry() throws InterruptedException {
* registry
*/
private void registry() {
WorkerHeartBeat workerHeartBeat = workerHeartBeatTask.getHeartBeat(); WorkerHeartBeat workerHeartBeat = workerHeartBeatTask.getHeartBeat();
while (ServerStatus.BUSY.equals(workerHeartBeat.getServerStatus())) {
log.warn("Worker node is BUSY: {}", workerHeartBeat);
workerHeartBeat = workerHeartBeatTask.getHeartBeat();
Thread.sleep(SLEEP_TIME_MILLIS);
}
String workerZKPath = workerConfig.getWorkerRegistryPath(); String workerZKPath = workerConfig.getWorkerRegistryPath();
// remove before persist // remove before persist
registryClient.remove(workerZKPath); registryClient.remove(workerZKPath);

9
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPool.java

@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.server.worker.runner; package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.server.worker.config.TaskExecuteThreadsFullPolicy; import org.apache.dolphinscheduler.server.worker.config.TaskExecuteThreadsFullPolicy;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics; import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics;
@ -42,12 +41,8 @@ public class WorkerTaskExecutorThreadPool {
ThreadUtils.newDaemonFixedThreadExecutor("WorkerTaskExecutorThreadPool", workerConfig.getExecThreads()); ThreadUtils.newDaemonFixedThreadExecutor("WorkerTaskExecutorThreadPool", workerConfig.getExecThreads());
this.workerConfig = workerConfig; this.workerConfig = workerConfig;
WorkerServerMetrics.registerWorkerCpuUsageGauge(OSUtils::cpuUsagePercentage); WorkerServerMetrics.registerWorkerExecuteQueueSizeGauge(this::getWaitingTaskExecutorSize);
WorkerServerMetrics.registerWorkerMemoryAvailableGauge(OSUtils::availablePhysicalMemorySize); WorkerServerMetrics.registerWorkerActiveExecuteThreadGauge(this::getRunningTaskExecutorSize);
WorkerServerMetrics.registerWorkerMemoryUsageGauge(OSUtils::memoryUsagePercentage);
WorkerServerMetrics.registerWorkerExecuteQueueSizeGauge(
() -> threadPoolExecutor.getQueue().size() - threadPoolExecutor.getActiveCount());
WorkerServerMetrics.registerWorkerActiveExecuteThreadGauge(threadPoolExecutor::getActiveCount);
} }
public boolean submitWorkerTaskExecutor(WorkerTaskExecutor workerTaskExecutor) { public boolean submitWorkerTaskExecutor(WorkerTaskExecutor workerTaskExecutor) {

61
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java

@ -24,10 +24,12 @@ import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.meter.metrics.MetricsProvider;
import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
import org.apache.dolphinscheduler.registry.api.RegistryClient; import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.config.WorkerServerLoadProtection;
import java.util.function.Supplier; import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThreadPool;
import lombok.NonNull; import lombok.NonNull;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -38,43 +40,38 @@ public class WorkerHeartBeatTask extends BaseHeartBeatTask<WorkerHeartBeat> {
private final WorkerConfig workerConfig; private final WorkerConfig workerConfig;
private final RegistryClient registryClient; private final RegistryClient registryClient;
private final Supplier<Integer> workerWaitingTaskCount; private final MetricsProvider metricsProvider;
private final WorkerTaskExecutorThreadPool workerTaskExecutorThreadPool;
private final int processId; private final int processId;
public WorkerHeartBeatTask(@NonNull WorkerConfig workerConfig, public WorkerHeartBeatTask(@NonNull WorkerConfig workerConfig,
@NonNull MetricsProvider metricsProvider,
@NonNull RegistryClient registryClient, @NonNull RegistryClient registryClient,
@NonNull Supplier<Integer> workerWaitingTaskCount) { @NonNull WorkerTaskExecutorThreadPool workerTaskExecutorThreadPool) {
super("WorkerHeartBeatTask", workerConfig.getHeartbeatInterval().toMillis()); super("WorkerHeartBeatTask", workerConfig.getMaxHeartbeatInterval().toMillis());
this.metricsProvider = metricsProvider;
this.workerConfig = workerConfig; this.workerConfig = workerConfig;
this.registryClient = registryClient; this.registryClient = registryClient;
this.workerWaitingTaskCount = workerWaitingTaskCount; this.workerTaskExecutorThreadPool = workerTaskExecutorThreadPool;
this.processId = OSUtils.getProcessID(); this.processId = OSUtils.getProcessID();
} }
@Override @Override
public WorkerHeartBeat getHeartBeat() { public WorkerHeartBeat getHeartBeat() {
double cpuUsagePercentage = OSUtils.cpuUsagePercentage(); SystemMetrics systemMetrics = metricsProvider.getSystemMetrics();
int maxCpuUsePercentage = workerConfig.getMaxCpuLoadAvg(); ServerStatus serverStatus = getServerStatus(systemMetrics, workerConfig, workerTaskExecutorThreadPool);
double reservedMemory = workerConfig.getReservedMemory();
double memoryUsagePercentage = OSUtils.memoryUsagePercentage();
int execThreads = workerConfig.getExecThreads();
ServerStatus serverStatus =
getServerStatus(cpuUsagePercentage, maxCpuUsePercentage, memoryUsagePercentage, reservedMemory,
execThreads, this.workerWaitingTaskCount.get());
return WorkerHeartBeat.builder() return WorkerHeartBeat.builder()
.startupTime(ServerLifeCycleManager.getServerStartupTime()) .startupTime(ServerLifeCycleManager.getServerStartupTime())
.reportTime(System.currentTimeMillis()) .reportTime(System.currentTimeMillis())
.cpuUsage(cpuUsagePercentage) .cpuUsage(systemMetrics.getTotalCpuUsedPercentage())
.availablePhysicalMemorySize(OSUtils.availablePhysicalMemorySize()) .jvmMemoryUsage(systemMetrics.getJvmMemoryUsedPercentage())
.memoryUsage(OSUtils.memoryUsagePercentage()) .memoryUsage(systemMetrics.getSystemMemoryUsedPercentage())
.reservedMemory(reservedMemory)
.diskAvailable(OSUtils.diskAvailable())
.processId(processId) .processId(processId)
.workerHostWeight(workerConfig.getHostWeight()) .workerHostWeight(workerConfig.getHostWeight())
.workerWaitingTaskCount(this.workerWaitingTaskCount.get()) .threadPoolUsage(workerTaskExecutorThreadPool.getRunningTaskExecutorSize()
.workerExecThreadCount(workerConfig.getExecThreads()) + workerTaskExecutorThreadPool.getWaitingTaskExecutorSize())
.serverStatus(serverStatus) .serverStatus(serverStatus)
.host(NetUtils.getHost()) .host(NetUtils.getHost())
.port(workerConfig.getListenPort()) .port(workerConfig.getListenPort())
@ -91,23 +88,13 @@ public class WorkerHeartBeatTask extends BaseHeartBeatTask<WorkerHeartBeat> {
workerRegistryPath, workerHeartBeatJson); workerRegistryPath, workerHeartBeatJson);
} }
private ServerStatus getServerStatus(double cpuUsagePercentage, private ServerStatus getServerStatus(SystemMetrics systemMetrics,
double maxCpuUsePercentage, WorkerConfig workerConfig,
double memoryUsagePercentage, WorkerTaskExecutorThreadPool workerTaskExecutorThreadPool) {
double reservedMemory, if (workerTaskExecutorThreadPool.isOverload()) {
int workerExecThreadCount,
int workerWaitingTaskCount) {
if (cpuUsagePercentage > maxCpuUsePercentage || (1 - memoryUsagePercentage) < reservedMemory) {
log.warn(
"current cpu load average {} is higher than {} or available memory {} is lower than {}",
cpuUsagePercentage, maxCpuUsePercentage, 1 - memoryUsagePercentage, reservedMemory);
return ServerStatus.ABNORMAL;
} else if (workerWaitingTaskCount > workerExecThreadCount) {
log.warn("current waiting task count {} is large than worker thread count {}, worker is busy",
workerWaitingTaskCount, workerExecThreadCount);
return ServerStatus.BUSY; return ServerStatus.BUSY;
} else {
return ServerStatus.NORMAL;
} }
WorkerServerLoadProtection serverLoadProtection = workerConfig.getServerLoadProtection();
return serverLoadProtection.isOverload(systemMetrics) ? ServerStatus.BUSY : ServerStatus.NORMAL;
} }
} }

17
dolphinscheduler-worker/src/main/resources/application.yaml

@ -44,13 +44,20 @@ worker:
# worker execute thread number to limit task instances in parallel # worker execute thread number to limit task instances in parallel
exec-threads: 100 exec-threads: 100
# worker heartbeat interval # worker heartbeat interval
heartbeat-interval: 10s max-heartbeat-interval: 10s
# worker host weight to dispatch tasks, default value 100 # worker host weight to dispatch tasks, default value 100
host-weight: 100 host-weight: 100
# worker max cpuload avg, only higher than the system cpu load average, worker server can be dispatched tasks. default value 1: will use 100% cpu. server-load-protection:
max-cpu-load-avg: 1 # If set true, will open worker overload protection
# worker reserved memory, only lower than system available memory, worker server can be dispatched tasks. default value 0.3, only the available memory is higher than 30%, worker server can receive task. enabled: true
reserved-memory: 0.3 # Worker max cpu usage, when the worker's cpu usage is smaller then this value, worker server can be dispatched tasks.
max-cpu-usage-percentage-thresholds: 0.7
# Worker max jvm memory usage , when the worker's jvm memory usage is smaller then this value, worker server can be dispatched tasks.
max-jvm-memory-usage-percentage-thresholds: 0.7
# Worker max System memory usage , when the master's system memory usage is smaller then this value, master server can execute workflow.
max-system-memory-usage-percentage-thresholds: 0.7
# Worker max disk usage , when the worker's disk usage is smaller then this value, worker server can be dispatched tasks.
max-disk-usage-percentage-thresholds: 0.7
registry-disconnect-strategy: registry-disconnect-strategy:
# The disconnect strategy: stop, waiting # The disconnect strategy: stop, waiting
strategy: waiting strategy: waiting

42
dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/config/WorkerServerLoadProtectionTest.java

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.config;
import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
class WorkerServerLoadProtectionTest {
@Test
void isOverload() {
WorkerServerLoadProtection workerServerLoadProtection = new WorkerServerLoadProtection();
SystemMetrics systemMetrics = SystemMetrics.builder()
.jvmMemoryUsedPercentage(0.71)
.systemMemoryUsedPercentage(0.71)
.totalCpuUsedPercentage(0.71)
.diskUsedPercentage(0.71)
.build();
workerServerLoadProtection.setEnabled(false);
Assertions.assertFalse(workerServerLoadProtection.isOverload(systemMetrics));
workerServerLoadProtection.setEnabled(true);
Assertions.assertTrue(workerServerLoadProtection.isOverload(systemMetrics));
}
}

28
dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java

@ -20,14 +20,15 @@ package org.apache.dolphinscheduler.server.worker.registry;
import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.given;
import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.meter.metrics.MetricsProvider;
import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
import org.apache.dolphinscheduler.registry.api.RegistryClient; import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.config.WorkerServerLoadProtection;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThreadPool; import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThreadPool;
import java.time.Duration; import java.time.Duration;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@ -36,10 +37,6 @@ import org.mockito.InjectMocks;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Sets;
/** /**
* worker registry test * worker registry test
@ -47,10 +44,6 @@ import com.google.common.collect.Sets;
@ExtendWith(MockitoExtension.class) @ExtendWith(MockitoExtension.class)
public class WorkerRegistryClientTest { public class WorkerRegistryClientTest {
private static final Logger log = LoggerFactory.getLogger(WorkerRegistryClientTest.class);
private static final String TEST_WORKER_GROUP = "test";
@InjectMocks @InjectMocks
private WorkerRegistryClient workerRegistryClient; private WorkerRegistryClient workerRegistryClient;
@ -61,10 +54,7 @@ public class WorkerRegistryClientTest {
private WorkerConfig workerConfig; private WorkerConfig workerConfig;
@Mock @Mock
private Set<String> workerGroups = Sets.newHashSet("127.0.0.1"); private MetricsProvider metricsProvider;
@Mock
private ScheduledExecutorService heartBeatExecutor;
@Mock @Mock
private WorkerTaskExecutorThreadPool workerManagerThread; private WorkerTaskExecutorThreadPool workerManagerThread;
@ -72,17 +62,13 @@ public class WorkerRegistryClientTest {
@Mock @Mock
private WorkerConnectStrategy workerConnectStrategy; private WorkerConnectStrategy workerConnectStrategy;
// private static final Set<String> workerGroups;
static {
// workerGroups = Sets.newHashSet(DEFAULT_WORKER_GROUP, TEST_WORKER_GROUP);
}
@Test @Test
public void testStart() { public void testStart() {
given(workerConfig.getWorkerAddress()).willReturn(NetUtils.getAddr(1234)); given(workerConfig.getWorkerAddress()).willReturn(NetUtils.getAddr(1234));
given(workerConfig.getHeartbeatInterval()).willReturn(Duration.ofSeconds(1)); given(workerConfig.getMaxHeartbeatInterval()).willReturn(Duration.ofSeconds(1));
given(workerConfig.getServerLoadProtection()).willReturn(new WorkerServerLoadProtection());
given(metricsProvider.getSystemMetrics()).willReturn(new SystemMetrics());
given(registryClient.checkNodeExists(Mockito.anyString(), Mockito.any(RegistryNodeType.class))) given(registryClient.checkNodeExists(Mockito.anyString(), Mockito.any(RegistryNodeType.class)))
.willReturn(true); .willReturn(true);

Loading…
Cancel
Save