Browse Source

[Feature][metrics] Add master, worker metrics (#10326)

* Add mater metrics

* fix UT

* Add url to mysql profile

* Add worker metrics

* Update grafana config

* Add system metrics doc

* Add process failover counter

* Add metrics image

* Change jpg to png

* Add command insert metrics

* Fix UT

* Revert UT

(cherry picked from commit e21d7b1551)
3.0.0/version-upgrade
Wenjun Ruan 2 years ago
parent
commit
318a8e3ae0
  1. 154
      docs/docs/en/guide/metrics/metrics.md
  2. BIN
      docs/img/metrics/metrics-datasource.png
  3. BIN
      docs/img/metrics/metrics-master.png
  4. BIN
      docs/img/metrics/metrics-worker.png
  5. 2
      dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/resources/banner.txt
  6. 3
      dolphinscheduler-api/src/main/resources/application.yaml
  7. 2
      dolphinscheduler-api/src/main/resources/banner.txt
  8. 8
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  9. 2
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java
  10. 8
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImpl.java
  11. 13
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
  12. 53
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/MasterServerMetrics.java
  13. 101
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/ProcessInstanceMetrics.java
  14. 137
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/TaskMetrics.java
  15. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
  16. 4
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
  17. 82
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
  18. 16
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
  19. 6
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
  20. 7
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java
  21. 5
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java
  22. 3
      dolphinscheduler-master/src/main/resources/application.yaml
  23. 2
      dolphinscheduler-master/src/main/resources/banner.txt
  24. 4
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteTaskTest.java
  25. 60
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManagerTest.java
  26. 16
      dolphinscheduler-meter/src/main/java/org/apache/dolphinscheduler/meter/MeterConfiguration.java
  27. 5
      dolphinscheduler-meter/src/main/resources/grafana-demo/docker-compose.yaml
  28. 2607
      dolphinscheduler-meter/src/main/resources/grafana/DolphinSchedulerMaster.json
  29. 1598
      dolphinscheduler-meter/src/main/resources/grafana/DolphinSchedulerWorker.json
  30. 2
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/StateEventCallbackService.java
  31. 2
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
  32. 2
      dolphinscheduler-standalone-server/src/main/resources/banner.txt
  33. 58
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/metrics/TaskMetrics.java
  34. 56
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/metrics/WorkerServerMetrics.java
  35. 2
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
  36. 8
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
  37. 3
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java
  38. 1
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
  39. 3
      dolphinscheduler-worker/src/main/resources/application.yaml
  40. 2
      dolphinscheduler-worker/src/main/resources/banner.txt

154
docs/docs/en/guide/metrics/metrics.md

@ -0,0 +1,154 @@
# Introduction
Apache DolphinScheduler has export some metrics to monitor the system. We use micrometer for the exporter facade, and
the default exporter is prometheus, more exporter is coming soon.
## Quick Start
You can add the following config in master/worker/alert/api's yaml file to open the metrics exporter.
```yaml
metrics:
enabled: true
```
Once you open the metrics exporter, you can access the metrics by the url: `http://ip:port/actuator/prometheus`
The exporter port is the `server.port` defined in application.yaml, e.g: master: `server.port: 5679`, worker: `server.port: 1235`, alert: `server.port: 50053`, api: `server.port: 12345`.
For example, you can get the master metrics by `curl http://localhost:5679/actuator/prometheus`
We have prepared the out-of-the-box Grafana configuration for you, you can find the Grafana dashboard
at `dolphinscheduler-meter/resources/grafana`, you can directly import these dashboards to grafana.
If you want to try at docker, you can use the following command to start the prometheus with grafana:
```shell
cd dolphinscheduler-meter/src/main/resources/grafana-demo
docker compose up
```
Then you can access the grafana by the url: `http://localhost/3001`
![image.png](../../../../img/metrics/metrics-master.png)
![image.png](../../../../img/metrics/metrics-worker.png)
![image.png](../../../../img/metrics/metrics-datasource.png)
## Master Metrics
Master metrics are exported by the DolphinScheduler master server.
### System Metrics
* dolphinscheduler_master_overload_count: Indicates the number of times the master has been overloaded.
* dolphinscheduler_master_consume_command_count: Indicates the number of commands has consumed.
### Process Metrics
* dolphinscheduler_create_command_count: Indicates the number of command has been inserted.
* dolphinscheduler_process_instance_submit_count: Indicates the number of process has been submitted.
* dolphinscheduler_process_instance_running_gauge: Indicates the number of process are running now.
* dolphinscheduler_process_instance_timeout_count: Indicates the number of process has been timeout.
* dolphinscheduler_process_instance_finish_count: Indicates the number of process has been finished, include success or
failure.
* dolphinscheduler_process_instance_success_count: Indicates the number of process has been successful.
* dolphinscheduler_process_instance_stop_count: Indicates the number of process has been stopped.
* dolphinscheduler_process_instance_failover_count: Indicates the number of process has been failed over.
### Task Metrics
* dolphinscheduler_task_timeout_count: Indicates the number of tasks has been timeout.
* dolphinscheduler_task_finish_count: Indicates the number of tasks has been finished, include success or failure.
* dolphinscheduler_task_success_count: Indicates the number of tasks has been successful.
* dolphinscheduler_task_timeout_count: Indicates the number of tasks has been timeout.
* dolphinscheduler_task_retry_count: Indicates the number of tasks has been retry.
* dolphinscheduler_task_failover_count: Indicates the number of tasks has been failover.
* dolphinscheduler_task_dispatch_count: Indicates the number of tasks has been dispatched to worker.
* dolphinscheduler_task_dispatch_failed_count: Indicates the number of tasks dispatched failed, if dispatched failed
will retry.
* dolphinscheduler_task_dispatch_error_count: Indicates the number of tasks dispatched error, if dispatched error, means
there are exception occur.
## Worker Metrics
Worker metrics are exported by the DolphinScheduler worker server.
### System Metrics
* dolphinscheduler_worker_overload_count: Indicates the number of times the worker has been overloaded.
* dolphinscheduler_worker_submit_queue_is_full_count: Indicates the number of times the worker's submit queue has been
full.
### Task Metrics
* dolphinscheduler_task_execute_count: Indicates the number of times a task has been executed, it contains a tag -
`task_type`.
* dolphinscheduler_task_execution_count: Indicates the total number of task has been executed.
* dolphinscheduler_task_execution_timer: Indicates the time spent executing tasks.
## Default System Metrics
In each server, there are some default metrics related to the system instance.
### Database Metrics
* hikaricp_connections_creation_seconds_max: Connection creation time max.
* hikaricp_connections_creation_seconds_count: Connection creation time count.
* hikaricp_connections_creation_seconds_sum: Connection creation time sum.
* hikaricp_connections_acquire_seconds_max: Connection acquire time max.
* hikaricp_connections_acquire_seconds_count: Connection acquire time count.
* hikaricp_connections_acquire_seconds_sum: Connection acquire time sum.
* hikaricp_connections_usage_seconds_max: Connection usage max.
* hikaricp_connections_usage_seconds_count: Connection usage time count.
* hikaricp_connections_usage_seconds_sum: Connection usage time sum.
* hikaricp_connections_max: Max connections.
* hikaricp_connections_min Min connections
* hikaricp_connections_active: Active connections.
* hikaricp_connections_idle: Idle connections.
* hikaricp_connections_pending: Pending connections.
* hikaricp_connections_timeout_total: Timeout connections.
* hikaricp_connections: Total connections
* jdbc_connections_max: Maximum number of active connections that can be allocated at the same time.
* jdbc_connections_min: Minimum number of idle connections in the pool.
* jdbc_connections_idle: Number of established but idle connections.
* jdbc_connections_active: Current number of active connections that have been allocated from the data source.
### JVM Metrics
* jvm_buffer_total_capacity_bytes: An estimate of the total capacity of the buffers in this pool.
* jvm_buffer_count_buffers: An estimate of the number of buffers in the pool.
* jvm_buffer_memory_used_bytes: An estimate of the memory that the Java virtual machine is using for this buffer pool.
* jvm_memory_committed_bytes: The amount of memory in bytes that is committed for the Java virtual machine to use.
* jvm_memory_max_bytes: The maximum amount of memory in bytes that can be used for memory management.
* jvm_memory_used_bytes: The amount of used memory.
* jvm_threads_peak_threads: The peak live thread count since the Java virtual machine started or peak was reset.
* jvm_threads_states_threads: The current number of threads having NEW state.
* jvm_gc_memory_allocated_bytes_total: Incremented for an increase in the size of the (young) heap memory pool after one GC to before the next.
* jvm_gc_max_data_size_bytes: Max size of long-lived heap memory pool.
* jvm_gc_pause_seconds_count: Time spent count in GC pause.
* jvm_gc_pause_seconds_sum: Time spent sum in GC pause.
* jvm_gc_pause_seconds_max: Time spent max in GC pause.
* jvm_gc_live_data_size_bytes: Size of long-lived heap memory pool after reclamation.
* jvm_gc_memory_promoted_bytes_total: Count of positive increases in the size of the old generation memory pool before GC to after GC.
* jvm_classes_loaded_classes: The number of classes that are currently loaded in the Java virtual machine.
* jvm_threads_live_threads: The current number of live threads including both daemon and non-daemon threads.
* jvm_threads_daemon_threads: The current number of live daemon threads.
* jvm_classes_unloaded_classes_total: The total number of classes unloaded since the Java virtual machine has started execution.
* process_cpu_usage: The "recent cpu usage" for the Java Virtual Machine process.
* process_start_time_seconds: Start time of the process since unix epoch.
* process_uptime_seconds: The uptime of the Java virtual machine.
## Other Metrics
* jetty_threads_config_max: The maximum number of threads in the pool.
* jetty_threads_config_min: The minimum number of threads in the pool.
* jetty_threads_current: The total number of threads in the pool.
* jetty_threads_idle: The number of idle threads in the pool.
* jetty_threads_busy: The number of busy threads in the pool.
* jetty_threads_jobs: Number of jobs queued waiting for a thread.
* process_files_max_files: The maximum file descriptor count.
* process_files_open_files: The open file descriptor count.
* system_cpu_usage: The "recent cpu usage" for the whole system.
* system_cpu_count: The number of processors available to the Java virtual machine.
* system_load_average_1m: The sum of the number of runnable entities queued to available processors and the number of runnable entities running on the available processors averaged over a period of time.
* logback_events_total: Number of level events that made it to the logs

BIN
docs/img/metrics/metrics-datasource.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 329 KiB

BIN
docs/img/metrics/metrics-master.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 457 KiB

BIN
docs/img/metrics/metrics-worker.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 394 KiB

2
dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/resources/banner.txt

@ -8,5 +8,5 @@ ${AnsiColor.BLUE}${AnsiStyle.BOLD}
|_| |_|
================================================================================ ================================================================================
${AnsiColor.BLUE}${AnsiStyle.BOLD} ${AnsiColor.BLUE}${AnsiStyle.BOLD}
:: Dolphinscheduler alert server :: ${application.formatted-version} :: DolphinScheduler alert server :: ${application.formatted-version}
${AnsiStyle.NORMAL} ${AnsiStyle.NORMAL}

3
dolphinscheduler-api/src/main/resources/application.yaml

@ -153,6 +153,9 @@ spring:
on-profile: mysql on-profile: mysql
datasource: datasource:
driver-class-name: com.mysql.cj.jdbc.Driver driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler
username: root
password: root
quartz: quartz:
properties: properties:
org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate

2
dolphinscheduler-api/src/main/resources/banner.txt

@ -8,5 +8,5 @@ ${AnsiColor.BLUE}${AnsiStyle.BOLD}
|_| |_|
================================================================================ ================================================================================
${AnsiColor.BLUE}${AnsiStyle.BOLD} ${AnsiColor.BLUE}${AnsiStyle.BOLD}
:: Dolphinscheduler api server :: ${application.formatted-version} :: DolphinScheduler api server :: ${application.formatted-version}
${AnsiStyle.NORMAL} ${AnsiStyle.NORMAL}

8
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -369,22 +369,22 @@ public final class Constants {
/** /**
* sleep 1000ms * sleep 1000ms
*/ */
public static final int SLEEP_TIME_MILLIS = 1000; public static final long SLEEP_TIME_MILLIS = 1_000L;
/** /**
* short sleep 100ms * short sleep 100ms
*/ */
public static final int SLEEP_TIME_MILLIS_SHORT = 100; public static final long SLEEP_TIME_MILLIS_SHORT = 100L;
/** /**
* one second mils * one second mils
*/ */
public static final int SECOND_TIME_MILLIS = 1000; public static final long SECOND_TIME_MILLIS = 1_000L;
/** /**
* master task instance cache-database refresh interval * master task instance cache-database refresh interval
*/ */
public static final int CACHE_REFRESH_TIME_MILLIS = 20 * 1000; public static final long CACHE_REFRESH_TIME_MILLIS = 20 * 1_000L;
/** /**
* heartbeat for zk info length * heartbeat for zk info length

2
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java

@ -262,7 +262,7 @@ public class TaskNode {
this.runFlag = runFlag; this.runFlag = runFlag;
} }
public Boolean isForbidden() { public boolean isForbidden() {
return (!StringUtils.isEmpty(this.runFlag) return (!StringUtils.isEmpty(this.runFlag)
&& this.runFlag.equals(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN)); && this.runFlag.equals(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN));
} }

8
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImpl.java vendored

@ -18,11 +18,14 @@
package org.apache.dolphinscheduler.server.master.cache.impl; package org.apache.dolphinscheduler.server.master.cache.impl;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import java.util.Collection; import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PostConstruct;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
@ -35,6 +38,11 @@ public class ProcessInstanceExecCacheManagerImpl implements ProcessInstanceExecC
private final ConcurrentHashMap<Integer, WorkflowExecuteRunnable> processInstanceExecMaps = new ConcurrentHashMap<>(); private final ConcurrentHashMap<Integer, WorkflowExecuteRunnable> processInstanceExecMaps = new ConcurrentHashMap<>();
@PostConstruct
public void registerMetrics() {
ProcessInstanceMetrics.registerProcessInstanceRunningGauge(processInstanceExecMaps::size);
}
@Override @Override
public WorkflowExecuteRunnable getByProcessInstanceId(int processInstanceId) { public WorkflowExecuteRunnable getByProcessInstanceId(int processInstanceId) {
return processInstanceExecMaps.get(processInstanceId); return processInstanceExecMaps.get(processInstanceId);

13
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java

@ -30,6 +30,7 @@ import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException; import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent; import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService; import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException; import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException;
@ -53,6 +54,11 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import io.micrometer.core.annotation.Counted;
import io.micrometer.core.annotation.Timed;
import org.apache.commons.lang3.time.StopWatch;
/** /**
* TaskUpdateQueue consumer * TaskUpdateQueue consumer
*/ */
@ -119,6 +125,7 @@ public class TaskPriorityQueueConsumer extends Thread {
List<TaskPriority> failedDispatchTasks = this.batchDispatch(fetchTaskNum); List<TaskPriority> failedDispatchTasks = this.batchDispatch(fetchTaskNum);
if (!failedDispatchTasks.isEmpty()) { if (!failedDispatchTasks.isEmpty()) {
TaskMetrics.incTaskDispatchFailed(failedDispatchTasks.size());
for (TaskPriority dispatchFailedTask : failedDispatchTasks) { for (TaskPriority dispatchFailedTask : failedDispatchTasks) {
taskPriorityQueue.put(dispatchFailedTask); taskPriorityQueue.put(dispatchFailedTask);
} }
@ -129,6 +136,7 @@ public class TaskPriorityQueueConsumer extends Thread {
} }
} }
} catch (Exception e) { } catch (Exception e) {
TaskMetrics.incTaskDispatchError();
logger.error("dispatcher task error", e); logger.error("dispatcher task error", e);
} }
} }
@ -137,7 +145,7 @@ public class TaskPriorityQueueConsumer extends Thread {
/** /**
* batch dispatch with thread pool * batch dispatch with thread pool
*/ */
private List<TaskPriority> batchDispatch(int fetchTaskNum) throws TaskPriorityQueueException, InterruptedException { public List<TaskPriority> batchDispatch(int fetchTaskNum) throws TaskPriorityQueueException, InterruptedException {
List<TaskPriority> failedDispatchTasks = Collections.synchronizedList(new ArrayList<>()); List<TaskPriority> failedDispatchTasks = Collections.synchronizedList(new ArrayList<>());
CountDownLatch latch = new CountDownLatch(fetchTaskNum); CountDownLatch latch = new CountDownLatch(fetchTaskNum);
@ -169,6 +177,7 @@ public class TaskPriorityQueueConsumer extends Thread {
* @return result * @return result
*/ */
protected boolean dispatchTask(TaskPriority taskPriority) { protected boolean dispatchTask(TaskPriority taskPriority) {
TaskMetrics.incTaskDispatch();
boolean result = false; boolean result = false;
try { try {
TaskExecutionContext context = taskPriority.getTaskExecutionContext(); TaskExecutionContext context = taskPriority.getTaskExecutionContext();
@ -217,7 +226,7 @@ public class TaskPriorityQueueConsumer extends Thread {
* @param taskInstanceId taskInstanceId * @param taskInstanceId taskInstanceId
* @return taskInstance is final state * @return taskInstance is final state
*/ */
public Boolean taskInstanceIsFinalState(int taskInstanceId) { public boolean taskInstanceIsFinalState(int taskInstanceId) {
TaskInstance taskInstance = processService.findTaskInstanceById(taskInstanceId); TaskInstance taskInstance = processService.findTaskInstanceById(taskInstanceId);
return taskInstance.getState().typeIsFinished(); return taskInstance.getState().typeIsFinished();
} }

53
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/MasterServerMetrics.java

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.metrics;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
public final class MasterServerMetrics {
private MasterServerMetrics() {
throw new UnsupportedOperationException("Utility class");
}
/**
* Used to measure the master server is overload.
*/
private static final Counter MASTER_OVERLOAD_COUNTER =
Counter.builder("dolphinscheduler_master_overload_count")
.description("Master server overload count")
.register(Metrics.globalRegistry);
/**
* Used to measure the number of process command consumed by master.
*/
private static final Counter MASTER_CONSUME_COMMAND_COUNTER =
Counter.builder("dolphinscheduler_master_consume_command_count")
.description("Master server consume command count")
.register(Metrics.globalRegistry);
public static void incMasterOverload() {
MASTER_OVERLOAD_COUNTER.increment();
}
public static void incMasterConsumeCommand(int commandCount) {
MASTER_CONSUME_COMMAND_COUNTER.increment(commandCount);
}
}

101
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/ProcessInstanceMetrics.java

@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.metrics;
import java.util.function.Supplier;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Metrics;
public final class ProcessInstanceMetrics {
private ProcessInstanceMetrics() {
throw new UnsupportedOperationException("Utility class");
}
private static final Counter PROCESS_INSTANCE_SUBMIT_COUNTER =
Counter.builder("dolphinscheduler_process_instance_submit_count")
.description("Process instance submit total count")
.register(Metrics.globalRegistry);
private static final Counter PROCESS_INSTANCE_TIMEOUT_COUNTER =
Counter.builder("dolphinscheduler_process_instance_timeout_count")
.description("Process instance timeout total count")
.register(Metrics.globalRegistry);
private static final Counter PROCESS_INSTANCE_FINISH_COUNTER =
Counter.builder("dolphinscheduler_process_instance_finish_count")
.description("Process instance finish total count")
.register(Metrics.globalRegistry);
private static final Counter PROCESS_INSTANCE_SUCCESS_COUNTER =
Counter.builder("dolphinscheduler_process_instance_success_count")
.description("Process instance success total count")
.register(Metrics.globalRegistry);
private static final Counter PROCESS_INSTANCE_FAILURE_COUNTER =
Counter.builder("dolphinscheduler_process_instance_failure_count")
.description("Process instance failure total count")
.register(Metrics.globalRegistry);
private static final Counter PROCESS_INSTANCE_STOP_COUNTER =
Counter.builder("dolphinscheduler_process_instance_stop_count")
.description("Process instance stop total count")
.register(Metrics.globalRegistry);
private static final Counter PROCESS_INSTANCE_FAILOVER_COUNTER =
Counter.builder("dolphinscheduler_process_instance_failover_count")
.description("Process instance failover total count")
.register(Metrics.globalRegistry);
public static synchronized void registerProcessInstanceRunningGauge(Supplier<Number> function) {
Gauge.builder("dolphinscheduler_process_instance_running_gauge", function)
.description("The current running process instance count")
.register(Metrics.globalRegistry);
}
public static void incProcessInstanceSubmit() {
PROCESS_INSTANCE_SUBMIT_COUNTER.increment();
}
public static void incProcessInstanceTimeout() {
PROCESS_INSTANCE_TIMEOUT_COUNTER.increment();
}
public static void incProcessInstanceFinish() {
PROCESS_INSTANCE_FINISH_COUNTER.increment();
}
public static void incProcessInstanceSuccess() {
PROCESS_INSTANCE_SUCCESS_COUNTER.increment();
}
public static void incProcessInstanceFailure() {
PROCESS_INSTANCE_FAILURE_COUNTER.increment();
}
public static void incProcessInstanceStop() {
PROCESS_INSTANCE_STOP_COUNTER.increment();
}
public static void incProcessInstanceFailover() {
PROCESS_INSTANCE_FAILOVER_COUNTER.increment();
}
}

137
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/TaskMetrics.java

@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.metrics;
import java.util.function.Supplier;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Metrics;
public final class TaskMetrics {
private TaskMetrics() {
throw new UnsupportedOperationException("Utility class");
}
private static final Counter TASK_SUBMIT_COUNTER =
Counter.builder("dolphinscheduler_task_submit_count")
.description("Task submit total count")
.register(Metrics.globalRegistry);
private static final Counter TASK_FINISH_COUNTER =
Counter.builder("dolphinscheduler_task_finish_count")
.description("Task finish total count")
.register(Metrics.globalRegistry);
private static final Counter TASK_SUCCESS_COUNTER =
Counter.builder("dolphinscheduler_task_success_count")
.description("Task success total count")
.register(Metrics.globalRegistry);
private static final Counter TASK_FAILURE_COUNTER =
Counter.builder("dolphinscheduler_task_failure_count")
.description("Task failure total count")
.register(Metrics.globalRegistry);
private static final Counter TASK_TIMEOUT_COUNTER =
Counter.builder("dolphinscheduler_task_timeout_count")
.description("Task timeout total count")
.register(Metrics.globalRegistry);
private static final Counter TASK_RETRY_COUNTER =
Counter.builder("dolphinscheduler_task_retry_count")
.description("Task retry total count")
.register(Metrics.globalRegistry);
private static final Counter TASK_STOP_COUNTER =
Counter.builder("dolphinscheduler_task_stop_count")
.description("Task stop total count")
.register(Metrics.globalRegistry);
private static final Counter TASK_FAILOVER_COUNTER =
Counter.builder("dolphinscheduler_task_failover_count")
.description("Task failover total count")
.register(Metrics.globalRegistry);
private static final Counter TASK_DISPATCH_COUNTER =
Counter.builder("dolphinscheduler_task_dispatch_count")
.description("Task dispatch count")
.register(Metrics.globalRegistry);
private static final Counter TASK_DISPATCHER_FAILED =
Counter.builder("dolphinscheduler_task_dispatch_failed_count")
.description("Task dispatch failed count")
.register(Metrics.globalRegistry);
private static final Counter TASK_DISPATCH_ERROR =
Counter.builder("dolphinscheduler_task_dispatch_error_count")
.description("Task dispatch error")
.register(Metrics.globalRegistry);
public static void incTaskSubmit() {
TASK_SUBMIT_COUNTER.increment();
}
public synchronized static void registerTaskRunning(Supplier<Number> consumer) {
Gauge.builder("dolphinscheduler_task_running_gauge", consumer)
.description("Task running count")
.register(Metrics.globalRegistry);
}
public static void incTaskFinish() {
TASK_FINISH_COUNTER.increment();
}
public static void incTaskSuccess() {
TASK_SUCCESS_COUNTER.increment();
}
public static void incTaskFailure() {
TASK_FAILURE_COUNTER.increment();
}
public static void incTaskTimeout() {
TASK_TIMEOUT_COUNTER.increment();
}
public static void incTaskRetry() {
TASK_RETRY_COUNTER.increment();
}
public static void incTaskStop() {
TASK_STOP_COUNTER.increment();
}
public static void incTaskFailover() {
TASK_FAILOVER_COUNTER.increment();
}
public static void incTaskDispatchFailed(int failedCount) {
TASK_DISPATCHER_FAILED.increment(failedCount);
}
public static void incTaskDispatchError() {
TASK_DISPATCH_ERROR.increment();
}
public static void incTaskDispatch() {
TASK_DISPATCH_COUNTER.increment();
}
}

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java

@ -62,7 +62,7 @@ public class FailoverExecuteThread extends Thread {
} catch (Exception e) { } catch (Exception e) {
logger.error("failover execute error", e); logger.error("failover execute error", e);
} finally { } finally {
ThreadUtils.sleep((long) Constants.SLEEP_TIME_MILLIS * masterConfig.getFailoverInterval() * 60); ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS * masterConfig.getFailoverInterval() * 60);
} }
} }
} }

4
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java

@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.SlotCheckState; import org.apache.dolphinscheduler.common.enums.SlotCheckState;
import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.Command;
@ -31,6 +30,7 @@ import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics;
import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager; import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
import org.apache.dolphinscheduler.service.alert.ProcessAlertManager; import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
@ -133,6 +133,7 @@ public class MasterSchedulerService extends Thread {
try { try {
boolean runCheckFlag = OSUtils.checkResource(masterConfig.getMaxCpuLoadAvg(), masterConfig.getReservedMemory()); boolean runCheckFlag = OSUtils.checkResource(masterConfig.getMaxCpuLoadAvg(), masterConfig.getReservedMemory());
if (!runCheckFlag) { if (!runCheckFlag) {
MasterServerMetrics.incMasterOverload();
Thread.sleep(Constants.SLEEP_TIME_MILLIS); Thread.sleep(Constants.SLEEP_TIME_MILLIS);
continue; continue;
} }
@ -159,6 +160,7 @@ public class MasterSchedulerService extends Thread {
if (CollectionUtils.isEmpty(processInstances)) { if (CollectionUtils.isEmpty(processInstances)) {
return; return;
} }
MasterServerMetrics.incMasterConsumeCommand(commands.size());
for (ProcessInstance processInstance : processInstances) { for (ProcessInstance processInstance : processInstances) {
if (processInstance == null) { if (processInstance == null) {

82
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

@ -66,6 +66,8 @@ import org.apache.dolphinscheduler.remote.command.HostUpdateCommand;
import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor; import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor;
import org.apache.dolphinscheduler.server.master.runner.task.TaskAction; import org.apache.dolphinscheduler.server.master.runner.task.TaskAction;
import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory; import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
@ -83,6 +85,7 @@ import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -92,7 +95,6 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -249,6 +251,7 @@ public class WorkflowExecuteRunnable implements Runnable {
this.nettyExecutorManager = nettyExecutorManager; this.nettyExecutorManager = nettyExecutorManager;
this.processAlertManager = processAlertManager; this.processAlertManager = processAlertManager;
this.stateWheelExecuteThread = stateWheelExecuteThread; this.stateWheelExecuteThread = stateWheelExecuteThread;
TaskMetrics.registerTaskRunning(readyToSubmitTaskQueue::size);
} }
/** /**
@ -291,7 +294,7 @@ public class WorkflowExecuteRunnable implements Runnable {
public boolean addStateEvent(StateEvent stateEvent) { public boolean addStateEvent(StateEvent stateEvent) {
if (processInstance.getId() != stateEvent.getProcessInstanceId()) { if (processInstance.getId() != stateEvent.getProcessInstanceId()) {
logger.info("state event would be abounded :{}", stateEvent.toString()); logger.info("state event would be abounded :{}", stateEvent);
return false; return false;
} }
this.stateEvents.add(stateEvent); this.stateEvents.add(stateEvent);
@ -307,7 +310,7 @@ public class WorkflowExecuteRunnable implements Runnable {
} }
private boolean stateEventHandler(StateEvent stateEvent) { private boolean stateEventHandler(StateEvent stateEvent) {
logger.info("process event: {}", stateEvent.toString()); logger.info("process event: {}", stateEvent);
if (!checkProcessInstance(stateEvent)) { if (!checkProcessInstance(stateEvent)) {
return false; return false;
@ -316,21 +319,26 @@ public class WorkflowExecuteRunnable implements Runnable {
boolean result = false; boolean result = false;
switch (stateEvent.getType()) { switch (stateEvent.getType()) {
case PROCESS_STATE_CHANGE: case PROCESS_STATE_CHANGE:
measureProcessState(stateEvent);
result = processStateChangeHandler(stateEvent); result = processStateChangeHandler(stateEvent);
break; break;
case TASK_STATE_CHANGE: case TASK_STATE_CHANGE:
measureTaskState(stateEvent);
result = taskStateChangeHandler(stateEvent); result = taskStateChangeHandler(stateEvent);
break; break;
case PROCESS_TIMEOUT: case PROCESS_TIMEOUT:
ProcessInstanceMetrics.incProcessInstanceTimeout();
result = processTimeout(); result = processTimeout();
break; break;
case TASK_TIMEOUT: case TASK_TIMEOUT:
TaskMetrics.incTaskTimeout();
result = taskTimeout(stateEvent); result = taskTimeout(stateEvent);
break; break;
case WAIT_TASK_GROUP: case WAIT_TASK_GROUP:
result = checkForceStartAndWakeUp(stateEvent); result = checkForceStartAndWakeUp(stateEvent);
break; break;
case TASK_RETRY: case TASK_RETRY:
TaskMetrics.incTaskRetry();
result = taskRetryEventHandler(stateEvent); result = taskRetryEventHandler(stateEvent);
break; break;
case PROCESS_BLOCKED: case PROCESS_BLOCKED:
@ -437,10 +445,10 @@ public class WorkflowExecuteRunnable implements Runnable {
private void taskFinished(TaskInstance taskInstance) { private void taskFinished(TaskInstance taskInstance) {
logger.info("work flow {} task id:{} code:{} state:{} ", logger.info("work flow {} task id:{} code:{} state:{} ",
processInstance.getId(), processInstance.getId(),
taskInstance.getId(), taskInstance.getId(),
taskInstance.getTaskCode(), taskInstance.getTaskCode(),
taskInstance.getState()); taskInstance.getState());
activeTaskProcessorMaps.remove(taskInstance.getTaskCode()); activeTaskProcessorMaps.remove(taskInstance.getTaskCode());
stateWheelExecuteThread.removeTask4TimeoutCheck(processInstance, taskInstance); stateWheelExecuteThread.removeTask4TimeoutCheck(processInstance, taskInstance);
@ -734,7 +742,7 @@ public class WorkflowExecuteRunnable implements Runnable {
scheduleDate = complementListDate.get(0); scheduleDate = complementListDate.get(0);
} else if (processInstance.getState().typeIsFinished()) { } else if (processInstance.getState().typeIsFinished()) {
endProcess(); endProcess();
if (complementListDate.size() <= 0) { if (complementListDate.isEmpty()) {
logger.info("process complement end. process id:{}", processInstance.getId()); logger.info("process complement end. process id:{}", processInstance.getId());
return true; return true;
} }
@ -745,9 +753,9 @@ public class WorkflowExecuteRunnable implements Runnable {
return true; return true;
} }
logger.info("process complement continue. process id:{}, schedule time:{} complementListDate:{}", logger.info("process complement continue. process id:{}, schedule time:{} complementListDate:{}",
processInstance.getId(), processInstance.getId(),
processInstance.getScheduleTime(), processInstance.getScheduleTime(),
complementListDate.toString()); complementListDate.toString());
scheduleDate = complementListDate.get(index + 1); scheduleDate = complementListDate.get(index + 1);
} }
//the next process complement //the next process complement
@ -948,7 +956,7 @@ public class WorkflowExecuteRunnable implements Runnable {
} }
} }
if (processInstance.isComplementData() && complementListDate.size() == 0) { if (processInstance.isComplementData() && complementListDate.isEmpty()) {
Map<String, String> cmdParam = JSONUtils.toMap(processInstance.getCommandParam()); Map<String, String> cmdParam = JSONUtils.toMap(processInstance.getCommandParam());
if (cmdParam != null && cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)) { if (cmdParam != null && cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)) {
// reset global params while there are start parameters // reset global params while there are start parameters
@ -957,19 +965,17 @@ public class WorkflowExecuteRunnable implements Runnable {
Date start = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE)); Date start = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE));
Date end = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE)); Date end = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE));
List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(processInstance.getProcessDefinitionCode()); List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(processInstance.getProcessDefinitionCode());
if (complementListDate.size() == 0 && needComplementProcess()) { if (complementListDate.isEmpty() && needComplementProcess()) {
complementListDate = CronUtils.getSelfFireDateList(start, end, schedules); complementListDate = CronUtils.getSelfFireDateList(start, end, schedules);
logger.info(" process definition code:{} complement data: {}", logger.info(" process definition code:{} complement data: {}",
processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionCode(), complementListDate.toString());
complementListDate.toString());
if (complementListDate.size() > 0 && Flag.NO == processInstance.getIsSubProcess()) { if (!complementListDate.isEmpty() && Flag.NO == processInstance.getIsSubProcess()) {
processInstance.setScheduleTime(complementListDate.get(0)); processInstance.setScheduleTime(complementListDate.get(0));
processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(processDefinition.getGlobalParamMap(), processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
processDefinition.getGlobalParamList(), processDefinition.getGlobalParamMap(),
CommandType.COMPLEMENT_DATA, processDefinition.getGlobalParamList(),
processInstance.getScheduleTime(), CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime(), cmdParam.get(Constants.SCHEDULE_TIMEZONE)));
cmdParam.get(Constants.SCHEDULE_TIMEZONE)));
processService.updateProcessInstance(processInstance); processService.updateProcessInstance(processInstance);
} }
} }
@ -1322,12 +1328,10 @@ public class WorkflowExecuteRunnable implements Runnable {
TaskInstance endTaskInstance = taskInstanceMap.get(completeTaskMap.get(NumberUtils.toLong(parentNodeCode))); TaskInstance endTaskInstance = taskInstanceMap.get(completeTaskMap.get(NumberUtils.toLong(parentNodeCode)));
String taskInstanceVarPool = endTaskInstance.getVarPool(); String taskInstanceVarPool = endTaskInstance.getVarPool();
if (StringUtils.isNotEmpty(taskInstanceVarPool)) { if (StringUtils.isNotEmpty(taskInstanceVarPool)) {
Set<Property> taskProperties = JSONUtils.toList(taskInstanceVarPool, Property.class) Set<Property> taskProperties = new HashSet<>(JSONUtils.toList(taskInstanceVarPool, Property.class));
.stream().collect(Collectors.toSet());
String processInstanceVarPool = processInstance.getVarPool(); String processInstanceVarPool = processInstance.getVarPool();
if (StringUtils.isNotEmpty(processInstanceVarPool)) { if (StringUtils.isNotEmpty(processInstanceVarPool)) {
Set<Property> properties = JSONUtils.toList(processInstanceVarPool, Property.class) Set<Property> properties = new HashSet<>(JSONUtils.toList(processInstanceVarPool, Property.class));
.stream().collect(Collectors.toSet());
properties.addAll(taskProperties); properties.addAll(taskProperties);
processInstance.setVarPool(JSONUtils.toJsonString(properties)); processInstance.setVarPool(JSONUtils.toJsonString(properties));
} else { } else {
@ -1733,9 +1737,7 @@ public class WorkflowExecuteRunnable implements Runnable {
return; return;
} }
logger.info("add task to stand by list, task name:{}, task id:{}, task code:{}", logger.info("add task to stand by list, task name:{}, task id:{}, task code:{}",
taskInstance.getName(), taskInstance.getName(), taskInstance.getId(), taskInstance.getTaskCode());
taskInstance.getId(),
taskInstance.getTaskCode());
readyToSubmitTaskQueue.put(taskInstance); readyToSubmitTaskQueue.put(taskInstance);
} catch (Exception e) { } catch (Exception e) {
logger.error("add task instance to readyToSubmitTaskQueue, taskName:{}, task id:{}", taskInstance.getName(), taskInstance.getId(), e); logger.error("add task instance to readyToSubmitTaskQueue, taskName:{}, task id:{}", taskInstance.getName(), taskInstance.getId(), e);
@ -2028,12 +2030,26 @@ public class WorkflowExecuteRunnable implements Runnable {
} }
} }
private void measureTaskState(StateEvent taskStateEvent) { private void measureProcessState(StateEvent processStateEvent) {
if (taskStateEvent == null || taskStateEvent.getExecutionStatus() == null) { if (processStateEvent.getExecutionStatus().typeIsFinished()) {
// the event is broken ProcessInstanceMetrics.incProcessInstanceFinish();
logger.warn("The task event is broken..., taskEvent: {}", taskStateEvent); }
return; switch (processStateEvent.getExecutionStatus()) {
case STOP:
ProcessInstanceMetrics.incProcessInstanceStop();
break;
case SUCCESS:
ProcessInstanceMetrics.incProcessInstanceSuccess();
break;
case FAILURE:
ProcessInstanceMetrics.incProcessInstanceFailure();
break;
default:
break;
} }
}
private void measureTaskState(StateEvent taskStateEvent) {
if (taskStateEvent.getExecutionStatus().typeIsFinished()) { if (taskStateEvent.getExecutionStatus().typeIsFinished()) {
TaskMetrics.incTaskFinish(); TaskMetrics.incTaskFinish();
} }

16
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java

@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService; import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
@ -45,6 +46,8 @@ import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback; import org.springframework.util.concurrent.ListenableFutureCallback;
import com.google.common.base.Strings;
@Component @Component
public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor { public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
@ -94,6 +97,7 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
* start workflow * start workflow
*/ */
public void startWorkflow(WorkflowExecuteRunnable workflowExecuteThread) { public void startWorkflow(WorkflowExecuteRunnable workflowExecuteThread) {
ProcessInstanceMetrics.incProcessInstanceSubmit();
submit(workflowExecuteThread); submit(workflowExecuteThread);
} }
@ -109,7 +113,7 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
} }
multiThreadFilterMap.put(workflowExecuteThread.getKey(), workflowExecuteThread); multiThreadFilterMap.put(workflowExecuteThread.getKey(), workflowExecuteThread);
int processInstanceId = workflowExecuteThread.getProcessInstance().getId(); int processInstanceId = workflowExecuteThread.getProcessInstance().getId();
ListenableFuture future = this.submitListenable(workflowExecuteThread::handleEvents); ListenableFuture<?> future = this.submitListenable(workflowExecuteThread::handleEvents);
future.addCallback(new ListenableFutureCallback() { future.addCallback(new ListenableFutureCallback() {
@Override @Override
public void onFailure(Throwable ex) { public void onFailure(Throwable ex) {
@ -120,7 +124,7 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
@Override @Override
public void onSuccess(Object result) { public void onSuccess(Object result) {
// if an exception occurs, first, the error message cannot be printed in the log; // if an exception occurs, first, the error message cannot be printed in the log;
// secondly, the `multiThreadFilterMap` cannot be remove the `workflowExecuteThread`, resulting in the state of process instance cannot be changed and memory leak // secondly, the `multiThreadFilterMap` cannot remove the `workflowExecuteThread`, resulting in the state of process instance cannot be changed and memory leak
try { try {
if (workflowExecuteThread.workFlowFinish()) { if (workflowExecuteThread.workFlowFinish()) {
stateWheelExecuteThread.removeProcess4TimeoutCheck(workflowExecuteThread.getProcessInstance()); stateWheelExecuteThread.removeProcess4TimeoutCheck(workflowExecuteThread.getProcessInstance());
@ -144,12 +148,14 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
return; return;
} }
Map<ProcessInstance, TaskInstance> fatherMaps = processService.notifyProcessList(finishProcessInstance.getId()); Map<ProcessInstance, TaskInstance> fatherMaps = processService.notifyProcessList(finishProcessInstance.getId());
for (ProcessInstance processInstance : fatherMaps.keySet()) { for (Map.Entry<ProcessInstance, TaskInstance> entry : fatherMaps.entrySet()) {
ProcessInstance processInstance = entry.getKey();
TaskInstance taskInstance = entry.getValue();
String address = NetUtils.getAddr(masterConfig.getListenPort()); String address = NetUtils.getAddr(masterConfig.getListenPort());
if (processInstance.getHost().equalsIgnoreCase(address)) { if (processInstance.getHost().equalsIgnoreCase(address)) {
this.notifyMyself(processInstance, fatherMaps.get(processInstance)); this.notifyMyself(processInstance, taskInstance);
} else { } else {
this.notifyProcess(finishProcessInstance, processInstance, fatherMaps.get(processInstance)); this.notifyProcess(finishProcessInstance, processInstance, taskInstance);
} }
} }
} }

6
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java

@ -43,7 +43,7 @@ import com.google.auto.service.AutoService;
@AutoService(ITaskProcessor.class) @AutoService(ITaskProcessor.class)
public class CommonTaskProcessor extends BaseTaskProcessor { public class CommonTaskProcessor extends BaseTaskProcessor {
private TaskPriorityQueue taskUpdateQueue; private TaskPriorityQueue<TaskPriority> taskUpdateQueue;
private NettyExecutorManager nettyExecutorManager = SpringApplicationContext.getBean(NettyExecutorManager.class); private NettyExecutorManager nettyExecutorManager = SpringApplicationContext.getBean(NettyExecutorManager.class);
@ -101,7 +101,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
this.initQueue(); this.initQueue();
} }
if (taskInstance.getState().typeIsFinished()) { if (taskInstance.getState().typeIsFinished()) {
logger.info(String.format("submit task , but task [%s] state [%s] is already finished. ", taskInstance.getName(), taskInstance.getState().toString())); logger.info("submit task , but task [{}] state [{}] is already finished. ", taskInstance.getName(), taskInstance.getState());
return true; return true;
} }
// task cannot be submitted because its execution state is RUNNING or DELAY. // task cannot be submitted because its execution state is RUNNING or DELAY.
@ -125,7 +125,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
taskPriority.setTaskExecutionContext(taskExecutionContext); taskPriority.setTaskExecutionContext(taskExecutionContext);
taskUpdateQueue.put(taskPriority); taskUpdateQueue.put(taskPriority);
logger.info("Master submit task to priority queue success, taskInstanceId : {}", taskInstance.getId()); logger.info("master submit success, task : {}", taskInstance.getName());
return true; return true;
} catch (Exception e) { } catch (Exception e) {
logger.error("submit task error", e); logger.error("submit task error", e);

7
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java

@ -33,7 +33,7 @@ import org.slf4j.LoggerFactory;
/** /**
* the factory to create task processor * the factory to create task processor
*/ */
public class TaskProcessorFactory { public final class TaskProcessorFactory {
private static final Logger logger = LoggerFactory.getLogger(TaskProcessorFactory.class); private static final Logger logger = LoggerFactory.getLogger(TaskProcessorFactory.class);
@ -66,10 +66,15 @@ public class TaskProcessorFactory {
/** /**
* if match master processor, then this task type is processed on the master * if match master processor, then this task type is processed on the master
*
* @param type * @param type
* @return * @return
*/ */
public static boolean isMasterTask(String type) { public static boolean isMasterTask(String type) {
return PROCESS_MAP.containsKey(type); return PROCESS_MAP.containsKey(type);
} }
private TaskProcessorFactory() {
throw new UnsupportedOperationException("TaskProcessorFactory cannot be instantiated");
}
} }

5
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java

@ -32,6 +32,8 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory; import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.server.utils.ProcessUtils;
@ -157,6 +159,7 @@ public class FailoverService {
} }
LOGGER.info("failover process instance id: {}", processInstance.getId()); LOGGER.info("failover process instance id: {}", processInstance.getId());
ProcessInstanceMetrics.incProcessInstanceFailover();
//updateProcessInstance host is null and insert into command //updateProcessInstance host is null and insert into command
processInstance.setHost(Constants.NULL); processInstance.setHost(Constants.NULL);
processService.processNeedFailoverProcessInstances(processInstance); processService.processNeedFailoverProcessInstances(processInstance);
@ -227,7 +230,7 @@ public class FailoverService {
if (!checkTaskInstanceNeedFailover(servers, taskInstance)) { if (!checkTaskInstanceNeedFailover(servers, taskInstance)) {
return; return;
} }
TaskMetrics.incTaskFailover();
boolean isMasterTask = TaskProcessorFactory.isMasterTask(taskInstance.getTaskType()); boolean isMasterTask = TaskProcessorFactory.isMasterTask(taskInstance.getTaskType());
taskInstance.setProcessInstance(processInstance); taskInstance.setProcessInstance(processInstance);

3
dolphinscheduler-master/src/main/resources/application.yaml

@ -136,6 +136,9 @@ spring:
on-profile: mysql on-profile: mysql
datasource: datasource:
driver-class-name: com.mysql.cj.jdbc.Driver driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler
username: root
password: root
quartz: quartz:
properties: properties:
org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate

2
dolphinscheduler-master/src/main/resources/banner.txt

@ -8,5 +8,5 @@ ${AnsiColor.BLUE}${AnsiStyle.BOLD}
|_| |_|
================================================================================ ================================================================================
${AnsiColor.BLUE}${AnsiStyle.BOLD} ${AnsiColor.BLUE}${AnsiStyle.BOLD}
:: Dolphinscheduler master server :: ${application.formatted-version} :: DolphinScheduler master server :: ${application.formatted-version}
${AnsiStyle.NORMAL} ${AnsiStyle.NORMAL}

4
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteTaskTest.java

@ -84,8 +84,6 @@ public class WorkflowExecuteTaskTest {
private ApplicationContext applicationContext; private ApplicationContext applicationContext;
private TaskProcessorFactory taskProcessorFactory;
private StateWheelExecuteThread stateWheelExecuteThread; private StateWheelExecuteThread stateWheelExecuteThread;
@Before @Before
@ -100,8 +98,6 @@ public class WorkflowExecuteTaskTest {
processService = mock(ProcessService.class); processService = mock(ProcessService.class);
Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService); Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
taskProcessorFactory = mock(TaskProcessorFactory.class);
processInstance = mock(ProcessInstance.class); processInstance = mock(ProcessInstance.class);
Mockito.when(processInstance.getState()).thenReturn(ExecutionStatus.SUCCESS); Mockito.when(processInstance.getState()).thenReturn(ExecutionStatus.SUCCESS);
Mockito.when(processInstance.getHistoryCmd()).thenReturn(CommandType.COMPLEMENT_DATA.toString()); Mockito.when(processInstance.getHistoryCmd()).thenReturn(CommandType.COMPLEMENT_DATA.toString());

60
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManagerTest.java

@ -1,60 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.registry;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
/**
* server node manager test
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({ RegistryClient.class })
@PowerMockIgnore({"javax.management.*"})
public class ServerNodeManagerTest {
private ServerNodeManager serverNodeManager;
@Mock
private WorkerGroupMapper workerGroupMapper;
@Mock
private AlertDao alertDao;
@Before
public void before() {
PowerMockito.suppress(PowerMockito.constructor(RegistryClient.class));
serverNodeManager = PowerMockito.mock(ServerNodeManager.class);
}
@Test
public void test(){
//serverNodeManager.getWorkerGroupNodes()
}
}

16
dolphinscheduler-meter/src/main/java/org/apache/dolphinscheduler/meter/MeterConfiguration.java

@ -20,6 +20,10 @@
package org.apache.dolphinscheduler.meter; package org.apache.dolphinscheduler.meter;
import javax.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
@ -30,6 +34,18 @@ import io.micrometer.core.aop.CountedAspect;
import io.micrometer.core.aop.TimedAspect; import io.micrometer.core.aop.TimedAspect;
import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.MeterRegistry;
/**
* This configuration class is used to config the metrics. We use <a href="https://micrometer.io/docs/concepts">micrometer</a> as the metrics fade.
*
* <p>To open the metrics, you need to set the property "metrics.enabled" to true. Right now, we only support expose the metrics to Prometheus,
* after you open metrics expose, you can get the metrics data at: http://host:port/actuator/prometheus.
* <p>You can use the below method to get a meter:
* <pre>
* {@code
* Counter counter = Metrics.counter("name", "tag1", "tag2");
* }
* </pre>
*/
@Configuration @Configuration
@EnableAspectJAutoProxy @EnableAspectJAutoProxy
@EnableAutoConfiguration @EnableAutoConfiguration

5
dolphinscheduler-meter/src/main/resources/grafana-demo/docker-compose.yaml

@ -32,9 +32,10 @@ services:
image: grafana/grafana image: grafana/grafana
networks: [ test ] networks: [ test ]
ports: ports:
- "3000:3000" # due to the DolphinScheduler frontend port is 3000, so we change the grafana default port to 3001.
- "3001:3000"
environment: environment:
GF_AUTH_ANONYMOUS_ENABLED: true GF_AUTH_ANONYMOUS_ENABLED: "true"
volumes: volumes:
- ../grafana:/dashboards:ro - ../grafana:/dashboards:ro
- ./datasources:/etc/grafana/provisioning/datasources:ro - ./datasources:/etc/grafana/provisioning/datasources:ro

2607
dolphinscheduler-meter/src/main/resources/grafana/DolphinSchedulerMaster.json

File diff suppressed because it is too large Load Diff

1598
dolphinscheduler-meter/src/main/resources/grafana/DolphinSchedulerWorker.json

File diff suppressed because it is too large Load Diff

2
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/StateEventCallbackService.java

@ -86,7 +86,7 @@ public class StateEventCallbackService {
return null; return null;
} }
public int pause(int ntries) { public long pause(int ntries) {
return SLEEP_TIME_MILLIS * RETRY_BACKOFF[ntries % RETRY_BACKOFF.length]; return SLEEP_TIME_MILLIS * RETRY_BACKOFF[ntries % RETRY_BACKOFF.length];
} }

2
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.service.process; package org.apache.dolphinscheduler.service.process;
import io.micrometer.core.annotation.Counted;
import static java.util.stream.Collectors.toSet; import static java.util.stream.Collectors.toSet;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
@ -391,6 +392,7 @@ public class ProcessServiceImpl implements ProcessService {
* @return create result * @return create result
*/ */
@Override @Override
@Counted("dolphinscheduler_create_command_count")
public int createCommand(Command command) { public int createCommand(Command command) {
int result = 0; int result = 0;
if (command != null) { if (command != null) {

2
dolphinscheduler-standalone-server/src/main/resources/banner.txt

@ -8,5 +8,5 @@ ${AnsiColor.BLUE}${AnsiStyle.BOLD}
|_| |_|
================================================================================ ================================================================================
${AnsiColor.BLUE}${AnsiStyle.BOLD} ${AnsiColor.BLUE}${AnsiStyle.BOLD}
:: Dolphinscheduler standalone server :: ${application.formatted-version} :: DolphinScheduler standalone server :: ${application.formatted-version}
${AnsiStyle.NORMAL} ${AnsiStyle.NORMAL}

58
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/metrics/TaskMetrics.java

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.metrics;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.ServiceLoader;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
public final class TaskMetrics {
private TaskMetrics() {
throw new UnsupportedOperationException("Utility class");
}
private static Map<String, Counter> TASK_TYPE_EXECUTE_COUNTER = new HashMap<>();
private static final Counter UNKNOWN_TASK_EXECUTE_COUNTER =
Counter.builder("dolphinscheduler_task_execute_count")
.tag("task_type", "unknown")
.description("task execute counter")
.register(Metrics.globalRegistry);
static {
for (TaskChannelFactory taskChannelFactory : ServiceLoader.load(TaskChannelFactory.class)) {
TASK_TYPE_EXECUTE_COUNTER.put(
taskChannelFactory.getName(),
Counter.builder("dolphinscheduler_task_execute_count")
.tag("task_type", taskChannelFactory.getName())
.description("task execute counter")
.register(Metrics.globalRegistry)
);
}
}
public static void incrTaskTypeExecuteCount(String taskType) {
TASK_TYPE_EXECUTE_COUNTER.getOrDefault(taskType, UNKNOWN_TASK_EXECUTE_COUNTER).increment();
}
}

56
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/metrics/WorkerServerMetrics.java

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.metrics;
import java.util.function.Supplier;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Metrics;
public final class WorkerServerMetrics {
public WorkerServerMetrics() {
throw new UnsupportedOperationException("Utility class");
}
private static final Counter WORKER_OVERLOAD_COUNTER =
Counter.builder("dolphinscheduler_worker_overload_count")
.description("worker load count")
.register(Metrics.globalRegistry);
private static final Counter WORKER_SUBMIT_QUEUE_IS_FULL_COUNTER =
Counter.builder("dolphinscheduler_worker_submit_queue_is_full_count")
.description("worker task submit queue is full count")
.register(Metrics.globalRegistry);
public static void incWorkerOverloadCount() {
WORKER_OVERLOAD_COUNTER.increment();
}
public static void incWorkerSubmitQueueIsFullCount() {
WORKER_SUBMIT_QUEUE_IS_FULL_COUNTER.increment();
}
public static void registerWorkerRunningTaskGauge(Supplier<Number> supplier) {
Gauge.builder("dolphinscheduler_worker_running_task_gauge", supplier)
.description("worker running task gauge")
.register(Metrics.globalRegistry);
}
}

2
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java

@ -118,7 +118,7 @@ public class TaskCallbackService {
return null; return null;
} }
public int pause(int ntries) { public long pause(int ntries) {
return SLEEP_TIME_MILLIS * RETRY_BACKOFF[ntries % RETRY_BACKOFF.length]; return SLEEP_TIME_MILLIS * RETRY_BACKOFF[ntries % RETRY_BACKOFF.length];
} }

8
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java

@ -34,11 +34,14 @@ import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.utils.LogUtils; import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.metrics.TaskMetrics;
import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread; import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.alert.AlertClientService; import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.task.TaskPluginManager; import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.apache.commons.lang.SystemUtils;
import java.util.Date; import java.util.Date;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -48,6 +51,8 @@ import org.springframework.stereotype.Component;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import io.micrometer.core.annotation.Counted;
import io.micrometer.core.annotation.Timed;
import io.netty.channel.Channel; import io.netty.channel.Channel;
/** /**
@ -85,6 +90,8 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
@Autowired @Autowired
private WorkerManagerThread workerManager; private WorkerManagerThread workerManager;
@Counted(value = "dolphinscheduler_task_execution_count", description = "task execute total count")
@Timed(value = "dolphinscheduler_task_execution_timer", percentiles = {0.5, 0.75, 0.95, 0.99}, histogram = true)
@Override @Override
public void process(Channel channel, Command command) { public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_EXECUTE_REQUEST == command.getType(), Preconditions.checkArgument(CommandType.TASK_EXECUTE_REQUEST == command.getType(),
@ -106,6 +113,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
logger.error("task execution context is null"); logger.error("task execution context is null");
return; return;
} }
TaskMetrics.incrTaskTypeExecuteCount(taskExecutionContext.getTaskType());
// set cache, it will be used when kill task // set cache, it will be used when kill task
TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext); TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);

3
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java

@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.server.worker.runner; package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
@ -52,6 +54,7 @@ public class WorkerExecService {
this.execService = execService; this.execService = execService;
this.listeningExecutorService = MoreExecutors.listeningDecorator(this.execService); this.listeningExecutorService = MoreExecutors.listeningDecorator(this.execService);
this.taskExecuteThreadMap = taskExecuteThreadMap; this.taskExecuteThreadMap = taskExecuteThreadMap;
WorkerServerMetrics.registerWorkerRunningTaskGauge(taskExecuteThreadMap::size);
} }
public void submit(TaskExecuteThread taskExecuteThread) { public void submit(TaskExecuteThread taskExecuteThread) {

1
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java

@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;

3
dolphinscheduler-worker/src/main/resources/application.yaml

@ -99,3 +99,6 @@ spring:
on-profile: mysql on-profile: mysql
datasource: datasource:
driver-class-name: com.mysql.cj.jdbc.Driver driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler
username: root
password: root

2
dolphinscheduler-worker/src/main/resources/banner.txt

@ -8,5 +8,5 @@ ${AnsiColor.BLUE}${AnsiStyle.BOLD}
|_| |_|
================================================================================ ================================================================================
${AnsiColor.BLUE}${AnsiStyle.BOLD} ${AnsiColor.BLUE}${AnsiStyle.BOLD}
:: Dolphinscheduler work server :: ${application.formatted-version} :: DolphinScheduler work server :: ${application.formatted-version}
${AnsiStyle.NORMAL} ${AnsiStyle.NORMAL}

Loading…
Cancel
Save