From a01f13f72d9095bae0226e073b1357448f13c6f8 Mon Sep 17 00:00:00 2001 From: hailin0 Date: Thu, 4 Mar 2021 11:04:19 +0800 Subject: [PATCH] [Improvement] Add skywalking agent plugin (#4852) * [Improvement] Add skywalking agent plugin * fix * fix * fix * fix * suport log * fix * Rename LICENSE-skywalking.txt to LICENSE-apm-toolkit-logback.txt * Rename LICENSE-apm-toolkit-logback.txt to LICENSE-apm-toolkit-logback-1.x.txt * Update known-dependencies.txt * fix * fix * fix * fix * fix * fix * fix docker logback config * add log reporter to alert module * fix * fix * fix --- .../logback/logback-alert.xml | 5 + .../dolphinscheduler/logback/logback-api.xml | 5 + .../logback/logback-master.xml | 5 + .../logback/logback-worker.xml | 5 + .../src/main/resources/logback-alert.xml | 5 + .../src/main/resources/logback-api.xml | 5 + dolphinscheduler-common/pom.xml | 4 + dolphinscheduler-dist/pom.xml | 6 + dolphinscheduler-dist/release-docs/LICENSE | 3 + .../licenses/LICENSE-skywalking-agent.txt | 201 +++++++++++++ .../main/assembly/dolphinscheduler-binary.xml | 8 + .../src/main/resources/logback-master.xml | 5 + .../src/main/resources/logback-worker.xml | 5 + .../service/queue/TaskPriority.java | 6 +- ext/skywalking/config/agent.config | 118 ++++++++ ext/skywalking/dashboard/dolphinscheduler.yml | 284 ++++++++++++++++++ ext/skywalking/pom.xml | 218 ++++++++++++++ ...eTaskExecThreadConstructorInterceptor.java | 35 +++ ...erBaseTaskExecThreadMethodInterceptor.java | 100 ++++++ ...asterExecThreadConstructorInterceptor.java | 34 +++ .../MasterExecThreadMethodInterceptor.java | 95 ++++++ ...MasterExecThreadStateCacheInterceptor.java | 50 +++ ...sterSchedulerServiceMethodInterceptor.java | 53 ++++ ...tyRemoteChannelConstructorInterceptor.java | 32 ++ .../NettyRemoteChannelMethodInterceptor.java | 69 +++++ .../NettyRemotingClientMethodInterceptor.java | 70 +++++ ...ettyRequestProcessorMethodInterceptor.java | 72 +++++ .../skywalking/plugin/TaskContext.java | 30 ++ .../TaskExecuteConstructorInterceptor.java | 35 +++ .../plugin/TaskExecuteMethodInterceptor.java | 88 ++++++ ...skExecuteThreadConstructorInterceptor.java | 35 +++ .../TaskExecuteThreadMethodInterceptor.java | 83 +++++ ...riorityQueueConsumerMethodInterceptor.java | 71 +++++ ...askPriorityQueueImplMethodInterceptor.java | 73 +++++ .../skywalking/plugin/Utils.java | 70 +++++ ...kExecThreadInterceptorInstrumentation.java | 92 ++++++ ...rExecThreadInterceptorInstrumentation.java | 111 +++++++ ...ulerServiceInterceptorInstrumentation.java | 72 +++++ ...moteChannelInterceptorInstrumentation.java | 90 ++++++ ...otingClientInterceptorInstrumentation.java | 77 +++++ ...stProcessorInterceptorInstrumentation.java | 77 +++++ ...TaskExecuteInterceptorInstrumentation.java | 93 ++++++ ...ecuteThreadInterceptorInstrumentation.java | 91 ++++++ ...eueConsumerInterceptorInstrumentation.java | 78 +++++ ...tyQueueImplInterceptorInstrumentation.java | 76 +++++ .../src/main/resources/skywalking-plugin.def | 26 ++ pom.xml | 8 + script/dolphinscheduler-daemon.sh | 11 +- script/scp-hosts.sh | 2 +- script/start-all.sh | 12 +- tools/dependencies/check-LICENSE.sh | 4 + tools/dependencies/known-dependencies.txt | 12 + 52 files changed, 2905 insertions(+), 10 deletions(-) create mode 100644 dolphinscheduler-dist/release-docs/licenses/LICENSE-skywalking-agent.txt create mode 100644 ext/skywalking/config/agent.config create mode 100644 ext/skywalking/dashboard/dolphinscheduler.yml create mode 100644 ext/skywalking/pom.xml create mode 100644 ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/MasterBaseTaskExecThreadConstructorInterceptor.java create mode 100644 ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/MasterBaseTaskExecThreadMethodInterceptor.java create mode 100644 ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/MasterExecThreadConstructorInterceptor.java create mode 100644 ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/MasterExecThreadMethodInterceptor.java create mode 100644 ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/MasterExecThreadStateCacheInterceptor.java create mode 100644 ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/MasterSchedulerServiceMethodInterceptor.java create mode 100644 ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/NettyRemoteChannelConstructorInterceptor.java create mode 100644 ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/NettyRemoteChannelMethodInterceptor.java create mode 100644 ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/NettyRemotingClientMethodInterceptor.java create mode 100644 ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/NettyRequestProcessorMethodInterceptor.java create mode 100644 ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskContext.java create mode 100644 ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskExecuteConstructorInterceptor.java create mode 100644 ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskExecuteMethodInterceptor.java create mode 100644 ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskExecuteThreadConstructorInterceptor.java create mode 100644 ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskExecuteThreadMethodInterceptor.java create mode 100644 ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskPriorityQueueConsumerMethodInterceptor.java create mode 100644 ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskPriorityQueueImplMethodInterceptor.java create mode 100644 ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/Utils.java create mode 100644 ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/MasterBaseTaskExecThreadInterceptorInstrumentation.java create mode 100644 ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/MasterExecThreadInterceptorInstrumentation.java create mode 100644 ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/MasterSchedulerServiceInterceptorInstrumentation.java create mode 100644 ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/NettyRemoteChannelInterceptorInstrumentation.java create mode 100644 ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/NettyRemotingClientInterceptorInstrumentation.java create mode 100644 ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/NettyRequestProcessorInterceptorInstrumentation.java create mode 100644 ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/TaskExecuteInterceptorInstrumentation.java create mode 100644 ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/TaskExecuteThreadInterceptorInstrumentation.java create mode 100644 ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/TaskPriorityQueueConsumerInterceptorInstrumentation.java create mode 100644 ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/TaskPriorityQueueImplInterceptorInstrumentation.java create mode 100644 ext/skywalking/src/main/resources/skywalking-plugin.def diff --git a/docker/build/conf/dolphinscheduler/logback/logback-alert.xml b/docker/build/conf/dolphinscheduler/logback/logback-alert.xml index eec78385db..e9919aced3 100644 --- a/docker/build/conf/dolphinscheduler/logback/logback-alert.xml +++ b/docker/build/conf/dolphinscheduler/logback/logback-alert.xml @@ -36,8 +36,13 @@ + + + + + \ No newline at end of file diff --git a/docker/build/conf/dolphinscheduler/logback/logback-api.xml b/docker/build/conf/dolphinscheduler/logback/logback-api.xml index 6d29f8af5f..8b55b69bfa 100644 --- a/docker/build/conf/dolphinscheduler/logback/logback-api.xml +++ b/docker/build/conf/dolphinscheduler/logback/logback-api.xml @@ -41,6 +41,10 @@ + + + + @@ -48,6 +52,7 @@ + \ No newline at end of file diff --git a/docker/build/conf/dolphinscheduler/logback/logback-master.xml b/docker/build/conf/dolphinscheduler/logback/logback-master.xml index d1bfb67aa1..289c861a64 100644 --- a/docker/build/conf/dolphinscheduler/logback/logback-master.xml +++ b/docker/build/conf/dolphinscheduler/logback/logback-master.xml @@ -65,9 +65,14 @@ + + + + + \ No newline at end of file diff --git a/docker/build/conf/dolphinscheduler/logback/logback-worker.xml b/docker/build/conf/dolphinscheduler/logback/logback-worker.xml index b7e08dd846..bed3c0864f 100644 --- a/docker/build/conf/dolphinscheduler/logback/logback-worker.xml +++ b/docker/build/conf/dolphinscheduler/logback/logback-worker.xml @@ -66,9 +66,14 @@ + + + + + \ No newline at end of file diff --git a/dolphinscheduler-alert/src/main/resources/logback-alert.xml b/dolphinscheduler-alert/src/main/resources/logback-alert.xml index 1718947dd1..b46c0a0159 100644 --- a/dolphinscheduler-alert/src/main/resources/logback-alert.xml +++ b/dolphinscheduler-alert/src/main/resources/logback-alert.xml @@ -44,9 +44,14 @@ + + + + + \ No newline at end of file diff --git a/dolphinscheduler-api/src/main/resources/logback-api.xml b/dolphinscheduler-api/src/main/resources/logback-api.xml index c1142ed8ed..44f47024c7 100644 --- a/dolphinscheduler-api/src/main/resources/logback-api.xml +++ b/dolphinscheduler-api/src/main/resources/logback-api.xml @@ -49,6 +49,10 @@ + + + + @@ -57,6 +61,7 @@ + \ No newline at end of file diff --git a/dolphinscheduler-common/pom.xml b/dolphinscheduler-common/pom.xml index cf76df3a36..356240cc6f 100644 --- a/dolphinscheduler-common/pom.xml +++ b/dolphinscheduler-common/pom.xml @@ -588,5 +588,9 @@ compile + + org.apache.skywalking + apm-toolkit-logback-1.x + diff --git a/dolphinscheduler-dist/pom.xml b/dolphinscheduler-dist/pom.xml index 80c47a72c9..9d525d30ac 100644 --- a/dolphinscheduler-dist/pom.xml +++ b/dolphinscheduler-dist/pom.xml @@ -41,6 +41,12 @@ org.apache.dolphinscheduler dolphinscheduler-api + + + org.apache.dolphinscheduler + dolphinscheduler-skywalking + ${project.version} + diff --git a/dolphinscheduler-dist/release-docs/LICENSE b/dolphinscheduler-dist/release-docs/LICENSE index 71d3f9a548..a2a5090dc1 100644 --- a/dolphinscheduler-dist/release-docs/LICENSE +++ b/dolphinscheduler-dist/release-docs/LICENSE @@ -254,6 +254,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt. derby 10.14.2.0: https://github.com/apache/derby, Apache 2.0 druid 1.1.14: https://mvnrepository.com/artifact/com.alibaba/druid/1.1.14, Apache 2.0 fastjson 1.2.61: https://mvnrepository.com/artifact/com.alibaba/fastjson/1.2.61, Apache 2.0 + grpc-java 1.32.1: https://github.com/grpc/grpc-java, Apache 2.0 gson 2.8.5: https://github.com/google/gson, Apache 2.0 guava 20.0: https://mvnrepository.com/artifact/com.google.guava/guava/20.0, Apache 2.0 guice 3.0: https://mvnrepository.com/artifact/com.google.inject/guice/3.0, Apache 2.0 @@ -342,9 +343,11 @@ The text of each license is also included at licenses/LICENSE-[project].txt. netty-all 4.1.33.Final: https://github.com/netty/netty/blob/netty-4.1.33.Final/LICENSE.txt, Apache 2.0 opencsv 2.3: https://mvnrepository.com/artifact/net.sf.opencsv/opencsv/2.3, Apache 2.0 parquet-hadoop-bundle 1.8.1: https://mvnrepository.com/artifact/org.apache.parquet/parquet-hadoop-bundle/1.8.1, Apache 2.0 + perfmark-api 0.19.0: https://mvnrepository.com/artifact/io.perfmark/perfmark-api/0.19.0, Apache 2.0 poi 3.17: https://mvnrepository.com/artifact/org.apache.poi/poi/3.17, Apache 2.0 quartz 2.3.0: https://mvnrepository.com/artifact/org.quartz-scheduler/quartz/2.3.0, Apache 2.0 quartz-jobs 2.3.0: https://mvnrepository.com/artifact/org.quartz-scheduler/quartz-jobs/2.3.0, Apache 2.0 + skywalking 8.4.0: https://github.com/apache/skywalking, Apache 2.0 snakeyaml 1.23: https://mvnrepository.com/artifact/org.yaml/snakeyaml/1.23, Apache 2.0 snappy 0.2: https://mvnrepository.com/artifact/org.iq80.snappy/snappy/0.2, Apache 2.0 snappy-java 1.0.4.1: https://github.com/xerial/snappy-java, Apache 2.0 diff --git a/dolphinscheduler-dist/release-docs/licenses/LICENSE-skywalking-agent.txt b/dolphinscheduler-dist/release-docs/licenses/LICENSE-skywalking-agent.txt new file mode 100644 index 0000000000..261eeb9e9f --- /dev/null +++ b/dolphinscheduler-dist/release-docs/licenses/LICENSE-skywalking-agent.txt @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/dolphinscheduler-dist/src/main/assembly/dolphinscheduler-binary.xml b/dolphinscheduler-dist/src/main/assembly/dolphinscheduler-binary.xml index 28bbb361cd..3e282bddd7 100644 --- a/dolphinscheduler-dist/src/main/assembly/dolphinscheduler-binary.xml +++ b/dolphinscheduler-dist/src/main/assembly/dolphinscheduler-binary.xml @@ -151,6 +151,14 @@ . + + ${basedir}/../ext/skywalking/target/skywalking-agent + + **/*.* + + ./skywalking-agent + + ${basedir}/../dolphinscheduler-ui/dist diff --git a/dolphinscheduler-server/src/main/resources/logback-master.xml b/dolphinscheduler-server/src/main/resources/logback-master.xml index a61d891b10..6606ddc300 100644 --- a/dolphinscheduler-server/src/main/resources/logback-master.xml +++ b/dolphinscheduler-server/src/main/resources/logback-master.xml @@ -73,9 +73,14 @@ + + + + + \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/resources/logback-worker.xml b/dolphinscheduler-server/src/main/resources/logback-worker.xml index bea68ff37d..35b8242e3a 100644 --- a/dolphinscheduler-server/src/main/resources/logback-worker.xml +++ b/dolphinscheduler-server/src/main/resources/logback-worker.xml @@ -73,9 +73,14 @@ + + + + + \ No newline at end of file diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java index a872f6db9f..0e6ebaa7f5 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java @@ -53,7 +53,7 @@ public class TaskPriority implements Comparable { /** * context */ - private Map context; + private Map context; public TaskPriority(){} @@ -96,7 +96,7 @@ public class TaskPriority implements Comparable { return taskId; } - public Map getContext() { + public Map getContext() { return context; } @@ -112,7 +112,7 @@ public class TaskPriority implements Comparable { this.groupName = groupName; } - public void setContext(Map context) { + public void setContext(Map context) { this.context = context; } diff --git a/ext/skywalking/config/agent.config b/ext/skywalking/config/agent.config new file mode 100644 index 0000000000..cca5bae971 --- /dev/null +++ b/ext/skywalking/config/agent.config @@ -0,0 +1,118 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# The agent namespace +# agent.namespace=${SW_AGENT_NAMESPACE:default-namespace} + +# The service name in UI +agent.service_name=${SW_AGENT_NAME:Your_ApplicationName} + +# The number of sampled traces per 3 seconds +# Negative or zero means off, by default +# agent.sample_n_per_3_secs=${SW_AGENT_SAMPLE:-1} + +# Authentication active is based on backend setting, see application.yml for more details. +# agent.authentication = ${SW_AGENT_AUTHENTICATION:xxxx} + +# The max amount of spans in a single segment. +# Through this config item, SkyWalking keep your application memory cost estimated. +# agent.span_limit_per_segment=${SW_AGENT_SPAN_LIMIT:150} + +# If the operation name of the first span is included in this set, this segment should be ignored. Multiple values should be separated by `,`. +# agent.ignore_suffix=${SW_AGENT_IGNORE_SUFFIX:.jpg,.jpeg,.js,.css,.png,.bmp,.gif,.ico,.mp3,.mp4,.html,.svg} + +# If true, SkyWalking agent will save all instrumented classes files in `/debugging` folder. +# SkyWalking team may ask for these files in order to resolve compatible problem. +# agent.is_open_debugging_class = ${SW_AGENT_OPEN_DEBUG:true} + +# If true, SkyWalking agent will cache all instrumented classes files to memory or disk files (decided by class cache mode), +# allow other javaagent to enhance those classes that enhanced by SkyWalking agent. +# agent.is_cache_enhanced_class = ${SW_AGENT_CACHE_CLASS:false} + +# The instrumented classes cache mode: MEMORY or FILE +# MEMORY: cache class bytes to memory, if instrumented classes is too many or too large, it may take up more memory +# FILE: cache class bytes in `/class-cache` folder, automatically clean up cached class files when the application exits +# agent.class_cache_mode = ${SW_AGENT_CLASS_CACHE_MODE:MEMORY} + +# The operationName max length +# Notice, in the current practice, we don't recommend the length over 190. +# agent.operation_name_threshold=${SW_AGENT_OPERATION_NAME_THRESHOLD:150} + +# The agent use gRPC plain text in default. +# If true, SkyWalking agent uses TLS even no CA file detected. +# agent.force_tls=${SW_AGENT_FORCE_TLS:false} + +# If true, skywalking agent will enable profile when user create a new profile task. Otherwise disable profile. +# profile.active=${SW_AGENT_PROFILE_ACTIVE:true} + +# Parallel monitor segment count +# profile.max_parallel=${SW_AGENT_PROFILE_MAX_PARALLEL:5} + +# Max monitor segment time(minutes), if current segment monitor time out of limit, then stop it. +# profile.duration=${SW_AGENT_PROFILE_DURATION:10} + +# Max dump thread stack depth +# profile.dump_max_stack_depth=${SW_AGENT_PROFILE_DUMP_MAX_STACK_DEPTH:500} + +# Snapshot transport to backend buffer size +# profile.snapshot_transport_buffer_size=${SW_AGENT_PROFILE_SNAPSHOT_TRANSPORT_BUFFER_SIZE:50} + +# Backend service addresses. +collector.backend_service=${SW_AGENT_COLLECTOR_BACKEND_SERVICES:127.0.0.1:11800} + +# Logging file_name +logging.file_name=${SW_LOGGING_FILE_NAME:skywalking-api.log} + +# Logging level +logging.level=${SW_LOGGING_LEVEL:INFO} + +# Logging dir +# logging.dir=${SW_LOGGING_DIR:""} + +# Logging max_file_size, default: 300 * 1024 * 1024 = 314572800 +# logging.max_file_size=${SW_LOGGING_MAX_FILE_SIZE:314572800} + +# The max history log files. When rollover happened, if log files exceed this number, +# then the oldest file will be delete. Negative or zero means off, by default. +# logging.max_history_files=${SW_LOGGING_MAX_HISTORY_FILES:-1} + +# Listed exceptions would not be treated as an error. Because in some codes, the exception is being used as a way of controlling business flow. +# Besides, the annotation named IgnoredException in the trace toolkit is another way to configure ignored exceptions. +# statuscheck.ignored_exceptions=${SW_STATUSCHECK_IGNORED_EXCEPTIONS:} + +# The max recursive depth when checking the exception traced by the agent. Typically, we don't recommend setting this more than 10, which could cause a performance issue. Negative value and 0 would be ignored, which means all exceptions would make the span tagged in error status. +# statuscheck.max_recursive_depth=${SW_STATUSCHECK_MAX_RECURSIVE_DEPTH:1} + +# Mount the specific folders of the plugins. Plugins in mounted folders would work. +plugin.mount=${SW_MOUNT_FOLDERS:plugins,activations} + +# Exclude activated plugins +# plugin.exclude_plugins=${SW_EXCLUDE_PLUGINS:} + +# mysql plugin configuration +# plugin.mysql.trace_sql_parameters=${SW_MYSQL_TRACE_SQL_PARAMETERS:false} + +# Kafka producer configuration +# plugin.kafka.bootstrap_servers=${SW_KAFKA_BOOTSTRAP_SERVERS:localhost:9092} + +# Match spring bean with regex expression for classname +# plugin.springannotation.classname_match_regex=${SW_SPRINGANNOTATION_CLASSNAME_MATCH_REGEX:} + +# log reporter configs +# Specify which grpc server's host for log data to report to. +plugin.toolkit.log.grpc.reporter.server_host=${SW_GRPC_LOG_SERVER_HOST:127.0.0.1} +# Specify which grpc server's port for log data to report to. +plugin.toolkit.log.grpc.reporter.server_port=${SW_GRPC_LOG_SERVER_PORT:11800} \ No newline at end of file diff --git a/ext/skywalking/dashboard/dolphinscheduler.yml b/ext/skywalking/dashboard/dolphinscheduler.yml new file mode 100644 index 0000000000..31b97fe52e --- /dev/null +++ b/ext/skywalking/dashboard/dolphinscheduler.yml @@ -0,0 +1,284 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# UI templates initialized file includes the default template when the SkyWalking OAP starts up at the first time. +# +# Also, SkyWalking would detect the existing templates in the database, once they are missing, all templates in this file +# could be added automatically. + +templates: + - name: Dolphinscheduler + type: "DASHBOARD" + configuration: |- + [ + { + "name":"Dolphinscheduler", + "type":"service", + "serviceGroup":"dolphinscheduler", + "children":[ + { + "name":"Master Scheduler", + "children":[ + { + "width":3, + "title":"QuartzScheduler Load", + "height":350, + "entityType":"Endpoint", + "independentSelector":true, + "metricType":"REGULAR_VALUE", + "metricName":"endpoint_cpm", + "queryMetricType":"readMetricsValues", + "chartType":"ChartLine", + "currentService":"dolphinscheduler::master-server", + "currentEndpoint":"quartz-scheduler/org.apache.dolphinscheduler.service.quartz.ProcessScheduleJob" + }, + { + "width":3, + "title":"QuartzScheduler Avg Time", + "height":350, + "entityType":"Endpoint", + "independentSelector":true, + "metricType":"REGULAR_VALUE", + "metricName":"endpoint_avg", + "queryMetricType":"readMetricsValues", + "chartType":"ChartLine", + "currentService":"dolphinscheduler::master-server", + "currentEndpoint":"quartz-scheduler/org.apache.dolphinscheduler.service.quartz.ProcessScheduleJob", + "unit":"ms" + }, + { + "width":3, + "title":"QuartzScheduler Time Percentile", + "height":350, + "entityType":"Endpoint", + "independentSelector":true, + "metricType":"LABELED_VALUE", + "metricName":"endpoint_percentile", + "queryMetricType":"readLabeledMetricsValues", + "chartType":"ChartLine", + "metricLabels":"P50, P75, P90, P95, P99", + "labelsIndex":"0, 1, 2, 3, 4", + "currentService":"dolphinscheduler::master-server", + "currentEndpoint":"quartz-scheduler/org.apache.dolphinscheduler.service.quartz.ProcessScheduleJob", + "unit":"ms" + }, + { + "width":3, + "title":"QuartzScheduler Successful Rate", + "height":350, + "entityType":"Endpoint", + "independentSelector":true, + "metricType":"REGULAR_VALUE", + "metricName":"endpoint_sla", + "queryMetricType":"readMetricsValues", + "chartType":"ChartLine", + "currentService":"dolphinscheduler::master-server", + "currentEndpoint":"quartz-scheduler/org.apache.dolphinscheduler.service.quartz.ProcessScheduleJob", + "unit":"%", + "aggregation":"/", + "aggregationNum":"100" + }, + { + "width":3, + "title":"MasterSchedulerService Load", + "height":350, + "entityType":"Endpoint", + "independentSelector":true, + "metricType":"REGULAR_VALUE", + "metricName":"endpoint_cpm", + "queryMetricType":"readMetricsValues", + "chartType":"ChartLine", + "currentService":"dolphinscheduler::master-server", + "currentEndpoint":"master/schedule/process" + }, + { + "width":3, + "title":"MasterSchedulerService Avg Time", + "height":350, + "entityType":"Endpoint", + "independentSelector":true, + "metricType":"REGULAR_VALUE", + "metricName":"endpoint_avg", + "queryMetricType":"readMetricsValues", + "chartType":"ChartLine", + "currentService":"dolphinscheduler::master-server", + "currentEndpoint":"master/schedule/process", + "unit":"ms" + }, + { + "width":3, + "title":"MasterSchedulerService Time Percentile", + "height":350, + "entityType":"Endpoint", + "independentSelector":true, + "metricType":"LABELED_VALUE", + "queryMetricType":"readLabeledMetricsValues", + "chartType":"ChartLine", + "metricName":"endpoint_percentile", + "metricLabels":"P50, P75, P90, P95, P99", + "labelsIndex":"0, 1, 2, 3, 4", + "currentService":"dolphinscheduler::master-server", + "currentEndpoint":"master/schedule/process", + "unit":"ms" + }, + { + "width":3, + "title":"MasterSchedulerService Successful Rate", + "height":350, + "entityType":"Endpoint", + "independentSelector":true, + "metricType":"REGULAR_VALUE", + "queryMetricType":"readMetricsValues", + "chartType":"ChartLine", + "metricName":"endpoint_sla", + "currentService":"dolphinscheduler::master-server", + "currentEndpoint":"master/schedule/process", + "unit":"%", + "aggregation":"/", + "aggregationNum":"100" + } + ] + }, + { + "name":"Master Queue", + "children":[ + { + "width":3, + "title":"Put Load", + "height":350, + "entityType":"Endpoint", + "independentSelector":true, + "metricType":"REGULAR_VALUE", + "metricName":"endpoint_cpm", + "queryMetricType":"readMetricsValues", + "chartType":"ChartLine", + "currentService":"dolphinscheduler::master-server", + "currentEndpoint":"masetr/queue/put" + }, + { + "width":3, + "title":"Put Avg Time", + "height":350, + "entityType":"Endpoint", + "independentSelector":true, + "metricType":"REGULAR_VALUE", + "metricName":"endpoint_avg", + "queryMetricType":"readMetricsValues", + "chartType":"ChartLine", + "currentService":"dolphinscheduler::master-server", + "currentEndpoint":"masetr/queue/put", + "unit":"ms" + }, + { + "width":3, + "title":"Put Time Percentile", + "height":350, + "entityType":"Endpoint", + "independentSelector":true, + "metricType":"LABELED_VALUE", + "metricName":"endpoint_percentile", + "queryMetricType":"readLabeledMetricsValues", + "chartType":"ChartLine", + "metricLabels":"P50, P75, P90, P95, P99", + "labelsIndex":"0, 1, 2, 3, 4", + "currentService":"dolphinscheduler::master-server", + "currentEndpoint":"masetr/queue/put", + "unit":"ms" + }, + { + "width":3, + "title":"Put Successful Rate", + "height":350, + "entityType":"Endpoint", + "independentSelector":true, + "metricType":"REGULAR_VALUE", + "metricName":"endpoint_sla", + "queryMetricType":"readMetricsValues", + "chartType":"ChartLine", + "currentService":"dolphinscheduler::master-server", + "currentEndpoint":"masetr/queue/put", + "unit":"%", + "aggregation":"/", + "aggregationNum":"100" + }, + { + "width":3, + "title":"Take Load", + "height":350, + "entityType":"Endpoint", + "independentSelector":true, + "metricType":"REGULAR_VALUE", + "metricName":"endpoint_cpm", + "queryMetricType":"readMetricsValues", + "chartType":"ChartLine", + "currentService":"dolphinscheduler::master-server", + "currentEndpoint":"masetr/queue/take" + }, + { + "width":3, + "title":"Take Avg Time", + "height":350, + "entityType":"Endpoint", + "independentSelector":true, + "metricType":"REGULAR_VALUE", + "metricName":"endpoint_avg", + "queryMetricType":"readMetricsValues", + "chartType":"ChartLine", + "currentService":"dolphinscheduler::master-server", + "currentEndpoint":"masetr/queue/take", + "unit":"ms" + }, + { + "width":3, + "title":"Take Time Percentile", + "height":350, + "entityType":"Endpoint", + "independentSelector":true, + "metricType":"LABELED_VALUE", + "metricName":"endpoint_percentile", + "queryMetricType":"readLabeledMetricsValues", + "chartType":"ChartLine", + "metricLabels":"P50, P75, P90, P95, P99", + "labelsIndex":"0, 1, 2, 3, 4", + "currentService":"dolphinscheduler::master-server", + "currentEndpoint":"masetr/queue/take", + "unit":"ms" + }, + { + "width":3, + "title":"Take Successful Rate", + "height":350, + "entityType":"Endpoint", + "independentSelector":true, + "metricType":"REGULAR_VALUE", + "metricName":"endpoint_sla", + "queryMetricType":"readMetricsValues", + "chartType":"ChartLine", + "currentService":"dolphinscheduler::master-server", + "currentEndpoint":"masetr/queue/take", + "unit":"%", + "aggregation":"/", + "aggregationNum":"100" + } + ] + } + ] + } + ] + # Activated as the DASHBOARD type, makes this templates added into the UI page automatically. + # False means providing a basic template, user needs to add it manually. + activated: true + # True means wouldn't show up on the dashboard. Only keeps the definition in the storage. + disabled: false \ No newline at end of file diff --git a/ext/skywalking/pom.xml b/ext/skywalking/pom.xml new file mode 100644 index 0000000000..cb7121e056 --- /dev/null +++ b/ext/skywalking/pom.xml @@ -0,0 +1,218 @@ + + + + + dolphinscheduler + org.apache.dolphinscheduler + 1.3.6-SNAPSHOT + ../../pom.xml + + 4.0.0 + + dolphinscheduler-skywalking + dolphinscheduler-skywalking + jar + + + UTF-8 + ${parent.version} + + ${project.build.directory}/skywalking-agent/ + ${agent.dir}/plugins/ + ${agent.dir}/activations/ + ${agent.dir}/config + ${agent.dir}/dashboard + org.apache.skywalking.apm.dependencies + + + + + org.apache.dolphinscheduler + dolphinscheduler-service + ${dolphinscheduler.version} + provided + + + org.apache.dolphinscheduler + dolphinscheduler-server + ${dolphinscheduler.version} + provided + + + + org.apache.skywalking + apm-agent-core + ${skywalking.version} + provided + + + org.projectlombok + lombok + 1.18.16 + provided + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.1.1 + + + package + + shade + + + + + net.bytebuddy:* + + + + + net.bytebuddy + ${agent-dependencies-shade.package}.net.bytebuddy + + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + copy + package + + copy + + + + + + org.apache.skywalking + apm-agent + ${skywalking.version} + skywalking-agent.jar + ${agent.dir} + + + + + org.apache.dolphinscheduler + dolphinscheduler-skywalking + ${project.version} + ${agent-plugins.dir} + + + org.apache.skywalking + apm-zookeeper-3.4.x-plugin + ${skywalking.version} + ${agent-plugins.dir} + + + org.apache.skywalking + apm-quartz-scheduler-2.x-plugin + ${skywalking.version} + ${agent-plugins.dir} + + + org.apache.skywalking + apm-httpclient-commons + ${skywalking.version} + ${agent-plugins.dir} + + + org.apache.skywalking + apm-httpclient-3.x-plugin + ${skywalking.version} + ${agent-plugins.dir} + + + org.apache.skywalking + apm-httpClient-4.x-plugin + ${skywalking.version} + ${agent-plugins.dir} + + + org.apache.skywalking + spring-commons + ${skywalking.version} + ${agent-plugins.dir} + + + org.apache.skywalking + apm-spring-core-patch + ${skywalking.version} + ${agent-plugins.dir} + + + org.apache.skywalking + apm-springmvc-annotation-commons + ${skywalking.version} + ${agent-plugins.dir} + + + org.apache.skywalking + apm-springmvc-annotation-5.x-plugin + ${skywalking.version} + ${agent-plugins.dir} + + + + + org.apache.skywalking + apm-toolkit-logback-1.x-activation + ${skywalking.version} + ${agent-activations.dir} + + + + + + + + org.apache.maven.plugins + maven-antrun-plugin + + + package + + run + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/MasterBaseTaskExecThreadConstructorInterceptor.java b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/MasterBaseTaskExecThreadConstructorInterceptor.java new file mode 100644 index 0000000000..889c562b2b --- /dev/null +++ b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/MasterBaseTaskExecThreadConstructorInterceptor.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.dolphinscheduler.skywalking.plugin; + +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.skywalking.apm.agent.core.context.ContextManager; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor; + +public class MasterBaseTaskExecThreadConstructorInterceptor implements InstanceConstructorInterceptor { + @Override + public void onConstruct(EnhancedInstance objInst, Object[] allArguments) throws Throwable { + TaskInstance taskInstance = (TaskInstance) allArguments[0]; + TaskContext taskContext = new TaskContext<>(taskInstance, ContextManager.capture()); + + objInst.setSkyWalkingDynamicField(taskContext); + } +} + diff --git a/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/MasterBaseTaskExecThreadMethodInterceptor.java b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/MasterBaseTaskExecThreadMethodInterceptor.java new file mode 100644 index 0000000000..5e8e3d64c3 --- /dev/null +++ b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/MasterBaseTaskExecThreadMethodInterceptor.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.dolphinscheduler.skywalking.plugin; + +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.server.master.runner.MasterBaseTaskExecThread; +import org.apache.skywalking.apm.agent.core.context.ContextManager; +import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult; + +import java.lang.reflect.Method; + +import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_TASK_STATE; +import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_TASK_TYPE; +import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_TASK_INSTANCE_ID; +import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_PROCESS_DEFINITION_ID; +import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_PROCESS_INSTANCE_ID; +import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_TASK_INSTANCE_NAME; +import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_TASK_WORKER_GROUP; +import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_EXECUTE_METHOD; +import static org.apache.dolphinscheduler.skywalking.plugin.Utils.getProcessDefinitionId; + +public class MasterBaseTaskExecThreadMethodInterceptor implements InstanceMethodsAroundInterceptor { + + @Override + public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, MethodInterceptResult result) throws Throwable { + TaskContext taskContext = (TaskContext) objInst.getSkyWalkingDynamicField(); + TaskInstance taskInstance = taskContext.getCache(); + String operationName = getOperationNamePrefix(taskInstance) + taskInstance.getName(); + + AbstractSpan span = ContextManager.createLocalSpan(operationName); + span.setComponent(Utils.DOLPHIN_SCHEDULER); + TAG_PROCESS_DEFINITION_ID.set(span, String.valueOf(taskInstance.getProcessDefinitionId())); + TAG_PROCESS_INSTANCE_ID.set(span, String.valueOf(taskInstance.getProcessInstanceId())); + TAG_TASK_TYPE.set(span, taskInstance.getTaskType()); + TAG_TASK_INSTANCE_ID.set(span, String.valueOf(taskInstance.getId())); + TAG_TASK_INSTANCE_NAME.set(span, taskInstance.getName()); + TAG_TASK_WORKER_GROUP.set(span, taskInstance.getWorkerGroup()); + TAG_EXECUTE_METHOD.set(span, Utils.getMethodName(method)); + + ContextManager.continued(taskContext.getContextSnapshot()); + } + + @Override + public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, Object ret) throws Throwable { + AbstractSpan span = ContextManager.activeSpan(); + + MasterBaseTaskExecThread original = (MasterBaseTaskExecThread) objInst; + TaskInstance taskInstance = original.getTaskInstance(); + ExecutionStatus executionStatus = taskInstance.getState(); + + TAG_TASK_STATE.set(span, taskInstance.getState().getDescp()); + if (!ExecutionStatus.SUCCESS.equals(executionStatus)) { + span.errorOccurred(); + } + + ContextManager.stopSpan(); + return ret; + } + + @Override + public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, Throwable t) { + ContextManager.activeSpan().log(t); + } + + private static String getOperationNamePrefix(TaskInstance taskInstance) { + String prefix = ""; + if (taskInstance.isSubProcess()) { + prefix = "master/subprocess_task/"; + } else if (taskInstance.isDependTask()) { + prefix = "master/depend_task/"; + } else if (taskInstance.isConditionsTask()) { + prefix = "master/conditions_task/"; + } else { + prefix = "master/task/"; + } + + return prefix + getProcessDefinitionId(taskInstance.getProcessDefinitionId()) + "/"; + } +} + diff --git a/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/MasterExecThreadConstructorInterceptor.java b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/MasterExecThreadConstructorInterceptor.java new file mode 100644 index 0000000000..9847cc29a7 --- /dev/null +++ b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/MasterExecThreadConstructorInterceptor.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.dolphinscheduler.skywalking.plugin; + +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.skywalking.apm.agent.core.context.ContextManager; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor; + +public class MasterExecThreadConstructorInterceptor implements InstanceConstructorInterceptor { + @Override + public void onConstruct(EnhancedInstance objInst, Object[] allArguments) throws Throwable { + ProcessInstance processInstance = (ProcessInstance) allArguments[0]; + TaskContext taskContext = new TaskContext<>(processInstance, ContextManager.capture()); + + objInst.setSkyWalkingDynamicField(taskContext); + } +} diff --git a/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/MasterExecThreadMethodInterceptor.java b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/MasterExecThreadMethodInterceptor.java new file mode 100644 index 0000000000..d76cb13410 --- /dev/null +++ b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/MasterExecThreadMethodInterceptor.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.dolphinscheduler.skywalking.plugin; + +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.skywalking.apm.agent.core.context.ContextManager; +import org.apache.skywalking.apm.agent.core.context.RuntimeContext; +import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult; + +import java.lang.reflect.Method; + +import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_PROJECT_ID; +import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_PROCESS_STATE; +import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_PROCESS_INSTANCE_ID; +import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_PROCESS_INSTANCE_NAME; +import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_PROCESS_INSTANCE_HOST; +import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_PROCESS_DEFINITION_ID; +import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_PROCESS_COMMAND_TYPE; +import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_PROCESS_WORKER_GROUP; +import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_PROCESS_TIMEOUT; +import static org.apache.dolphinscheduler.skywalking.plugin.Utils.MASTER_PROCESS_EXECUTION_STATUS; +import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_EXECUTE_METHOD; +import static org.apache.dolphinscheduler.skywalking.plugin.Utils.getProjectId; + +public class MasterExecThreadMethodInterceptor implements InstanceMethodsAroundInterceptor { + private static final String OPERATION_NAME_PREFIX = "master/process/"; + + @Override + public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, MethodInterceptResult result) throws Throwable { + TaskContext taskContext = (TaskContext) objInst.getSkyWalkingDynamicField(); + ProcessInstance processInstance = taskContext.getCache(); + ProcessDefinition processDefinition = processInstance.getProcessDefinition(); + String operationName = OPERATION_NAME_PREFIX + getProjectId(processDefinition.getProjectId()) + "/" + processDefinition.getName(); + + AbstractSpan span = ContextManager.createLocalSpan(operationName); + span.setComponent(Utils.DOLPHIN_SCHEDULER); + TAG_PROJECT_ID.set(span, String.valueOf(processDefinition.getProjectId())); + TAG_PROCESS_INSTANCE_ID.set(span, String.valueOf(processInstance.getId())); + TAG_PROCESS_INSTANCE_NAME.set(span, processInstance.getName()); + TAG_PROCESS_INSTANCE_HOST.set(span, processInstance.getHost()); + TAG_PROCESS_DEFINITION_ID.set(span, String.valueOf(processInstance.getProcessDefinitionId())); + TAG_PROCESS_COMMAND_TYPE.set(span, processInstance.getCommandType().name()); + TAG_PROCESS_WORKER_GROUP.set(span, processInstance.getWorkerGroup()); + TAG_PROCESS_TIMEOUT.set(span, String.valueOf(processInstance.getTimeout())); + TAG_EXECUTE_METHOD.set(span, Utils.getMethodName(method)); + + ContextManager.continued(taskContext.getContextSnapshot()); + } + + @Override + public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, Object ret) throws Throwable { + AbstractSpan span = ContextManager.activeSpan(); + + RuntimeContext runtimeContext = ContextManager.getRuntimeContext(); + ExecutionStatus executionStatus = (ExecutionStatus) runtimeContext.get(MASTER_PROCESS_EXECUTION_STATUS); + if (executionStatus == null) { + ProcessInstance processInstance = (ProcessInstance) objInst.getSkyWalkingDynamicField(); + executionStatus = processInstance.getState(); + } + + TAG_PROCESS_STATE.set(span, executionStatus.getDescp()); + if (!ExecutionStatus.SUCCESS.equals(executionStatus)) { + span.errorOccurred(); + } + + ContextManager.stopSpan(); + return ret; + } + + @Override + public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, Throwable t) { + ContextManager.activeSpan().log(t); + } +} diff --git a/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/MasterExecThreadStateCacheInterceptor.java b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/MasterExecThreadStateCacheInterceptor.java new file mode 100644 index 0000000000..e24ff0f0ed --- /dev/null +++ b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/MasterExecThreadStateCacheInterceptor.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.dolphinscheduler.skywalking.plugin; + +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.skywalking.apm.agent.core.context.ContextManager; +import org.apache.skywalking.apm.agent.core.context.RuntimeContext; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult; + +import java.lang.reflect.Method; + +import static org.apache.dolphinscheduler.skywalking.plugin.Utils.MASTER_PROCESS_EXECUTION_STATUS; + +public class MasterExecThreadStateCacheInterceptor implements InstanceMethodsAroundInterceptor { + + @Override + public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, MethodInterceptResult result) throws Throwable { + } + + @Override + public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, Object ret) throws Throwable { + ExecutionStatus executionStatus = (ExecutionStatus) ret; + RuntimeContext runtimeContext = ContextManager.getRuntimeContext(); + runtimeContext.put(MASTER_PROCESS_EXECUTION_STATUS, executionStatus); + + return ret; + } + + @Override + public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, Throwable t) { + } +} diff --git a/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/MasterSchedulerServiceMethodInterceptor.java b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/MasterSchedulerServiceMethodInterceptor.java new file mode 100644 index 0000000000..9eeebf3199 --- /dev/null +++ b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/MasterSchedulerServiceMethodInterceptor.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.dolphinscheduler.skywalking.plugin; + +import org.apache.skywalking.apm.agent.core.context.ContextManager; +import org.apache.skywalking.apm.agent.core.context.tag.Tags; +import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult; + +import java.lang.reflect.Method; + +import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_EXECUTE_METHOD; + +public class MasterSchedulerServiceMethodInterceptor implements InstanceMethodsAroundInterceptor { + private static final String OPERATION_NAME = "master/schedule/process"; + + @Override + public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, MethodInterceptResult result) throws Throwable { + AbstractSpan span = ContextManager.createLocalSpan(OPERATION_NAME); + span.setComponent(Utils.DOLPHIN_SCHEDULER); + Tags.LOGIC_ENDPOINT.set(span, Tags.VAL_LOCAL_SPAN_AS_LOGIC_ENDPOINT); + TAG_EXECUTE_METHOD.set(span, Utils.getMethodName(method)); + } + + @Override + public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, Object ret) throws Throwable { + ContextManager.stopSpan(); + return ret; + } + + @Override + public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, Throwable t) { + ContextManager.activeSpan().log(t); + } +} diff --git a/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/NettyRemoteChannelConstructorInterceptor.java b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/NettyRemoteChannelConstructorInterceptor.java new file mode 100644 index 0000000000..33a4e1afab --- /dev/null +++ b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/NettyRemoteChannelConstructorInterceptor.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.dolphinscheduler.skywalking.plugin; + +import io.netty.channel.Channel; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor; + +public class NettyRemoteChannelConstructorInterceptor implements InstanceConstructorInterceptor { + @Override + public void onConstruct(EnhancedInstance objInst, Object[] allArguments) throws Throwable { + Channel channel = (Channel) allArguments[0]; + objInst.setSkyWalkingDynamicField(channel.remoteAddress().toString()); + } +} + diff --git a/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/NettyRemoteChannelMethodInterceptor.java b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/NettyRemoteChannelMethodInterceptor.java new file mode 100644 index 0000000000..710895fc4b --- /dev/null +++ b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/NettyRemoteChannelMethodInterceptor.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.dolphinscheduler.skywalking.plugin; + +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.command.CommandContext; +import org.apache.skywalking.apm.agent.core.context.CarrierItem; +import org.apache.skywalking.apm.agent.core.context.ContextCarrier; +import org.apache.skywalking.apm.agent.core.context.ContextManager; +import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan; +import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult; + +import java.lang.reflect.Method; + +import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_EXECUTE_METHOD; + +public class NettyRemoteChannelMethodInterceptor implements InstanceMethodsAroundInterceptor { + private static final String OPERATION_NAME_PREFIX = "rpc/command/send/"; + + @Override + public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, MethodInterceptResult result) throws Throwable { + String remoteAddress = (String) objInst.getSkyWalkingDynamicField(); + Command command = (Command) allArguments[0]; + CommandContext commandContext = command.getContext(); + String operationName = OPERATION_NAME_PREFIX + command.getType().name(); + + ContextCarrier contextCarrier = new ContextCarrier(); + AbstractSpan span = ContextManager.createExitSpan(operationName, contextCarrier, remoteAddress); + span.setComponent(Utils.DOLPHIN_SCHEDULER); + SpanLayer.asRPCFramework(span); + TAG_EXECUTE_METHOD.set(span, Utils.getMethodName(method)); + + CarrierItem item = contextCarrier.items(); + while (item.hasNext()) { + item = item.next(); + commandContext.put(item.getHeadKey(), item.getHeadValue()); + } + } + + @Override + public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, Object ret) throws Throwable { + ContextManager.stopSpan(); + return ret; + } + + @Override + public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, Throwable t) { + ContextManager.activeSpan().log(t); + } +} diff --git a/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/NettyRemotingClientMethodInterceptor.java b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/NettyRemotingClientMethodInterceptor.java new file mode 100644 index 0000000000..325d5e1097 --- /dev/null +++ b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/NettyRemotingClientMethodInterceptor.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.dolphinscheduler.skywalking.plugin; + +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.command.CommandContext; +import org.apache.dolphinscheduler.remote.utils.Host; +import org.apache.skywalking.apm.agent.core.context.CarrierItem; +import org.apache.skywalking.apm.agent.core.context.ContextCarrier; +import org.apache.skywalking.apm.agent.core.context.ContextManager; +import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan; +import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult; + +import java.lang.reflect.Method; + +import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_EXECUTE_METHOD; + +public class NettyRemotingClientMethodInterceptor implements InstanceMethodsAroundInterceptor { + private static final String OPERATION_NAME_PREFIX = "rpc/command/send/"; + + @Override + public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, MethodInterceptResult result) throws Throwable { + Host host = (Host) allArguments[0]; + Command command = (Command) allArguments[1]; + CommandContext commandContext = command.getContext(); + String operationName = OPERATION_NAME_PREFIX + command.getType().name(); + + ContextCarrier contextCarrier = new ContextCarrier(); + AbstractSpan span = ContextManager.createExitSpan(operationName, contextCarrier, host.getAddress()); + span.setComponent(Utils.DOLPHIN_SCHEDULER); + SpanLayer.asRPCFramework(span); + TAG_EXECUTE_METHOD.set(span, Utils.getMethodName(method)); + + CarrierItem item = contextCarrier.items(); + while (item.hasNext()) { + item = item.next(); + commandContext.put(item.getHeadKey(), item.getHeadValue()); + } + } + + @Override + public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, Object ret) throws Throwable { + ContextManager.stopSpan(); + return ret; + } + + @Override + public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, Throwable t) { + ContextManager.activeSpan().log(t); + } +} diff --git a/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/NettyRequestProcessorMethodInterceptor.java b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/NettyRequestProcessorMethodInterceptor.java new file mode 100644 index 0000000000..764b49905d --- /dev/null +++ b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/NettyRequestProcessorMethodInterceptor.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.dolphinscheduler.skywalking.plugin; + +import io.netty.channel.Channel; +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.command.CommandContext; +import org.apache.skywalking.apm.agent.core.context.CarrierItem; +import org.apache.skywalking.apm.agent.core.context.ContextCarrier; +import org.apache.skywalking.apm.agent.core.context.ContextManager; +import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan; +import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult; + +import java.lang.reflect.Method; + +import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_NETTY_REMOTE_ADDRESS; +import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_EXECUTE_METHOD; + +public class NettyRequestProcessorMethodInterceptor implements InstanceMethodsAroundInterceptor { + private static final String OPERATION_NAME_PREFIX = "rpc/command/process/"; + + @Override + public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, MethodInterceptResult result) throws Throwable { + Channel channel = (Channel) allArguments[0]; + Command command = (Command) allArguments[1]; + CommandContext commandContext = command.getContext(); + String operationName = OPERATION_NAME_PREFIX + command.getType().name(); + + ContextCarrier contextCarrier = new ContextCarrier(); + CarrierItem item = contextCarrier.items(); + while (item.hasNext()) { + item = item.next(); + item.setHeadValue(commandContext.get(item.getHeadKey())); + } + + AbstractSpan span = ContextManager.createEntrySpan(operationName, contextCarrier); + span.setComponent(Utils.DOLPHIN_SCHEDULER); + SpanLayer.asRPCFramework(span); + TAG_NETTY_REMOTE_ADDRESS.set(span, channel.remoteAddress().toString()); + TAG_EXECUTE_METHOD.set(span, Utils.getMethodName(method)); + } + + @Override + public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, Object ret) throws Throwable { + ContextManager.stopSpan(); + return ret; + } + + @Override + public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, Throwable t) { + ContextManager.activeSpan().log(t); + } +} diff --git a/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskContext.java b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskContext.java new file mode 100644 index 0000000000..2aa929e933 --- /dev/null +++ b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskContext.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.dolphinscheduler.skywalking.plugin; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import org.apache.skywalking.apm.agent.core.context.ContextSnapshot; + +@Getter +@AllArgsConstructor +public class TaskContext { + private T cache; + private ContextSnapshot contextSnapshot; +} diff --git a/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskExecuteConstructorInterceptor.java b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskExecuteConstructorInterceptor.java new file mode 100644 index 0000000000..16ddc85a0f --- /dev/null +++ b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskExecuteConstructorInterceptor.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.dolphinscheduler.skywalking.plugin; + +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; +import org.apache.skywalking.apm.agent.core.context.ContextManager; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor; + +public class TaskExecuteConstructorInterceptor implements InstanceConstructorInterceptor { + @Override + public void onConstruct(EnhancedInstance objInst, Object[] allArguments) throws Throwable { + TaskExecutionContext taskExecutionContext = (TaskExecutionContext) allArguments[0]; + TaskContext taskContext = new TaskContext<>(taskExecutionContext, ContextManager.capture()); + + objInst.setSkyWalkingDynamicField(taskContext); + } +} + diff --git a/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskExecuteMethodInterceptor.java b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskExecuteMethodInterceptor.java new file mode 100644 index 0000000000..99a8ce22d1 --- /dev/null +++ b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskExecuteMethodInterceptor.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.dolphinscheduler.skywalking.plugin; + +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.worker.task.AbstractTask; +import org.apache.skywalking.apm.agent.core.context.ContextManager; +import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult; + +import java.lang.reflect.Method; + +import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_TASK_PARAMS; +import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_TASK_STATE; +import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_EXECUTE_METHOD; + +public class TaskExecuteMethodInterceptor implements InstanceMethodsAroundInterceptor { + private static final int DEFAULT_TASK_STATUS_CODE = -1; + private static final int TASK_PARAMS_MAX_LENGTH = 2048; + private static final String OPERATION_NAME_PREFIX = "worker/"; + + @Override + public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, MethodInterceptResult result) throws Throwable { + TaskContext taskContext = (TaskContext) objInst.getSkyWalkingDynamicField(); + String operationName = OPERATION_NAME_PREFIX + taskContext.getCache().getTaskType() + "/" + method.getName(); + + AbstractSpan span = ContextManager.createLocalSpan(operationName); + span.setComponent(Utils.DOLPHIN_SCHEDULER); + TAG_EXECUTE_METHOD.set(span, Utils.getMethodName(method)); + } + + @Override + public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, Object ret) throws Throwable { + AbstractSpan span = ContextManager.activeSpan(); + + AbstractTask task = (AbstractTask) objInst; + int statusCode = task.getExitStatusCode(); + ExecutionStatus status = task.getExitStatus(); + TAG_TASK_STATE.set(span, status.getDescp()); + if (statusCode != DEFAULT_TASK_STATUS_CODE && ExecutionStatus.FAILURE.equals(status)) { + logTaskParams(objInst); + span.errorOccurred(); + } + + ContextManager.stopSpan(); + return ret; + } + + @Override + public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, Throwable t) { + logTaskParams(objInst); + + ContextManager.activeSpan().log(t); + } + + private void logTaskParams(EnhancedInstance objInst) { + TaskContext taskContext = (TaskContext) objInst.getSkyWalkingDynamicField(); + TaskExecutionContext executionContext = taskContext.getCache(); + String taskParams = executionContext.getTaskParams(); + + String limitTaskParams = taskParams; + if (taskParams.length() > TASK_PARAMS_MAX_LENGTH) { + limitTaskParams = taskParams.substring(TASK_PARAMS_MAX_LENGTH); + } + + AbstractSpan span = ContextManager.activeSpan(); + TAG_TASK_PARAMS.set(span, limitTaskParams); + } +} diff --git a/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskExecuteThreadConstructorInterceptor.java b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskExecuteThreadConstructorInterceptor.java new file mode 100644 index 0000000000..d6781f8185 --- /dev/null +++ b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskExecuteThreadConstructorInterceptor.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.dolphinscheduler.skywalking.plugin; + +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; +import org.apache.skywalking.apm.agent.core.context.ContextManager; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor; + +public class TaskExecuteThreadConstructorInterceptor implements InstanceConstructorInterceptor { + @Override + public void onConstruct(EnhancedInstance objInst, Object[] allArguments) throws Throwable { + TaskExecutionContext taskExecutionContext = (TaskExecutionContext) allArguments[0]; + TaskContext taskContext = new TaskContext<>(taskExecutionContext, ContextManager.capture()); + + objInst.setSkyWalkingDynamicField(taskContext); + } +} + diff --git a/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskExecuteThreadMethodInterceptor.java b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskExecuteThreadMethodInterceptor.java new file mode 100644 index 0000000000..e263ed4e63 --- /dev/null +++ b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskExecuteThreadMethodInterceptor.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.dolphinscheduler.skywalking.plugin; + +import org.apache.dolphinscheduler.common.enums.TaskType; +import org.apache.dolphinscheduler.common.utils.EnumUtils; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; +import org.apache.skywalking.apm.agent.core.context.ContextManager; +import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult; + +import java.lang.reflect.Method; + +import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_TASK_INSTANCE_ID; +import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_TASK_INSTANCE_HOST; +import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_TASK_EXECUTE_PATH; +import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_TASK_LOG_PATH; +import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_PROJECT_ID; +import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_PROCESS_DEFINITION_ID; +import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_PROCESS_INSTANCE_ID; +import static org.apache.dolphinscheduler.skywalking.plugin.Utils.getProjectId; +import static org.apache.dolphinscheduler.skywalking.plugin.Utils.getProcessDefinitionId; +import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_EXECUTE_METHOD; + +public class TaskExecuteThreadMethodInterceptor implements InstanceMethodsAroundInterceptor { + private static final String OPERATION_NAME_PREFIX = "worker/execute/"; + + @Override + public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, MethodInterceptResult result) throws Throwable { + TaskContext taskContext = (TaskContext) objInst.getSkyWalkingDynamicField(); + TaskExecutionContext executionContext = taskContext.getCache(); + TaskType type = EnumUtils.getEnum(TaskType.class, executionContext.getTaskType()); + String operationName = OPERATION_NAME_PREFIX + + type.getDescp() + + "/" + getProjectId(executionContext.getProjectId()) + + "/" + getProcessDefinitionId(executionContext.getProcessDefineId()) + + "/" + executionContext.getTaskName(); + + AbstractSpan span = ContextManager.createLocalSpan(operationName); + span.setComponent(Utils.DOLPHIN_SCHEDULER); + + TAG_PROJECT_ID.set(span, String.valueOf(executionContext.getProjectId())); + TAG_PROCESS_DEFINITION_ID.set(span, String.valueOf(executionContext.getProcessDefineId())); + TAG_PROCESS_INSTANCE_ID.set(span, String.valueOf(executionContext.getProcessInstanceId())); + TAG_TASK_INSTANCE_HOST.set(span, executionContext.getHost()); + TAG_TASK_INSTANCE_ID.set(span, String.valueOf(executionContext.getTaskInstanceId())); + TAG_TASK_EXECUTE_PATH.set(span, executionContext.getExecutePath()); + TAG_TASK_LOG_PATH.set(span, executionContext.getLogPath()); + TAG_EXECUTE_METHOD.set(span, Utils.getMethodName(method)); + + ContextManager.continued(taskContext.getContextSnapshot()); + } + + @Override + public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, Object ret) throws Throwable { + ContextManager.stopSpan(); + return ret; + } + + @Override + public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, Throwable t) { + ContextManager.activeSpan().log(t); + } +} + diff --git a/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskPriorityQueueConsumerMethodInterceptor.java b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskPriorityQueueConsumerMethodInterceptor.java new file mode 100644 index 0000000000..20748745c2 --- /dev/null +++ b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskPriorityQueueConsumerMethodInterceptor.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.dolphinscheduler.skywalking.plugin; + +import org.apache.dolphinscheduler.service.queue.TaskPriority; +import org.apache.skywalking.apm.agent.core.context.ContextManager; +import org.apache.skywalking.apm.agent.core.context.ContextSnapshot; +import org.apache.skywalking.apm.agent.core.context.tag.Tags; +import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult; + +import java.lang.reflect.Method; +import java.util.Map; + +import static org.apache.dolphinscheduler.skywalking.plugin.Utils.*; + +public class TaskPriorityQueueConsumerMethodInterceptor implements InstanceMethodsAroundInterceptor { + private static final String OPERATION_NAME = "master/queue/take"; + + @Override + public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, MethodInterceptResult result) throws Throwable { + TaskPriority taskPriority = (TaskPriority) allArguments[0]; + Map taskContext = taskPriority.getContext(); + + AbstractSpan span = ContextManager.createLocalSpan(OPERATION_NAME); + span.setComponent(Utils.DOLPHIN_SCHEDULER); + TAG_TASK_ID.set(span, String.valueOf(taskPriority.getTaskId())); + TAG_PROCESS_INSTANCE_ID.set(span, String.valueOf(taskPriority.getProcessInstanceId())); + Tags.LOGIC_ENDPOINT.set(span, Tags.VAL_LOCAL_SPAN_AS_LOGIC_ENDPOINT); + TAG_EXECUTE_METHOD.set(span, Utils.getMethodName(method)); + + ContextSnapshot contextSnapshot = (ContextSnapshot) taskContext.get(SKYWALKING_TRACING_CONTEXT); + ContextManager.continued(contextSnapshot); + } + + @Override + public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, Object ret) throws Throwable { + AbstractSpan span = ContextManager.activeSpan(); + + boolean handleResult = (boolean) ret; + if (!handleResult) { + span.errorOccurred(); + } + + ContextManager.stopSpan(); + return ret; + } + + @Override + public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, Throwable t) { + ContextManager.activeSpan().log(t); + } +} diff --git a/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskPriorityQueueImplMethodInterceptor.java b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskPriorityQueueImplMethodInterceptor.java new file mode 100644 index 0000000000..10ab60f4f5 --- /dev/null +++ b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskPriorityQueueImplMethodInterceptor.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.dolphinscheduler.skywalking.plugin; + +import org.apache.dolphinscheduler.service.queue.TaskPriority; +import org.apache.skywalking.apm.agent.core.context.ContextManager; +import org.apache.skywalking.apm.agent.core.context.tag.Tags; +import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult; + +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_TASK_ID; +import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_PROCESS_INSTANCE_ID; +import static org.apache.dolphinscheduler.skywalking.plugin.Utils.SKYWALKING_TRACING_CONTEXT; +import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_EXECUTE_METHOD; + +public class TaskPriorityQueueImplMethodInterceptor implements InstanceMethodsAroundInterceptor { + private static final String OPERATION_NAME = "master/queue/put"; + + @Override + public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, MethodInterceptResult result) throws Throwable { + boolean isActive = ContextManager.isActive(); + TaskPriority taskPriority = (TaskPriority) allArguments[0]; + + AbstractSpan span = ContextManager.createLocalSpan(OPERATION_NAME); + span.setComponent(Utils.DOLPHIN_SCHEDULER); + TAG_TASK_ID.set(span, String.valueOf(taskPriority.getTaskId())); + TAG_PROCESS_INSTANCE_ID.set(span, String.valueOf(taskPriority.getProcessInstanceId())); + Tags.LOGIC_ENDPOINT.set(span, Tags.VAL_LOCAL_SPAN_AS_LOGIC_ENDPOINT); + TAG_EXECUTE_METHOD.set(span, Utils.getMethodName(method)); + + if (isActive) { + Map taskPriorityContext = taskPriority.getContext(); + if (taskPriorityContext == null) { + taskPriorityContext = new HashMap<>(); + taskPriority.setContext(taskPriorityContext); + } + taskPriorityContext.put(SKYWALKING_TRACING_CONTEXT, ContextManager.capture()); + } + } + + @Override + public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, Object ret) throws Throwable { + ContextManager.stopSpan(); + return ret; + } + + @Override + public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, Throwable t) { + ContextManager.activeSpan().log(t); + } +} diff --git a/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/Utils.java b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/Utils.java new file mode 100644 index 0000000000..2dc6bb84bb --- /dev/null +++ b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/Utils.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.dolphinscheduler.skywalking.plugin; + +import org.apache.skywalking.apm.agent.core.context.tag.StringTag; +import org.apache.skywalking.apm.network.trace.component.ComponentsDefine; +import org.apache.skywalking.apm.network.trace.component.OfficialComponent; + +import java.lang.reflect.Method; + +public class Utils { + public static final OfficialComponent DOLPHIN_SCHEDULER = ComponentsDefine.DOLPHIN_SCHEDULER; + + public static final String SKYWALKING_TRACING_CONTEXT = "skywalking_tracing_context"; + public static final String MASTER_PROCESS_EXECUTION_STATUS = "master_process_execution_status"; + + public static final StringTag TAG_PROJECT_ID = new StringTag("project.id"); + + public static final StringTag TAG_PROCESS_STATE = new StringTag("process.state"); + public static final StringTag TAG_PROCESS_INSTANCE_ID = new StringTag("process.instance.id"); + public static final StringTag TAG_PROCESS_INSTANCE_NAME = new StringTag("process.instance.name"); + public static final StringTag TAG_PROCESS_INSTANCE_HOST = new StringTag("process.instance.host"); + public static final StringTag TAG_PROCESS_DEFINITION_ID = new StringTag("process.definitionId"); + public static final StringTag TAG_PROCESS_COMMAND_TYPE = new StringTag("process.commandType"); + public static final StringTag TAG_PROCESS_WORKER_GROUP = new StringTag("process.workerGroup"); + public static final StringTag TAG_PROCESS_TIMEOUT = new StringTag("process.timeout"); + + public static final StringTag TAG_TASK_STATE = new StringTag("task.state"); + public static final StringTag TAG_TASK_TYPE = new StringTag("take.type"); + public static final StringTag TAG_TASK_INSTANCE_ID = new StringTag("task.instance.id"); + public static final StringTag TAG_TASK_INSTANCE_NAME = new StringTag("task.instanceName"); + public static final StringTag TAG_TASK_INSTANCE_HOST = new StringTag("task.instance.host"); + public static final StringTag TAG_TASK_WORKER_GROUP = new StringTag("task.workerGroup"); + public static final StringTag TAG_TASK_ID = new StringTag("task.id"); + public static final StringTag TAG_TASK_EXECUTE_PATH = new StringTag("task.executePath"); + public static final StringTag TAG_TASK_LOG_PATH = new StringTag("task.logPath"); + public static final StringTag TAG_TASK_PARAMS = new StringTag("task.params"); + + public static final StringTag TAG_NETTY_REMOTE_ADDRESS = new StringTag("netty.remoteAddress"); + + public static final StringTag TAG_EXECUTE_METHOD = new StringTag("execute.method"); + + public static String getProjectId(int id) { + return "project_id_" + id; + } + + public static String getProcessDefinitionId(int id) { + return "process_definition_id_" + id; + } + + public static String getMethodName(Method method) { + return method.getDeclaringClass().getTypeName() + "#" + method.getName(); + } +} diff --git a/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/MasterBaseTaskExecThreadInterceptorInstrumentation.java b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/MasterBaseTaskExecThreadInterceptorInstrumentation.java new file mode 100644 index 0000000000..c9323817d5 --- /dev/null +++ b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/MasterBaseTaskExecThreadInterceptorInstrumentation.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.dolphinscheduler.skywalking.plugin.define; + +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine; +import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; +import static net.bytebuddy.matcher.ElementMatchers.isPublic; +import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName; + +/** + * Enhance {@link org.apache.dolphinscheduler.server.master.runner.MasterBaseTaskExecThread} instance and intercept `call` methods, + * this class provides an isolated thread for running job. + * + * @see org.apache.dolphinscheduler.skywalking.plugin.MasterBaseTaskExecThreadConstructorInterceptor + * @see org.apache.dolphinscheduler.skywalking.plugin.MasterBaseTaskExecThreadMethodInterceptor + */ +public class MasterBaseTaskExecThreadInterceptorInstrumentation extends ClassInstanceMethodsEnhancePluginDefine { + private static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.dolphinscheduler.skywalking.plugin.MasterBaseTaskExecThreadConstructorInterceptor"; + private static final String METHOD_INTERCEPTOR_CLASS = "org.apache.dolphinscheduler.skywalking.plugin.MasterBaseTaskExecThreadMethodInterceptor"; + private static final String ENHANC_CLASS = "org.apache.dolphinscheduler.server.master.runner.MasterBaseTaskExecThread"; + + @Override + protected ClassMatch enhanceClass() { + return byName(ENHANC_CLASS); + } + + @Override + public ConstructorInterceptPoint[] getConstructorsInterceptPoints() { + return new ConstructorInterceptPoint[] { + new ConstructorInterceptPoint() { + @Override + public ElementMatcher getConstructorMatcher() { + return takesArguments(1) + .and(takesArgument(0, named("org.apache.dolphinscheduler.dao.entity.TaskInstance"))); + } + + @Override + public String getConstructorInterceptor() { + return CONSTRUCTOR_INTERCEPTOR_CLASS; + } + } + }; + } + + @Override + public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { + return new InstanceMethodsInterceptPoint[] { + new InstanceMethodsInterceptPoint() { + @Override + public ElementMatcher getMethodsMatcher() { + return named("call") + .and(isPublic()) + .and(takesArguments(0)); + } + + @Override + public String getMethodsInterceptor() { + return METHOD_INTERCEPTOR_CLASS; + } + + @Override + public boolean isOverrideArgs() { + return false; + } + } + }; + } +} diff --git a/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/MasterExecThreadInterceptorInstrumentation.java b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/MasterExecThreadInterceptorInstrumentation.java new file mode 100644 index 0000000000..e786b052ec --- /dev/null +++ b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/MasterExecThreadInterceptorInstrumentation.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.dolphinscheduler.skywalking.plugin.define; + +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine; +import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; +import static net.bytebuddy.matcher.ElementMatchers.isPublic; +import static net.bytebuddy.matcher.ElementMatchers.returns; +import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName; + +/** + * Enhance {@link org.apache.dolphinscheduler.server.master.runner.MasterExecThread} instance and intercept `run` and `getProcessInstanceState` methods, + * the `run` method is a unified entrance of scheduled job. + * + * @see org.apache.dolphinscheduler.skywalking.plugin.MasterExecThreadConstructorInterceptor + * @see org.apache.dolphinscheduler.skywalking.plugin.MasterExecThreadMethodInterceptor + * @see org.apache.dolphinscheduler.skywalking.plugin.MasterExecThreadStateCacheInterceptor + */ +public class MasterExecThreadInterceptorInstrumentation extends ClassInstanceMethodsEnhancePluginDefine { + private static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.dolphinscheduler.skywalking.plugin.MasterExecThreadConstructorInterceptor"; + private static final String EXEC_PROCESS_METHOD_INTERCEPTOR_CLASS = "org.apache.dolphinscheduler.skywalking.plugin.MasterExecThreadMethodInterceptor"; + private static final String CACHE_STATE_METHOD_INTERCEPTOR_CLASS = "org.apache.dolphinscheduler.skywalking.plugin.MasterExecThreadStateCacheInterceptor"; + private static final String ENHANC_CLASS = "org.apache.dolphinscheduler.server.master.runner.MasterExecThread"; + + @Override + protected ClassMatch enhanceClass() { + return byName(ENHANC_CLASS); + } + + @Override + public ConstructorInterceptPoint[] getConstructorsInterceptPoints() { + return new ConstructorInterceptPoint[] { + new ConstructorInterceptPoint() { + @Override + public ElementMatcher getConstructorMatcher() { + return takesArgument(0, named("org.apache.dolphinscheduler.dao.entity.ProcessInstance")); + } + + @Override + public String getConstructorInterceptor() { + return CONSTRUCTOR_INTERCEPTOR_CLASS; + } + } + }; + } + + @Override + public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { + return new InstanceMethodsInterceptPoint[] { + new InstanceMethodsInterceptPoint() { + @Override + public ElementMatcher getMethodsMatcher() { + return named("run") + .and(isPublic()) + .and(takesArguments(0)); + } + + @Override + public String getMethodsInterceptor() { + return EXEC_PROCESS_METHOD_INTERCEPTOR_CLASS; + } + + @Override + public boolean isOverrideArgs() { + return false; + } + }, + new InstanceMethodsInterceptPoint() { + @Override + public ElementMatcher getMethodsMatcher() { + return named("getProcessInstanceState") + .and(returns(named("org.apache.dolphinscheduler.common.enums.ExecutionStatus"))); + } + + @Override + public String getMethodsInterceptor() { + return CACHE_STATE_METHOD_INTERCEPTOR_CLASS; + } + + @Override + public boolean isOverrideArgs() { + return false; + } + } + }; + } +} diff --git a/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/MasterSchedulerServiceInterceptorInstrumentation.java b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/MasterSchedulerServiceInterceptorInstrumentation.java new file mode 100644 index 0000000000..db1e498809 --- /dev/null +++ b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/MasterSchedulerServiceInterceptorInstrumentation.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.dolphinscheduler.skywalking.plugin.define; + +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine; +import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName; + +/** + * Enhance {@link org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService} instance and intercept `scheduleProcess` method, + * this method is a scheduler entrance of job. + * + * @see org.apache.dolphinscheduler.skywalking.plugin.MasterSchedulerServiceMethodInterceptor + */ +public class MasterSchedulerServiceInterceptorInstrumentation extends ClassInstanceMethodsEnhancePluginDefine { + private static final String METHOD_INTERCEPTOR_CLASS = "org.apache.dolphinscheduler.skywalking.plugin.MasterSchedulerServiceMethodInterceptor"; + private static final String ENHANC_CLASS = "org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService"; + + @Override + protected ClassMatch enhanceClass() { + return byName(ENHANC_CLASS); + } + + @Override + public ConstructorInterceptPoint[] getConstructorsInterceptPoints() { + return new ConstructorInterceptPoint[0]; + } + + @Override + public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { + return new InstanceMethodsInterceptPoint[] { + new InstanceMethodsInterceptPoint() { + @Override + public ElementMatcher getMethodsMatcher() { + return named("scheduleProcess"); + } + + @Override + public String getMethodsInterceptor() { + return METHOD_INTERCEPTOR_CLASS; + } + + @Override + public boolean isOverrideArgs() { + return false; + } + } + }; + } +} diff --git a/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/NettyRemoteChannelInterceptorInstrumentation.java b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/NettyRemoteChannelInterceptorInstrumentation.java new file mode 100644 index 0000000000..38e7943a69 --- /dev/null +++ b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/NettyRemoteChannelInterceptorInstrumentation.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.dolphinscheduler.skywalking.plugin.define; + +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine; +import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch; + +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName; + +/** + * Enhance {@link org.apache.dolphinscheduler.server.worker.processor.NettyRemoteChannel} instance and intercept `writeAndFlush` method, + * this method send message to remote server. + * + * @see org.apache.dolphinscheduler.skywalking.plugin.NettyRemoteChannelConstructorInterceptor + * @see org.apache.dolphinscheduler.skywalking.plugin.NettyRemoteChannelMethodInterceptor + */ +public class NettyRemoteChannelInterceptorInstrumentation extends ClassInstanceMethodsEnhancePluginDefine { + private static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.dolphinscheduler.skywalking.plugin.NettyRemoteChannelConstructorInterceptor"; + private static final String METHOD_INTERCEPTOR_CLASS = "org.apache.dolphinscheduler.skywalking.plugin.NettyRemoteChannelMethodInterceptor"; + private static final String ENHANC_CLASS = "org.apache.dolphinscheduler.server.worker.processor.NettyRemoteChannel"; + + @Override + protected ClassMatch enhanceClass() { + return byName(ENHANC_CLASS); + } + + @Override + public ConstructorInterceptPoint[] getConstructorsInterceptPoints() { + return new ConstructorInterceptPoint[] { + new ConstructorInterceptPoint() { + @Override + public ElementMatcher getConstructorMatcher() { + return takesArgument(0, named("io.netty.channel.Channel")); + } + + @Override + public String getConstructorInterceptor() { + return CONSTRUCTOR_INTERCEPTOR_CLASS; + } + } + }; + } + + @Override + public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { + return new InstanceMethodsInterceptPoint[] { + new InstanceMethodsInterceptPoint() { + @Override + public ElementMatcher getMethodsMatcher() { + return named("writeAndFlush") + .and(takesArguments(1)) + .and(takesArgument(0, named("org.apache.dolphinscheduler.remote.command.Command"))); + } + + @Override + public String getMethodsInterceptor() { + return METHOD_INTERCEPTOR_CLASS; + } + + @Override + public boolean isOverrideArgs() { + return false; + } + } + }; + } +} diff --git a/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/NettyRemotingClientInterceptorInstrumentation.java b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/NettyRemotingClientInterceptorInstrumentation.java new file mode 100644 index 0000000000..4ddba81d0a --- /dev/null +++ b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/NettyRemotingClientInterceptorInstrumentation.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.dolphinscheduler.skywalking.plugin.define; + +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine; +import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch; + +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName; + +/** + * Enhance {@link org.apache.dolphinscheduler.remote.NettyRemotingClient} instance and intercept `send` method, + * this method send message to remote server. + * + * @see org.apache.dolphinscheduler.skywalking.plugin.NettyRemotingClientMethodInterceptor + */ +public class NettyRemotingClientInterceptorInstrumentation extends ClassInstanceMethodsEnhancePluginDefine { + private static final String METHOD_INTERCEPTOR_CLASS = "org.apache.dolphinscheduler.skywalking.plugin.NettyRemotingClientMethodInterceptor"; + private static final String ENHANC_CLASS = "org.apache.dolphinscheduler.remote.NettyRemotingClient"; + + @Override + protected ClassMatch enhanceClass() { + return byName(ENHANC_CLASS); + } + + @Override + public ConstructorInterceptPoint[] getConstructorsInterceptPoints() { + return new ConstructorInterceptPoint[0]; + } + + @Override + public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { + return new InstanceMethodsInterceptPoint[] { + new InstanceMethodsInterceptPoint() { + @Override + public ElementMatcher getMethodsMatcher() { + return named("send") + .and(takesArguments(2)) + .and(takesArgument(0, named("org.apache.dolphinscheduler.remote.utils.Host"))) + .and(takesArgument(1, named("org.apache.dolphinscheduler.remote.command.Command"))); + } + + @Override + public String getMethodsInterceptor() { + return METHOD_INTERCEPTOR_CLASS; + } + + @Override + public boolean isOverrideArgs() { + return false; + } + } + }; + } +} diff --git a/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/NettyRequestProcessorInterceptorInstrumentation.java b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/NettyRequestProcessorInterceptorInstrumentation.java new file mode 100644 index 0000000000..bf3d5f7e6a --- /dev/null +++ b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/NettyRequestProcessorInterceptorInstrumentation.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.dolphinscheduler.skywalking.plugin.define; + +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine; +import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch; +import org.apache.skywalking.apm.agent.core.plugin.match.HierarchyMatch; + +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; +import static net.bytebuddy.matcher.ElementMatchers.named; + +/** + * Enhance {@link org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor} instance and intercept `process` method, + * this method receive and handle message from remote server. + * + * @see org.apache.dolphinscheduler.skywalking.plugin.NettyRequestProcessorMethodInterceptor + */ +public class NettyRequestProcessorInterceptorInstrumentation extends ClassInstanceMethodsEnhancePluginDefine { + private static final String METHOD_INTERCEPTOR_CLASS = "org.apache.dolphinscheduler.skywalking.plugin.NettyRequestProcessorMethodInterceptor"; + private static final String ENHANC_CLASS = "org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor"; + + @Override + protected ClassMatch enhanceClass() { + return HierarchyMatch.byHierarchyMatch(ENHANC_CLASS); + } + + @Override + public ConstructorInterceptPoint[] getConstructorsInterceptPoints() { + return new ConstructorInterceptPoint[0]; + } + + @Override + public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { + return new InstanceMethodsInterceptPoint[] { + new InstanceMethodsInterceptPoint() { + @Override + public ElementMatcher getMethodsMatcher() { + return named("process") + .and(takesArguments(2)) + .and(takesArgument(0, named("io.netty.channel.Channel"))) + .and(takesArgument(1, named("org.apache.dolphinscheduler.remote.command.Command"))); + } + + @Override + public String getMethodsInterceptor() { + return METHOD_INTERCEPTOR_CLASS; + } + + @Override + public boolean isOverrideArgs() { + return false; + } + } + }; + } +} diff --git a/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/TaskExecuteInterceptorInstrumentation.java b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/TaskExecuteInterceptorInstrumentation.java new file mode 100644 index 0000000000..92218fe7f5 --- /dev/null +++ b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/TaskExecuteInterceptorInstrumentation.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.dolphinscheduler.skywalking.plugin.define; + +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine; +import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch; +import org.apache.skywalking.apm.agent.core.plugin.match.HierarchyMatch; + +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.isPublic; + +/** + * Enhance {@link org.apache.dolphinscheduler.server.worker.task.AbstractTask} instance and intercept `init`、`handle` and `after` methods, + * the implementation class execute task through these methods. + * + * @see org.apache.dolphinscheduler.skywalking.plugin.TaskExecuteConstructorInterceptor + * @see org.apache.dolphinscheduler.skywalking.plugin.TaskExecuteConstructorInterceptor + */ +public class TaskExecuteInterceptorInstrumentation extends ClassInstanceMethodsEnhancePluginDefine { + private static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.dolphinscheduler.skywalking.plugin.TaskExecuteConstructorInterceptor"; + private static final String METHOD_INTERCEPTOR_CLASS = "org.apache.dolphinscheduler.skywalking.plugin.TaskExecuteMethodInterceptor"; + private static final String ENHANC_CLASS = "org.apache.dolphinscheduler.server.worker.task.AbstractTask"; + + @Override + protected ClassMatch enhanceClass() { + return HierarchyMatch.byHierarchyMatch(ENHANC_CLASS); + } + + @Override + public ConstructorInterceptPoint[] getConstructorsInterceptPoints() { + return new ConstructorInterceptPoint[] { + new ConstructorInterceptPoint() { + @Override + public ElementMatcher getConstructorMatcher() { + return takesArgument(0, named("org.apache.dolphinscheduler.server.entity.TaskExecutionContext")); + } + + @Override + public String getConstructorInterceptor() { + return CONSTRUCTOR_INTERCEPTOR_CLASS; + } + } + }; + } + + @Override + public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { + return new InstanceMethodsInterceptPoint[] { + new InstanceMethodsInterceptPoint() { + @Override + public ElementMatcher getMethodsMatcher() { + return named("handle") + .or(named("init")) + .or(named("after")) + .and(isPublic()) + .and(takesArguments(0)); + } + + @Override + public String getMethodsInterceptor() { + return METHOD_INTERCEPTOR_CLASS; + } + + @Override + public boolean isOverrideArgs() { + return false; + } + } + }; + } +} diff --git a/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/TaskExecuteThreadInterceptorInstrumentation.java b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/TaskExecuteThreadInterceptorInstrumentation.java new file mode 100644 index 0000000000..4ad4c5ccae --- /dev/null +++ b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/TaskExecuteThreadInterceptorInstrumentation.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.dolphinscheduler.skywalking.plugin.define; + +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine; +import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch; + +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.isPublic; +import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName; + +/** + * Enhance {@link org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread} instance and intercept `run` method, + * this class provides an isolated thread for running task. + * + * @see org.apache.dolphinscheduler.skywalking.plugin.TaskExecuteThreadConstructorInterceptor + * @see org.apache.dolphinscheduler.skywalking.plugin.TaskExecuteThreadMethodInterceptor + */ +public class TaskExecuteThreadInterceptorInstrumentation extends ClassInstanceMethodsEnhancePluginDefine { + private static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.dolphinscheduler.skywalking.plugin.TaskExecuteThreadConstructorInterceptor"; + private static final String METHOD_INTERCEPTOR_CLASS = "org.apache.dolphinscheduler.skywalking.plugin.TaskExecuteThreadMethodInterceptor"; + private static final String ENHANC_CLASS = "org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread"; + + @Override + protected ClassMatch enhanceClass() { + return byName(ENHANC_CLASS); + } + + @Override + public ConstructorInterceptPoint[] getConstructorsInterceptPoints() { + return new ConstructorInterceptPoint[] { + new ConstructorInterceptPoint() { + @Override + public ElementMatcher getConstructorMatcher() { + return takesArgument(0, named("org.apache.dolphinscheduler.server.entity.TaskExecutionContext")); + } + + @Override + public String getConstructorInterceptor() { + return CONSTRUCTOR_INTERCEPTOR_CLASS; + } + } + }; + } + + @Override + public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { + return new InstanceMethodsInterceptPoint[] { + new InstanceMethodsInterceptPoint() { + @Override + public ElementMatcher getMethodsMatcher() { + return named("run") + .and(isPublic()) + .and(takesArguments(0)); + } + + @Override + public String getMethodsInterceptor() { + return METHOD_INTERCEPTOR_CLASS; + } + + @Override + public boolean isOverrideArgs() { + return false; + } + } + }; + } +} diff --git a/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/TaskPriorityQueueConsumerInterceptorInstrumentation.java b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/TaskPriorityQueueConsumerInterceptorInstrumentation.java new file mode 100644 index 0000000000..881ef115c3 --- /dev/null +++ b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/TaskPriorityQueueConsumerInterceptorInstrumentation.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.dolphinscheduler.skywalking.plugin.define; + +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine; +import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch; + +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.returns; +import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName; + +/** + * Enhance {@link org.apache.dolphinscheduler.server.master.consumer.TaskPriorityQueueConsumer} instance and intercept `dispatch` method, + * this method is a dispatch the task info of memory queue to worker nodes. + * + * @see org.apache.dolphinscheduler.skywalking.plugin.TaskPriorityQueueConsumerMethodInterceptor + */ +public class TaskPriorityQueueConsumerInterceptorInstrumentation extends ClassInstanceMethodsEnhancePluginDefine { + private static final String METHOD_INTERCEPTOR_CLASS = "org.apache.dolphinscheduler.skywalking.plugin.TaskPriorityQueueConsumerMethodInterceptor"; + private static final String ENHANC_CLASS = "org.apache.dolphinscheduler.server.master.consumer.TaskPriorityQueueConsumer"; + + @Override + protected ClassMatch enhanceClass() { + return byName(ENHANC_CLASS); + } + + @Override + public ConstructorInterceptPoint[] getConstructorsInterceptPoints() { + return new ConstructorInterceptPoint[0]; + } + + @Override + public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { + return new InstanceMethodsInterceptPoint[] { + new InstanceMethodsInterceptPoint() { + @Override + public ElementMatcher getMethodsMatcher() { + return named("dispatch") + .and(takesArguments(1)) + .and(takesArgument(0, named("org.apache.dolphinscheduler.service.queue.TaskPriority"))) + .and(returns(boolean.class)); + } + + @Override + public String getMethodsInterceptor() { + return METHOD_INTERCEPTOR_CLASS; + } + + @Override + public boolean isOverrideArgs() { + return false; + } + } + }; + } +} diff --git a/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/TaskPriorityQueueImplInterceptorInstrumentation.java b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/TaskPriorityQueueImplInterceptorInstrumentation.java new file mode 100644 index 0000000000..be06260959 --- /dev/null +++ b/ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/TaskPriorityQueueImplInterceptorInstrumentation.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.dolphinscheduler.skywalking.plugin.define; + +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine; +import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch; + +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName; + +/** + * Enhance {@link org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl} instance and intercept `put` method, + * this method is a publish task info to memory queue. + * + * @see org.apache.dolphinscheduler.skywalking.plugin.TaskPriorityQueueImplMethodInterceptor + */ +public class TaskPriorityQueueImplInterceptorInstrumentation extends ClassInstanceMethodsEnhancePluginDefine { + private static final String METHOD_INTERCEPTOR_CLASS = "org.apache.dolphinscheduler.skywalking.plugin.TaskPriorityQueueImplMethodInterceptor"; + private static final String ENHANC_CLASS = "org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl"; + + @Override + protected ClassMatch enhanceClass() { + return byName(ENHANC_CLASS); + } + + @Override + public ConstructorInterceptPoint[] getConstructorsInterceptPoints() { + return new ConstructorInterceptPoint[0]; + } + + @Override + public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { + return new InstanceMethodsInterceptPoint[] { + new InstanceMethodsInterceptPoint() { + @Override + public ElementMatcher getMethodsMatcher() { + return named("put") + .and(takesArguments(1)) + .and(takesArgument(0, named("org.apache.dolphinscheduler.service.queue.TaskPriority"))); + } + + @Override + public String getMethodsInterceptor() { + return METHOD_INTERCEPTOR_CLASS; + } + + @Override + public boolean isOverrideArgs() { + return false; + } + } + }; + } +} diff --git a/ext/skywalking/src/main/resources/skywalking-plugin.def b/ext/skywalking/src/main/resources/skywalking-plugin.def new file mode 100644 index 0000000000..25e18ac2e0 --- /dev/null +++ b/ext/skywalking/src/main/resources/skywalking-plugin.def @@ -0,0 +1,26 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +dolphinscheduler=org.apache.dolphinscheduler.skywalking.plugin.define.MasterSchedulerServiceInterceptorInstrumentation +dolphinscheduler=org.apache.dolphinscheduler.skywalking.plugin.define.MasterExecThreadInterceptorInstrumentation +dolphinscheduler=org.apache.dolphinscheduler.skywalking.plugin.define.MasterBaseTaskExecThreadInterceptorInstrumentation +dolphinscheduler=org.apache.dolphinscheduler.skywalking.plugin.define.TaskPriorityQueueImplInterceptorInstrumentation +dolphinscheduler=org.apache.dolphinscheduler.skywalking.plugin.define.TaskPriorityQueueConsumerInterceptorInstrumentation +dolphinscheduler=org.apache.dolphinscheduler.skywalking.plugin.define.NettyRemotingClientInterceptorInstrumentation +dolphinscheduler=org.apache.dolphinscheduler.skywalking.plugin.define.NettyRequestProcessorInterceptorInstrumentation +dolphinscheduler=org.apache.dolphinscheduler.skywalking.plugin.define.NettyRemoteChannelInterceptorInstrumentation +dolphinscheduler=org.apache.dolphinscheduler.skywalking.plugin.define.TaskExecuteThreadInterceptorInstrumentation +dolphinscheduler=org.apache.dolphinscheduler.skywalking.plugin.define.TaskExecuteInterceptorInstrumentation \ No newline at end of file diff --git a/pom.xml b/pom.xml index 443ae36de4..1e70282ec4 100644 --- a/pom.xml +++ b/pom.xml @@ -117,6 +117,7 @@ 2.5 1.9.3 2.9.2 + 8.4.0 @@ -543,6 +544,12 @@ swagger-bootstrap-ui ${swagger.version} + + + org.apache.skywalking + apm-toolkit-logback-1.x + ${skywalking.version} + @@ -1015,5 +1022,6 @@ dolphinscheduler-remote dolphinscheduler-service dolphinscheduler-plugin-api + ext/skywalking diff --git a/script/dolphinscheduler-daemon.sh b/script/dolphinscheduler-daemon.sh index fc10d9f266..b6a6dbadea 100755 --- a/script/dolphinscheduler-daemon.sh +++ b/script/dolphinscheduler-daemon.sh @@ -97,7 +97,16 @@ else exit 1 fi -export DOLPHINSCHEDULER_OPTS="-server -Xms$HEAP_INITIAL_SIZE -Xmx$HEAP_MAX_SIZE -Xmn$HEAP_NEW_GENERATION__SIZE -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m -Xss512k -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:LargePageSizeInBytes=128m -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70 -XX:+PrintGCDetails -Xloggc:gc.log -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=dump.hprof" +if [ "$SKYWALKING_ENABLE" = "true" ]; then + SKYWALKING_OPTS="-javaagent:$DOLPHINSCHEDULER_HOME/skywalking-agent/skywalking-agent.jar -DSW_AGENT_NAME=dolphinscheduler::$command -DSW_LOGGING_FILE_NAME=skywalking-dolphinscheduler-$command.log" + + export DOLPHINSCHEDULER_OPTS="$DOLPHINSCHEDULER_OPTS $SKYWALKING_OPTS" + echo "Info: Skywalking enabled opts: $SKYWALKING_OPTS" +else + echo "Info: Skywalking not enabled." +fi + +export DOLPHINSCHEDULER_OPTS="-server -Xms$HEAP_INITIAL_SIZE -Xmx$HEAP_MAX_SIZE -Xmn$HEAP_NEW_GENERATION__SIZE -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m -Xss512k -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:LargePageSizeInBytes=128m -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70 -XX:+PrintGCDetails -Xloggc:gc.log -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=dump.hprof $DOLPHINSCHEDULER_OPTS" case $startStop in (start) diff --git a/script/scp-hosts.sh b/script/scp-hosts.sh index 27ee932a40..50a94f9b0f 100755 --- a/script/scp-hosts.sh +++ b/script/scp-hosts.sh @@ -54,7 +54,7 @@ do echo "scp dirs to $host/$installPath starting" ssh -p $sshPort $host "cd $installPath/; rm -rf bin/ conf/ lib/ script/ sql/ ui/" - for dsDir in bin conf lib script sql ui install.sh + for dsDir in bin conf lib script sql ui skywalking-agent install.sh do # if worker in workersGroupMap if [[ "${workersGroupMap[${host}]}" ]] && [[ "${dsDir}" == "conf" ]]; then diff --git a/script/start-all.sh b/script/start-all.sh index 61b916483e..2902e0515f 100755 --- a/script/start-all.sh +++ b/script/start-all.sh @@ -31,11 +31,13 @@ do workersGroupMap+=([$worker]=$groupName) done +skywalkingEnv="SKYWALKING_ENABLE=$enableSkywalking SW_AGENT_COLLECTOR_BACKEND_SERVICES=$skywalkingServers SW_GRPC_LOG_SERVER_HOST=$skywalkingLogReporterHost SW_GRPC_LOG_SERVER_PORT=$skywalkingLogReporterPort" + mastersHost=(${masters//,/ }) for master in ${mastersHost[@]} do echo "$master master server is starting" - ssh -p $sshPort $master "cd $installPath/; sh bin/dolphinscheduler-daemon.sh start master-server;" + ssh -p $sshPort $master "cd $installPath/; export $skywalkingEnv; sh bin/dolphinscheduler-daemon.sh start master-server;" done @@ -43,15 +45,15 @@ for worker in ${!workersGroupMap[*]} do echo "$worker worker server is starting" - ssh -p $sshPort $worker "cd $installPath/; sh bin/dolphinscheduler-daemon.sh start worker-server;" - ssh -p $sshPort $worker "cd $installPath/; sh bin/dolphinscheduler-daemon.sh start logger-server;" + ssh -p $sshPort $worker "cd $installPath/; export $skywalkingEnv; sh bin/dolphinscheduler-daemon.sh start worker-server;" + ssh -p $sshPort $worker "cd $installPath/; export $skywalkingEnv; sh bin/dolphinscheduler-daemon.sh start logger-server;" done -ssh -p $sshPort $alertServer "cd $installPath/; sh bin/dolphinscheduler-daemon.sh start alert-server;" +ssh -p $sshPort $alertServer "cd $installPath/; export $skywalkingEnv; sh bin/dolphinscheduler-daemon.sh start alert-server;" apiServersHost=(${apiServers//,/ }) for apiServer in ${apiServersHost[@]} do echo "$apiServer worker server is starting" - ssh -p $sshPort $apiServer "cd $installPath/; sh bin/dolphinscheduler-daemon.sh start api-server;" + ssh -p $sshPort $apiServer "cd $installPath/; export $skywalkingEnv; sh bin/dolphinscheduler-daemon.sh start api-server;" done \ No newline at end of file diff --git a/tools/dependencies/check-LICENSE.sh b/tools/dependencies/check-LICENSE.sh index d414bd40c6..123727a024 100755 --- a/tools/dependencies/check-LICENSE.sh +++ b/tools/dependencies/check-LICENSE.sh @@ -27,6 +27,10 @@ echo '=== Self modules: ' && ./mvnw --batch-mode --quiet -Dexec.executable='echo echo '=== Distributed dependencies: ' && ls dist/lib | tee all-dependencies.txt +echo '=== Skywalking agent dependencies: ' && ls dist/skywalking-agent | grep .jar | tee -a all-dependencies.txt \ + && ls dist/skywalking-agent/plugins | tee -a all-dependencies.txt \ + && ls dist/skywalking-agent/activations | tee -a all-dependencies.txt + # Exclude all self modules(jars) to generate all third-party dependencies echo '=== Third party dependencies: ' && grep -vf self-modules.txt all-dependencies.txt | tee third-party-dependencies.txt diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt index e684b7a05a..81f1710dfe 100755 --- a/tools/dependencies/known-dependencies.txt +++ b/tools/dependencies/known-dependencies.txt @@ -12,6 +12,16 @@ aspectjweaver-1.9.2.jar audience-annotations-0.5.0.jar avro-1.7.4.jar aws-java-sdk-1.7.4.jar +apm-toolkit-logback-1.x-8.4.0.jar +apm-toolkit-logback-1.x-activation-8.4.0.jar +apm-httpclient-3.x-plugin-8.4.0.jar +apm-httpClient-4.x-plugin-8.4.0.jar +apm-httpclient-commons-8.4.0.jar +apm-quartz-scheduler-2.x-plugin-8.4.0.jar +apm-spring-core-patch-8.4.0.jar +apm-springmvc-annotation-5.x-plugin-8.4.0.jar +apm-springmvc-annotation-commons-8.4.0.jar +apm-zookeeper-3.4.x-plugin-8.4.0.jar bonecp-0.8.0.RELEASE.jar byte-buddy-1.9.10.jar classmate-1.4.0.jar @@ -198,6 +208,8 @@ springfox-swagger2-2.9.2.jar swagger-annotations-1.5.20.jar swagger-bootstrap-ui-1.9.3.jar swagger-models-1.5.20.jar +skywalking-agent.jar +spring-commons-8.4.0.jar tephra-api-0.6.0.jar threetenbp-1.3.6.jar transaction-api-1.1.jar