diff --git a/docker/build/conf/dolphinscheduler/logback/logback-alert.xml b/docker/build/conf/dolphinscheduler/logback/logback-alert.xml
index eec78385db..e9919aced3 100644
--- a/docker/build/conf/dolphinscheduler/logback/logback-alert.xml
+++ b/docker/build/conf/dolphinscheduler/logback/logback-alert.xml
@@ -36,8 +36,13 @@
+
+
+
+
+
\ No newline at end of file
diff --git a/docker/build/conf/dolphinscheduler/logback/logback-api.xml b/docker/build/conf/dolphinscheduler/logback/logback-api.xml
index 6d29f8af5f..8b55b69bfa 100644
--- a/docker/build/conf/dolphinscheduler/logback/logback-api.xml
+++ b/docker/build/conf/dolphinscheduler/logback/logback-api.xml
@@ -41,6 +41,10 @@
+
+
+
+
@@ -48,6 +52,7 @@
+
\ No newline at end of file
diff --git a/docker/build/conf/dolphinscheduler/logback/logback-master.xml b/docker/build/conf/dolphinscheduler/logback/logback-master.xml
index d1bfb67aa1..289c861a64 100644
--- a/docker/build/conf/dolphinscheduler/logback/logback-master.xml
+++ b/docker/build/conf/dolphinscheduler/logback/logback-master.xml
@@ -65,9 +65,14 @@
+
+
+
+
+
\ No newline at end of file
diff --git a/docker/build/conf/dolphinscheduler/logback/logback-worker.xml b/docker/build/conf/dolphinscheduler/logback/logback-worker.xml
index b7e08dd846..bed3c0864f 100644
--- a/docker/build/conf/dolphinscheduler/logback/logback-worker.xml
+++ b/docker/build/conf/dolphinscheduler/logback/logback-worker.xml
@@ -66,9 +66,14 @@
+
+
+
+
+
\ No newline at end of file
diff --git a/dolphinscheduler-alert/src/main/resources/logback-alert.xml b/dolphinscheduler-alert/src/main/resources/logback-alert.xml
index 1718947dd1..b46c0a0159 100644
--- a/dolphinscheduler-alert/src/main/resources/logback-alert.xml
+++ b/dolphinscheduler-alert/src/main/resources/logback-alert.xml
@@ -44,9 +44,14 @@
+
+
+
+
+
\ No newline at end of file
diff --git a/dolphinscheduler-api/src/main/resources/logback-api.xml b/dolphinscheduler-api/src/main/resources/logback-api.xml
index c1142ed8ed..44f47024c7 100644
--- a/dolphinscheduler-api/src/main/resources/logback-api.xml
+++ b/dolphinscheduler-api/src/main/resources/logback-api.xml
@@ -49,6 +49,10 @@
+
+
+
+
@@ -57,6 +61,7 @@
+
\ No newline at end of file
diff --git a/dolphinscheduler-common/pom.xml b/dolphinscheduler-common/pom.xml
index cf76df3a36..356240cc6f 100644
--- a/dolphinscheduler-common/pom.xml
+++ b/dolphinscheduler-common/pom.xml
@@ -588,5 +588,9 @@
compile
+
+ org.apache.skywalking
+ apm-toolkit-logback-1.x
+
diff --git a/dolphinscheduler-dist/pom.xml b/dolphinscheduler-dist/pom.xml
index 80c47a72c9..9d525d30ac 100644
--- a/dolphinscheduler-dist/pom.xml
+++ b/dolphinscheduler-dist/pom.xml
@@ -41,6 +41,12 @@
org.apache.dolphinscheduler
dolphinscheduler-api
+
+
+ org.apache.dolphinscheduler
+ dolphinscheduler-skywalking
+ ${project.version}
+
diff --git a/dolphinscheduler-dist/release-docs/LICENSE b/dolphinscheduler-dist/release-docs/LICENSE
index 71d3f9a548..a2a5090dc1 100644
--- a/dolphinscheduler-dist/release-docs/LICENSE
+++ b/dolphinscheduler-dist/release-docs/LICENSE
@@ -254,6 +254,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
derby 10.14.2.0: https://github.com/apache/derby, Apache 2.0
druid 1.1.14: https://mvnrepository.com/artifact/com.alibaba/druid/1.1.14, Apache 2.0
fastjson 1.2.61: https://mvnrepository.com/artifact/com.alibaba/fastjson/1.2.61, Apache 2.0
+ grpc-java 1.32.1: https://github.com/grpc/grpc-java, Apache 2.0
gson 2.8.5: https://github.com/google/gson, Apache 2.0
guava 20.0: https://mvnrepository.com/artifact/com.google.guava/guava/20.0, Apache 2.0
guice 3.0: https://mvnrepository.com/artifact/com.google.inject/guice/3.0, Apache 2.0
@@ -342,9 +343,11 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
netty-all 4.1.33.Final: https://github.com/netty/netty/blob/netty-4.1.33.Final/LICENSE.txt, Apache 2.0
opencsv 2.3: https://mvnrepository.com/artifact/net.sf.opencsv/opencsv/2.3, Apache 2.0
parquet-hadoop-bundle 1.8.1: https://mvnrepository.com/artifact/org.apache.parquet/parquet-hadoop-bundle/1.8.1, Apache 2.0
+ perfmark-api 0.19.0: https://mvnrepository.com/artifact/io.perfmark/perfmark-api/0.19.0, Apache 2.0
poi 3.17: https://mvnrepository.com/artifact/org.apache.poi/poi/3.17, Apache 2.0
quartz 2.3.0: https://mvnrepository.com/artifact/org.quartz-scheduler/quartz/2.3.0, Apache 2.0
quartz-jobs 2.3.0: https://mvnrepository.com/artifact/org.quartz-scheduler/quartz-jobs/2.3.0, Apache 2.0
+ skywalking 8.4.0: https://github.com/apache/skywalking, Apache 2.0
snakeyaml 1.23: https://mvnrepository.com/artifact/org.yaml/snakeyaml/1.23, Apache 2.0
snappy 0.2: https://mvnrepository.com/artifact/org.iq80.snappy/snappy/0.2, Apache 2.0
snappy-java 1.0.4.1: https://github.com/xerial/snappy-java, Apache 2.0
diff --git a/dolphinscheduler-dist/release-docs/licenses/LICENSE-skywalking-agent.txt b/dolphinscheduler-dist/release-docs/licenses/LICENSE-skywalking-agent.txt
new file mode 100644
index 0000000000..261eeb9e9f
--- /dev/null
+++ b/dolphinscheduler-dist/release-docs/licenses/LICENSE-skywalking-agent.txt
@@ -0,0 +1,201 @@
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
diff --git a/dolphinscheduler-dist/src/main/assembly/dolphinscheduler-binary.xml b/dolphinscheduler-dist/src/main/assembly/dolphinscheduler-binary.xml
index 28bbb361cd..3e282bddd7 100644
--- a/dolphinscheduler-dist/src/main/assembly/dolphinscheduler-binary.xml
+++ b/dolphinscheduler-dist/src/main/assembly/dolphinscheduler-binary.xml
@@ -151,6 +151,14 @@
.
+
+ ${basedir}/../ext/skywalking/target/skywalking-agent
+
+ **/*.*
+
+ ./skywalking-agent
+
+
${basedir}/../dolphinscheduler-ui/dist
diff --git a/dolphinscheduler-server/src/main/resources/logback-master.xml b/dolphinscheduler-server/src/main/resources/logback-master.xml
index a61d891b10..6606ddc300 100644
--- a/dolphinscheduler-server/src/main/resources/logback-master.xml
+++ b/dolphinscheduler-server/src/main/resources/logback-master.xml
@@ -73,9 +73,14 @@
+
+
+
+
+
\ No newline at end of file
diff --git a/dolphinscheduler-server/src/main/resources/logback-worker.xml b/dolphinscheduler-server/src/main/resources/logback-worker.xml
index bea68ff37d..35b8242e3a 100644
--- a/dolphinscheduler-server/src/main/resources/logback-worker.xml
+++ b/dolphinscheduler-server/src/main/resources/logback-worker.xml
@@ -73,9 +73,14 @@
+
+
+
+
+
\ No newline at end of file
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java
index a872f6db9f..0e6ebaa7f5 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java
@@ -53,7 +53,7 @@ public class TaskPriority implements Comparable {
/**
* context
*/
- private Map context;
+ private Map context;
public TaskPriority(){}
@@ -96,7 +96,7 @@ public class TaskPriority implements Comparable {
return taskId;
}
- public Map getContext() {
+ public Map getContext() {
return context;
}
@@ -112,7 +112,7 @@ public class TaskPriority implements Comparable {
this.groupName = groupName;
}
- public void setContext(Map context) {
+ public void setContext(Map context) {
this.context = context;
}
diff --git a/ext/skywalking/config/agent.config b/ext/skywalking/config/agent.config
new file mode 100644
index 0000000000..cca5bae971
--- /dev/null
+++ b/ext/skywalking/config/agent.config
@@ -0,0 +1,118 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# The agent namespace
+# agent.namespace=${SW_AGENT_NAMESPACE:default-namespace}
+
+# The service name in UI
+agent.service_name=${SW_AGENT_NAME:Your_ApplicationName}
+
+# The number of sampled traces per 3 seconds
+# Negative or zero means off, by default
+# agent.sample_n_per_3_secs=${SW_AGENT_SAMPLE:-1}
+
+# Authentication active is based on backend setting, see application.yml for more details.
+# agent.authentication = ${SW_AGENT_AUTHENTICATION:xxxx}
+
+# The max amount of spans in a single segment.
+# Through this config item, SkyWalking keep your application memory cost estimated.
+# agent.span_limit_per_segment=${SW_AGENT_SPAN_LIMIT:150}
+
+# If the operation name of the first span is included in this set, this segment should be ignored. Multiple values should be separated by `,`.
+# agent.ignore_suffix=${SW_AGENT_IGNORE_SUFFIX:.jpg,.jpeg,.js,.css,.png,.bmp,.gif,.ico,.mp3,.mp4,.html,.svg}
+
+# If true, SkyWalking agent will save all instrumented classes files in `/debugging` folder.
+# SkyWalking team may ask for these files in order to resolve compatible problem.
+# agent.is_open_debugging_class = ${SW_AGENT_OPEN_DEBUG:true}
+
+# If true, SkyWalking agent will cache all instrumented classes files to memory or disk files (decided by class cache mode),
+# allow other javaagent to enhance those classes that enhanced by SkyWalking agent.
+# agent.is_cache_enhanced_class = ${SW_AGENT_CACHE_CLASS:false}
+
+# The instrumented classes cache mode: MEMORY or FILE
+# MEMORY: cache class bytes to memory, if instrumented classes is too many or too large, it may take up more memory
+# FILE: cache class bytes in `/class-cache` folder, automatically clean up cached class files when the application exits
+# agent.class_cache_mode = ${SW_AGENT_CLASS_CACHE_MODE:MEMORY}
+
+# The operationName max length
+# Notice, in the current practice, we don't recommend the length over 190.
+# agent.operation_name_threshold=${SW_AGENT_OPERATION_NAME_THRESHOLD:150}
+
+# The agent use gRPC plain text in default.
+# If true, SkyWalking agent uses TLS even no CA file detected.
+# agent.force_tls=${SW_AGENT_FORCE_TLS:false}
+
+# If true, skywalking agent will enable profile when user create a new profile task. Otherwise disable profile.
+# profile.active=${SW_AGENT_PROFILE_ACTIVE:true}
+
+# Parallel monitor segment count
+# profile.max_parallel=${SW_AGENT_PROFILE_MAX_PARALLEL:5}
+
+# Max monitor segment time(minutes), if current segment monitor time out of limit, then stop it.
+# profile.duration=${SW_AGENT_PROFILE_DURATION:10}
+
+# Max dump thread stack depth
+# profile.dump_max_stack_depth=${SW_AGENT_PROFILE_DUMP_MAX_STACK_DEPTH:500}
+
+# Snapshot transport to backend buffer size
+# profile.snapshot_transport_buffer_size=${SW_AGENT_PROFILE_SNAPSHOT_TRANSPORT_BUFFER_SIZE:50}
+
+# Backend service addresses.
+collector.backend_service=${SW_AGENT_COLLECTOR_BACKEND_SERVICES:127.0.0.1:11800}
+
+# Logging file_name
+logging.file_name=${SW_LOGGING_FILE_NAME:skywalking-api.log}
+
+# Logging level
+logging.level=${SW_LOGGING_LEVEL:INFO}
+
+# Logging dir
+# logging.dir=${SW_LOGGING_DIR:""}
+
+# Logging max_file_size, default: 300 * 1024 * 1024 = 314572800
+# logging.max_file_size=${SW_LOGGING_MAX_FILE_SIZE:314572800}
+
+# The max history log files. When rollover happened, if log files exceed this number,
+# then the oldest file will be delete. Negative or zero means off, by default.
+# logging.max_history_files=${SW_LOGGING_MAX_HISTORY_FILES:-1}
+
+# Listed exceptions would not be treated as an error. Because in some codes, the exception is being used as a way of controlling business flow.
+# Besides, the annotation named IgnoredException in the trace toolkit is another way to configure ignored exceptions.
+# statuscheck.ignored_exceptions=${SW_STATUSCHECK_IGNORED_EXCEPTIONS:}
+
+# The max recursive depth when checking the exception traced by the agent. Typically, we don't recommend setting this more than 10, which could cause a performance issue. Negative value and 0 would be ignored, which means all exceptions would make the span tagged in error status.
+# statuscheck.max_recursive_depth=${SW_STATUSCHECK_MAX_RECURSIVE_DEPTH:1}
+
+# Mount the specific folders of the plugins. Plugins in mounted folders would work.
+plugin.mount=${SW_MOUNT_FOLDERS:plugins,activations}
+
+# Exclude activated plugins
+# plugin.exclude_plugins=${SW_EXCLUDE_PLUGINS:}
+
+# mysql plugin configuration
+# plugin.mysql.trace_sql_parameters=${SW_MYSQL_TRACE_SQL_PARAMETERS:false}
+
+# Kafka producer configuration
+# plugin.kafka.bootstrap_servers=${SW_KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
+
+# Match spring bean with regex expression for classname
+# plugin.springannotation.classname_match_regex=${SW_SPRINGANNOTATION_CLASSNAME_MATCH_REGEX:}
+
+# log reporter configs
+# Specify which grpc server's host for log data to report to.
+plugin.toolkit.log.grpc.reporter.server_host=${SW_GRPC_LOG_SERVER_HOST:127.0.0.1}
+# Specify which grpc server's port for log data to report to.
+plugin.toolkit.log.grpc.reporter.server_port=${SW_GRPC_LOG_SERVER_PORT:11800}
\ No newline at end of file
diff --git a/ext/skywalking/dashboard/dolphinscheduler.yml b/ext/skywalking/dashboard/dolphinscheduler.yml
new file mode 100644
index 0000000000..31b97fe52e
--- /dev/null
+++ b/ext/skywalking/dashboard/dolphinscheduler.yml
@@ -0,0 +1,284 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# UI templates initialized file includes the default template when the SkyWalking OAP starts up at the first time.
+#
+# Also, SkyWalking would detect the existing templates in the database, once they are missing, all templates in this file
+# could be added automatically.
+
+templates:
+ - name: Dolphinscheduler
+ type: "DASHBOARD"
+ configuration: |-
+ [
+ {
+ "name":"Dolphinscheduler",
+ "type":"service",
+ "serviceGroup":"dolphinscheduler",
+ "children":[
+ {
+ "name":"Master Scheduler",
+ "children":[
+ {
+ "width":3,
+ "title":"QuartzScheduler Load",
+ "height":350,
+ "entityType":"Endpoint",
+ "independentSelector":true,
+ "metricType":"REGULAR_VALUE",
+ "metricName":"endpoint_cpm",
+ "queryMetricType":"readMetricsValues",
+ "chartType":"ChartLine",
+ "currentService":"dolphinscheduler::master-server",
+ "currentEndpoint":"quartz-scheduler/org.apache.dolphinscheduler.service.quartz.ProcessScheduleJob"
+ },
+ {
+ "width":3,
+ "title":"QuartzScheduler Avg Time",
+ "height":350,
+ "entityType":"Endpoint",
+ "independentSelector":true,
+ "metricType":"REGULAR_VALUE",
+ "metricName":"endpoint_avg",
+ "queryMetricType":"readMetricsValues",
+ "chartType":"ChartLine",
+ "currentService":"dolphinscheduler::master-server",
+ "currentEndpoint":"quartz-scheduler/org.apache.dolphinscheduler.service.quartz.ProcessScheduleJob",
+ "unit":"ms"
+ },
+ {
+ "width":3,
+ "title":"QuartzScheduler Time Percentile",
+ "height":350,
+ "entityType":"Endpoint",
+ "independentSelector":true,
+ "metricType":"LABELED_VALUE",
+ "metricName":"endpoint_percentile",
+ "queryMetricType":"readLabeledMetricsValues",
+ "chartType":"ChartLine",
+ "metricLabels":"P50, P75, P90, P95, P99",
+ "labelsIndex":"0, 1, 2, 3, 4",
+ "currentService":"dolphinscheduler::master-server",
+ "currentEndpoint":"quartz-scheduler/org.apache.dolphinscheduler.service.quartz.ProcessScheduleJob",
+ "unit":"ms"
+ },
+ {
+ "width":3,
+ "title":"QuartzScheduler Successful Rate",
+ "height":350,
+ "entityType":"Endpoint",
+ "independentSelector":true,
+ "metricType":"REGULAR_VALUE",
+ "metricName":"endpoint_sla",
+ "queryMetricType":"readMetricsValues",
+ "chartType":"ChartLine",
+ "currentService":"dolphinscheduler::master-server",
+ "currentEndpoint":"quartz-scheduler/org.apache.dolphinscheduler.service.quartz.ProcessScheduleJob",
+ "unit":"%",
+ "aggregation":"/",
+ "aggregationNum":"100"
+ },
+ {
+ "width":3,
+ "title":"MasterSchedulerService Load",
+ "height":350,
+ "entityType":"Endpoint",
+ "independentSelector":true,
+ "metricType":"REGULAR_VALUE",
+ "metricName":"endpoint_cpm",
+ "queryMetricType":"readMetricsValues",
+ "chartType":"ChartLine",
+ "currentService":"dolphinscheduler::master-server",
+ "currentEndpoint":"master/schedule/process"
+ },
+ {
+ "width":3,
+ "title":"MasterSchedulerService Avg Time",
+ "height":350,
+ "entityType":"Endpoint",
+ "independentSelector":true,
+ "metricType":"REGULAR_VALUE",
+ "metricName":"endpoint_avg",
+ "queryMetricType":"readMetricsValues",
+ "chartType":"ChartLine",
+ "currentService":"dolphinscheduler::master-server",
+ "currentEndpoint":"master/schedule/process",
+ "unit":"ms"
+ },
+ {
+ "width":3,
+ "title":"MasterSchedulerService Time Percentile",
+ "height":350,
+ "entityType":"Endpoint",
+ "independentSelector":true,
+ "metricType":"LABELED_VALUE",
+ "queryMetricType":"readLabeledMetricsValues",
+ "chartType":"ChartLine",
+ "metricName":"endpoint_percentile",
+ "metricLabels":"P50, P75, P90, P95, P99",
+ "labelsIndex":"0, 1, 2, 3, 4",
+ "currentService":"dolphinscheduler::master-server",
+ "currentEndpoint":"master/schedule/process",
+ "unit":"ms"
+ },
+ {
+ "width":3,
+ "title":"MasterSchedulerService Successful Rate",
+ "height":350,
+ "entityType":"Endpoint",
+ "independentSelector":true,
+ "metricType":"REGULAR_VALUE",
+ "queryMetricType":"readMetricsValues",
+ "chartType":"ChartLine",
+ "metricName":"endpoint_sla",
+ "currentService":"dolphinscheduler::master-server",
+ "currentEndpoint":"master/schedule/process",
+ "unit":"%",
+ "aggregation":"/",
+ "aggregationNum":"100"
+ }
+ ]
+ },
+ {
+ "name":"Master Queue",
+ "children":[
+ {
+ "width":3,
+ "title":"Put Load",
+ "height":350,
+ "entityType":"Endpoint",
+ "independentSelector":true,
+ "metricType":"REGULAR_VALUE",
+ "metricName":"endpoint_cpm",
+ "queryMetricType":"readMetricsValues",
+ "chartType":"ChartLine",
+ "currentService":"dolphinscheduler::master-server",
+ "currentEndpoint":"masetr/queue/put"
+ },
+ {
+ "width":3,
+ "title":"Put Avg Time",
+ "height":350,
+ "entityType":"Endpoint",
+ "independentSelector":true,
+ "metricType":"REGULAR_VALUE",
+ "metricName":"endpoint_avg",
+ "queryMetricType":"readMetricsValues",
+ "chartType":"ChartLine",
+ "currentService":"dolphinscheduler::master-server",
+ "currentEndpoint":"masetr/queue/put",
+ "unit":"ms"
+ },
+ {
+ "width":3,
+ "title":"Put Time Percentile",
+ "height":350,
+ "entityType":"Endpoint",
+ "independentSelector":true,
+ "metricType":"LABELED_VALUE",
+ "metricName":"endpoint_percentile",
+ "queryMetricType":"readLabeledMetricsValues",
+ "chartType":"ChartLine",
+ "metricLabels":"P50, P75, P90, P95, P99",
+ "labelsIndex":"0, 1, 2, 3, 4",
+ "currentService":"dolphinscheduler::master-server",
+ "currentEndpoint":"masetr/queue/put",
+ "unit":"ms"
+ },
+ {
+ "width":3,
+ "title":"Put Successful Rate",
+ "height":350,
+ "entityType":"Endpoint",
+ "independentSelector":true,
+ "metricType":"REGULAR_VALUE",
+ "metricName":"endpoint_sla",
+ "queryMetricType":"readMetricsValues",
+ "chartType":"ChartLine",
+ "currentService":"dolphinscheduler::master-server",
+ "currentEndpoint":"masetr/queue/put",
+ "unit":"%",
+ "aggregation":"/",
+ "aggregationNum":"100"
+ },
+ {
+ "width":3,
+ "title":"Take Load",
+ "height":350,
+ "entityType":"Endpoint",
+ "independentSelector":true,
+ "metricType":"REGULAR_VALUE",
+ "metricName":"endpoint_cpm",
+ "queryMetricType":"readMetricsValues",
+ "chartType":"ChartLine",
+ "currentService":"dolphinscheduler::master-server",
+ "currentEndpoint":"masetr/queue/take"
+ },
+ {
+ "width":3,
+ "title":"Take Avg Time",
+ "height":350,
+ "entityType":"Endpoint",
+ "independentSelector":true,
+ "metricType":"REGULAR_VALUE",
+ "metricName":"endpoint_avg",
+ "queryMetricType":"readMetricsValues",
+ "chartType":"ChartLine",
+ "currentService":"dolphinscheduler::master-server",
+ "currentEndpoint":"masetr/queue/take",
+ "unit":"ms"
+ },
+ {
+ "width":3,
+ "title":"Take Time Percentile",
+ "height":350,
+ "entityType":"Endpoint",
+ "independentSelector":true,
+ "metricType":"LABELED_VALUE",
+ "metricName":"endpoint_percentile",
+ "queryMetricType":"readLabeledMetricsValues",
+ "chartType":"ChartLine",
+ "metricLabels":"P50, P75, P90, P95, P99",
+ "labelsIndex":"0, 1, 2, 3, 4",
+ "currentService":"dolphinscheduler::master-server",
+ "currentEndpoint":"masetr/queue/take",
+ "unit":"ms"
+ },
+ {
+ "width":3,
+ "title":"Take Successful Rate",
+ "height":350,
+ "entityType":"Endpoint",
+ "independentSelector":true,
+ "metricType":"REGULAR_VALUE",
+ "metricName":"endpoint_sla",
+ "queryMetricType":"readMetricsValues",
+ "chartType":"ChartLine",
+ "currentService":"dolphinscheduler::master-server",
+ "currentEndpoint":"masetr/queue/take",
+ "unit":"%",
+ "aggregation":"/",
+ "aggregationNum":"100"
+ }
+ ]
+ }
+ ]
+ }
+ ]
+ # Activated as the DASHBOARD type, makes this templates added into the UI page automatically.
+ # False means providing a basic template, user needs to add it manually.
+ activated: true
+ # True means wouldn't show up on the dashboard. Only keeps the definition in the storage.
+ disabled: false
\ No newline at end of file
diff --git a/ext/skywalking/pom.xml b/ext/skywalking/pom.xml
new file mode 100644
index 0000000000..cb7121e056
--- /dev/null
+++ b/ext/skywalking/pom.xml
@@ -0,0 +1,218 @@
+
+
+
+
+ dolphinscheduler
+ org.apache.dolphinscheduler
+ 1.3.6-SNAPSHOT
+ ../../pom.xml
+
+ 4.0.0
+
+ dolphinscheduler-skywalking
+ dolphinscheduler-skywalking
+ jar
+
+
+ UTF-8
+ ${parent.version}
+
+ ${project.build.directory}/skywalking-agent/
+ ${agent.dir}/plugins/
+ ${agent.dir}/activations/
+ ${agent.dir}/config
+ ${agent.dir}/dashboard
+ org.apache.skywalking.apm.dependencies
+
+
+
+
+ org.apache.dolphinscheduler
+ dolphinscheduler-service
+ ${dolphinscheduler.version}
+ provided
+
+
+ org.apache.dolphinscheduler
+ dolphinscheduler-server
+ ${dolphinscheduler.version}
+ provided
+
+
+
+ org.apache.skywalking
+ apm-agent-core
+ ${skywalking.version}
+ provided
+
+
+ org.projectlombok
+ lombok
+ 1.18.16
+ provided
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 3.1.1
+
+
+ package
+
+ shade
+
+
+
+
+ net.bytebuddy:*
+
+
+
+
+ net.bytebuddy
+ ${agent-dependencies-shade.package}.net.bytebuddy
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-dependency-plugin
+
+
+ copy
+ package
+
+ copy
+
+
+
+
+
+ org.apache.skywalking
+ apm-agent
+ ${skywalking.version}
+ skywalking-agent.jar
+ ${agent.dir}
+
+
+
+
+ org.apache.dolphinscheduler
+ dolphinscheduler-skywalking
+ ${project.version}
+ ${agent-plugins.dir}
+
+
+ org.apache.skywalking
+ apm-zookeeper-3.4.x-plugin
+ ${skywalking.version}
+ ${agent-plugins.dir}
+
+
+ org.apache.skywalking
+ apm-quartz-scheduler-2.x-plugin
+ ${skywalking.version}
+ ${agent-plugins.dir}
+
+
+ org.apache.skywalking
+ apm-httpclient-commons
+ ${skywalking.version}
+ ${agent-plugins.dir}
+
+
+ org.apache.skywalking
+ apm-httpclient-3.x-plugin
+ ${skywalking.version}
+ ${agent-plugins.dir}
+
+
+ org.apache.skywalking
+ apm-httpClient-4.x-plugin
+ ${skywalking.version}
+ ${agent-plugins.dir}
+
+
+ org.apache.skywalking
+ spring-commons
+ ${skywalking.version}
+ ${agent-plugins.dir}
+
+
+ org.apache.skywalking
+ apm-spring-core-patch
+ ${skywalking.version}
+ ${agent-plugins.dir}
+
+
+ org.apache.skywalking
+ apm-springmvc-annotation-commons
+ ${skywalking.version}
+ ${agent-plugins.dir}
+
+
+ org.apache.skywalking
+ apm-springmvc-annotation-5.x-plugin
+ ${skywalking.version}
+ ${agent-plugins.dir}
+
+
+
+
+ org.apache.skywalking
+ apm-toolkit-logback-1.x-activation
+ ${skywalking.version}
+ ${agent-activations.dir}
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-antrun-plugin
+
+
+ package
+
+ run
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/MasterBaseTaskExecThreadConstructorInterceptor.java b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/MasterBaseTaskExecThreadConstructorInterceptor.java
new file mode 100644
index 0000000000..889c562b2b
--- /dev/null
+++ b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/MasterBaseTaskExecThreadConstructorInterceptor.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.dolphinscheduler.skywalking.plugin;
+
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
+
+public class MasterBaseTaskExecThreadConstructorInterceptor implements InstanceConstructorInterceptor {
+ @Override
+ public void onConstruct(EnhancedInstance objInst, Object[] allArguments) throws Throwable {
+ TaskInstance taskInstance = (TaskInstance) allArguments[0];
+ TaskContext taskContext = new TaskContext<>(taskInstance, ContextManager.capture());
+
+ objInst.setSkyWalkingDynamicField(taskContext);
+ }
+}
+
diff --git a/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/MasterBaseTaskExecThreadMethodInterceptor.java b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/MasterBaseTaskExecThreadMethodInterceptor.java
new file mode 100644
index 0000000000..5e8e3d64c3
--- /dev/null
+++ b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/MasterBaseTaskExecThreadMethodInterceptor.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.dolphinscheduler.skywalking.plugin;
+
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.server.master.runner.MasterBaseTaskExecThread;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+
+import java.lang.reflect.Method;
+
+import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_TASK_STATE;
+import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_TASK_TYPE;
+import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_TASK_INSTANCE_ID;
+import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_PROCESS_DEFINITION_ID;
+import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_PROCESS_INSTANCE_ID;
+import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_TASK_INSTANCE_NAME;
+import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_TASK_WORKER_GROUP;
+import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_EXECUTE_METHOD;
+import static org.apache.dolphinscheduler.skywalking.plugin.Utils.getProcessDefinitionId;
+
+public class MasterBaseTaskExecThreadMethodInterceptor implements InstanceMethodsAroundInterceptor {
+
+ @Override
+ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
+ TaskContext taskContext = (TaskContext) objInst.getSkyWalkingDynamicField();
+ TaskInstance taskInstance = taskContext.getCache();
+ String operationName = getOperationNamePrefix(taskInstance) + taskInstance.getName();
+
+ AbstractSpan span = ContextManager.createLocalSpan(operationName);
+ span.setComponent(Utils.DOLPHIN_SCHEDULER);
+ TAG_PROCESS_DEFINITION_ID.set(span, String.valueOf(taskInstance.getProcessDefinitionId()));
+ TAG_PROCESS_INSTANCE_ID.set(span, String.valueOf(taskInstance.getProcessInstanceId()));
+ TAG_TASK_TYPE.set(span, taskInstance.getTaskType());
+ TAG_TASK_INSTANCE_ID.set(span, String.valueOf(taskInstance.getId()));
+ TAG_TASK_INSTANCE_NAME.set(span, taskInstance.getName());
+ TAG_TASK_WORKER_GROUP.set(span, taskInstance.getWorkerGroup());
+ TAG_EXECUTE_METHOD.set(span, Utils.getMethodName(method));
+
+ ContextManager.continued(taskContext.getContextSnapshot());
+ }
+
+ @Override
+ public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes, Object ret) throws Throwable {
+ AbstractSpan span = ContextManager.activeSpan();
+
+ MasterBaseTaskExecThread original = (MasterBaseTaskExecThread) objInst;
+ TaskInstance taskInstance = original.getTaskInstance();
+ ExecutionStatus executionStatus = taskInstance.getState();
+
+ TAG_TASK_STATE.set(span, taskInstance.getState().getDescp());
+ if (!ExecutionStatus.SUCCESS.equals(executionStatus)) {
+ span.errorOccurred();
+ }
+
+ ContextManager.stopSpan();
+ return ret;
+ }
+
+ @Override
+ public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes, Throwable t) {
+ ContextManager.activeSpan().log(t);
+ }
+
+ private static String getOperationNamePrefix(TaskInstance taskInstance) {
+ String prefix = "";
+ if (taskInstance.isSubProcess()) {
+ prefix = "master/subprocess_task/";
+ } else if (taskInstance.isDependTask()) {
+ prefix = "master/depend_task/";
+ } else if (taskInstance.isConditionsTask()) {
+ prefix = "master/conditions_task/";
+ } else {
+ prefix = "master/task/";
+ }
+
+ return prefix + getProcessDefinitionId(taskInstance.getProcessDefinitionId()) + "/";
+ }
+}
+
diff --git a/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/MasterExecThreadConstructorInterceptor.java b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/MasterExecThreadConstructorInterceptor.java
new file mode 100644
index 0000000000..9847cc29a7
--- /dev/null
+++ b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/MasterExecThreadConstructorInterceptor.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.dolphinscheduler.skywalking.plugin;
+
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
+
+public class MasterExecThreadConstructorInterceptor implements InstanceConstructorInterceptor {
+ @Override
+ public void onConstruct(EnhancedInstance objInst, Object[] allArguments) throws Throwable {
+ ProcessInstance processInstance = (ProcessInstance) allArguments[0];
+ TaskContext taskContext = new TaskContext<>(processInstance, ContextManager.capture());
+
+ objInst.setSkyWalkingDynamicField(taskContext);
+ }
+}
diff --git a/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/MasterExecThreadMethodInterceptor.java b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/MasterExecThreadMethodInterceptor.java
new file mode 100644
index 0000000000..d76cb13410
--- /dev/null
+++ b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/MasterExecThreadMethodInterceptor.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.dolphinscheduler.skywalking.plugin;
+
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.RuntimeContext;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+
+import java.lang.reflect.Method;
+
+import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_PROJECT_ID;
+import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_PROCESS_STATE;
+import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_PROCESS_INSTANCE_ID;
+import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_PROCESS_INSTANCE_NAME;
+import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_PROCESS_INSTANCE_HOST;
+import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_PROCESS_DEFINITION_ID;
+import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_PROCESS_COMMAND_TYPE;
+import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_PROCESS_WORKER_GROUP;
+import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_PROCESS_TIMEOUT;
+import static org.apache.dolphinscheduler.skywalking.plugin.Utils.MASTER_PROCESS_EXECUTION_STATUS;
+import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_EXECUTE_METHOD;
+import static org.apache.dolphinscheduler.skywalking.plugin.Utils.getProjectId;
+
+public class MasterExecThreadMethodInterceptor implements InstanceMethodsAroundInterceptor {
+ private static final String OPERATION_NAME_PREFIX = "master/process/";
+
+ @Override
+ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
+ TaskContext taskContext = (TaskContext) objInst.getSkyWalkingDynamicField();
+ ProcessInstance processInstance = taskContext.getCache();
+ ProcessDefinition processDefinition = processInstance.getProcessDefinition();
+ String operationName = OPERATION_NAME_PREFIX + getProjectId(processDefinition.getProjectId()) + "/" + processDefinition.getName();
+
+ AbstractSpan span = ContextManager.createLocalSpan(operationName);
+ span.setComponent(Utils.DOLPHIN_SCHEDULER);
+ TAG_PROJECT_ID.set(span, String.valueOf(processDefinition.getProjectId()));
+ TAG_PROCESS_INSTANCE_ID.set(span, String.valueOf(processInstance.getId()));
+ TAG_PROCESS_INSTANCE_NAME.set(span, processInstance.getName());
+ TAG_PROCESS_INSTANCE_HOST.set(span, processInstance.getHost());
+ TAG_PROCESS_DEFINITION_ID.set(span, String.valueOf(processInstance.getProcessDefinitionId()));
+ TAG_PROCESS_COMMAND_TYPE.set(span, processInstance.getCommandType().name());
+ TAG_PROCESS_WORKER_GROUP.set(span, processInstance.getWorkerGroup());
+ TAG_PROCESS_TIMEOUT.set(span, String.valueOf(processInstance.getTimeout()));
+ TAG_EXECUTE_METHOD.set(span, Utils.getMethodName(method));
+
+ ContextManager.continued(taskContext.getContextSnapshot());
+ }
+
+ @Override
+ public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes, Object ret) throws Throwable {
+ AbstractSpan span = ContextManager.activeSpan();
+
+ RuntimeContext runtimeContext = ContextManager.getRuntimeContext();
+ ExecutionStatus executionStatus = (ExecutionStatus) runtimeContext.get(MASTER_PROCESS_EXECUTION_STATUS);
+ if (executionStatus == null) {
+ ProcessInstance processInstance = (ProcessInstance) objInst.getSkyWalkingDynamicField();
+ executionStatus = processInstance.getState();
+ }
+
+ TAG_PROCESS_STATE.set(span, executionStatus.getDescp());
+ if (!ExecutionStatus.SUCCESS.equals(executionStatus)) {
+ span.errorOccurred();
+ }
+
+ ContextManager.stopSpan();
+ return ret;
+ }
+
+ @Override
+ public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes, Throwable t) {
+ ContextManager.activeSpan().log(t);
+ }
+}
diff --git a/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/MasterExecThreadStateCacheInterceptor.java b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/MasterExecThreadStateCacheInterceptor.java
new file mode 100644
index 0000000000..e24ff0f0ed
--- /dev/null
+++ b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/MasterExecThreadStateCacheInterceptor.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.dolphinscheduler.skywalking.plugin;
+
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.RuntimeContext;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+
+import java.lang.reflect.Method;
+
+import static org.apache.dolphinscheduler.skywalking.plugin.Utils.MASTER_PROCESS_EXECUTION_STATUS;
+
+public class MasterExecThreadStateCacheInterceptor implements InstanceMethodsAroundInterceptor {
+
+ @Override
+ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
+ }
+
+ @Override
+ public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes, Object ret) throws Throwable {
+ ExecutionStatus executionStatus = (ExecutionStatus) ret;
+ RuntimeContext runtimeContext = ContextManager.getRuntimeContext();
+ runtimeContext.put(MASTER_PROCESS_EXECUTION_STATUS, executionStatus);
+
+ return ret;
+ }
+
+ @Override
+ public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes, Throwable t) {
+ }
+}
diff --git a/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/MasterSchedulerServiceMethodInterceptor.java b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/MasterSchedulerServiceMethodInterceptor.java
new file mode 100644
index 0000000000..9eeebf3199
--- /dev/null
+++ b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/MasterSchedulerServiceMethodInterceptor.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.dolphinscheduler.skywalking.plugin;
+
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.tag.Tags;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+
+import java.lang.reflect.Method;
+
+import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_EXECUTE_METHOD;
+
+public class MasterSchedulerServiceMethodInterceptor implements InstanceMethodsAroundInterceptor {
+ private static final String OPERATION_NAME = "master/schedule/process";
+
+ @Override
+ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
+ AbstractSpan span = ContextManager.createLocalSpan(OPERATION_NAME);
+ span.setComponent(Utils.DOLPHIN_SCHEDULER);
+ Tags.LOGIC_ENDPOINT.set(span, Tags.VAL_LOCAL_SPAN_AS_LOGIC_ENDPOINT);
+ TAG_EXECUTE_METHOD.set(span, Utils.getMethodName(method));
+ }
+
+ @Override
+ public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes, Object ret) throws Throwable {
+ ContextManager.stopSpan();
+ return ret;
+ }
+
+ @Override
+ public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes, Throwable t) {
+ ContextManager.activeSpan().log(t);
+ }
+}
diff --git a/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/NettyRemoteChannelConstructorInterceptor.java b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/NettyRemoteChannelConstructorInterceptor.java
new file mode 100644
index 0000000000..33a4e1afab
--- /dev/null
+++ b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/NettyRemoteChannelConstructorInterceptor.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.dolphinscheduler.skywalking.plugin;
+
+import io.netty.channel.Channel;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
+
+public class NettyRemoteChannelConstructorInterceptor implements InstanceConstructorInterceptor {
+ @Override
+ public void onConstruct(EnhancedInstance objInst, Object[] allArguments) throws Throwable {
+ Channel channel = (Channel) allArguments[0];
+ objInst.setSkyWalkingDynamicField(channel.remoteAddress().toString());
+ }
+}
+
diff --git a/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/NettyRemoteChannelMethodInterceptor.java b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/NettyRemoteChannelMethodInterceptor.java
new file mode 100644
index 0000000000..710895fc4b
--- /dev/null
+++ b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/NettyRemoteChannelMethodInterceptor.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.dolphinscheduler.skywalking.plugin;
+
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandContext;
+import org.apache.skywalking.apm.agent.core.context.CarrierItem;
+import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+
+import java.lang.reflect.Method;
+
+import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_EXECUTE_METHOD;
+
+public class NettyRemoteChannelMethodInterceptor implements InstanceMethodsAroundInterceptor {
+ private static final String OPERATION_NAME_PREFIX = "rpc/command/send/";
+
+ @Override
+ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
+ String remoteAddress = (String) objInst.getSkyWalkingDynamicField();
+ Command command = (Command) allArguments[0];
+ CommandContext commandContext = command.getContext();
+ String operationName = OPERATION_NAME_PREFIX + command.getType().name();
+
+ ContextCarrier contextCarrier = new ContextCarrier();
+ AbstractSpan span = ContextManager.createExitSpan(operationName, contextCarrier, remoteAddress);
+ span.setComponent(Utils.DOLPHIN_SCHEDULER);
+ SpanLayer.asRPCFramework(span);
+ TAG_EXECUTE_METHOD.set(span, Utils.getMethodName(method));
+
+ CarrierItem item = contextCarrier.items();
+ while (item.hasNext()) {
+ item = item.next();
+ commandContext.put(item.getHeadKey(), item.getHeadValue());
+ }
+ }
+
+ @Override
+ public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes, Object ret) throws Throwable {
+ ContextManager.stopSpan();
+ return ret;
+ }
+
+ @Override
+ public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes, Throwable t) {
+ ContextManager.activeSpan().log(t);
+ }
+}
diff --git a/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/NettyRemotingClientMethodInterceptor.java b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/NettyRemotingClientMethodInterceptor.java
new file mode 100644
index 0000000000..325d5e1097
--- /dev/null
+++ b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/NettyRemotingClientMethodInterceptor.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.dolphinscheduler.skywalking.plugin;
+
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandContext;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.skywalking.apm.agent.core.context.CarrierItem;
+import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+
+import java.lang.reflect.Method;
+
+import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_EXECUTE_METHOD;
+
+public class NettyRemotingClientMethodInterceptor implements InstanceMethodsAroundInterceptor {
+ private static final String OPERATION_NAME_PREFIX = "rpc/command/send/";
+
+ @Override
+ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
+ Host host = (Host) allArguments[0];
+ Command command = (Command) allArguments[1];
+ CommandContext commandContext = command.getContext();
+ String operationName = OPERATION_NAME_PREFIX + command.getType().name();
+
+ ContextCarrier contextCarrier = new ContextCarrier();
+ AbstractSpan span = ContextManager.createExitSpan(operationName, contextCarrier, host.getAddress());
+ span.setComponent(Utils.DOLPHIN_SCHEDULER);
+ SpanLayer.asRPCFramework(span);
+ TAG_EXECUTE_METHOD.set(span, Utils.getMethodName(method));
+
+ CarrierItem item = contextCarrier.items();
+ while (item.hasNext()) {
+ item = item.next();
+ commandContext.put(item.getHeadKey(), item.getHeadValue());
+ }
+ }
+
+ @Override
+ public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes, Object ret) throws Throwable {
+ ContextManager.stopSpan();
+ return ret;
+ }
+
+ @Override
+ public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes, Throwable t) {
+ ContextManager.activeSpan().log(t);
+ }
+}
diff --git a/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/NettyRequestProcessorMethodInterceptor.java b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/NettyRequestProcessorMethodInterceptor.java
new file mode 100644
index 0000000000..764b49905d
--- /dev/null
+++ b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/NettyRequestProcessorMethodInterceptor.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.dolphinscheduler.skywalking.plugin;
+
+import io.netty.channel.Channel;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandContext;
+import org.apache.skywalking.apm.agent.core.context.CarrierItem;
+import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+
+import java.lang.reflect.Method;
+
+import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_NETTY_REMOTE_ADDRESS;
+import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_EXECUTE_METHOD;
+
+public class NettyRequestProcessorMethodInterceptor implements InstanceMethodsAroundInterceptor {
+ private static final String OPERATION_NAME_PREFIX = "rpc/command/process/";
+
+ @Override
+ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
+ Channel channel = (Channel) allArguments[0];
+ Command command = (Command) allArguments[1];
+ CommandContext commandContext = command.getContext();
+ String operationName = OPERATION_NAME_PREFIX + command.getType().name();
+
+ ContextCarrier contextCarrier = new ContextCarrier();
+ CarrierItem item = contextCarrier.items();
+ while (item.hasNext()) {
+ item = item.next();
+ item.setHeadValue(commandContext.get(item.getHeadKey()));
+ }
+
+ AbstractSpan span = ContextManager.createEntrySpan(operationName, contextCarrier);
+ span.setComponent(Utils.DOLPHIN_SCHEDULER);
+ SpanLayer.asRPCFramework(span);
+ TAG_NETTY_REMOTE_ADDRESS.set(span, channel.remoteAddress().toString());
+ TAG_EXECUTE_METHOD.set(span, Utils.getMethodName(method));
+ }
+
+ @Override
+ public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes, Object ret) throws Throwable {
+ ContextManager.stopSpan();
+ return ret;
+ }
+
+ @Override
+ public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes, Throwable t) {
+ ContextManager.activeSpan().log(t);
+ }
+}
diff --git a/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskContext.java b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskContext.java
new file mode 100644
index 0000000000..2aa929e933
--- /dev/null
+++ b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskContext.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.dolphinscheduler.skywalking.plugin;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
+
+@Getter
+@AllArgsConstructor
+public class TaskContext {
+ private T cache;
+ private ContextSnapshot contextSnapshot;
+}
diff --git a/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskExecuteConstructorInterceptor.java b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskExecuteConstructorInterceptor.java
new file mode 100644
index 0000000000..16ddc85a0f
--- /dev/null
+++ b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskExecuteConstructorInterceptor.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.dolphinscheduler.skywalking.plugin;
+
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
+
+public class TaskExecuteConstructorInterceptor implements InstanceConstructorInterceptor {
+ @Override
+ public void onConstruct(EnhancedInstance objInst, Object[] allArguments) throws Throwable {
+ TaskExecutionContext taskExecutionContext = (TaskExecutionContext) allArguments[0];
+ TaskContext taskContext = new TaskContext<>(taskExecutionContext, ContextManager.capture());
+
+ objInst.setSkyWalkingDynamicField(taskContext);
+ }
+}
+
diff --git a/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskExecuteMethodInterceptor.java b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskExecuteMethodInterceptor.java
new file mode 100644
index 0000000000..99a8ce22d1
--- /dev/null
+++ b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskExecuteMethodInterceptor.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.dolphinscheduler.skywalking.plugin;
+
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+
+import java.lang.reflect.Method;
+
+import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_TASK_PARAMS;
+import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_TASK_STATE;
+import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_EXECUTE_METHOD;
+
+public class TaskExecuteMethodInterceptor implements InstanceMethodsAroundInterceptor {
+ private static final int DEFAULT_TASK_STATUS_CODE = -1;
+ private static final int TASK_PARAMS_MAX_LENGTH = 2048;
+ private static final String OPERATION_NAME_PREFIX = "worker/";
+
+ @Override
+ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
+ TaskContext taskContext = (TaskContext) objInst.getSkyWalkingDynamicField();
+ String operationName = OPERATION_NAME_PREFIX + taskContext.getCache().getTaskType() + "/" + method.getName();
+
+ AbstractSpan span = ContextManager.createLocalSpan(operationName);
+ span.setComponent(Utils.DOLPHIN_SCHEDULER);
+ TAG_EXECUTE_METHOD.set(span, Utils.getMethodName(method));
+ }
+
+ @Override
+ public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes, Object ret) throws Throwable {
+ AbstractSpan span = ContextManager.activeSpan();
+
+ AbstractTask task = (AbstractTask) objInst;
+ int statusCode = task.getExitStatusCode();
+ ExecutionStatus status = task.getExitStatus();
+ TAG_TASK_STATE.set(span, status.getDescp());
+ if (statusCode != DEFAULT_TASK_STATUS_CODE && ExecutionStatus.FAILURE.equals(status)) {
+ logTaskParams(objInst);
+ span.errorOccurred();
+ }
+
+ ContextManager.stopSpan();
+ return ret;
+ }
+
+ @Override
+ public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes, Throwable t) {
+ logTaskParams(objInst);
+
+ ContextManager.activeSpan().log(t);
+ }
+
+ private void logTaskParams(EnhancedInstance objInst) {
+ TaskContext taskContext = (TaskContext) objInst.getSkyWalkingDynamicField();
+ TaskExecutionContext executionContext = taskContext.getCache();
+ String taskParams = executionContext.getTaskParams();
+
+ String limitTaskParams = taskParams;
+ if (taskParams.length() > TASK_PARAMS_MAX_LENGTH) {
+ limitTaskParams = taskParams.substring(TASK_PARAMS_MAX_LENGTH);
+ }
+
+ AbstractSpan span = ContextManager.activeSpan();
+ TAG_TASK_PARAMS.set(span, limitTaskParams);
+ }
+}
diff --git a/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskExecuteThreadConstructorInterceptor.java b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskExecuteThreadConstructorInterceptor.java
new file mode 100644
index 0000000000..d6781f8185
--- /dev/null
+++ b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskExecuteThreadConstructorInterceptor.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.dolphinscheduler.skywalking.plugin;
+
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
+
+public class TaskExecuteThreadConstructorInterceptor implements InstanceConstructorInterceptor {
+ @Override
+ public void onConstruct(EnhancedInstance objInst, Object[] allArguments) throws Throwable {
+ TaskExecutionContext taskExecutionContext = (TaskExecutionContext) allArguments[0];
+ TaskContext taskContext = new TaskContext<>(taskExecutionContext, ContextManager.capture());
+
+ objInst.setSkyWalkingDynamicField(taskContext);
+ }
+}
+
diff --git a/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskExecuteThreadMethodInterceptor.java b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskExecuteThreadMethodInterceptor.java
new file mode 100644
index 0000000000..e263ed4e63
--- /dev/null
+++ b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskExecuteThreadMethodInterceptor.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.dolphinscheduler.skywalking.plugin;
+
+import org.apache.dolphinscheduler.common.enums.TaskType;
+import org.apache.dolphinscheduler.common.utils.EnumUtils;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+
+import java.lang.reflect.Method;
+
+import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_TASK_INSTANCE_ID;
+import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_TASK_INSTANCE_HOST;
+import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_TASK_EXECUTE_PATH;
+import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_TASK_LOG_PATH;
+import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_PROJECT_ID;
+import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_PROCESS_DEFINITION_ID;
+import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_PROCESS_INSTANCE_ID;
+import static org.apache.dolphinscheduler.skywalking.plugin.Utils.getProjectId;
+import static org.apache.dolphinscheduler.skywalking.plugin.Utils.getProcessDefinitionId;
+import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_EXECUTE_METHOD;
+
+public class TaskExecuteThreadMethodInterceptor implements InstanceMethodsAroundInterceptor {
+ private static final String OPERATION_NAME_PREFIX = "worker/execute/";
+
+ @Override
+ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
+ TaskContext taskContext = (TaskContext) objInst.getSkyWalkingDynamicField();
+ TaskExecutionContext executionContext = taskContext.getCache();
+ TaskType type = EnumUtils.getEnum(TaskType.class, executionContext.getTaskType());
+ String operationName = OPERATION_NAME_PREFIX
+ + type.getDescp()
+ + "/" + getProjectId(executionContext.getProjectId())
+ + "/" + getProcessDefinitionId(executionContext.getProcessDefineId())
+ + "/" + executionContext.getTaskName();
+
+ AbstractSpan span = ContextManager.createLocalSpan(operationName);
+ span.setComponent(Utils.DOLPHIN_SCHEDULER);
+
+ TAG_PROJECT_ID.set(span, String.valueOf(executionContext.getProjectId()));
+ TAG_PROCESS_DEFINITION_ID.set(span, String.valueOf(executionContext.getProcessDefineId()));
+ TAG_PROCESS_INSTANCE_ID.set(span, String.valueOf(executionContext.getProcessInstanceId()));
+ TAG_TASK_INSTANCE_HOST.set(span, executionContext.getHost());
+ TAG_TASK_INSTANCE_ID.set(span, String.valueOf(executionContext.getTaskInstanceId()));
+ TAG_TASK_EXECUTE_PATH.set(span, executionContext.getExecutePath());
+ TAG_TASK_LOG_PATH.set(span, executionContext.getLogPath());
+ TAG_EXECUTE_METHOD.set(span, Utils.getMethodName(method));
+
+ ContextManager.continued(taskContext.getContextSnapshot());
+ }
+
+ @Override
+ public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes, Object ret) throws Throwable {
+ ContextManager.stopSpan();
+ return ret;
+ }
+
+ @Override
+ public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes, Throwable t) {
+ ContextManager.activeSpan().log(t);
+ }
+}
+
diff --git a/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskPriorityQueueConsumerMethodInterceptor.java b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskPriorityQueueConsumerMethodInterceptor.java
new file mode 100644
index 0000000000..20748745c2
--- /dev/null
+++ b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskPriorityQueueConsumerMethodInterceptor.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.dolphinscheduler.skywalking.plugin;
+
+import org.apache.dolphinscheduler.service.queue.TaskPriority;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
+import org.apache.skywalking.apm.agent.core.context.tag.Tags;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+
+import java.lang.reflect.Method;
+import java.util.Map;
+
+import static org.apache.dolphinscheduler.skywalking.plugin.Utils.*;
+
+public class TaskPriorityQueueConsumerMethodInterceptor implements InstanceMethodsAroundInterceptor {
+ private static final String OPERATION_NAME = "master/queue/take";
+
+ @Override
+ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
+ TaskPriority taskPriority = (TaskPriority) allArguments[0];
+ Map taskContext = taskPriority.getContext();
+
+ AbstractSpan span = ContextManager.createLocalSpan(OPERATION_NAME);
+ span.setComponent(Utils.DOLPHIN_SCHEDULER);
+ TAG_TASK_ID.set(span, String.valueOf(taskPriority.getTaskId()));
+ TAG_PROCESS_INSTANCE_ID.set(span, String.valueOf(taskPriority.getProcessInstanceId()));
+ Tags.LOGIC_ENDPOINT.set(span, Tags.VAL_LOCAL_SPAN_AS_LOGIC_ENDPOINT);
+ TAG_EXECUTE_METHOD.set(span, Utils.getMethodName(method));
+
+ ContextSnapshot contextSnapshot = (ContextSnapshot) taskContext.get(SKYWALKING_TRACING_CONTEXT);
+ ContextManager.continued(contextSnapshot);
+ }
+
+ @Override
+ public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes, Object ret) throws Throwable {
+ AbstractSpan span = ContextManager.activeSpan();
+
+ boolean handleResult = (boolean) ret;
+ if (!handleResult) {
+ span.errorOccurred();
+ }
+
+ ContextManager.stopSpan();
+ return ret;
+ }
+
+ @Override
+ public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes, Throwable t) {
+ ContextManager.activeSpan().log(t);
+ }
+}
diff --git a/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskPriorityQueueImplMethodInterceptor.java b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskPriorityQueueImplMethodInterceptor.java
new file mode 100644
index 0000000000..10ab60f4f5
--- /dev/null
+++ b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskPriorityQueueImplMethodInterceptor.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.dolphinscheduler.skywalking.plugin;
+
+import org.apache.dolphinscheduler.service.queue.TaskPriority;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.tag.Tags;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_TASK_ID;
+import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_PROCESS_INSTANCE_ID;
+import static org.apache.dolphinscheduler.skywalking.plugin.Utils.SKYWALKING_TRACING_CONTEXT;
+import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_EXECUTE_METHOD;
+
+public class TaskPriorityQueueImplMethodInterceptor implements InstanceMethodsAroundInterceptor {
+ private static final String OPERATION_NAME = "master/queue/put";
+
+ @Override
+ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
+ boolean isActive = ContextManager.isActive();
+ TaskPriority taskPriority = (TaskPriority) allArguments[0];
+
+ AbstractSpan span = ContextManager.createLocalSpan(OPERATION_NAME);
+ span.setComponent(Utils.DOLPHIN_SCHEDULER);
+ TAG_TASK_ID.set(span, String.valueOf(taskPriority.getTaskId()));
+ TAG_PROCESS_INSTANCE_ID.set(span, String.valueOf(taskPriority.getProcessInstanceId()));
+ Tags.LOGIC_ENDPOINT.set(span, Tags.VAL_LOCAL_SPAN_AS_LOGIC_ENDPOINT);
+ TAG_EXECUTE_METHOD.set(span, Utils.getMethodName(method));
+
+ if (isActive) {
+ Map taskPriorityContext = taskPriority.getContext();
+ if (taskPriorityContext == null) {
+ taskPriorityContext = new HashMap<>();
+ taskPriority.setContext(taskPriorityContext);
+ }
+ taskPriorityContext.put(SKYWALKING_TRACING_CONTEXT, ContextManager.capture());
+ }
+ }
+
+ @Override
+ public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes, Object ret) throws Throwable {
+ ContextManager.stopSpan();
+ return ret;
+ }
+
+ @Override
+ public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes, Throwable t) {
+ ContextManager.activeSpan().log(t);
+ }
+}
diff --git a/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/Utils.java b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/Utils.java
new file mode 100644
index 0000000000..2dc6bb84bb
--- /dev/null
+++ b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/Utils.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.dolphinscheduler.skywalking.plugin;
+
+import org.apache.skywalking.apm.agent.core.context.tag.StringTag;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+import org.apache.skywalking.apm.network.trace.component.OfficialComponent;
+
+import java.lang.reflect.Method;
+
+public class Utils {
+ public static final OfficialComponent DOLPHIN_SCHEDULER = ComponentsDefine.DOLPHIN_SCHEDULER;
+
+ public static final String SKYWALKING_TRACING_CONTEXT = "skywalking_tracing_context";
+ public static final String MASTER_PROCESS_EXECUTION_STATUS = "master_process_execution_status";
+
+ public static final StringTag TAG_PROJECT_ID = new StringTag("project.id");
+
+ public static final StringTag TAG_PROCESS_STATE = new StringTag("process.state");
+ public static final StringTag TAG_PROCESS_INSTANCE_ID = new StringTag("process.instance.id");
+ public static final StringTag TAG_PROCESS_INSTANCE_NAME = new StringTag("process.instance.name");
+ public static final StringTag TAG_PROCESS_INSTANCE_HOST = new StringTag("process.instance.host");
+ public static final StringTag TAG_PROCESS_DEFINITION_ID = new StringTag("process.definitionId");
+ public static final StringTag TAG_PROCESS_COMMAND_TYPE = new StringTag("process.commandType");
+ public static final StringTag TAG_PROCESS_WORKER_GROUP = new StringTag("process.workerGroup");
+ public static final StringTag TAG_PROCESS_TIMEOUT = new StringTag("process.timeout");
+
+ public static final StringTag TAG_TASK_STATE = new StringTag("task.state");
+ public static final StringTag TAG_TASK_TYPE = new StringTag("take.type");
+ public static final StringTag TAG_TASK_INSTANCE_ID = new StringTag("task.instance.id");
+ public static final StringTag TAG_TASK_INSTANCE_NAME = new StringTag("task.instanceName");
+ public static final StringTag TAG_TASK_INSTANCE_HOST = new StringTag("task.instance.host");
+ public static final StringTag TAG_TASK_WORKER_GROUP = new StringTag("task.workerGroup");
+ public static final StringTag TAG_TASK_ID = new StringTag("task.id");
+ public static final StringTag TAG_TASK_EXECUTE_PATH = new StringTag("task.executePath");
+ public static final StringTag TAG_TASK_LOG_PATH = new StringTag("task.logPath");
+ public static final StringTag TAG_TASK_PARAMS = new StringTag("task.params");
+
+ public static final StringTag TAG_NETTY_REMOTE_ADDRESS = new StringTag("netty.remoteAddress");
+
+ public static final StringTag TAG_EXECUTE_METHOD = new StringTag("execute.method");
+
+ public static String getProjectId(int id) {
+ return "project_id_" + id;
+ }
+
+ public static String getProcessDefinitionId(int id) {
+ return "process_definition_id_" + id;
+ }
+
+ public static String getMethodName(Method method) {
+ return method.getDeclaringClass().getTypeName() + "#" + method.getName();
+ }
+}
diff --git a/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/MasterBaseTaskExecThreadInterceptorInstrumentation.java b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/MasterBaseTaskExecThreadInterceptorInstrumentation.java
new file mode 100644
index 0000000000..c9323817d5
--- /dev/null
+++ b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/MasterBaseTaskExecThreadInterceptorInstrumentation.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.dolphinscheduler.skywalking.plugin.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+import static net.bytebuddy.matcher.ElementMatchers.isPublic;
+import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
+
+/**
+ * Enhance {@link org.apache.dolphinscheduler.server.master.runner.MasterBaseTaskExecThread} instance and intercept `call` methods,
+ * this class provides an isolated thread for running job.
+ *
+ * @see org.apache.dolphinscheduler.skywalking.plugin.MasterBaseTaskExecThreadConstructorInterceptor
+ * @see org.apache.dolphinscheduler.skywalking.plugin.MasterBaseTaskExecThreadMethodInterceptor
+ */
+public class MasterBaseTaskExecThreadInterceptorInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
+ private static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.dolphinscheduler.skywalking.plugin.MasterBaseTaskExecThreadConstructorInterceptor";
+ private static final String METHOD_INTERCEPTOR_CLASS = "org.apache.dolphinscheduler.skywalking.plugin.MasterBaseTaskExecThreadMethodInterceptor";
+ private static final String ENHANC_CLASS = "org.apache.dolphinscheduler.server.master.runner.MasterBaseTaskExecThread";
+
+ @Override
+ protected ClassMatch enhanceClass() {
+ return byName(ENHANC_CLASS);
+ }
+
+ @Override
+ public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+ return new ConstructorInterceptPoint[] {
+ new ConstructorInterceptPoint() {
+ @Override
+ public ElementMatcher getConstructorMatcher() {
+ return takesArguments(1)
+ .and(takesArgument(0, named("org.apache.dolphinscheduler.dao.entity.TaskInstance")));
+ }
+
+ @Override
+ public String getConstructorInterceptor() {
+ return CONSTRUCTOR_INTERCEPTOR_CLASS;
+ }
+ }
+ };
+ }
+
+ @Override
+ public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
+ return new InstanceMethodsInterceptPoint[] {
+ new InstanceMethodsInterceptPoint() {
+ @Override
+ public ElementMatcher getMethodsMatcher() {
+ return named("call")
+ .and(isPublic())
+ .and(takesArguments(0));
+ }
+
+ @Override
+ public String getMethodsInterceptor() {
+ return METHOD_INTERCEPTOR_CLASS;
+ }
+
+ @Override
+ public boolean isOverrideArgs() {
+ return false;
+ }
+ }
+ };
+ }
+}
diff --git a/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/MasterExecThreadInterceptorInstrumentation.java b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/MasterExecThreadInterceptorInstrumentation.java
new file mode 100644
index 0000000000..e786b052ec
--- /dev/null
+++ b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/MasterExecThreadInterceptorInstrumentation.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.dolphinscheduler.skywalking.plugin.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+import static net.bytebuddy.matcher.ElementMatchers.isPublic;
+import static net.bytebuddy.matcher.ElementMatchers.returns;
+import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
+
+/**
+ * Enhance {@link org.apache.dolphinscheduler.server.master.runner.MasterExecThread} instance and intercept `run` and `getProcessInstanceState` methods,
+ * the `run` method is a unified entrance of scheduled job.
+ *
+ * @see org.apache.dolphinscheduler.skywalking.plugin.MasterExecThreadConstructorInterceptor
+ * @see org.apache.dolphinscheduler.skywalking.plugin.MasterExecThreadMethodInterceptor
+ * @see org.apache.dolphinscheduler.skywalking.plugin.MasterExecThreadStateCacheInterceptor
+ */
+public class MasterExecThreadInterceptorInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
+ private static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.dolphinscheduler.skywalking.plugin.MasterExecThreadConstructorInterceptor";
+ private static final String EXEC_PROCESS_METHOD_INTERCEPTOR_CLASS = "org.apache.dolphinscheduler.skywalking.plugin.MasterExecThreadMethodInterceptor";
+ private static final String CACHE_STATE_METHOD_INTERCEPTOR_CLASS = "org.apache.dolphinscheduler.skywalking.plugin.MasterExecThreadStateCacheInterceptor";
+ private static final String ENHANC_CLASS = "org.apache.dolphinscheduler.server.master.runner.MasterExecThread";
+
+ @Override
+ protected ClassMatch enhanceClass() {
+ return byName(ENHANC_CLASS);
+ }
+
+ @Override
+ public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+ return new ConstructorInterceptPoint[] {
+ new ConstructorInterceptPoint() {
+ @Override
+ public ElementMatcher getConstructorMatcher() {
+ return takesArgument(0, named("org.apache.dolphinscheduler.dao.entity.ProcessInstance"));
+ }
+
+ @Override
+ public String getConstructorInterceptor() {
+ return CONSTRUCTOR_INTERCEPTOR_CLASS;
+ }
+ }
+ };
+ }
+
+ @Override
+ public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
+ return new InstanceMethodsInterceptPoint[] {
+ new InstanceMethodsInterceptPoint() {
+ @Override
+ public ElementMatcher getMethodsMatcher() {
+ return named("run")
+ .and(isPublic())
+ .and(takesArguments(0));
+ }
+
+ @Override
+ public String getMethodsInterceptor() {
+ return EXEC_PROCESS_METHOD_INTERCEPTOR_CLASS;
+ }
+
+ @Override
+ public boolean isOverrideArgs() {
+ return false;
+ }
+ },
+ new InstanceMethodsInterceptPoint() {
+ @Override
+ public ElementMatcher getMethodsMatcher() {
+ return named("getProcessInstanceState")
+ .and(returns(named("org.apache.dolphinscheduler.common.enums.ExecutionStatus")));
+ }
+
+ @Override
+ public String getMethodsInterceptor() {
+ return CACHE_STATE_METHOD_INTERCEPTOR_CLASS;
+ }
+
+ @Override
+ public boolean isOverrideArgs() {
+ return false;
+ }
+ }
+ };
+ }
+}
diff --git a/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/MasterSchedulerServiceInterceptorInstrumentation.java b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/MasterSchedulerServiceInterceptorInstrumentation.java
new file mode 100644
index 0000000000..db1e498809
--- /dev/null
+++ b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/MasterSchedulerServiceInterceptorInstrumentation.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.dolphinscheduler.skywalking.plugin.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
+
+/**
+ * Enhance {@link org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService} instance and intercept `scheduleProcess` method,
+ * this method is a scheduler entrance of job.
+ *
+ * @see org.apache.dolphinscheduler.skywalking.plugin.MasterSchedulerServiceMethodInterceptor
+ */
+public class MasterSchedulerServiceInterceptorInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
+ private static final String METHOD_INTERCEPTOR_CLASS = "org.apache.dolphinscheduler.skywalking.plugin.MasterSchedulerServiceMethodInterceptor";
+ private static final String ENHANC_CLASS = "org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService";
+
+ @Override
+ protected ClassMatch enhanceClass() {
+ return byName(ENHANC_CLASS);
+ }
+
+ @Override
+ public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+ return new ConstructorInterceptPoint[0];
+ }
+
+ @Override
+ public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
+ return new InstanceMethodsInterceptPoint[] {
+ new InstanceMethodsInterceptPoint() {
+ @Override
+ public ElementMatcher getMethodsMatcher() {
+ return named("scheduleProcess");
+ }
+
+ @Override
+ public String getMethodsInterceptor() {
+ return METHOD_INTERCEPTOR_CLASS;
+ }
+
+ @Override
+ public boolean isOverrideArgs() {
+ return false;
+ }
+ }
+ };
+ }
+}
diff --git a/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/NettyRemoteChannelInterceptorInstrumentation.java b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/NettyRemoteChannelInterceptorInstrumentation.java
new file mode 100644
index 0000000000..38e7943a69
--- /dev/null
+++ b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/NettyRemoteChannelInterceptorInstrumentation.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.dolphinscheduler.skywalking.plugin.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
+
+/**
+ * Enhance {@link org.apache.dolphinscheduler.server.worker.processor.NettyRemoteChannel} instance and intercept `writeAndFlush` method,
+ * this method send message to remote server.
+ *
+ * @see org.apache.dolphinscheduler.skywalking.plugin.NettyRemoteChannelConstructorInterceptor
+ * @see org.apache.dolphinscheduler.skywalking.plugin.NettyRemoteChannelMethodInterceptor
+ */
+public class NettyRemoteChannelInterceptorInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
+ private static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.dolphinscheduler.skywalking.plugin.NettyRemoteChannelConstructorInterceptor";
+ private static final String METHOD_INTERCEPTOR_CLASS = "org.apache.dolphinscheduler.skywalking.plugin.NettyRemoteChannelMethodInterceptor";
+ private static final String ENHANC_CLASS = "org.apache.dolphinscheduler.server.worker.processor.NettyRemoteChannel";
+
+ @Override
+ protected ClassMatch enhanceClass() {
+ return byName(ENHANC_CLASS);
+ }
+
+ @Override
+ public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+ return new ConstructorInterceptPoint[] {
+ new ConstructorInterceptPoint() {
+ @Override
+ public ElementMatcher getConstructorMatcher() {
+ return takesArgument(0, named("io.netty.channel.Channel"));
+ }
+
+ @Override
+ public String getConstructorInterceptor() {
+ return CONSTRUCTOR_INTERCEPTOR_CLASS;
+ }
+ }
+ };
+ }
+
+ @Override
+ public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
+ return new InstanceMethodsInterceptPoint[] {
+ new InstanceMethodsInterceptPoint() {
+ @Override
+ public ElementMatcher getMethodsMatcher() {
+ return named("writeAndFlush")
+ .and(takesArguments(1))
+ .and(takesArgument(0, named("org.apache.dolphinscheduler.remote.command.Command")));
+ }
+
+ @Override
+ public String getMethodsInterceptor() {
+ return METHOD_INTERCEPTOR_CLASS;
+ }
+
+ @Override
+ public boolean isOverrideArgs() {
+ return false;
+ }
+ }
+ };
+ }
+}
diff --git a/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/NettyRemotingClientInterceptorInstrumentation.java b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/NettyRemotingClientInterceptorInstrumentation.java
new file mode 100644
index 0000000000..4ddba81d0a
--- /dev/null
+++ b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/NettyRemotingClientInterceptorInstrumentation.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.dolphinscheduler.skywalking.plugin.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
+
+/**
+ * Enhance {@link org.apache.dolphinscheduler.remote.NettyRemotingClient} instance and intercept `send` method,
+ * this method send message to remote server.
+ *
+ * @see org.apache.dolphinscheduler.skywalking.plugin.NettyRemotingClientMethodInterceptor
+ */
+public class NettyRemotingClientInterceptorInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
+ private static final String METHOD_INTERCEPTOR_CLASS = "org.apache.dolphinscheduler.skywalking.plugin.NettyRemotingClientMethodInterceptor";
+ private static final String ENHANC_CLASS = "org.apache.dolphinscheduler.remote.NettyRemotingClient";
+
+ @Override
+ protected ClassMatch enhanceClass() {
+ return byName(ENHANC_CLASS);
+ }
+
+ @Override
+ public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+ return new ConstructorInterceptPoint[0];
+ }
+
+ @Override
+ public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
+ return new InstanceMethodsInterceptPoint[] {
+ new InstanceMethodsInterceptPoint() {
+ @Override
+ public ElementMatcher getMethodsMatcher() {
+ return named("send")
+ .and(takesArguments(2))
+ .and(takesArgument(0, named("org.apache.dolphinscheduler.remote.utils.Host")))
+ .and(takesArgument(1, named("org.apache.dolphinscheduler.remote.command.Command")));
+ }
+
+ @Override
+ public String getMethodsInterceptor() {
+ return METHOD_INTERCEPTOR_CLASS;
+ }
+
+ @Override
+ public boolean isOverrideArgs() {
+ return false;
+ }
+ }
+ };
+ }
+}
diff --git a/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/NettyRequestProcessorInterceptorInstrumentation.java b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/NettyRequestProcessorInterceptorInstrumentation.java
new file mode 100644
index 0000000000..bf3d5f7e6a
--- /dev/null
+++ b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/NettyRequestProcessorInterceptorInstrumentation.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.dolphinscheduler.skywalking.plugin.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+import org.apache.skywalking.apm.agent.core.plugin.match.HierarchyMatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+import static net.bytebuddy.matcher.ElementMatchers.named;
+
+/**
+ * Enhance {@link org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor} instance and intercept `process` method,
+ * this method receive and handle message from remote server.
+ *
+ * @see org.apache.dolphinscheduler.skywalking.plugin.NettyRequestProcessorMethodInterceptor
+ */
+public class NettyRequestProcessorInterceptorInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
+ private static final String METHOD_INTERCEPTOR_CLASS = "org.apache.dolphinscheduler.skywalking.plugin.NettyRequestProcessorMethodInterceptor";
+ private static final String ENHANC_CLASS = "org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor";
+
+ @Override
+ protected ClassMatch enhanceClass() {
+ return HierarchyMatch.byHierarchyMatch(ENHANC_CLASS);
+ }
+
+ @Override
+ public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+ return new ConstructorInterceptPoint[0];
+ }
+
+ @Override
+ public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
+ return new InstanceMethodsInterceptPoint[] {
+ new InstanceMethodsInterceptPoint() {
+ @Override
+ public ElementMatcher getMethodsMatcher() {
+ return named("process")
+ .and(takesArguments(2))
+ .and(takesArgument(0, named("io.netty.channel.Channel")))
+ .and(takesArgument(1, named("org.apache.dolphinscheduler.remote.command.Command")));
+ }
+
+ @Override
+ public String getMethodsInterceptor() {
+ return METHOD_INTERCEPTOR_CLASS;
+ }
+
+ @Override
+ public boolean isOverrideArgs() {
+ return false;
+ }
+ }
+ };
+ }
+}
diff --git a/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/TaskExecuteInterceptorInstrumentation.java b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/TaskExecuteInterceptorInstrumentation.java
new file mode 100644
index 0000000000..92218fe7f5
--- /dev/null
+++ b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/TaskExecuteInterceptorInstrumentation.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.dolphinscheduler.skywalking.plugin.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+import org.apache.skywalking.apm.agent.core.plugin.match.HierarchyMatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.isPublic;
+
+/**
+ * Enhance {@link org.apache.dolphinscheduler.server.worker.task.AbstractTask} instance and intercept `init`、`handle` and `after` methods,
+ * the implementation class execute task through these methods.
+ *
+ * @see org.apache.dolphinscheduler.skywalking.plugin.TaskExecuteConstructorInterceptor
+ * @see org.apache.dolphinscheduler.skywalking.plugin.TaskExecuteConstructorInterceptor
+ */
+public class TaskExecuteInterceptorInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
+ private static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.dolphinscheduler.skywalking.plugin.TaskExecuteConstructorInterceptor";
+ private static final String METHOD_INTERCEPTOR_CLASS = "org.apache.dolphinscheduler.skywalking.plugin.TaskExecuteMethodInterceptor";
+ private static final String ENHANC_CLASS = "org.apache.dolphinscheduler.server.worker.task.AbstractTask";
+
+ @Override
+ protected ClassMatch enhanceClass() {
+ return HierarchyMatch.byHierarchyMatch(ENHANC_CLASS);
+ }
+
+ @Override
+ public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+ return new ConstructorInterceptPoint[] {
+ new ConstructorInterceptPoint() {
+ @Override
+ public ElementMatcher getConstructorMatcher() {
+ return takesArgument(0, named("org.apache.dolphinscheduler.server.entity.TaskExecutionContext"));
+ }
+
+ @Override
+ public String getConstructorInterceptor() {
+ return CONSTRUCTOR_INTERCEPTOR_CLASS;
+ }
+ }
+ };
+ }
+
+ @Override
+ public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
+ return new InstanceMethodsInterceptPoint[] {
+ new InstanceMethodsInterceptPoint() {
+ @Override
+ public ElementMatcher getMethodsMatcher() {
+ return named("handle")
+ .or(named("init"))
+ .or(named("after"))
+ .and(isPublic())
+ .and(takesArguments(0));
+ }
+
+ @Override
+ public String getMethodsInterceptor() {
+ return METHOD_INTERCEPTOR_CLASS;
+ }
+
+ @Override
+ public boolean isOverrideArgs() {
+ return false;
+ }
+ }
+ };
+ }
+}
diff --git a/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/TaskExecuteThreadInterceptorInstrumentation.java b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/TaskExecuteThreadInterceptorInstrumentation.java
new file mode 100644
index 0000000000..4ad4c5ccae
--- /dev/null
+++ b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/TaskExecuteThreadInterceptorInstrumentation.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.dolphinscheduler.skywalking.plugin.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.isPublic;
+import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
+
+/**
+ * Enhance {@link org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread} instance and intercept `run` method,
+ * this class provides an isolated thread for running task.
+ *
+ * @see org.apache.dolphinscheduler.skywalking.plugin.TaskExecuteThreadConstructorInterceptor
+ * @see org.apache.dolphinscheduler.skywalking.plugin.TaskExecuteThreadMethodInterceptor
+ */
+public class TaskExecuteThreadInterceptorInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
+ private static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.dolphinscheduler.skywalking.plugin.TaskExecuteThreadConstructorInterceptor";
+ private static final String METHOD_INTERCEPTOR_CLASS = "org.apache.dolphinscheduler.skywalking.plugin.TaskExecuteThreadMethodInterceptor";
+ private static final String ENHANC_CLASS = "org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread";
+
+ @Override
+ protected ClassMatch enhanceClass() {
+ return byName(ENHANC_CLASS);
+ }
+
+ @Override
+ public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+ return new ConstructorInterceptPoint[] {
+ new ConstructorInterceptPoint() {
+ @Override
+ public ElementMatcher getConstructorMatcher() {
+ return takesArgument(0, named("org.apache.dolphinscheduler.server.entity.TaskExecutionContext"));
+ }
+
+ @Override
+ public String getConstructorInterceptor() {
+ return CONSTRUCTOR_INTERCEPTOR_CLASS;
+ }
+ }
+ };
+ }
+
+ @Override
+ public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
+ return new InstanceMethodsInterceptPoint[] {
+ new InstanceMethodsInterceptPoint() {
+ @Override
+ public ElementMatcher getMethodsMatcher() {
+ return named("run")
+ .and(isPublic())
+ .and(takesArguments(0));
+ }
+
+ @Override
+ public String getMethodsInterceptor() {
+ return METHOD_INTERCEPTOR_CLASS;
+ }
+
+ @Override
+ public boolean isOverrideArgs() {
+ return false;
+ }
+ }
+ };
+ }
+}
diff --git a/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/TaskPriorityQueueConsumerInterceptorInstrumentation.java b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/TaskPriorityQueueConsumerInterceptorInstrumentation.java
new file mode 100644
index 0000000000..881ef115c3
--- /dev/null
+++ b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/TaskPriorityQueueConsumerInterceptorInstrumentation.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.dolphinscheduler.skywalking.plugin.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.returns;
+import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
+
+/**
+ * Enhance {@link org.apache.dolphinscheduler.server.master.consumer.TaskPriorityQueueConsumer} instance and intercept `dispatch` method,
+ * this method is a dispatch the task info of memory queue to worker nodes.
+ *
+ * @see org.apache.dolphinscheduler.skywalking.plugin.TaskPriorityQueueConsumerMethodInterceptor
+ */
+public class TaskPriorityQueueConsumerInterceptorInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
+ private static final String METHOD_INTERCEPTOR_CLASS = "org.apache.dolphinscheduler.skywalking.plugin.TaskPriorityQueueConsumerMethodInterceptor";
+ private static final String ENHANC_CLASS = "org.apache.dolphinscheduler.server.master.consumer.TaskPriorityQueueConsumer";
+
+ @Override
+ protected ClassMatch enhanceClass() {
+ return byName(ENHANC_CLASS);
+ }
+
+ @Override
+ public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+ return new ConstructorInterceptPoint[0];
+ }
+
+ @Override
+ public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
+ return new InstanceMethodsInterceptPoint[] {
+ new InstanceMethodsInterceptPoint() {
+ @Override
+ public ElementMatcher getMethodsMatcher() {
+ return named("dispatch")
+ .and(takesArguments(1))
+ .and(takesArgument(0, named("org.apache.dolphinscheduler.service.queue.TaskPriority")))
+ .and(returns(boolean.class));
+ }
+
+ @Override
+ public String getMethodsInterceptor() {
+ return METHOD_INTERCEPTOR_CLASS;
+ }
+
+ @Override
+ public boolean isOverrideArgs() {
+ return false;
+ }
+ }
+ };
+ }
+}
diff --git a/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/TaskPriorityQueueImplInterceptorInstrumentation.java b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/TaskPriorityQueueImplInterceptorInstrumentation.java
new file mode 100644
index 0000000000..be06260959
--- /dev/null
+++ b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/TaskPriorityQueueImplInterceptorInstrumentation.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.dolphinscheduler.skywalking.plugin.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
+
+/**
+ * Enhance {@link org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl} instance and intercept `put` method,
+ * this method is a publish task info to memory queue.
+ *
+ * @see org.apache.dolphinscheduler.skywalking.plugin.TaskPriorityQueueImplMethodInterceptor
+ */
+public class TaskPriorityQueueImplInterceptorInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
+ private static final String METHOD_INTERCEPTOR_CLASS = "org.apache.dolphinscheduler.skywalking.plugin.TaskPriorityQueueImplMethodInterceptor";
+ private static final String ENHANC_CLASS = "org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl";
+
+ @Override
+ protected ClassMatch enhanceClass() {
+ return byName(ENHANC_CLASS);
+ }
+
+ @Override
+ public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+ return new ConstructorInterceptPoint[0];
+ }
+
+ @Override
+ public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
+ return new InstanceMethodsInterceptPoint[] {
+ new InstanceMethodsInterceptPoint() {
+ @Override
+ public ElementMatcher getMethodsMatcher() {
+ return named("put")
+ .and(takesArguments(1))
+ .and(takesArgument(0, named("org.apache.dolphinscheduler.service.queue.TaskPriority")));
+ }
+
+ @Override
+ public String getMethodsInterceptor() {
+ return METHOD_INTERCEPTOR_CLASS;
+ }
+
+ @Override
+ public boolean isOverrideArgs() {
+ return false;
+ }
+ }
+ };
+ }
+}
diff --git a/ext/skywalking/src/main/resources/skywalking-plugin.def b/ext/skywalking/src/main/resources/skywalking-plugin.def
new file mode 100644
index 0000000000..25e18ac2e0
--- /dev/null
+++ b/ext/skywalking/src/main/resources/skywalking-plugin.def
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+dolphinscheduler=org.apache.dolphinscheduler.skywalking.plugin.define.MasterSchedulerServiceInterceptorInstrumentation
+dolphinscheduler=org.apache.dolphinscheduler.skywalking.plugin.define.MasterExecThreadInterceptorInstrumentation
+dolphinscheduler=org.apache.dolphinscheduler.skywalking.plugin.define.MasterBaseTaskExecThreadInterceptorInstrumentation
+dolphinscheduler=org.apache.dolphinscheduler.skywalking.plugin.define.TaskPriorityQueueImplInterceptorInstrumentation
+dolphinscheduler=org.apache.dolphinscheduler.skywalking.plugin.define.TaskPriorityQueueConsumerInterceptorInstrumentation
+dolphinscheduler=org.apache.dolphinscheduler.skywalking.plugin.define.NettyRemotingClientInterceptorInstrumentation
+dolphinscheduler=org.apache.dolphinscheduler.skywalking.plugin.define.NettyRequestProcessorInterceptorInstrumentation
+dolphinscheduler=org.apache.dolphinscheduler.skywalking.plugin.define.NettyRemoteChannelInterceptorInstrumentation
+dolphinscheduler=org.apache.dolphinscheduler.skywalking.plugin.define.TaskExecuteThreadInterceptorInstrumentation
+dolphinscheduler=org.apache.dolphinscheduler.skywalking.plugin.define.TaskExecuteInterceptorInstrumentation
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 443ae36de4..1e70282ec4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -117,6 +117,7 @@
2.5
1.9.3
2.9.2
+ 8.4.0
@@ -543,6 +544,12 @@
swagger-bootstrap-ui
${swagger.version}
+
+
+ org.apache.skywalking
+ apm-toolkit-logback-1.x
+ ${skywalking.version}
+
@@ -1015,5 +1022,6 @@
dolphinscheduler-remote
dolphinscheduler-service
dolphinscheduler-plugin-api
+ ext/skywalking
diff --git a/script/dolphinscheduler-daemon.sh b/script/dolphinscheduler-daemon.sh
index fc10d9f266..b6a6dbadea 100755
--- a/script/dolphinscheduler-daemon.sh
+++ b/script/dolphinscheduler-daemon.sh
@@ -97,7 +97,16 @@ else
exit 1
fi
-export DOLPHINSCHEDULER_OPTS="-server -Xms$HEAP_INITIAL_SIZE -Xmx$HEAP_MAX_SIZE -Xmn$HEAP_NEW_GENERATION__SIZE -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m -Xss512k -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:LargePageSizeInBytes=128m -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70 -XX:+PrintGCDetails -Xloggc:gc.log -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=dump.hprof"
+if [ "$SKYWALKING_ENABLE" = "true" ]; then
+ SKYWALKING_OPTS="-javaagent:$DOLPHINSCHEDULER_HOME/skywalking-agent/skywalking-agent.jar -DSW_AGENT_NAME=dolphinscheduler::$command -DSW_LOGGING_FILE_NAME=skywalking-dolphinscheduler-$command.log"
+
+ export DOLPHINSCHEDULER_OPTS="$DOLPHINSCHEDULER_OPTS $SKYWALKING_OPTS"
+ echo "Info: Skywalking enabled opts: $SKYWALKING_OPTS"
+else
+ echo "Info: Skywalking not enabled."
+fi
+
+export DOLPHINSCHEDULER_OPTS="-server -Xms$HEAP_INITIAL_SIZE -Xmx$HEAP_MAX_SIZE -Xmn$HEAP_NEW_GENERATION__SIZE -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m -Xss512k -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:LargePageSizeInBytes=128m -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70 -XX:+PrintGCDetails -Xloggc:gc.log -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=dump.hprof $DOLPHINSCHEDULER_OPTS"
case $startStop in
(start)
diff --git a/script/scp-hosts.sh b/script/scp-hosts.sh
index 27ee932a40..50a94f9b0f 100755
--- a/script/scp-hosts.sh
+++ b/script/scp-hosts.sh
@@ -54,7 +54,7 @@ do
echo "scp dirs to $host/$installPath starting"
ssh -p $sshPort $host "cd $installPath/; rm -rf bin/ conf/ lib/ script/ sql/ ui/"
- for dsDir in bin conf lib script sql ui install.sh
+ for dsDir in bin conf lib script sql ui skywalking-agent install.sh
do
# if worker in workersGroupMap
if [[ "${workersGroupMap[${host}]}" ]] && [[ "${dsDir}" == "conf" ]]; then
diff --git a/script/start-all.sh b/script/start-all.sh
index 61b916483e..2902e0515f 100755
--- a/script/start-all.sh
+++ b/script/start-all.sh
@@ -31,11 +31,13 @@ do
workersGroupMap+=([$worker]=$groupName)
done
+skywalkingEnv="SKYWALKING_ENABLE=$enableSkywalking SW_AGENT_COLLECTOR_BACKEND_SERVICES=$skywalkingServers SW_GRPC_LOG_SERVER_HOST=$skywalkingLogReporterHost SW_GRPC_LOG_SERVER_PORT=$skywalkingLogReporterPort"
+
mastersHost=(${masters//,/ })
for master in ${mastersHost[@]}
do
echo "$master master server is starting"
- ssh -p $sshPort $master "cd $installPath/; sh bin/dolphinscheduler-daemon.sh start master-server;"
+ ssh -p $sshPort $master "cd $installPath/; export $skywalkingEnv; sh bin/dolphinscheduler-daemon.sh start master-server;"
done
@@ -43,15 +45,15 @@ for worker in ${!workersGroupMap[*]}
do
echo "$worker worker server is starting"
- ssh -p $sshPort $worker "cd $installPath/; sh bin/dolphinscheduler-daemon.sh start worker-server;"
- ssh -p $sshPort $worker "cd $installPath/; sh bin/dolphinscheduler-daemon.sh start logger-server;"
+ ssh -p $sshPort $worker "cd $installPath/; export $skywalkingEnv; sh bin/dolphinscheduler-daemon.sh start worker-server;"
+ ssh -p $sshPort $worker "cd $installPath/; export $skywalkingEnv; sh bin/dolphinscheduler-daemon.sh start logger-server;"
done
-ssh -p $sshPort $alertServer "cd $installPath/; sh bin/dolphinscheduler-daemon.sh start alert-server;"
+ssh -p $sshPort $alertServer "cd $installPath/; export $skywalkingEnv; sh bin/dolphinscheduler-daemon.sh start alert-server;"
apiServersHost=(${apiServers//,/ })
for apiServer in ${apiServersHost[@]}
do
echo "$apiServer worker server is starting"
- ssh -p $sshPort $apiServer "cd $installPath/; sh bin/dolphinscheduler-daemon.sh start api-server;"
+ ssh -p $sshPort $apiServer "cd $installPath/; export $skywalkingEnv; sh bin/dolphinscheduler-daemon.sh start api-server;"
done
\ No newline at end of file
diff --git a/tools/dependencies/check-LICENSE.sh b/tools/dependencies/check-LICENSE.sh
index d414bd40c6..123727a024 100755
--- a/tools/dependencies/check-LICENSE.sh
+++ b/tools/dependencies/check-LICENSE.sh
@@ -27,6 +27,10 @@ echo '=== Self modules: ' && ./mvnw --batch-mode --quiet -Dexec.executable='echo
echo '=== Distributed dependencies: ' && ls dist/lib | tee all-dependencies.txt
+echo '=== Skywalking agent dependencies: ' && ls dist/skywalking-agent | grep .jar | tee -a all-dependencies.txt \
+ && ls dist/skywalking-agent/plugins | tee -a all-dependencies.txt \
+ && ls dist/skywalking-agent/activations | tee -a all-dependencies.txt
+
# Exclude all self modules(jars) to generate all third-party dependencies
echo '=== Third party dependencies: ' && grep -vf self-modules.txt all-dependencies.txt | tee third-party-dependencies.txt
diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt
index e684b7a05a..81f1710dfe 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -12,6 +12,16 @@ aspectjweaver-1.9.2.jar
audience-annotations-0.5.0.jar
avro-1.7.4.jar
aws-java-sdk-1.7.4.jar
+apm-toolkit-logback-1.x-8.4.0.jar
+apm-toolkit-logback-1.x-activation-8.4.0.jar
+apm-httpclient-3.x-plugin-8.4.0.jar
+apm-httpClient-4.x-plugin-8.4.0.jar
+apm-httpclient-commons-8.4.0.jar
+apm-quartz-scheduler-2.x-plugin-8.4.0.jar
+apm-spring-core-patch-8.4.0.jar
+apm-springmvc-annotation-5.x-plugin-8.4.0.jar
+apm-springmvc-annotation-commons-8.4.0.jar
+apm-zookeeper-3.4.x-plugin-8.4.0.jar
bonecp-0.8.0.RELEASE.jar
byte-buddy-1.9.10.jar
classmate-1.4.0.jar
@@ -198,6 +208,8 @@ springfox-swagger2-2.9.2.jar
swagger-annotations-1.5.20.jar
swagger-bootstrap-ui-1.9.3.jar
swagger-models-1.5.20.jar
+skywalking-agent.jar
+spring-commons-8.4.0.jar
tephra-api-0.6.0.jar
threetenbp-1.3.6.jar
transaction-api-1.1.jar