Browse Source

[Improvement][Metrics] Switch to use tags to indicate task / workflow execution status for metrics (#11128)

* [Improvement][Metrics] Switch to use tags to indicate task / workflow execution status for metrics (#10867)

* Update docs and grafana demo dashboards
k8s/config
Eric Gao 2 years ago committed by GitHub
parent
commit
0e15ce3389
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 30
      docs/docs/en/guide/metrics/metrics.md
  2. 32
      docs/docs/zh/guide/metrics/metrics.md
  3. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRetryStateEventHandler.java
  4. 8
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java
  5. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java
  6. 3
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStartEventHandler.java
  7. 8
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStateEventHandler.java
  8. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowTimeoutStateEventHandler.java
  9. 85
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/ProcessInstanceMetrics.java
  10. 93
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/TaskMetrics.java
  11. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
  12. 4
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
  13. 3
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java
  14. 2694
      dolphinscheduler-meter/src/main/resources/grafana/DolphinSchedulerMaster.json

30
docs/docs/en/guide/metrics/metrics.md

@ -60,14 +60,15 @@ For example, you can get the master metrics by `curl http://localhost:5679/actua
### Task Related Metrics
- ds.task.timeout.count: (counter) the number of timeout tasks
- ds.task.finish.count: (counter) the number of finished tasks, both succeeded and failed included
- ds.task.success.count: (counter) the number of successful tasks
- ds.task.failure.count: (counter) the number of failed tasks
- ds.task.stop.count: (counter) the number of stopped tasks
- ds.task.retry.count: (counter) the number of retried tasks
- ds.task.submit.count: (counter) the number of submitted tasks
- ds.task.failover.count: (counter) the number of task fail-overs
- ds.task.instance.count: (counter) the number of task instances, sliced by the tag `state`:
- timeout: the number of timeout tasks
- finish: the number of finished tasks, both succeeded and failed included
- success: the number of successful tasks
- fail: the number of failed tasks
- stop: the number of stopped tasks
- retry: the number of retried tasks
- submit: the number of submitted tasks
- failover: the number of task fail-overs
- ds.task.dispatch.count: (counter) the number of tasks dispatched to worker
- ds.task.dispatch.failure.count: (counter) the number of tasks failed to dispatch, retry failure included
- ds.task.dispatch.error.count: (counter) the number of task dispatch errors
@ -83,12 +84,13 @@ For example, you can get the master metrics by `curl http://localhost:5679/actua
- ds.workflow.create.command.count: (counter) the number of commands created and inserted by workflows
- ds.workflow.instance.submit.count: (counter) the number of submitted workflow instances
- ds.workflow.instance.running: (gauge) the number of running workflow instances
- ds.workflow.instance.timeout.count: (counter) the number of timeout workflow instances
- ds.workflow.instance.finish.count: (counter) indicates the number of finished workflow instances, both successes and failures included
- ds.workflow.instance.success.count: (counter) the number of successful workflow instances
- ds.workflow.instance.failure.count: (counter) the number of failed workflow instances
- ds.workflow.instance.stop.count: (counter) the number of stopped workflow instances
- ds.workflow.instance.failover.count: (counter) the number of workflow instance fail-overs
- ds.workflow.instance.count: (counter) the number of workflow instances, sliced by the tag `state`:
- timeout: the number of timeout workflow instances
- finish: the number of finished workflow instances, both successes and failures included
- success: the number of successful workflow instances
- fail: the number of failed workflow instances
- stop: the number of stopped workflow instances
- failover: the number of workflow instance fail-overs
### Master Server Metrics

32
docs/docs/zh/guide/metrics/metrics.md

@ -61,14 +61,15 @@ metrics exporter端口`server.port`是在application.yaml里定义的: master: `
### 任务相关指标
- ds.task.timeout.count: (counter) 超时的任务数量
- ds.task.finish.count: (counter) 完成的任务数量,成功和失败的任务都算在内
- ds.task.success.count: (counter) 成功完成的任务数量
- ds.task.failure.count: (counter) 失败的任务数量
- ds.task.stop.count: (counter) 暂停的任务数量
- ds.task.retry.count: (counter) 重试的任务数量
- ds.task.submit.count: (counter) 已提交的任务数量
- ds.task.failover.count: (counter) 容错的任务数量
- ds.task.instance.count: (counter) 任务实例数量,由tag `state`按状态切分:
- timeout:超时的任务数量
- finish:完成的任务数量,成功和失败的任务都算在内
- success:成功完成的任务数量
- fail:失败的任务数量
- stop:暂停的任务数量
- retry:重试的任务数量
- submit:已提交的任务数量
- failover:容错的任务数量
- ds.task.dispatch.count: (counter) 分发到worker上的任务数量
- ds.task.dispatch.failure.count: (counter) 分发失败的任务数量,重试也包含在内
- ds.task.dispatch.error.count: (counter) 分发任务的错误数量
@ -82,14 +83,15 @@ metrics exporter端口`server.port`是在application.yaml里定义的: master: `
### 工作流相关指标
- ds.workflow.create.command.count: (counter) 工作量创建并插入的命令数量
- ds.workflow.instance.submit.count: (counter) 已提交的工作量实例数量
- ds.workflow.instance.running: (gauge) 正在运行的工作流实例数量
- ds.workflow.instance.timeout.count: (counter) 运行超时的工作流实例数量
- ds.workflow.instance.finish.count: (counter) 已完成的工作流实例数量,包含成功和失败
- ds.workflow.instance.success.count: (counter) 运行成功的工作流实例数量
- ds.workflow.instance.failure.count: (counter) 运行失败的工作流实例数量
- ds.workflow.instance.stop.count: (counter) 停止的工作流实例数量
- ds.workflow.instance.failover.count: (counter) 容错的工作流实例数量
- ds.workflow.instance.count: (counter) 工作流实例数量,由tag `state`按状态切分:
- submit:已提交的工作量实例数量
- timeout:运行超时的工作流实例数量
- finish:已完成的工作流实例数量,包含成功和失败
- success:运行成功的工作流实例数量
- fail:运行失败的工作流实例数量
- stop:停止的工作流实例数量
- failover:容错的工作流实例数量
### Master Server指标

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRetryStateEventHandler.java

@ -31,7 +31,7 @@ public class TaskRetryStateEventHandler implements StateEventHandler {
@Override
public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, StateEvent stateEvent)
throws StateEventHandleException {
TaskMetrics.incTaskRetry();
TaskMetrics.incTaskInstanceByState("retry");
Map<Long, TaskInstance> waitToRetryTaskInstanceMap = workflowExecuteRunnable.getWaitToRetryTaskInstanceMap();
TaskInstance taskInstance = waitToRetryTaskInstanceMap.get(stateEvent.getTaskCode());
workflowExecuteRunnable.addTaskToStandByList(taskInstance);

8
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java

@ -96,17 +96,17 @@ public class TaskStateEventHandler implements StateEventHandler {
return;
}
if (taskStateEvent.getExecutionStatus().typeIsFinished()) {
TaskMetrics.incTaskFinish();
TaskMetrics.incTaskInstanceByState("finish");
}
switch (taskStateEvent.getExecutionStatus()) {
case STOP:
TaskMetrics.incTaskStop();
TaskMetrics.incTaskInstanceByState("stop");
break;
case SUCCESS:
TaskMetrics.incTaskSuccess();
TaskMetrics.incTaskInstanceByState("success");
break;
case FAILURE:
TaskMetrics.incTaskFailure();
TaskMetrics.incTaskInstanceByState("fail");
break;
default:
break;

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java

@ -35,7 +35,7 @@ public class TaskTimeoutStateEventHandler implements StateEventHandler {
@Override
public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, StateEvent stateEvent)
throws StateEventHandleError {
TaskMetrics.incTaskTimeout();
TaskMetrics.incTaskInstanceByState("timeout");
workflowExecuteRunnable.checkTaskInstanceByStateEvent(stateEvent);
TaskInstance taskInstance = workflowExecuteRunnable.getTaskInstance(stateEvent.getTaskInstanceId()).get();

3
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStartEventHandler.java

@ -59,8 +59,7 @@ public class WorkflowStartEventHandler implements WorkflowEventHandler {
"The workflow start event is invalid, cannot find the workflow instance from cache");
}
ProcessInstance processInstance = workflowExecuteRunnable.getProcessInstance();
ProcessInstanceMetrics.incProcessInstanceSubmit();
ProcessInstanceMetrics.incProcessInstanceByState("submit");
CompletableFuture<WorkflowSubmitStatue> workflowSubmitFuture =
CompletableFuture.supplyAsync(workflowExecuteRunnable::call, workflowExecuteThreadPool);
workflowSubmitFuture.thenAccept(workflowSubmitStatue -> {

8
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStateEventHandler.java

@ -75,17 +75,17 @@ public class WorkflowStateEventHandler implements StateEventHandler {
private void measureProcessState(StateEvent processStateEvent) {
if (processStateEvent.getExecutionStatus().typeIsFinished()) {
ProcessInstanceMetrics.incProcessInstanceFinish();
ProcessInstanceMetrics.incProcessInstanceByState("finish");
}
switch (processStateEvent.getExecutionStatus()) {
case STOP:
ProcessInstanceMetrics.incProcessInstanceStop();
ProcessInstanceMetrics.incProcessInstanceByState("stop");
break;
case SUCCESS:
ProcessInstanceMetrics.incProcessInstanceSuccess();
ProcessInstanceMetrics.incProcessInstanceByState("success");
break;
case FAILURE:
ProcessInstanceMetrics.incProcessInstanceFailure();
ProcessInstanceMetrics.incProcessInstanceByState("fail");
break;
default:
break;

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowTimeoutStateEventHandler.java

@ -27,7 +27,7 @@ import com.google.auto.service.AutoService;
public class WorkflowTimeoutStateEventHandler implements StateEventHandler {
@Override
public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, StateEvent stateEvent) {
ProcessInstanceMetrics.incProcessInstanceTimeout();
ProcessInstanceMetrics.incProcessInstanceByState("timeout");
workflowExecuteRunnable.processTimeout();
return true;
}

85
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/ProcessInstanceMetrics.java

@ -17,9 +17,14 @@
package org.apache.dolphinscheduler.server.master.metrics;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import com.google.common.collect.ImmutableSet;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Metrics;
@ -31,6 +36,24 @@ public final class ProcessInstanceMetrics {
throw new UnsupportedOperationException("Utility class");
}
private static Map<String, Counter> PROCESS_INSTANCE_COUNTERS = new HashMap<>();
private static final Set<String> PROCESS_INSTANCE_STATES = ImmutableSet.of(
"submit", "timeout", "finish", "failover", "success", "fail", "stop");
static {
for (final String state : PROCESS_INSTANCE_STATES) {
PROCESS_INSTANCE_COUNTERS.put(
state,
Counter.builder("ds.workflow.instance.count")
.tag("state", state)
.description(String.format("Process instance %s total count", state))
.register(Metrics.globalRegistry)
);
}
}
private static final Timer COMMAND_QUERY_TIMETER =
Timer.builder("ds.workflow.command.query.duration")
.description("Command query duration")
@ -41,41 +64,6 @@ public final class ProcessInstanceMetrics {
.description("Process instance generated duration")
.register(Metrics.globalRegistry);
private static final Counter PROCESS_INSTANCE_SUBMIT_COUNTER =
Counter.builder("ds.workflow.instance.submit.count")
.description("Process instance submit total count")
.register(Metrics.globalRegistry);
private static final Counter PROCESS_INSTANCE_TIMEOUT_COUNTER =
Counter.builder("ds.workflow.instance.timeout.count")
.description("Process instance timeout total count")
.register(Metrics.globalRegistry);
private static final Counter PROCESS_INSTANCE_FINISH_COUNTER =
Counter.builder("ds.workflow.instance.finish.count")
.description("Process instance finish total count")
.register(Metrics.globalRegistry);
private static final Counter PROCESS_INSTANCE_SUCCESS_COUNTER =
Counter.builder("ds.workflow.instance.success.count")
.description("Process instance success total count")
.register(Metrics.globalRegistry);
private static final Counter PROCESS_INSTANCE_FAILURE_COUNTER =
Counter.builder("ds.workflow.instance.failure.count")
.description("Process instance failure total count")
.register(Metrics.globalRegistry);
private static final Counter PROCESS_INSTANCE_STOP_COUNTER =
Counter.builder("ds.workflow.instance.stop.count")
.description("Process instance stop total count")
.register(Metrics.globalRegistry);
private static final Counter PROCESS_INSTANCE_FAILOVER_COUNTER =
Counter.builder("ds.workflow.instance.failover.count")
.description("Process instance failover total count")
.register(Metrics.globalRegistry);
public static void recordCommandQueryTime(long milliseconds) {
COMMAND_QUERY_TIMETER.record(milliseconds, TimeUnit.MILLISECONDS);
}
@ -96,31 +84,8 @@ public final class ProcessInstanceMetrics {
.register(Metrics.globalRegistry);
}
public static void incProcessInstanceSubmit() {
PROCESS_INSTANCE_SUBMIT_COUNTER.increment();
}
public static void incProcessInstanceTimeout() {
PROCESS_INSTANCE_TIMEOUT_COUNTER.increment();
}
public static void incProcessInstanceFinish() {
PROCESS_INSTANCE_FINISH_COUNTER.increment();
public static void incProcessInstanceByState(final String state) {
PROCESS_INSTANCE_COUNTERS.get(state).increment();
}
public static void incProcessInstanceSuccess() {
PROCESS_INSTANCE_SUCCESS_COUNTER.increment();
}
public static void incProcessInstanceFailure() {
PROCESS_INSTANCE_FAILURE_COUNTER.increment();
}
public static void incProcessInstanceStop() {
PROCESS_INSTANCE_STOP_COUNTER.increment();
}
public static void incProcessInstanceFailover() {
PROCESS_INSTANCE_FAILOVER_COUNTER.increment();
}
}

93
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/TaskMetrics.java

@ -17,8 +17,12 @@
package org.apache.dolphinscheduler.server.master.metrics;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import com.facebook.presto.jdbc.internal.guava.collect.ImmutableSet;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Metrics;
@ -29,45 +33,24 @@ public final class TaskMetrics {
throw new UnsupportedOperationException("Utility class");
}
private static final Counter TASK_SUBMIT_COUNTER =
Counter.builder("ds.task.submit.count")
.description("Task submit total count")
.register(Metrics.globalRegistry);
private static final Counter TASK_FINISH_COUNTER =
Counter.builder("ds.task.finish.count")
.description("Task finish total count")
.register(Metrics.globalRegistry);
private static final Counter TASK_SUCCESS_COUNTER =
Counter.builder("ds.task.success.count")
.description("Task success total count")
.register(Metrics.globalRegistry);
private static final Counter TASK_FAILURE_COUNTER =
Counter.builder("ds.task.failure.count")
.description("Task failure total count")
.register(Metrics.globalRegistry);
private static Map<String, Counter> TASK_INSTANCE_COUNTERS = new HashMap<>();
private static final Counter TASK_TIMEOUT_COUNTER =
Counter.builder("ds.task.timeout.count")
.description("Task timeout total count")
.register(Metrics.globalRegistry);
private static final Set<String> TASK_INSTANCE_STATES = ImmutableSet.of(
"submit", "timeout", "finish", "failover", "retry", "dispatch", "success", "fail", "stop");
private static final Counter TASK_RETRY_COUNTER =
Counter.builder("ds.task.retry.count")
.description("Task retry total count")
.register(Metrics.globalRegistry);
static {
for (final String state : TASK_INSTANCE_STATES) {
TASK_INSTANCE_COUNTERS.put(
state,
Counter.builder("ds.task.instance.count")
.tags("state", state)
.description(String.format("Process instance %s total count", state))
.register(Metrics.globalRegistry)
);
}
private static final Counter TASK_STOP_COUNTER =
Counter.builder("ds.task.stop.count")
.description("Task stop total count")
.register(Metrics.globalRegistry);
private static final Counter TASK_FAILOVER_COUNTER =
Counter.builder("ds.task.failover.count")
.description("Task failover total count")
.register(Metrics.globalRegistry);
}
private static final Counter TASK_DISPATCH_COUNTER =
Counter.builder("ds.task.dispatch.count")
@ -76,52 +59,20 @@ public final class TaskMetrics {
private static final Counter TASK_DISPATCHER_FAILED =
Counter.builder("ds.task.dispatch.failure.count")
.description("Task dispatch failed count")
.description("Task dispatch failures count, retried ones included")
.register(Metrics.globalRegistry);
private static final Counter TASK_DISPATCH_ERROR =
Counter.builder("ds.task.dispatch.error.count")
.description("Task dispatch error")
.description("Number of errors during task dispatch")
.register(Metrics.globalRegistry);
public static void incTaskSubmit() {
TASK_SUBMIT_COUNTER.increment();
}
public synchronized static void registerTaskPrepared(Supplier<Number> consumer) {
Gauge.builder("ds.task.prepared", consumer)
.description("Task prepared count")
.register(Metrics.globalRegistry);
}
public static void incTaskFinish() {
TASK_FINISH_COUNTER.increment();
}
public static void incTaskSuccess() {
TASK_SUCCESS_COUNTER.increment();
}
public static void incTaskFailure() {
TASK_FAILURE_COUNTER.increment();
}
public static void incTaskTimeout() {
TASK_TIMEOUT_COUNTER.increment();
}
public static void incTaskRetry() {
TASK_RETRY_COUNTER.increment();
}
public static void incTaskStop() {
TASK_STOP_COUNTER.increment();
}
public static void incTaskFailover() {
TASK_FAILOVER_COUNTER.increment();
}
public static void incTaskDispatchFailed(int failedCount) {
TASK_DISPATCHER_FAILED.increment(failedCount);
}
@ -134,4 +85,8 @@ public final class TaskMetrics {
TASK_DISPATCH_COUNTER.increment();
}
public static void incTaskInstanceByState(final String state) {
TASK_INSTANCE_COUNTERS.get(state).increment();
}
}

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

@ -1673,7 +1673,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
taskInstance.getName(),
taskInstance.getId(),
taskInstance.getTaskCode());
TaskMetrics.incTaskSubmit();
TaskMetrics.incTaskInstanceByState("submit");
readyToSubmitTaskQueue.put(taskInstance);
}

4
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java

@ -169,7 +169,7 @@ public class MasterFailoverService {
}
}
ProcessInstanceMetrics.incProcessInstanceFailover();
ProcessInstanceMetrics.incProcessInstanceByState("failover");
//updateProcessInstance host is null to mark this processInstance has been failover
// and insert a failover command
processInstance.setHost(Constants.NULL);
@ -211,7 +211,7 @@ public class MasterFailoverService {
* @param taskInstance
*/
private void failoverTaskInstance(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
TaskMetrics.incTaskFailover();
TaskMetrics.incTaskInstanceByState("failover");
boolean isMasterTask = TaskProcessorFactory.isMasterTask(taskInstance.getTaskType());
taskInstance.setProcessInstance(processInstance);

3
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java

@ -156,8 +156,7 @@ public class WorkerFailoverService {
* @param taskInstance
*/
private void failoverTaskInstance(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
TaskMetrics.incTaskFailover();
TaskMetrics.incTaskInstanceByState("failover");
boolean isMasterTask = TaskProcessorFactory.isMasterTask(taskInstance.getTaskType());
taskInstance.setProcessInstance(processInstance);

2694
dolphinscheduler-meter/src/main/resources/grafana/DolphinSchedulerMaster.json

File diff suppressed because it is too large Load Diff
Loading…
Cancel
Save