From e86630bb7b40153aa9852bad7d315684990ec0ac Mon Sep 17 00:00:00 2001 From: Eric Gao Date: Thu, 1 Jun 2023 16:54:48 +0800 Subject: [PATCH] [Improvement][Metrics] Add more worker related metrics and fix some previous ones (#14254) * Add more worker related metrics and fix some previous ones * update metrics docs --- docs/docs/en/guide/metrics/metrics.md | 7 +++- docs/docs/zh/guide/metrics/metrics.md | 7 +++- .../grafana/DolphinSchedulerWorker.json | 4 +-- .../worker/metrics/WorkerServerMetrics.java | 36 +++++++++++++++++-- .../worker/runner/WorkerExecService.java | 6 +++- .../worker/runner/WorkerManagerThread.java | 7 ++++ 6 files changed, 59 insertions(+), 8 deletions(-) diff --git a/docs/docs/en/guide/metrics/metrics.md b/docs/docs/en/guide/metrics/metrics.md index e45c3c72d3..3470340465 100644 --- a/docs/docs/en/guide/metrics/metrics.md +++ b/docs/docs/en/guide/metrics/metrics.md @@ -73,7 +73,6 @@ For example, you can get the master metrics by `curl http://localhost:5679/actua - ds.task.dispatch.failure.count: (counter) the number of tasks failed to dispatch, retry failure included - ds.task.dispatch.error.count: (counter) the number of task dispatch errors - ds.task.execution.count.by.type: (counter) the number of task executions grouped by tag `task_type` -- ds.task.running: (gauge) the number of running tasks - ds.task.prepared: (gauge) the number of tasks prepared for task queue - ds.task.execution.count: (counter) the number of executed tasks - ds.task.execution.duration: (histogram) duration of task executions @@ -104,6 +103,12 @@ For example, you can get the master metrics by `curl http://localhost:5679/actua ### Worker Server Metrics - ds.worker.overload.count: (counter) the number of times the worker overloaded +- ds.worker.task: (gauge) the number of tasks on the worker, including pending and running ones +- ds.worker.execute.queue.size: (gauge) the number of pending tasks on the worker +- ds.worker.active.execute.thread: (gauge) the number of running tasks on the worker +- ds.worker.memory.available: (gauge) the available physical memory of the worker (GB) +- ds.worker.cpu.usage: (gauge) the cpu usage percentage of the worker +- ds.worker.memory.usage: (gauge) the memory usage percentage of the worker - ds.worker.full.submit.queue.count: (counter) the number of times the worker's submit queue being full - ds.worker.resource.download.count: (counter) the number of downloaded resource files on workers, sliced by tag `status` - ds.worker.resource.download.duration: (histogram) the time cost of resource download on workers diff --git a/docs/docs/zh/guide/metrics/metrics.md b/docs/docs/zh/guide/metrics/metrics.md index ce9430cb7c..aa620b873e 100644 --- a/docs/docs/zh/guide/metrics/metrics.md +++ b/docs/docs/zh/guide/metrics/metrics.md @@ -74,7 +74,6 @@ metrics exporter端口`server.port`是在application.yaml里定义的: master: ` - ds.task.dispatch.failure.count: (counter) 分发失败的任务数量,重试也包含在内 - ds.task.dispatch.error.count: (counter) 分发任务的错误数量 - ds.task.execution.count.by.type: (counter) 任务执行数量,按标签`task_type`聚类 -- ds.task.running: (gauge) 正在运行的任务数量 - ds.task.prepared: (gauge) 准备好且待提交的任务数量 - ds.task.execution.count: (counter) 已执行的任务数量 - ds.task.execution.duration: (histogram) 任务执行时长 @@ -104,6 +103,12 @@ metrics exporter端口`server.port`是在application.yaml里定义的: master: ` ### Worker Server指标 - ds.worker.overload.count: (counter) worker过载次数 +- ds.worker.task: (gauge) worker上任务总数,包含等待提交和正在执行的任务 +- ds.worker.execute.queue.size: (gauge) worker上等待提交的任务总数 +- ds.worker.active.execute.thread: (gauge) worker上正在执行的任务总数 +- ds.worker.memory.available: (gauge) worker机器可用物理内存 (GB) +- ds.worker.cpu.usage: (gauge) worker机器cpu使用百分比 +- ds.worker.memory.usage: (gauge) worker机器内存使用百分比 - ds.worker.full.submit.queue.count: (counter) worker提交队列全满次数 - ds.worker.resource.download.count: (counter) worker下载资源文件的次数,可由`status`标签切分 - ds.worker.resource.download.duration: (histogram) worker下载资源文件时花费的时间分布 diff --git a/dolphinscheduler-meter/src/main/resources/grafana/DolphinSchedulerWorker.json b/dolphinscheduler-meter/src/main/resources/grafana/DolphinSchedulerWorker.json index 548ef495f1..b2ab589018 100644 --- a/dolphinscheduler-meter/src/main/resources/grafana/DolphinSchedulerWorker.json +++ b/dolphinscheduler-meter/src/main/resources/grafana/DolphinSchedulerWorker.json @@ -437,11 +437,11 @@ "type": "prometheus", "uid": "PBFA97CFB590B2093" }, - "expr": "ds_task_running{}", + "expr": "ds_worker_task{}", "refId": "A" } ], - "title": "Worker Running Task", + "title": "Worker Tasks Total", "type": "timeseries" }, { diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/metrics/WorkerServerMetrics.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/metrics/WorkerServerMetrics.java index ad40bc3443..a73ce541a3 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/metrics/WorkerServerMetrics.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/metrics/WorkerServerMetrics.java @@ -91,9 +91,39 @@ public class WorkerServerMetrics { workerResourceDownloadSizeDistribution.record(size); } - public void registerWorkerRunningTaskGauge(final Supplier supplier) { - Gauge.builder("ds.task.running", supplier) - .description("number of running tasks on workers") + public void registerWorkerTaskTotalGauge(final Supplier supplier) { + Gauge.builder("ds.worker.task", supplier) + .description("total number of tasks on worker") + .register(Metrics.globalRegistry); + } + + public void registerWorkerExecuteQueueSizeGauge(Supplier supplier) { + Gauge.builder("ds.worker.execute.queue.size", supplier) + .description("worker execute queue size") + .register(Metrics.globalRegistry); + } + + public void registerWorkerActiveExecuteThreadGauge(Supplier supplier) { + Gauge.builder("ds.worker.active.execute.thread", supplier) + .description("number of active task execute threads on worker") + .register(Metrics.globalRegistry); + } + + public void registerWorkerMemoryAvailableGauge(Supplier supplier) { + Gauge.builder("ds.worker.memory.available", supplier) + .description("worker memory available") + .register(Metrics.globalRegistry); + } + + public void registerWorkerCpuUsageGauge(Supplier supplier) { + Gauge.builder("ds.worker.cpu.usage", supplier) + .description("worker cpu usage") + .register(Metrics.globalRegistry); + } + + public void registerWorkerMemoryUsageGauge(Supplier supplier) { + Gauge.builder("ds.worker.memory.usage", supplier) + .description("worker memory usage") .register(Metrics.globalRegistry); } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java index 3238122e88..f16e887bec 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java @@ -52,7 +52,7 @@ public class WorkerExecService { this.execService = execService; this.listeningExecutorService = MoreExecutors.listeningDecorator(this.execService); this.taskExecuteThreadMap = taskExecuteThreadMap; - WorkerServerMetrics.registerWorkerRunningTaskGauge(taskExecuteThreadMap::size); + WorkerServerMetrics.registerWorkerTaskTotalGauge(taskExecuteThreadMap::size); } public void submit(final WorkerTaskExecuteRunnable taskExecuteThread) { @@ -86,6 +86,10 @@ public class WorkerExecService { return ((ThreadPoolExecutor) this.execService).getQueue().size(); } + public int getActiveExecThreadCount() { + return ((ThreadPoolExecutor) this.execService).getActiveCount(); + } + public Map getTaskExecuteThreadMap() { return taskExecuteThreadMap; } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java index c6f2125c72..829c8d83c3 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.worker.runner; import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager; import org.apache.dolphinscheduler.common.thread.ThreadUtils; +import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager; import org.apache.dolphinscheduler.server.worker.config.TaskExecuteThreadsFullPolicy; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; @@ -122,6 +123,12 @@ public class WorkerManagerThread implements Runnable { @Override public void run() { + WorkerServerMetrics.registerWorkerCpuUsageGauge(OSUtils::cpuUsagePercentage); + WorkerServerMetrics.registerWorkerMemoryAvailableGauge(OSUtils::availablePhysicalMemorySize); + WorkerServerMetrics.registerWorkerMemoryUsageGauge(OSUtils::memoryUsagePercentage); + WorkerServerMetrics.registerWorkerExecuteQueueSizeGauge(workerExecService::getThreadPoolQueueSize); + WorkerServerMetrics.registerWorkerActiveExecuteThreadGauge(workerExecService::getActiveExecThreadCount); + Thread.currentThread().setName("Worker-Execute-Manager-Thread"); while (!ServerLifeCycleManager.isStopped()) { try {