Browse Source

[Improvement] Add skywalking agent plugin (#4852)

* [Improvement] Add skywalking agent plugin

* fix

* fix

* fix

* fix

* suport log

* fix

* Rename LICENSE-skywalking.txt to LICENSE-apm-toolkit-logback.txt

* Rename LICENSE-apm-toolkit-logback.txt to LICENSE-apm-toolkit-logback-1.x.txt

* Update known-dependencies.txt

* fix

* fix

* fix

* fix

* fix

* fix

* fix docker logback config

* add log reporter to alert module

* fix

* fix

* fix
hailin0 4 years ago committed by GitHub
parent
commit
a01f13f72d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      docker/build/conf/dolphinscheduler/logback/logback-alert.xml
  2. 5
      docker/build/conf/dolphinscheduler/logback/logback-api.xml
  3. 5
      docker/build/conf/dolphinscheduler/logback/logback-master.xml
  4. 5
      docker/build/conf/dolphinscheduler/logback/logback-worker.xml
  5. 5
      dolphinscheduler-alert/src/main/resources/logback-alert.xml
  6. 5
      dolphinscheduler-api/src/main/resources/logback-api.xml
  7. 4
      dolphinscheduler-common/pom.xml
  8. 6
      dolphinscheduler-dist/pom.xml
  9. 3
      dolphinscheduler-dist/release-docs/LICENSE
  10. 201
      dolphinscheduler-dist/release-docs/licenses/LICENSE-skywalking-agent.txt
  11. 8
      dolphinscheduler-dist/src/main/assembly/dolphinscheduler-binary.xml
  12. 5
      dolphinscheduler-server/src/main/resources/logback-master.xml
  13. 5
      dolphinscheduler-server/src/main/resources/logback-worker.xml
  14. 6
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java
  15. 118
      ext/skywalking/config/agent.config
  16. 284
      ext/skywalking/dashboard/dolphinscheduler.yml
  17. 218
      ext/skywalking/pom.xml
  18. 35
      ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/MasterBaseTaskExecThreadConstructorInterceptor.java
  19. 100
      ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/MasterBaseTaskExecThreadMethodInterceptor.java
  20. 34
      ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/MasterExecThreadConstructorInterceptor.java
  21. 95
      ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/MasterExecThreadMethodInterceptor.java
  22. 50
      ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/MasterExecThreadStateCacheInterceptor.java
  23. 53
      ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/MasterSchedulerServiceMethodInterceptor.java
  24. 32
      ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/NettyRemoteChannelConstructorInterceptor.java
  25. 69
      ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/NettyRemoteChannelMethodInterceptor.java
  26. 70
      ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/NettyRemotingClientMethodInterceptor.java
  27. 72
      ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/NettyRequestProcessorMethodInterceptor.java
  28. 30
      ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskContext.java
  29. 35
      ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskExecuteConstructorInterceptor.java
  30. 88
      ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskExecuteMethodInterceptor.java
  31. 35
      ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskExecuteThreadConstructorInterceptor.java
  32. 83
      ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskExecuteThreadMethodInterceptor.java
  33. 71
      ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskPriorityQueueConsumerMethodInterceptor.java
  34. 73
      ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskPriorityQueueImplMethodInterceptor.java
  35. 70
      ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/Utils.java
  36. 92
      ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/MasterBaseTaskExecThreadInterceptorInstrumentation.java
  37. 111
      ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/MasterExecThreadInterceptorInstrumentation.java
  38. 72
      ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/MasterSchedulerServiceInterceptorInstrumentation.java
  39. 90
      ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/NettyRemoteChannelInterceptorInstrumentation.java
  40. 77
      ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/NettyRemotingClientInterceptorInstrumentation.java
  41. 77
      ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/NettyRequestProcessorInterceptorInstrumentation.java
  42. 93
      ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/TaskExecuteInterceptorInstrumentation.java
  43. 91
      ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/TaskExecuteThreadInterceptorInstrumentation.java
  44. 78
      ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/TaskPriorityQueueConsumerInterceptorInstrumentation.java
  45. 76
      ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/TaskPriorityQueueImplInterceptorInstrumentation.java
  46. 26
      ext/skywalking/src/main/resources/skywalking-plugin.def
  47. 8
      pom.xml
  48. 11
      script/dolphinscheduler-daemon.sh
  49. 2
      script/scp-hosts.sh
  50. 12
      script/start-all.sh
  51. 4
      tools/dependencies/check-LICENSE.sh
  52. 12
      tools/dependencies/known-dependencies.txt

5
docker/build/conf/dolphinscheduler/logback/logback-alert.xml

@ -36,8 +36,13 @@
</encoder> </encoder>
</appender> </appender>
<!-- skywalking log reporter start-->
<appender name="SKYWALKING-LOG" class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.log.GRPCLogClientAppender"/>
<!-- skywalking log reporter end-->
<root level="INFO"> <root level="INFO">
<appender-ref ref="ALERTLOGFILE"/> <appender-ref ref="ALERTLOGFILE"/>
<appender-ref ref="SKYWALKING-LOG"/>
</root> </root>
</configuration> </configuration>

5
docker/build/conf/dolphinscheduler/logback/logback-api.xml

@ -41,6 +41,10 @@
</appender> </appender>
<!-- api server logback config end --> <!-- api server logback config end -->
<!-- skywalking log reporter start-->
<appender name="SKYWALKING-LOG" class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.log.GRPCLogClientAppender"/>
<!-- skywalking log reporter end-->
<logger name="org.apache.zookeeper" level="WARN"/> <logger name="org.apache.zookeeper" level="WARN"/>
<logger name="org.apache.hbase" level="WARN"/> <logger name="org.apache.hbase" level="WARN"/>
<logger name="org.apache.hadoop" level="WARN"/> <logger name="org.apache.hadoop" level="WARN"/>
@ -48,6 +52,7 @@
<root level="INFO"> <root level="INFO">
<appender-ref ref="APILOGFILE"/> <appender-ref ref="APILOGFILE"/>
<appender-ref ref="SKYWALKING-LOG"/>
</root> </root>
</configuration> </configuration>

5
docker/build/conf/dolphinscheduler/logback/logback-master.xml

@ -65,9 +65,14 @@
</appender> </appender>
<!-- master server logback config end --> <!-- master server logback config end -->
<!-- skywalking log reporter start-->
<appender name="SKYWALKING-LOG" class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.log.GRPCLogClientAppender"/>
<!-- skywalking log reporter end-->
<root level="INFO"> <root level="INFO">
<appender-ref ref="TASKLOGFILE"/> <appender-ref ref="TASKLOGFILE"/>
<appender-ref ref="MASTERLOGFILE"/> <appender-ref ref="MASTERLOGFILE"/>
<appender-ref ref="SKYWALKING-LOG"/>
</root> </root>
</configuration> </configuration>

5
docker/build/conf/dolphinscheduler/logback/logback-worker.xml

@ -66,9 +66,14 @@
</appender> </appender>
<!-- worker server logback config end --> <!-- worker server logback config end -->
<!-- skywalking log reporter start-->
<appender name="SKYWALKING-LOG" class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.log.GRPCLogClientAppender"/>
<!-- skywalking log reporter end-->
<root level="INFO"> <root level="INFO">
<appender-ref ref="TASKLOGFILE"/> <appender-ref ref="TASKLOGFILE"/>
<appender-ref ref="WORKERLOGFILE"/> <appender-ref ref="WORKERLOGFILE"/>
<appender-ref ref="SKYWALKING-LOG"/>
</root> </root>
</configuration> </configuration>

5
dolphinscheduler-alert/src/main/resources/logback-alert.xml

@ -44,9 +44,14 @@
</encoder> </encoder>
</appender> </appender>
<!-- skywalking log reporter start-->
<appender name="SKYWALKING-LOG" class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.log.GRPCLogClientAppender"/>
<!-- skywalking log reporter end-->
<root level="INFO"> <root level="INFO">
<appender-ref ref="STDOUT"/> <appender-ref ref="STDOUT"/>
<appender-ref ref="ALERTLOGFILE"/> <appender-ref ref="ALERTLOGFILE"/>
<appender-ref ref="SKYWALKING-LOG"/>
</root> </root>
</configuration> </configuration>

5
dolphinscheduler-api/src/main/resources/logback-api.xml

@ -49,6 +49,10 @@
</appender> </appender>
<!-- api server logback config end --> <!-- api server logback config end -->
<!-- skywalking log reporter start-->
<appender name="SKYWALKING-LOG" class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.log.GRPCLogClientAppender"/>
<!-- skywalking log reporter end-->
<logger name="org.apache.zookeeper" level="WARN"/> <logger name="org.apache.zookeeper" level="WARN"/>
<logger name="org.apache.hbase" level="WARN"/> <logger name="org.apache.hbase" level="WARN"/>
<logger name="org.apache.hadoop" level="WARN"/> <logger name="org.apache.hadoop" level="WARN"/>
@ -57,6 +61,7 @@
<root level="INFO"> <root level="INFO">
<!--<appender-ref ref="STDOUT"/>--> <!--<appender-ref ref="STDOUT"/>-->
<appender-ref ref="APILOGFILE"/> <appender-ref ref="APILOGFILE"/>
<appender-ref ref="SKYWALKING-LOG"/>
</root> </root>
</configuration> </configuration>

4
dolphinscheduler-common/pom.xml

@ -588,5 +588,9 @@
<scope>compile</scope> <scope>compile</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-toolkit-logback-1.x</artifactId>
</dependency>
</dependencies> </dependencies>
</project> </project>

6
dolphinscheduler-dist/pom.xml vendored

@ -41,6 +41,12 @@
<groupId>org.apache.dolphinscheduler</groupId> <groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-api</artifactId> <artifactId>dolphinscheduler-api</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-skywalking</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies> </dependencies>
<profiles> <profiles>

3
dolphinscheduler-dist/release-docs/LICENSE vendored

@ -254,6 +254,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
derby 10.14.2.0: https://github.com/apache/derby, Apache 2.0 derby 10.14.2.0: https://github.com/apache/derby, Apache 2.0
druid 1.1.14: https://mvnrepository.com/artifact/com.alibaba/druid/1.1.14, Apache 2.0 druid 1.1.14: https://mvnrepository.com/artifact/com.alibaba/druid/1.1.14, Apache 2.0
fastjson 1.2.61: https://mvnrepository.com/artifact/com.alibaba/fastjson/1.2.61, Apache 2.0 fastjson 1.2.61: https://mvnrepository.com/artifact/com.alibaba/fastjson/1.2.61, Apache 2.0
grpc-java 1.32.1: https://github.com/grpc/grpc-java, Apache 2.0
gson 2.8.5: https://github.com/google/gson, Apache 2.0 gson 2.8.5: https://github.com/google/gson, Apache 2.0
guava 20.0: https://mvnrepository.com/artifact/com.google.guava/guava/20.0, Apache 2.0 guava 20.0: https://mvnrepository.com/artifact/com.google.guava/guava/20.0, Apache 2.0
guice 3.0: https://mvnrepository.com/artifact/com.google.inject/guice/3.0, Apache 2.0 guice 3.0: https://mvnrepository.com/artifact/com.google.inject/guice/3.0, Apache 2.0
@ -342,9 +343,11 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
netty-all 4.1.33.Final: https://github.com/netty/netty/blob/netty-4.1.33.Final/LICENSE.txt, Apache 2.0 netty-all 4.1.33.Final: https://github.com/netty/netty/blob/netty-4.1.33.Final/LICENSE.txt, Apache 2.0
opencsv 2.3: https://mvnrepository.com/artifact/net.sf.opencsv/opencsv/2.3, Apache 2.0 opencsv 2.3: https://mvnrepository.com/artifact/net.sf.opencsv/opencsv/2.3, Apache 2.0
parquet-hadoop-bundle 1.8.1: https://mvnrepository.com/artifact/org.apache.parquet/parquet-hadoop-bundle/1.8.1, Apache 2.0 parquet-hadoop-bundle 1.8.1: https://mvnrepository.com/artifact/org.apache.parquet/parquet-hadoop-bundle/1.8.1, Apache 2.0
perfmark-api 0.19.0: https://mvnrepository.com/artifact/io.perfmark/perfmark-api/0.19.0, Apache 2.0
poi 3.17: https://mvnrepository.com/artifact/org.apache.poi/poi/3.17, Apache 2.0 poi 3.17: https://mvnrepository.com/artifact/org.apache.poi/poi/3.17, Apache 2.0
quartz 2.3.0: https://mvnrepository.com/artifact/org.quartz-scheduler/quartz/2.3.0, Apache 2.0 quartz 2.3.0: https://mvnrepository.com/artifact/org.quartz-scheduler/quartz/2.3.0, Apache 2.0
quartz-jobs 2.3.0: https://mvnrepository.com/artifact/org.quartz-scheduler/quartz-jobs/2.3.0, Apache 2.0 quartz-jobs 2.3.0: https://mvnrepository.com/artifact/org.quartz-scheduler/quartz-jobs/2.3.0, Apache 2.0
skywalking 8.4.0: https://github.com/apache/skywalking, Apache 2.0
snakeyaml 1.23: https://mvnrepository.com/artifact/org.yaml/snakeyaml/1.23, Apache 2.0 snakeyaml 1.23: https://mvnrepository.com/artifact/org.yaml/snakeyaml/1.23, Apache 2.0
snappy 0.2: https://mvnrepository.com/artifact/org.iq80.snappy/snappy/0.2, Apache 2.0 snappy 0.2: https://mvnrepository.com/artifact/org.iq80.snappy/snappy/0.2, Apache 2.0
snappy-java 1.0.4.1: https://github.com/xerial/snappy-java, Apache 2.0 snappy-java 1.0.4.1: https://github.com/xerial/snappy-java, Apache 2.0

201
dolphinscheduler-dist/release-docs/licenses/LICENSE-skywalking-agent.txt vendored

@ -0,0 +1,201 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

8
dolphinscheduler-dist/src/main/assembly/dolphinscheduler-binary.xml vendored

@ -151,6 +151,14 @@
<outputDirectory>.</outputDirectory> <outputDirectory>.</outputDirectory>
</fileSet> </fileSet>
<fileSet>
<directory>${basedir}/../ext/skywalking/target/skywalking-agent</directory>
<includes>
<include>**/*.*</include>
</includes>
<outputDirectory>./skywalking-agent</outputDirectory>
</fileSet>
<fileSet> <fileSet>
<directory>${basedir}/../dolphinscheduler-ui/dist</directory> <directory>${basedir}/../dolphinscheduler-ui/dist</directory>
<includes> <includes>

5
dolphinscheduler-server/src/main/resources/logback-master.xml

@ -73,9 +73,14 @@
</appender> </appender>
<!-- master server logback config end --> <!-- master server logback config end -->
<!-- skywalking log reporter start-->
<appender name="SKYWALKING-LOG" class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.log.GRPCLogClientAppender"/>
<!-- skywalking log reporter end-->
<root level="INFO"> <root level="INFO">
<appender-ref ref="TASKLOGFILE"/> <appender-ref ref="TASKLOGFILE"/>
<appender-ref ref="MASTERLOGFILE"/> <appender-ref ref="MASTERLOGFILE"/>
<appender-ref ref="SKYWALKING-LOG"/>
</root> </root>
</configuration> </configuration>

5
dolphinscheduler-server/src/main/resources/logback-worker.xml

@ -73,9 +73,14 @@
</appender> </appender>
<!-- worker server logback config end --> <!-- worker server logback config end -->
<!-- skywalking log reporter start-->
<appender name="SKYWALKING-LOG" class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.log.GRPCLogClientAppender"/>
<!-- skywalking log reporter end-->
<root level="INFO"> <root level="INFO">
<appender-ref ref="TASKLOGFILE"/> <appender-ref ref="TASKLOGFILE"/>
<appender-ref ref="WORKERLOGFILE"/> <appender-ref ref="WORKERLOGFILE"/>
<appender-ref ref="SKYWALKING-LOG"/>
</root> </root>
</configuration> </configuration>

6
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java

@ -53,7 +53,7 @@ public class TaskPriority implements Comparable<TaskPriority> {
/** /**
* context * context
*/ */
private Map<String, String> context; private Map<String, Object> context;
public TaskPriority(){} public TaskPriority(){}
@ -96,7 +96,7 @@ public class TaskPriority implements Comparable<TaskPriority> {
return taskId; return taskId;
} }
public Map<String, String> getContext() { public Map<String, Object> getContext() {
return context; return context;
} }
@ -112,7 +112,7 @@ public class TaskPriority implements Comparable<TaskPriority> {
this.groupName = groupName; this.groupName = groupName;
} }
public void setContext(Map<String, String> context) { public void setContext(Map<String, Object> context) {
this.context = context; this.context = context;
} }

118
ext/skywalking/config/agent.config

@ -0,0 +1,118 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# The agent namespace
# agent.namespace=${SW_AGENT_NAMESPACE:default-namespace}
# The service name in UI
agent.service_name=${SW_AGENT_NAME:Your_ApplicationName}
# The number of sampled traces per 3 seconds
# Negative or zero means off, by default
# agent.sample_n_per_3_secs=${SW_AGENT_SAMPLE:-1}
# Authentication active is based on backend setting, see application.yml for more details.
# agent.authentication = ${SW_AGENT_AUTHENTICATION:xxxx}
# The max amount of spans in a single segment.
# Through this config item, SkyWalking keep your application memory cost estimated.
# agent.span_limit_per_segment=${SW_AGENT_SPAN_LIMIT:150}
# If the operation name of the first span is included in this set, this segment should be ignored. Multiple values should be separated by `,`.
# agent.ignore_suffix=${SW_AGENT_IGNORE_SUFFIX:.jpg,.jpeg,.js,.css,.png,.bmp,.gif,.ico,.mp3,.mp4,.html,.svg}
# If true, SkyWalking agent will save all instrumented classes files in `/debugging` folder.
# SkyWalking team may ask for these files in order to resolve compatible problem.
# agent.is_open_debugging_class = ${SW_AGENT_OPEN_DEBUG:true}
# If true, SkyWalking agent will cache all instrumented classes files to memory or disk files (decided by class cache mode),
# allow other javaagent to enhance those classes that enhanced by SkyWalking agent.
# agent.is_cache_enhanced_class = ${SW_AGENT_CACHE_CLASS:false}
# The instrumented classes cache mode: MEMORY or FILE
# MEMORY: cache class bytes to memory, if instrumented classes is too many or too large, it may take up more memory
# FILE: cache class bytes in `/class-cache` folder, automatically clean up cached class files when the application exits
# agent.class_cache_mode = ${SW_AGENT_CLASS_CACHE_MODE:MEMORY}
# The operationName max length
# Notice, in the current practice, we don't recommend the length over 190.
# agent.operation_name_threshold=${SW_AGENT_OPERATION_NAME_THRESHOLD:150}
# The agent use gRPC plain text in default.
# If true, SkyWalking agent uses TLS even no CA file detected.
# agent.force_tls=${SW_AGENT_FORCE_TLS:false}
# If true, skywalking agent will enable profile when user create a new profile task. Otherwise disable profile.
# profile.active=${SW_AGENT_PROFILE_ACTIVE:true}
# Parallel monitor segment count
# profile.max_parallel=${SW_AGENT_PROFILE_MAX_PARALLEL:5}
# Max monitor segment time(minutes), if current segment monitor time out of limit, then stop it.
# profile.duration=${SW_AGENT_PROFILE_DURATION:10}
# Max dump thread stack depth
# profile.dump_max_stack_depth=${SW_AGENT_PROFILE_DUMP_MAX_STACK_DEPTH:500}
# Snapshot transport to backend buffer size
# profile.snapshot_transport_buffer_size=${SW_AGENT_PROFILE_SNAPSHOT_TRANSPORT_BUFFER_SIZE:50}
# Backend service addresses.
collector.backend_service=${SW_AGENT_COLLECTOR_BACKEND_SERVICES:127.0.0.1:11800}
# Logging file_name
logging.file_name=${SW_LOGGING_FILE_NAME:skywalking-api.log}
# Logging level
logging.level=${SW_LOGGING_LEVEL:INFO}
# Logging dir
# logging.dir=${SW_LOGGING_DIR:""}
# Logging max_file_size, default: 300 * 1024 * 1024 = 314572800
# logging.max_file_size=${SW_LOGGING_MAX_FILE_SIZE:314572800}
# The max history log files. When rollover happened, if log files exceed this number,
# then the oldest file will be delete. Negative or zero means off, by default.
# logging.max_history_files=${SW_LOGGING_MAX_HISTORY_FILES:-1}
# Listed exceptions would not be treated as an error. Because in some codes, the exception is being used as a way of controlling business flow.
# Besides, the annotation named IgnoredException in the trace toolkit is another way to configure ignored exceptions.
# statuscheck.ignored_exceptions=${SW_STATUSCHECK_IGNORED_EXCEPTIONS:}
# The max recursive depth when checking the exception traced by the agent. Typically, we don't recommend setting this more than 10, which could cause a performance issue. Negative value and 0 would be ignored, which means all exceptions would make the span tagged in error status.
# statuscheck.max_recursive_depth=${SW_STATUSCHECK_MAX_RECURSIVE_DEPTH:1}
# Mount the specific folders of the plugins. Plugins in mounted folders would work.
plugin.mount=${SW_MOUNT_FOLDERS:plugins,activations}
# Exclude activated plugins
# plugin.exclude_plugins=${SW_EXCLUDE_PLUGINS:}
# mysql plugin configuration
# plugin.mysql.trace_sql_parameters=${SW_MYSQL_TRACE_SQL_PARAMETERS:false}
# Kafka producer configuration
# plugin.kafka.bootstrap_servers=${SW_KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
# Match spring bean with regex expression for classname
# plugin.springannotation.classname_match_regex=${SW_SPRINGANNOTATION_CLASSNAME_MATCH_REGEX:}
# log reporter configs
# Specify which grpc server's host for log data to report to.
plugin.toolkit.log.grpc.reporter.server_host=${SW_GRPC_LOG_SERVER_HOST:127.0.0.1}
# Specify which grpc server's port for log data to report to.
plugin.toolkit.log.grpc.reporter.server_port=${SW_GRPC_LOG_SERVER_PORT:11800}

284
ext/skywalking/dashboard/dolphinscheduler.yml

@ -0,0 +1,284 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# UI templates initialized file includes the default template when the SkyWalking OAP starts up at the first time.
#
# Also, SkyWalking would detect the existing templates in the database, once they are missing, all templates in this file
# could be added automatically.
templates:
- name: Dolphinscheduler
type: "DASHBOARD"
configuration: |-
[
{
"name":"Dolphinscheduler",
"type":"service",
"serviceGroup":"dolphinscheduler",
"children":[
{
"name":"Master Scheduler",
"children":[
{
"width":3,
"title":"QuartzScheduler Load",
"height":350,
"entityType":"Endpoint",
"independentSelector":true,
"metricType":"REGULAR_VALUE",
"metricName":"endpoint_cpm",
"queryMetricType":"readMetricsValues",
"chartType":"ChartLine",
"currentService":"dolphinscheduler::master-server",
"currentEndpoint":"quartz-scheduler/org.apache.dolphinscheduler.service.quartz.ProcessScheduleJob"
},
{
"width":3,
"title":"QuartzScheduler Avg Time",
"height":350,
"entityType":"Endpoint",
"independentSelector":true,
"metricType":"REGULAR_VALUE",
"metricName":"endpoint_avg",
"queryMetricType":"readMetricsValues",
"chartType":"ChartLine",
"currentService":"dolphinscheduler::master-server",
"currentEndpoint":"quartz-scheduler/org.apache.dolphinscheduler.service.quartz.ProcessScheduleJob",
"unit":"ms"
},
{
"width":3,
"title":"QuartzScheduler Time Percentile",
"height":350,
"entityType":"Endpoint",
"independentSelector":true,
"metricType":"LABELED_VALUE",
"metricName":"endpoint_percentile",
"queryMetricType":"readLabeledMetricsValues",
"chartType":"ChartLine",
"metricLabels":"P50, P75, P90, P95, P99",
"labelsIndex":"0, 1, 2, 3, 4",
"currentService":"dolphinscheduler::master-server",
"currentEndpoint":"quartz-scheduler/org.apache.dolphinscheduler.service.quartz.ProcessScheduleJob",
"unit":"ms"
},
{
"width":3,
"title":"QuartzScheduler Successful Rate",
"height":350,
"entityType":"Endpoint",
"independentSelector":true,
"metricType":"REGULAR_VALUE",
"metricName":"endpoint_sla",
"queryMetricType":"readMetricsValues",
"chartType":"ChartLine",
"currentService":"dolphinscheduler::master-server",
"currentEndpoint":"quartz-scheduler/org.apache.dolphinscheduler.service.quartz.ProcessScheduleJob",
"unit":"%",
"aggregation":"/",
"aggregationNum":"100"
},
{
"width":3,
"title":"MasterSchedulerService Load",
"height":350,
"entityType":"Endpoint",
"independentSelector":true,
"metricType":"REGULAR_VALUE",
"metricName":"endpoint_cpm",
"queryMetricType":"readMetricsValues",
"chartType":"ChartLine",
"currentService":"dolphinscheduler::master-server",
"currentEndpoint":"master/schedule/process"
},
{
"width":3,
"title":"MasterSchedulerService Avg Time",
"height":350,
"entityType":"Endpoint",
"independentSelector":true,
"metricType":"REGULAR_VALUE",
"metricName":"endpoint_avg",
"queryMetricType":"readMetricsValues",
"chartType":"ChartLine",
"currentService":"dolphinscheduler::master-server",
"currentEndpoint":"master/schedule/process",
"unit":"ms"
},
{
"width":3,
"title":"MasterSchedulerService Time Percentile",
"height":350,
"entityType":"Endpoint",
"independentSelector":true,
"metricType":"LABELED_VALUE",
"queryMetricType":"readLabeledMetricsValues",
"chartType":"ChartLine",
"metricName":"endpoint_percentile",
"metricLabels":"P50, P75, P90, P95, P99",
"labelsIndex":"0, 1, 2, 3, 4",
"currentService":"dolphinscheduler::master-server",
"currentEndpoint":"master/schedule/process",
"unit":"ms"
},
{
"width":3,
"title":"MasterSchedulerService Successful Rate",
"height":350,
"entityType":"Endpoint",
"independentSelector":true,
"metricType":"REGULAR_VALUE",
"queryMetricType":"readMetricsValues",
"chartType":"ChartLine",
"metricName":"endpoint_sla",
"currentService":"dolphinscheduler::master-server",
"currentEndpoint":"master/schedule/process",
"unit":"%",
"aggregation":"/",
"aggregationNum":"100"
}
]
},
{
"name":"Master Queue",
"children":[
{
"width":3,
"title":"Put Load",
"height":350,
"entityType":"Endpoint",
"independentSelector":true,
"metricType":"REGULAR_VALUE",
"metricName":"endpoint_cpm",
"queryMetricType":"readMetricsValues",
"chartType":"ChartLine",
"currentService":"dolphinscheduler::master-server",
"currentEndpoint":"masetr/queue/put"
},
{
"width":3,
"title":"Put Avg Time",
"height":350,
"entityType":"Endpoint",
"independentSelector":true,
"metricType":"REGULAR_VALUE",
"metricName":"endpoint_avg",
"queryMetricType":"readMetricsValues",
"chartType":"ChartLine",
"currentService":"dolphinscheduler::master-server",
"currentEndpoint":"masetr/queue/put",
"unit":"ms"
},
{
"width":3,
"title":"Put Time Percentile",
"height":350,
"entityType":"Endpoint",
"independentSelector":true,
"metricType":"LABELED_VALUE",
"metricName":"endpoint_percentile",
"queryMetricType":"readLabeledMetricsValues",
"chartType":"ChartLine",
"metricLabels":"P50, P75, P90, P95, P99",
"labelsIndex":"0, 1, 2, 3, 4",
"currentService":"dolphinscheduler::master-server",
"currentEndpoint":"masetr/queue/put",
"unit":"ms"
},
{
"width":3,
"title":"Put Successful Rate",
"height":350,
"entityType":"Endpoint",
"independentSelector":true,
"metricType":"REGULAR_VALUE",
"metricName":"endpoint_sla",
"queryMetricType":"readMetricsValues",
"chartType":"ChartLine",
"currentService":"dolphinscheduler::master-server",
"currentEndpoint":"masetr/queue/put",
"unit":"%",
"aggregation":"/",
"aggregationNum":"100"
},
{
"width":3,
"title":"Take Load",
"height":350,
"entityType":"Endpoint",
"independentSelector":true,
"metricType":"REGULAR_VALUE",
"metricName":"endpoint_cpm",
"queryMetricType":"readMetricsValues",
"chartType":"ChartLine",
"currentService":"dolphinscheduler::master-server",
"currentEndpoint":"masetr/queue/take"
},
{
"width":3,
"title":"Take Avg Time",
"height":350,
"entityType":"Endpoint",
"independentSelector":true,
"metricType":"REGULAR_VALUE",
"metricName":"endpoint_avg",
"queryMetricType":"readMetricsValues",
"chartType":"ChartLine",
"currentService":"dolphinscheduler::master-server",
"currentEndpoint":"masetr/queue/take",
"unit":"ms"
},
{
"width":3,
"title":"Take Time Percentile",
"height":350,
"entityType":"Endpoint",
"independentSelector":true,
"metricType":"LABELED_VALUE",
"metricName":"endpoint_percentile",
"queryMetricType":"readLabeledMetricsValues",
"chartType":"ChartLine",
"metricLabels":"P50, P75, P90, P95, P99",
"labelsIndex":"0, 1, 2, 3, 4",
"currentService":"dolphinscheduler::master-server",
"currentEndpoint":"masetr/queue/take",
"unit":"ms"
},
{
"width":3,
"title":"Take Successful Rate",
"height":350,
"entityType":"Endpoint",
"independentSelector":true,
"metricType":"REGULAR_VALUE",
"metricName":"endpoint_sla",
"queryMetricType":"readMetricsValues",
"chartType":"ChartLine",
"currentService":"dolphinscheduler::master-server",
"currentEndpoint":"masetr/queue/take",
"unit":"%",
"aggregation":"/",
"aggregationNum":"100"
}
]
}
]
}
]
# Activated as the DASHBOARD type, makes this templates added into the UI page automatically.
# False means providing a basic template, user needs to add it manually.
activated: true
# True means wouldn't show up on the dashboard. Only keeps the definition in the storage.
disabled: false

218
ext/skywalking/pom.xml

@ -0,0 +1,218 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dolphinscheduler</artifactId>
<groupId>org.apache.dolphinscheduler</groupId>
<version>1.3.6-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dolphinscheduler-skywalking</artifactId>
<name>dolphinscheduler-skywalking</name>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<dolphinscheduler.version>${parent.version}</dolphinscheduler.version>
<agent.dir>${project.build.directory}/skywalking-agent/</agent.dir>
<agent-plugins.dir>${agent.dir}/plugins/</agent-plugins.dir>
<agent-activations.dir>${agent.dir}/activations/</agent-activations.dir>
<agent-config.dir>${agent.dir}/config</agent-config.dir>
<agent-dashboard.dir>${agent.dir}/dashboard</agent-dashboard.dir>
<agent-dependencies-shade.package>org.apache.skywalking.apm.dependencies</agent-dependencies-shade.package>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-service</artifactId>
<version>${dolphinscheduler.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-server</artifactId>
<version>${dolphinscheduler.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-agent-core</artifactId>
<version>${skywalking.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.16</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<includes>
<include>net.bytebuddy:*</include>
</includes>
</artifactSet>
<relocations>
<relocation>
<pattern>net.bytebuddy</pattern>
<shadedPattern>${agent-dependencies-shade.package}.net.bytebuddy</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy</id>
<phase>package</phase>
<goals>
<goal>copy</goal>
</goals>
<configuration>
<artifactItems>
<!-- skywalking agent -->
<artifactItem>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-agent</artifactId>
<version>${skywalking.version}</version>
<destFileName>skywalking-agent.jar</destFileName>
<outputDirectory>${agent.dir}</outputDirectory>
</artifactItem>
<!-- skywalking plugins -->
<artifactItem>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-skywalking</artifactId>
<version>${project.version}</version>
<outputDirectory>${agent-plugins.dir}</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-zookeeper-3.4.x-plugin</artifactId>
<version>${skywalking.version}</version>
<outputDirectory>${agent-plugins.dir}</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-quartz-scheduler-2.x-plugin</artifactId>
<version>${skywalking.version}</version>
<outputDirectory>${agent-plugins.dir}</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-httpclient-commons</artifactId>
<version>${skywalking.version}</version>
<outputDirectory>${agent-plugins.dir}</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-httpclient-3.x-plugin</artifactId>
<version>${skywalking.version}</version>
<outputDirectory>${agent-plugins.dir}</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-httpClient-4.x-plugin</artifactId>
<version>${skywalking.version}</version>
<outputDirectory>${agent-plugins.dir}</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>org.apache.skywalking</groupId>
<artifactId>spring-commons</artifactId>
<version>${skywalking.version}</version>
<outputDirectory>${agent-plugins.dir}</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-spring-core-patch</artifactId>
<version>${skywalking.version}</version>
<outputDirectory>${agent-plugins.dir}</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-springmvc-annotation-commons</artifactId>
<version>${skywalking.version}</version>
<outputDirectory>${agent-plugins.dir}</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-springmvc-annotation-5.x-plugin</artifactId>
<version>${skywalking.version}</version>
<outputDirectory>${agent-plugins.dir}</outputDirectory>
</artifactItem>
<!-- skywalking toolkit -->
<artifactItem>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-toolkit-logback-1.x-activation</artifactId>
<version>${skywalking.version}</version>
<outputDirectory>${agent-activations.dir}</outputDirectory>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<tasks>
<copy overwrite="true" file="${project.basedir}/config/agent.config" tofile="${agent-config.dir}/agent.config" />
<copy overwrite="true" todir="${agent-dashboard.dir}" >
<fileset dir="${project.basedir}/dashboard">
<include name="*" />
</fileset>
</copy>
</tasks>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

35
ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/MasterBaseTaskExecThreadConstructorInterceptor.java

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.dolphinscheduler.skywalking.plugin;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
public class MasterBaseTaskExecThreadConstructorInterceptor implements InstanceConstructorInterceptor {
@Override
public void onConstruct(EnhancedInstance objInst, Object[] allArguments) throws Throwable {
TaskInstance taskInstance = (TaskInstance) allArguments[0];
TaskContext<TaskInstance> taskContext = new TaskContext<>(taskInstance, ContextManager.capture());
objInst.setSkyWalkingDynamicField(taskContext);
}
}

100
ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/MasterBaseTaskExecThreadMethodInterceptor.java

@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.dolphinscheduler.skywalking.plugin;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.runner.MasterBaseTaskExecThread;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import java.lang.reflect.Method;
import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_TASK_STATE;
import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_TASK_TYPE;
import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_TASK_INSTANCE_ID;
import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_PROCESS_DEFINITION_ID;
import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_PROCESS_INSTANCE_ID;
import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_TASK_INSTANCE_NAME;
import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_TASK_WORKER_GROUP;
import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_EXECUTE_METHOD;
import static org.apache.dolphinscheduler.skywalking.plugin.Utils.getProcessDefinitionId;
public class MasterBaseTaskExecThreadMethodInterceptor implements InstanceMethodsAroundInterceptor {
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
TaskContext<TaskInstance> taskContext = (TaskContext) objInst.getSkyWalkingDynamicField();
TaskInstance taskInstance = taskContext.getCache();
String operationName = getOperationNamePrefix(taskInstance) + taskInstance.getName();
AbstractSpan span = ContextManager.createLocalSpan(operationName);
span.setComponent(Utils.DOLPHIN_SCHEDULER);
TAG_PROCESS_DEFINITION_ID.set(span, String.valueOf(taskInstance.getProcessDefinitionId()));
TAG_PROCESS_INSTANCE_ID.set(span, String.valueOf(taskInstance.getProcessInstanceId()));
TAG_TASK_TYPE.set(span, taskInstance.getTaskType());
TAG_TASK_INSTANCE_ID.set(span, String.valueOf(taskInstance.getId()));
TAG_TASK_INSTANCE_NAME.set(span, taskInstance.getName());
TAG_TASK_WORKER_GROUP.set(span, taskInstance.getWorkerGroup());
TAG_EXECUTE_METHOD.set(span, Utils.getMethodName(method));
ContextManager.continued(taskContext.getContextSnapshot());
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
AbstractSpan span = ContextManager.activeSpan();
MasterBaseTaskExecThread original = (MasterBaseTaskExecThread) objInst;
TaskInstance taskInstance = original.getTaskInstance();
ExecutionStatus executionStatus = taskInstance.getState();
TAG_TASK_STATE.set(span, taskInstance.getState().getDescp());
if (!ExecutionStatus.SUCCESS.equals(executionStatus)) {
span.errorOccurred();
}
ContextManager.stopSpan();
return ret;
}
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
ContextManager.activeSpan().log(t);
}
private static String getOperationNamePrefix(TaskInstance taskInstance) {
String prefix = "";
if (taskInstance.isSubProcess()) {
prefix = "master/subprocess_task/";
} else if (taskInstance.isDependTask()) {
prefix = "master/depend_task/";
} else if (taskInstance.isConditionsTask()) {
prefix = "master/conditions_task/";
} else {
prefix = "master/task/";
}
return prefix + getProcessDefinitionId(taskInstance.getProcessDefinitionId()) + "/";
}
}

34
ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/MasterExecThreadConstructorInterceptor.java

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.dolphinscheduler.skywalking.plugin;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
public class MasterExecThreadConstructorInterceptor implements InstanceConstructorInterceptor {
@Override
public void onConstruct(EnhancedInstance objInst, Object[] allArguments) throws Throwable {
ProcessInstance processInstance = (ProcessInstance) allArguments[0];
TaskContext<ProcessInstance> taskContext = new TaskContext<>(processInstance, ContextManager.capture());
objInst.setSkyWalkingDynamicField(taskContext);
}
}

95
ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/MasterExecThreadMethodInterceptor.java

@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.dolphinscheduler.skywalking.plugin;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.RuntimeContext;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import java.lang.reflect.Method;
import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_PROJECT_ID;
import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_PROCESS_STATE;
import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_PROCESS_INSTANCE_ID;
import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_PROCESS_INSTANCE_NAME;
import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_PROCESS_INSTANCE_HOST;
import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_PROCESS_DEFINITION_ID;
import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_PROCESS_COMMAND_TYPE;
import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_PROCESS_WORKER_GROUP;
import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_PROCESS_TIMEOUT;
import static org.apache.dolphinscheduler.skywalking.plugin.Utils.MASTER_PROCESS_EXECUTION_STATUS;
import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_EXECUTE_METHOD;
import static org.apache.dolphinscheduler.skywalking.plugin.Utils.getProjectId;
public class MasterExecThreadMethodInterceptor implements InstanceMethodsAroundInterceptor {
private static final String OPERATION_NAME_PREFIX = "master/process/";
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
TaskContext<ProcessInstance> taskContext = (TaskContext<ProcessInstance>) objInst.getSkyWalkingDynamicField();
ProcessInstance processInstance = taskContext.getCache();
ProcessDefinition processDefinition = processInstance.getProcessDefinition();
String operationName = OPERATION_NAME_PREFIX + getProjectId(processDefinition.getProjectId()) + "/" + processDefinition.getName();
AbstractSpan span = ContextManager.createLocalSpan(operationName);
span.setComponent(Utils.DOLPHIN_SCHEDULER);
TAG_PROJECT_ID.set(span, String.valueOf(processDefinition.getProjectId()));
TAG_PROCESS_INSTANCE_ID.set(span, String.valueOf(processInstance.getId()));
TAG_PROCESS_INSTANCE_NAME.set(span, processInstance.getName());
TAG_PROCESS_INSTANCE_HOST.set(span, processInstance.getHost());
TAG_PROCESS_DEFINITION_ID.set(span, String.valueOf(processInstance.getProcessDefinitionId()));
TAG_PROCESS_COMMAND_TYPE.set(span, processInstance.getCommandType().name());
TAG_PROCESS_WORKER_GROUP.set(span, processInstance.getWorkerGroup());
TAG_PROCESS_TIMEOUT.set(span, String.valueOf(processInstance.getTimeout()));
TAG_EXECUTE_METHOD.set(span, Utils.getMethodName(method));
ContextManager.continued(taskContext.getContextSnapshot());
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
AbstractSpan span = ContextManager.activeSpan();
RuntimeContext runtimeContext = ContextManager.getRuntimeContext();
ExecutionStatus executionStatus = (ExecutionStatus) runtimeContext.get(MASTER_PROCESS_EXECUTION_STATUS);
if (executionStatus == null) {
ProcessInstance processInstance = (ProcessInstance) objInst.getSkyWalkingDynamicField();
executionStatus = processInstance.getState();
}
TAG_PROCESS_STATE.set(span, executionStatus.getDescp());
if (!ExecutionStatus.SUCCESS.equals(executionStatus)) {
span.errorOccurred();
}
ContextManager.stopSpan();
return ret;
}
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
ContextManager.activeSpan().log(t);
}
}

50
ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/MasterExecThreadStateCacheInterceptor.java

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.dolphinscheduler.skywalking.plugin;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.RuntimeContext;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import java.lang.reflect.Method;
import static org.apache.dolphinscheduler.skywalking.plugin.Utils.MASTER_PROCESS_EXECUTION_STATUS;
public class MasterExecThreadStateCacheInterceptor implements InstanceMethodsAroundInterceptor {
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
ExecutionStatus executionStatus = (ExecutionStatus) ret;
RuntimeContext runtimeContext = ContextManager.getRuntimeContext();
runtimeContext.put(MASTER_PROCESS_EXECUTION_STATUS, executionStatus);
return ret;
}
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
}
}

53
ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/MasterSchedulerServiceMethodInterceptor.java

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.dolphinscheduler.skywalking.plugin;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import java.lang.reflect.Method;
import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_EXECUTE_METHOD;
public class MasterSchedulerServiceMethodInterceptor implements InstanceMethodsAroundInterceptor {
private static final String OPERATION_NAME = "master/schedule/process";
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
AbstractSpan span = ContextManager.createLocalSpan(OPERATION_NAME);
span.setComponent(Utils.DOLPHIN_SCHEDULER);
Tags.LOGIC_ENDPOINT.set(span, Tags.VAL_LOCAL_SPAN_AS_LOGIC_ENDPOINT);
TAG_EXECUTE_METHOD.set(span, Utils.getMethodName(method));
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
ContextManager.stopSpan();
return ret;
}
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
ContextManager.activeSpan().log(t);
}
}

32
ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/NettyRemoteChannelConstructorInterceptor.java

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.dolphinscheduler.skywalking.plugin;
import io.netty.channel.Channel;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
public class NettyRemoteChannelConstructorInterceptor implements InstanceConstructorInterceptor {
@Override
public void onConstruct(EnhancedInstance objInst, Object[] allArguments) throws Throwable {
Channel channel = (Channel) allArguments[0];
objInst.setSkyWalkingDynamicField(channel.remoteAddress().toString());
}
}

69
ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/NettyRemoteChannelMethodInterceptor.java

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.dolphinscheduler.skywalking.plugin;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandContext;
import org.apache.skywalking.apm.agent.core.context.CarrierItem;
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import java.lang.reflect.Method;
import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_EXECUTE_METHOD;
public class NettyRemoteChannelMethodInterceptor implements InstanceMethodsAroundInterceptor {
private static final String OPERATION_NAME_PREFIX = "rpc/command/send/";
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
String remoteAddress = (String) objInst.getSkyWalkingDynamicField();
Command command = (Command) allArguments[0];
CommandContext commandContext = command.getContext();
String operationName = OPERATION_NAME_PREFIX + command.getType().name();
ContextCarrier contextCarrier = new ContextCarrier();
AbstractSpan span = ContextManager.createExitSpan(operationName, contextCarrier, remoteAddress);
span.setComponent(Utils.DOLPHIN_SCHEDULER);
SpanLayer.asRPCFramework(span);
TAG_EXECUTE_METHOD.set(span, Utils.getMethodName(method));
CarrierItem item = contextCarrier.items();
while (item.hasNext()) {
item = item.next();
commandContext.put(item.getHeadKey(), item.getHeadValue());
}
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
ContextManager.stopSpan();
return ret;
}
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
ContextManager.activeSpan().log(t);
}
}

70
ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/NettyRemotingClientMethodInterceptor.java

@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.dolphinscheduler.skywalking.plugin;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandContext;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.skywalking.apm.agent.core.context.CarrierItem;
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import java.lang.reflect.Method;
import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_EXECUTE_METHOD;
public class NettyRemotingClientMethodInterceptor implements InstanceMethodsAroundInterceptor {
private static final String OPERATION_NAME_PREFIX = "rpc/command/send/";
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
Host host = (Host) allArguments[0];
Command command = (Command) allArguments[1];
CommandContext commandContext = command.getContext();
String operationName = OPERATION_NAME_PREFIX + command.getType().name();
ContextCarrier contextCarrier = new ContextCarrier();
AbstractSpan span = ContextManager.createExitSpan(operationName, contextCarrier, host.getAddress());
span.setComponent(Utils.DOLPHIN_SCHEDULER);
SpanLayer.asRPCFramework(span);
TAG_EXECUTE_METHOD.set(span, Utils.getMethodName(method));
CarrierItem item = contextCarrier.items();
while (item.hasNext()) {
item = item.next();
commandContext.put(item.getHeadKey(), item.getHeadValue());
}
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
ContextManager.stopSpan();
return ret;
}
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
ContextManager.activeSpan().log(t);
}
}

72
ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/NettyRequestProcessorMethodInterceptor.java

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.dolphinscheduler.skywalking.plugin;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandContext;
import org.apache.skywalking.apm.agent.core.context.CarrierItem;
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import java.lang.reflect.Method;
import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_NETTY_REMOTE_ADDRESS;
import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_EXECUTE_METHOD;
public class NettyRequestProcessorMethodInterceptor implements InstanceMethodsAroundInterceptor {
private static final String OPERATION_NAME_PREFIX = "rpc/command/process/";
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
Channel channel = (Channel) allArguments[0];
Command command = (Command) allArguments[1];
CommandContext commandContext = command.getContext();
String operationName = OPERATION_NAME_PREFIX + command.getType().name();
ContextCarrier contextCarrier = new ContextCarrier();
CarrierItem item = contextCarrier.items();
while (item.hasNext()) {
item = item.next();
item.setHeadValue(commandContext.get(item.getHeadKey()));
}
AbstractSpan span = ContextManager.createEntrySpan(operationName, contextCarrier);
span.setComponent(Utils.DOLPHIN_SCHEDULER);
SpanLayer.asRPCFramework(span);
TAG_NETTY_REMOTE_ADDRESS.set(span, channel.remoteAddress().toString());
TAG_EXECUTE_METHOD.set(span, Utils.getMethodName(method));
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
ContextManager.stopSpan();
return ret;
}
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
ContextManager.activeSpan().log(t);
}
}

30
ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskContext.java

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.dolphinscheduler.skywalking.plugin;
import lombok.AllArgsConstructor;
import lombok.Getter;
import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
@Getter
@AllArgsConstructor
public class TaskContext<T> {
private T cache;
private ContextSnapshot contextSnapshot;
}

35
ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskExecuteConstructorInterceptor.java

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.dolphinscheduler.skywalking.plugin;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
public class TaskExecuteConstructorInterceptor implements InstanceConstructorInterceptor {
@Override
public void onConstruct(EnhancedInstance objInst, Object[] allArguments) throws Throwable {
TaskExecutionContext taskExecutionContext = (TaskExecutionContext) allArguments[0];
TaskContext<TaskExecutionContext> taskContext = new TaskContext<>(taskExecutionContext, ContextManager.capture());
objInst.setSkyWalkingDynamicField(taskContext);
}
}

88
ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskExecuteMethodInterceptor.java

@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.dolphinscheduler.skywalking.plugin;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import java.lang.reflect.Method;
import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_TASK_PARAMS;
import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_TASK_STATE;
import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_EXECUTE_METHOD;
public class TaskExecuteMethodInterceptor implements InstanceMethodsAroundInterceptor {
private static final int DEFAULT_TASK_STATUS_CODE = -1;
private static final int TASK_PARAMS_MAX_LENGTH = 2048;
private static final String OPERATION_NAME_PREFIX = "worker/";
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
TaskContext<TaskExecutionContext> taskContext = (TaskContext) objInst.getSkyWalkingDynamicField();
String operationName = OPERATION_NAME_PREFIX + taskContext.getCache().getTaskType() + "/" + method.getName();
AbstractSpan span = ContextManager.createLocalSpan(operationName);
span.setComponent(Utils.DOLPHIN_SCHEDULER);
TAG_EXECUTE_METHOD.set(span, Utils.getMethodName(method));
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
AbstractSpan span = ContextManager.activeSpan();
AbstractTask task = (AbstractTask) objInst;
int statusCode = task.getExitStatusCode();
ExecutionStatus status = task.getExitStatus();
TAG_TASK_STATE.set(span, status.getDescp());
if (statusCode != DEFAULT_TASK_STATUS_CODE && ExecutionStatus.FAILURE.equals(status)) {
logTaskParams(objInst);
span.errorOccurred();
}
ContextManager.stopSpan();
return ret;
}
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
logTaskParams(objInst);
ContextManager.activeSpan().log(t);
}
private void logTaskParams(EnhancedInstance objInst) {
TaskContext<TaskExecutionContext> taskContext = (TaskContext) objInst.getSkyWalkingDynamicField();
TaskExecutionContext executionContext = taskContext.getCache();
String taskParams = executionContext.getTaskParams();
String limitTaskParams = taskParams;
if (taskParams.length() > TASK_PARAMS_MAX_LENGTH) {
limitTaskParams = taskParams.substring(TASK_PARAMS_MAX_LENGTH);
}
AbstractSpan span = ContextManager.activeSpan();
TAG_TASK_PARAMS.set(span, limitTaskParams);
}
}

35
ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskExecuteThreadConstructorInterceptor.java

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.dolphinscheduler.skywalking.plugin;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
public class TaskExecuteThreadConstructorInterceptor implements InstanceConstructorInterceptor {
@Override
public void onConstruct(EnhancedInstance objInst, Object[] allArguments) throws Throwable {
TaskExecutionContext taskExecutionContext = (TaskExecutionContext) allArguments[0];
TaskContext<TaskExecutionContext> taskContext = new TaskContext<>(taskExecutionContext, ContextManager.capture());
objInst.setSkyWalkingDynamicField(taskContext);
}
}

83
ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskExecuteThreadMethodInterceptor.java

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.dolphinscheduler.skywalking.plugin;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.utils.EnumUtils;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import java.lang.reflect.Method;
import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_TASK_INSTANCE_ID;
import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_TASK_INSTANCE_HOST;
import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_TASK_EXECUTE_PATH;
import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_TASK_LOG_PATH;
import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_PROJECT_ID;
import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_PROCESS_DEFINITION_ID;
import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_PROCESS_INSTANCE_ID;
import static org.apache.dolphinscheduler.skywalking.plugin.Utils.getProjectId;
import static org.apache.dolphinscheduler.skywalking.plugin.Utils.getProcessDefinitionId;
import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_EXECUTE_METHOD;
public class TaskExecuteThreadMethodInterceptor implements InstanceMethodsAroundInterceptor {
private static final String OPERATION_NAME_PREFIX = "worker/execute/";
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
TaskContext<TaskExecutionContext> taskContext = (TaskContext) objInst.getSkyWalkingDynamicField();
TaskExecutionContext executionContext = taskContext.getCache();
TaskType type = EnumUtils.getEnum(TaskType.class, executionContext.getTaskType());
String operationName = OPERATION_NAME_PREFIX
+ type.getDescp()
+ "/" + getProjectId(executionContext.getProjectId())
+ "/" + getProcessDefinitionId(executionContext.getProcessDefineId())
+ "/" + executionContext.getTaskName();
AbstractSpan span = ContextManager.createLocalSpan(operationName);
span.setComponent(Utils.DOLPHIN_SCHEDULER);
TAG_PROJECT_ID.set(span, String.valueOf(executionContext.getProjectId()));
TAG_PROCESS_DEFINITION_ID.set(span, String.valueOf(executionContext.getProcessDefineId()));
TAG_PROCESS_INSTANCE_ID.set(span, String.valueOf(executionContext.getProcessInstanceId()));
TAG_TASK_INSTANCE_HOST.set(span, executionContext.getHost());
TAG_TASK_INSTANCE_ID.set(span, String.valueOf(executionContext.getTaskInstanceId()));
TAG_TASK_EXECUTE_PATH.set(span, executionContext.getExecutePath());
TAG_TASK_LOG_PATH.set(span, executionContext.getLogPath());
TAG_EXECUTE_METHOD.set(span, Utils.getMethodName(method));
ContextManager.continued(taskContext.getContextSnapshot());
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
ContextManager.stopSpan();
return ret;
}
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
ContextManager.activeSpan().log(t);
}
}

71
ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskPriorityQueueConsumerMethodInterceptor.java

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.dolphinscheduler.skywalking.plugin;
import org.apache.dolphinscheduler.service.queue.TaskPriority;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import java.lang.reflect.Method;
import java.util.Map;
import static org.apache.dolphinscheduler.skywalking.plugin.Utils.*;
public class TaskPriorityQueueConsumerMethodInterceptor implements InstanceMethodsAroundInterceptor {
private static final String OPERATION_NAME = "master/queue/take";
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
TaskPriority taskPriority = (TaskPriority) allArguments[0];
Map<String, Object> taskContext = taskPriority.getContext();
AbstractSpan span = ContextManager.createLocalSpan(OPERATION_NAME);
span.setComponent(Utils.DOLPHIN_SCHEDULER);
TAG_TASK_ID.set(span, String.valueOf(taskPriority.getTaskId()));
TAG_PROCESS_INSTANCE_ID.set(span, String.valueOf(taskPriority.getProcessInstanceId()));
Tags.LOGIC_ENDPOINT.set(span, Tags.VAL_LOCAL_SPAN_AS_LOGIC_ENDPOINT);
TAG_EXECUTE_METHOD.set(span, Utils.getMethodName(method));
ContextSnapshot contextSnapshot = (ContextSnapshot) taskContext.get(SKYWALKING_TRACING_CONTEXT);
ContextManager.continued(contextSnapshot);
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
AbstractSpan span = ContextManager.activeSpan();
boolean handleResult = (boolean) ret;
if (!handleResult) {
span.errorOccurred();
}
ContextManager.stopSpan();
return ret;
}
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
ContextManager.activeSpan().log(t);
}
}

73
ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/TaskPriorityQueueImplMethodInterceptor.java

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.dolphinscheduler.skywalking.plugin;
import org.apache.dolphinscheduler.service.queue.TaskPriority;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_TASK_ID;
import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_PROCESS_INSTANCE_ID;
import static org.apache.dolphinscheduler.skywalking.plugin.Utils.SKYWALKING_TRACING_CONTEXT;
import static org.apache.dolphinscheduler.skywalking.plugin.Utils.TAG_EXECUTE_METHOD;
public class TaskPriorityQueueImplMethodInterceptor implements InstanceMethodsAroundInterceptor {
private static final String OPERATION_NAME = "master/queue/put";
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
boolean isActive = ContextManager.isActive();
TaskPriority taskPriority = (TaskPriority) allArguments[0];
AbstractSpan span = ContextManager.createLocalSpan(OPERATION_NAME);
span.setComponent(Utils.DOLPHIN_SCHEDULER);
TAG_TASK_ID.set(span, String.valueOf(taskPriority.getTaskId()));
TAG_PROCESS_INSTANCE_ID.set(span, String.valueOf(taskPriority.getProcessInstanceId()));
Tags.LOGIC_ENDPOINT.set(span, Tags.VAL_LOCAL_SPAN_AS_LOGIC_ENDPOINT);
TAG_EXECUTE_METHOD.set(span, Utils.getMethodName(method));
if (isActive) {
Map<String, Object> taskPriorityContext = taskPriority.getContext();
if (taskPriorityContext == null) {
taskPriorityContext = new HashMap<>();
taskPriority.setContext(taskPriorityContext);
}
taskPriorityContext.put(SKYWALKING_TRACING_CONTEXT, ContextManager.capture());
}
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
ContextManager.stopSpan();
return ret;
}
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
ContextManager.activeSpan().log(t);
}
}

70
ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/Utils.java

@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.dolphinscheduler.skywalking.plugin;
import org.apache.skywalking.apm.agent.core.context.tag.StringTag;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
import org.apache.skywalking.apm.network.trace.component.OfficialComponent;
import java.lang.reflect.Method;
public class Utils {
public static final OfficialComponent DOLPHIN_SCHEDULER = ComponentsDefine.DOLPHIN_SCHEDULER;
public static final String SKYWALKING_TRACING_CONTEXT = "skywalking_tracing_context";
public static final String MASTER_PROCESS_EXECUTION_STATUS = "master_process_execution_status";
public static final StringTag TAG_PROJECT_ID = new StringTag("project.id");
public static final StringTag TAG_PROCESS_STATE = new StringTag("process.state");
public static final StringTag TAG_PROCESS_INSTANCE_ID = new StringTag("process.instance.id");
public static final StringTag TAG_PROCESS_INSTANCE_NAME = new StringTag("process.instance.name");
public static final StringTag TAG_PROCESS_INSTANCE_HOST = new StringTag("process.instance.host");
public static final StringTag TAG_PROCESS_DEFINITION_ID = new StringTag("process.definitionId");
public static final StringTag TAG_PROCESS_COMMAND_TYPE = new StringTag("process.commandType");
public static final StringTag TAG_PROCESS_WORKER_GROUP = new StringTag("process.workerGroup");
public static final StringTag TAG_PROCESS_TIMEOUT = new StringTag("process.timeout");
public static final StringTag TAG_TASK_STATE = new StringTag("task.state");
public static final StringTag TAG_TASK_TYPE = new StringTag("take.type");
public static final StringTag TAG_TASK_INSTANCE_ID = new StringTag("task.instance.id");
public static final StringTag TAG_TASK_INSTANCE_NAME = new StringTag("task.instanceName");
public static final StringTag TAG_TASK_INSTANCE_HOST = new StringTag("task.instance.host");
public static final StringTag TAG_TASK_WORKER_GROUP = new StringTag("task.workerGroup");
public static final StringTag TAG_TASK_ID = new StringTag("task.id");
public static final StringTag TAG_TASK_EXECUTE_PATH = new StringTag("task.executePath");
public static final StringTag TAG_TASK_LOG_PATH = new StringTag("task.logPath");
public static final StringTag TAG_TASK_PARAMS = new StringTag("task.params");
public static final StringTag TAG_NETTY_REMOTE_ADDRESS = new StringTag("netty.remoteAddress");
public static final StringTag TAG_EXECUTE_METHOD = new StringTag("execute.method");
public static String getProjectId(int id) {
return "project_id_" + id;
}
public static String getProcessDefinitionId(int id) {
return "process_definition_id_" + id;
}
public static String getMethodName(Method method) {
return method.getDeclaringClass().getTypeName() + "#" + method.getName();
}
}

92
ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/MasterBaseTaskExecThreadInterceptorInstrumentation.java

@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.dolphinscheduler.skywalking.plugin.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
/**
* Enhance {@link org.apache.dolphinscheduler.server.master.runner.MasterBaseTaskExecThread} instance and intercept `call` methods,
* this class provides an isolated thread for running job.
*
* @see org.apache.dolphinscheduler.skywalking.plugin.MasterBaseTaskExecThreadConstructorInterceptor
* @see org.apache.dolphinscheduler.skywalking.plugin.MasterBaseTaskExecThreadMethodInterceptor
*/
public class MasterBaseTaskExecThreadInterceptorInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
private static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.dolphinscheduler.skywalking.plugin.MasterBaseTaskExecThreadConstructorInterceptor";
private static final String METHOD_INTERCEPTOR_CLASS = "org.apache.dolphinscheduler.skywalking.plugin.MasterBaseTaskExecThreadMethodInterceptor";
private static final String ENHANC_CLASS = "org.apache.dolphinscheduler.server.master.runner.MasterBaseTaskExecThread";
@Override
protected ClassMatch enhanceClass() {
return byName(ENHANC_CLASS);
}
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[] {
new ConstructorInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getConstructorMatcher() {
return takesArguments(1)
.and(takesArgument(0, named("org.apache.dolphinscheduler.dao.entity.TaskInstance")));
}
@Override
public String getConstructorInterceptor() {
return CONSTRUCTOR_INTERCEPTOR_CLASS;
}
}
};
}
@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[] {
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named("call")
.and(isPublic())
.and(takesArguments(0));
}
@Override
public String getMethodsInterceptor() {
return METHOD_INTERCEPTOR_CLASS;
}
@Override
public boolean isOverrideArgs() {
return false;
}
}
};
}
}

111
ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/MasterExecThreadInterceptorInstrumentation.java

@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.dolphinscheduler.skywalking.plugin.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.returns;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
/**
* Enhance {@link org.apache.dolphinscheduler.server.master.runner.MasterExecThread} instance and intercept `run` and `getProcessInstanceState` methods,
* the `run` method is a unified entrance of scheduled job.
*
* @see org.apache.dolphinscheduler.skywalking.plugin.MasterExecThreadConstructorInterceptor
* @see org.apache.dolphinscheduler.skywalking.plugin.MasterExecThreadMethodInterceptor
* @see org.apache.dolphinscheduler.skywalking.plugin.MasterExecThreadStateCacheInterceptor
*/
public class MasterExecThreadInterceptorInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
private static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.dolphinscheduler.skywalking.plugin.MasterExecThreadConstructorInterceptor";
private static final String EXEC_PROCESS_METHOD_INTERCEPTOR_CLASS = "org.apache.dolphinscheduler.skywalking.plugin.MasterExecThreadMethodInterceptor";
private static final String CACHE_STATE_METHOD_INTERCEPTOR_CLASS = "org.apache.dolphinscheduler.skywalking.plugin.MasterExecThreadStateCacheInterceptor";
private static final String ENHANC_CLASS = "org.apache.dolphinscheduler.server.master.runner.MasterExecThread";
@Override
protected ClassMatch enhanceClass() {
return byName(ENHANC_CLASS);
}
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[] {
new ConstructorInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getConstructorMatcher() {
return takesArgument(0, named("org.apache.dolphinscheduler.dao.entity.ProcessInstance"));
}
@Override
public String getConstructorInterceptor() {
return CONSTRUCTOR_INTERCEPTOR_CLASS;
}
}
};
}
@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[] {
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named("run")
.and(isPublic())
.and(takesArguments(0));
}
@Override
public String getMethodsInterceptor() {
return EXEC_PROCESS_METHOD_INTERCEPTOR_CLASS;
}
@Override
public boolean isOverrideArgs() {
return false;
}
},
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named("getProcessInstanceState")
.and(returns(named("org.apache.dolphinscheduler.common.enums.ExecutionStatus")));
}
@Override
public String getMethodsInterceptor() {
return CACHE_STATE_METHOD_INTERCEPTOR_CLASS;
}
@Override
public boolean isOverrideArgs() {
return false;
}
}
};
}
}

72
ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/MasterSchedulerServiceInterceptorInstrumentation.java

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.dolphinscheduler.skywalking.plugin.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
/**
* Enhance {@link org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService} instance and intercept `scheduleProcess` method,
* this method is a scheduler entrance of job.
*
* @see org.apache.dolphinscheduler.skywalking.plugin.MasterSchedulerServiceMethodInterceptor
*/
public class MasterSchedulerServiceInterceptorInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
private static final String METHOD_INTERCEPTOR_CLASS = "org.apache.dolphinscheduler.skywalking.plugin.MasterSchedulerServiceMethodInterceptor";
private static final String ENHANC_CLASS = "org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService";
@Override
protected ClassMatch enhanceClass() {
return byName(ENHANC_CLASS);
}
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[0];
}
@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[] {
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named("scheduleProcess");
}
@Override
public String getMethodsInterceptor() {
return METHOD_INTERCEPTOR_CLASS;
}
@Override
public boolean isOverrideArgs() {
return false;
}
}
};
}
}

90
ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/NettyRemoteChannelInterceptorInstrumentation.java

@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.dolphinscheduler.skywalking.plugin.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
/**
* Enhance {@link org.apache.dolphinscheduler.server.worker.processor.NettyRemoteChannel} instance and intercept `writeAndFlush` method,
* this method send message to remote server.
*
* @see org.apache.dolphinscheduler.skywalking.plugin.NettyRemoteChannelConstructorInterceptor
* @see org.apache.dolphinscheduler.skywalking.plugin.NettyRemoteChannelMethodInterceptor
*/
public class NettyRemoteChannelInterceptorInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
private static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.dolphinscheduler.skywalking.plugin.NettyRemoteChannelConstructorInterceptor";
private static final String METHOD_INTERCEPTOR_CLASS = "org.apache.dolphinscheduler.skywalking.plugin.NettyRemoteChannelMethodInterceptor";
private static final String ENHANC_CLASS = "org.apache.dolphinscheduler.server.worker.processor.NettyRemoteChannel";
@Override
protected ClassMatch enhanceClass() {
return byName(ENHANC_CLASS);
}
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[] {
new ConstructorInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getConstructorMatcher() {
return takesArgument(0, named("io.netty.channel.Channel"));
}
@Override
public String getConstructorInterceptor() {
return CONSTRUCTOR_INTERCEPTOR_CLASS;
}
}
};
}
@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[] {
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named("writeAndFlush")
.and(takesArguments(1))
.and(takesArgument(0, named("org.apache.dolphinscheduler.remote.command.Command")));
}
@Override
public String getMethodsInterceptor() {
return METHOD_INTERCEPTOR_CLASS;
}
@Override
public boolean isOverrideArgs() {
return false;
}
}
};
}
}

77
ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/NettyRemotingClientInterceptorInstrumentation.java

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.dolphinscheduler.skywalking.plugin.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
/**
* Enhance {@link org.apache.dolphinscheduler.remote.NettyRemotingClient} instance and intercept `send` method,
* this method send message to remote server.
*
* @see org.apache.dolphinscheduler.skywalking.plugin.NettyRemotingClientMethodInterceptor
*/
public class NettyRemotingClientInterceptorInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
private static final String METHOD_INTERCEPTOR_CLASS = "org.apache.dolphinscheduler.skywalking.plugin.NettyRemotingClientMethodInterceptor";
private static final String ENHANC_CLASS = "org.apache.dolphinscheduler.remote.NettyRemotingClient";
@Override
protected ClassMatch enhanceClass() {
return byName(ENHANC_CLASS);
}
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[0];
}
@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[] {
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named("send")
.and(takesArguments(2))
.and(takesArgument(0, named("org.apache.dolphinscheduler.remote.utils.Host")))
.and(takesArgument(1, named("org.apache.dolphinscheduler.remote.command.Command")));
}
@Override
public String getMethodsInterceptor() {
return METHOD_INTERCEPTOR_CLASS;
}
@Override
public boolean isOverrideArgs() {
return false;
}
}
};
}
}

77
ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/NettyRequestProcessorInterceptorInstrumentation.java

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.dolphinscheduler.skywalking.plugin.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import org.apache.skywalking.apm.agent.core.plugin.match.HierarchyMatch;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import static net.bytebuddy.matcher.ElementMatchers.named;
/**
* Enhance {@link org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor} instance and intercept `process` method,
* this method receive and handle message from remote server.
*
* @see org.apache.dolphinscheduler.skywalking.plugin.NettyRequestProcessorMethodInterceptor
*/
public class NettyRequestProcessorInterceptorInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
private static final String METHOD_INTERCEPTOR_CLASS = "org.apache.dolphinscheduler.skywalking.plugin.NettyRequestProcessorMethodInterceptor";
private static final String ENHANC_CLASS = "org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor";
@Override
protected ClassMatch enhanceClass() {
return HierarchyMatch.byHierarchyMatch(ENHANC_CLASS);
}
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[0];
}
@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[] {
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named("process")
.and(takesArguments(2))
.and(takesArgument(0, named("io.netty.channel.Channel")))
.and(takesArgument(1, named("org.apache.dolphinscheduler.remote.command.Command")));
}
@Override
public String getMethodsInterceptor() {
return METHOD_INTERCEPTOR_CLASS;
}
@Override
public boolean isOverrideArgs() {
return false;
}
}
};
}
}

93
ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/TaskExecuteInterceptorInstrumentation.java

@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.dolphinscheduler.skywalking.plugin.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import org.apache.skywalking.apm.agent.core.plugin.match.HierarchyMatch;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
/**
* Enhance {@link org.apache.dolphinscheduler.server.worker.task.AbstractTask} instance and intercept `init``handle` and `after` methods,
* the implementation class execute task through these methods.
*
* @see org.apache.dolphinscheduler.skywalking.plugin.TaskExecuteConstructorInterceptor
* @see org.apache.dolphinscheduler.skywalking.plugin.TaskExecuteConstructorInterceptor
*/
public class TaskExecuteInterceptorInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
private static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.dolphinscheduler.skywalking.plugin.TaskExecuteConstructorInterceptor";
private static final String METHOD_INTERCEPTOR_CLASS = "org.apache.dolphinscheduler.skywalking.plugin.TaskExecuteMethodInterceptor";
private static final String ENHANC_CLASS = "org.apache.dolphinscheduler.server.worker.task.AbstractTask";
@Override
protected ClassMatch enhanceClass() {
return HierarchyMatch.byHierarchyMatch(ENHANC_CLASS);
}
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[] {
new ConstructorInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getConstructorMatcher() {
return takesArgument(0, named("org.apache.dolphinscheduler.server.entity.TaskExecutionContext"));
}
@Override
public String getConstructorInterceptor() {
return CONSTRUCTOR_INTERCEPTOR_CLASS;
}
}
};
}
@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[] {
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named("handle")
.or(named("init"))
.or(named("after"))
.and(isPublic())
.and(takesArguments(0));
}
@Override
public String getMethodsInterceptor() {
return METHOD_INTERCEPTOR_CLASS;
}
@Override
public boolean isOverrideArgs() {
return false;
}
}
};
}
}

91
ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/TaskExecuteThreadInterceptorInstrumentation.java

@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.dolphinscheduler.skywalking.plugin.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
/**
* Enhance {@link org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread} instance and intercept `run` method,
* this class provides an isolated thread for running task.
*
* @see org.apache.dolphinscheduler.skywalking.plugin.TaskExecuteThreadConstructorInterceptor
* @see org.apache.dolphinscheduler.skywalking.plugin.TaskExecuteThreadMethodInterceptor
*/
public class TaskExecuteThreadInterceptorInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
private static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.dolphinscheduler.skywalking.plugin.TaskExecuteThreadConstructorInterceptor";
private static final String METHOD_INTERCEPTOR_CLASS = "org.apache.dolphinscheduler.skywalking.plugin.TaskExecuteThreadMethodInterceptor";
private static final String ENHANC_CLASS = "org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread";
@Override
protected ClassMatch enhanceClass() {
return byName(ENHANC_CLASS);
}
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[] {
new ConstructorInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getConstructorMatcher() {
return takesArgument(0, named("org.apache.dolphinscheduler.server.entity.TaskExecutionContext"));
}
@Override
public String getConstructorInterceptor() {
return CONSTRUCTOR_INTERCEPTOR_CLASS;
}
}
};
}
@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[] {
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named("run")
.and(isPublic())
.and(takesArguments(0));
}
@Override
public String getMethodsInterceptor() {
return METHOD_INTERCEPTOR_CLASS;
}
@Override
public boolean isOverrideArgs() {
return false;
}
}
};
}
}

78
ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/TaskPriorityQueueConsumerInterceptorInstrumentation.java

@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.dolphinscheduler.skywalking.plugin.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.returns;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
/**
* Enhance {@link org.apache.dolphinscheduler.server.master.consumer.TaskPriorityQueueConsumer} instance and intercept `dispatch` method,
* this method is a dispatch the task info of memory queue to worker nodes.
*
* @see org.apache.dolphinscheduler.skywalking.plugin.TaskPriorityQueueConsumerMethodInterceptor
*/
public class TaskPriorityQueueConsumerInterceptorInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
private static final String METHOD_INTERCEPTOR_CLASS = "org.apache.dolphinscheduler.skywalking.plugin.TaskPriorityQueueConsumerMethodInterceptor";
private static final String ENHANC_CLASS = "org.apache.dolphinscheduler.server.master.consumer.TaskPriorityQueueConsumer";
@Override
protected ClassMatch enhanceClass() {
return byName(ENHANC_CLASS);
}
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[0];
}
@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[] {
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named("dispatch")
.and(takesArguments(1))
.and(takesArgument(0, named("org.apache.dolphinscheduler.service.queue.TaskPriority")))
.and(returns(boolean.class));
}
@Override
public String getMethodsInterceptor() {
return METHOD_INTERCEPTOR_CLASS;
}
@Override
public boolean isOverrideArgs() {
return false;
}
}
};
}
}

76
ext/skywalking/src/main/java/org/apache/dolphinscheduler/skywalking/plugin/define/TaskPriorityQueueImplInterceptorInstrumentation.java

@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.dolphinscheduler.skywalking.plugin.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
/**
* Enhance {@link org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl} instance and intercept `put` method,
* this method is a publish task info to memory queue.
*
* @see org.apache.dolphinscheduler.skywalking.plugin.TaskPriorityQueueImplMethodInterceptor
*/
public class TaskPriorityQueueImplInterceptorInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
private static final String METHOD_INTERCEPTOR_CLASS = "org.apache.dolphinscheduler.skywalking.plugin.TaskPriorityQueueImplMethodInterceptor";
private static final String ENHANC_CLASS = "org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl";
@Override
protected ClassMatch enhanceClass() {
return byName(ENHANC_CLASS);
}
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[0];
}
@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[] {
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named("put")
.and(takesArguments(1))
.and(takesArgument(0, named("org.apache.dolphinscheduler.service.queue.TaskPriority")));
}
@Override
public String getMethodsInterceptor() {
return METHOD_INTERCEPTOR_CLASS;
}
@Override
public boolean isOverrideArgs() {
return false;
}
}
};
}
}

26
ext/skywalking/src/main/resources/skywalking-plugin.def

@ -0,0 +1,26 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
dolphinscheduler=org.apache.dolphinscheduler.skywalking.plugin.define.MasterSchedulerServiceInterceptorInstrumentation
dolphinscheduler=org.apache.dolphinscheduler.skywalking.plugin.define.MasterExecThreadInterceptorInstrumentation
dolphinscheduler=org.apache.dolphinscheduler.skywalking.plugin.define.MasterBaseTaskExecThreadInterceptorInstrumentation
dolphinscheduler=org.apache.dolphinscheduler.skywalking.plugin.define.TaskPriorityQueueImplInterceptorInstrumentation
dolphinscheduler=org.apache.dolphinscheduler.skywalking.plugin.define.TaskPriorityQueueConsumerInterceptorInstrumentation
dolphinscheduler=org.apache.dolphinscheduler.skywalking.plugin.define.NettyRemotingClientInterceptorInstrumentation
dolphinscheduler=org.apache.dolphinscheduler.skywalking.plugin.define.NettyRequestProcessorInterceptorInstrumentation
dolphinscheduler=org.apache.dolphinscheduler.skywalking.plugin.define.NettyRemoteChannelInterceptorInstrumentation
dolphinscheduler=org.apache.dolphinscheduler.skywalking.plugin.define.TaskExecuteThreadInterceptorInstrumentation
dolphinscheduler=org.apache.dolphinscheduler.skywalking.plugin.define.TaskExecuteInterceptorInstrumentation

8
pom.xml

@ -117,6 +117,7 @@
<servlet-api.version>2.5</servlet-api.version> <servlet-api.version>2.5</servlet-api.version>
<swagger.version>1.9.3</swagger.version> <swagger.version>1.9.3</swagger.version>
<springfox.version>2.9.2</springfox.version> <springfox.version>2.9.2</springfox.version>
<skywalking.version>8.4.0</skywalking.version>
</properties> </properties>
<dependencyManagement> <dependencyManagement>
@ -543,6 +544,12 @@
<artifactId>swagger-bootstrap-ui</artifactId> <artifactId>swagger-bootstrap-ui</artifactId>
<version>${swagger.version}</version> <version>${swagger.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-toolkit-logback-1.x</artifactId>
<version>${skywalking.version}</version>
</dependency>
</dependencies> </dependencies>
</dependencyManagement> </dependencyManagement>
@ -1015,5 +1022,6 @@
<module>dolphinscheduler-remote</module> <module>dolphinscheduler-remote</module>
<module>dolphinscheduler-service</module> <module>dolphinscheduler-service</module>
<module>dolphinscheduler-plugin-api</module> <module>dolphinscheduler-plugin-api</module>
<module>ext/skywalking</module>
</modules> </modules>
</project> </project>

11
script/dolphinscheduler-daemon.sh

@ -97,7 +97,16 @@ else
exit 1 exit 1
fi fi
export DOLPHINSCHEDULER_OPTS="-server -Xms$HEAP_INITIAL_SIZE -Xmx$HEAP_MAX_SIZE -Xmn$HEAP_NEW_GENERATION__SIZE -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m -Xss512k -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:LargePageSizeInBytes=128m -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70 -XX:+PrintGCDetails -Xloggc:gc.log -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=dump.hprof" if [ "$SKYWALKING_ENABLE" = "true" ]; then
SKYWALKING_OPTS="-javaagent:$DOLPHINSCHEDULER_HOME/skywalking-agent/skywalking-agent.jar -DSW_AGENT_NAME=dolphinscheduler::$command -DSW_LOGGING_FILE_NAME=skywalking-dolphinscheduler-$command.log"
export DOLPHINSCHEDULER_OPTS="$DOLPHINSCHEDULER_OPTS $SKYWALKING_OPTS"
echo "Info: Skywalking enabled opts: $SKYWALKING_OPTS"
else
echo "Info: Skywalking not enabled."
fi
export DOLPHINSCHEDULER_OPTS="-server -Xms$HEAP_INITIAL_SIZE -Xmx$HEAP_MAX_SIZE -Xmn$HEAP_NEW_GENERATION__SIZE -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m -Xss512k -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:LargePageSizeInBytes=128m -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70 -XX:+PrintGCDetails -Xloggc:gc.log -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=dump.hprof $DOLPHINSCHEDULER_OPTS"
case $startStop in case $startStop in
(start) (start)

2
script/scp-hosts.sh

@ -54,7 +54,7 @@ do
echo "scp dirs to $host/$installPath starting" echo "scp dirs to $host/$installPath starting"
ssh -p $sshPort $host "cd $installPath/; rm -rf bin/ conf/ lib/ script/ sql/ ui/" ssh -p $sshPort $host "cd $installPath/; rm -rf bin/ conf/ lib/ script/ sql/ ui/"
for dsDir in bin conf lib script sql ui install.sh for dsDir in bin conf lib script sql ui skywalking-agent install.sh
do do
# if worker in workersGroupMap # if worker in workersGroupMap
if [[ "${workersGroupMap[${host}]}" ]] && [[ "${dsDir}" == "conf" ]]; then if [[ "${workersGroupMap[${host}]}" ]] && [[ "${dsDir}" == "conf" ]]; then

12
script/start-all.sh

@ -31,11 +31,13 @@ do
workersGroupMap+=([$worker]=$groupName) workersGroupMap+=([$worker]=$groupName)
done done
skywalkingEnv="SKYWALKING_ENABLE=$enableSkywalking SW_AGENT_COLLECTOR_BACKEND_SERVICES=$skywalkingServers SW_GRPC_LOG_SERVER_HOST=$skywalkingLogReporterHost SW_GRPC_LOG_SERVER_PORT=$skywalkingLogReporterPort"
mastersHost=(${masters//,/ }) mastersHost=(${masters//,/ })
for master in ${mastersHost[@]} for master in ${mastersHost[@]}
do do
echo "$master master server is starting" echo "$master master server is starting"
ssh -p $sshPort $master "cd $installPath/; sh bin/dolphinscheduler-daemon.sh start master-server;" ssh -p $sshPort $master "cd $installPath/; export $skywalkingEnv; sh bin/dolphinscheduler-daemon.sh start master-server;"
done done
@ -43,15 +45,15 @@ for worker in ${!workersGroupMap[*]}
do do
echo "$worker worker server is starting" echo "$worker worker server is starting"
ssh -p $sshPort $worker "cd $installPath/; sh bin/dolphinscheduler-daemon.sh start worker-server;" ssh -p $sshPort $worker "cd $installPath/; export $skywalkingEnv; sh bin/dolphinscheduler-daemon.sh start worker-server;"
ssh -p $sshPort $worker "cd $installPath/; sh bin/dolphinscheduler-daemon.sh start logger-server;" ssh -p $sshPort $worker "cd $installPath/; export $skywalkingEnv; sh bin/dolphinscheduler-daemon.sh start logger-server;"
done done
ssh -p $sshPort $alertServer "cd $installPath/; sh bin/dolphinscheduler-daemon.sh start alert-server;" ssh -p $sshPort $alertServer "cd $installPath/; export $skywalkingEnv; sh bin/dolphinscheduler-daemon.sh start alert-server;"
apiServersHost=(${apiServers//,/ }) apiServersHost=(${apiServers//,/ })
for apiServer in ${apiServersHost[@]} for apiServer in ${apiServersHost[@]}
do do
echo "$apiServer worker server is starting" echo "$apiServer worker server is starting"
ssh -p $sshPort $apiServer "cd $installPath/; sh bin/dolphinscheduler-daemon.sh start api-server;" ssh -p $sshPort $apiServer "cd $installPath/; export $skywalkingEnv; sh bin/dolphinscheduler-daemon.sh start api-server;"
done done

4
tools/dependencies/check-LICENSE.sh

@ -27,6 +27,10 @@ echo '=== Self modules: ' && ./mvnw --batch-mode --quiet -Dexec.executable='echo
echo '=== Distributed dependencies: ' && ls dist/lib | tee all-dependencies.txt echo '=== Distributed dependencies: ' && ls dist/lib | tee all-dependencies.txt
echo '=== Skywalking agent dependencies: ' && ls dist/skywalking-agent | grep .jar | tee -a all-dependencies.txt \
&& ls dist/skywalking-agent/plugins | tee -a all-dependencies.txt \
&& ls dist/skywalking-agent/activations | tee -a all-dependencies.txt
# Exclude all self modules(jars) to generate all third-party dependencies # Exclude all self modules(jars) to generate all third-party dependencies
echo '=== Third party dependencies: ' && grep -vf self-modules.txt all-dependencies.txt | tee third-party-dependencies.txt echo '=== Third party dependencies: ' && grep -vf self-modules.txt all-dependencies.txt | tee third-party-dependencies.txt

12
tools/dependencies/known-dependencies.txt

@ -12,6 +12,16 @@ aspectjweaver-1.9.2.jar
audience-annotations-0.5.0.jar audience-annotations-0.5.0.jar
avro-1.7.4.jar avro-1.7.4.jar
aws-java-sdk-1.7.4.jar aws-java-sdk-1.7.4.jar
apm-toolkit-logback-1.x-8.4.0.jar
apm-toolkit-logback-1.x-activation-8.4.0.jar
apm-httpclient-3.x-plugin-8.4.0.jar
apm-httpClient-4.x-plugin-8.4.0.jar
apm-httpclient-commons-8.4.0.jar
apm-quartz-scheduler-2.x-plugin-8.4.0.jar
apm-spring-core-patch-8.4.0.jar
apm-springmvc-annotation-5.x-plugin-8.4.0.jar
apm-springmvc-annotation-commons-8.4.0.jar
apm-zookeeper-3.4.x-plugin-8.4.0.jar
bonecp-0.8.0.RELEASE.jar bonecp-0.8.0.RELEASE.jar
byte-buddy-1.9.10.jar byte-buddy-1.9.10.jar
classmate-1.4.0.jar classmate-1.4.0.jar
@ -198,6 +208,8 @@ springfox-swagger2-2.9.2.jar
swagger-annotations-1.5.20.jar swagger-annotations-1.5.20.jar
swagger-bootstrap-ui-1.9.3.jar swagger-bootstrap-ui-1.9.3.jar
swagger-models-1.5.20.jar swagger-models-1.5.20.jar
skywalking-agent.jar
spring-commons-8.4.0.jar
tephra-api-0.6.0.jar tephra-api-0.6.0.jar
threetenbp-1.3.6.jar threetenbp-1.3.6.jar
transaction-api-1.1.jar transaction-api-1.1.jar

Loading…
Cancel
Save