Browse Source

Merge branch 'dev' of https://github.com/apache/incubator-dolphinscheduler into dev_wuxiaofei_server

pull/3/MERGE
wuxiaofei 4 years ago
parent
commit
911cf061e0
  1. 12
      README.md
  2. 6
      README_zh_CN.md
  3. 1
      docker/README.md
  4. 2
      docker/build/Dockerfile
  5. 46
      docker/build/conf/dolphinscheduler/alert.properties.tpl
  6. 25
      docker/build/conf/dolphinscheduler/application-api.properties.tpl
  7. 66
      docker/build/conf/dolphinscheduler/common.properties.tpl
  8. 15
      docker/build/conf/dolphinscheduler/datasource.properties.tpl
  9. 4
      docker/build/conf/dolphinscheduler/logback/logback-api.xml
  10. 4
      docker/build/conf/dolphinscheduler/logback/logback-master.xml
  11. 9
      docker/build/conf/dolphinscheduler/logback/logback-worker.xml
  12. 5
      docker/build/conf/dolphinscheduler/master.properties.tpl
  13. 14
      docker/build/conf/dolphinscheduler/worker.properties.tpl
  14. 1
      docker/build/conf/dolphinscheduler/zookeeper.properties.tpl
  15. 27
      docker/build/startup-init-conf.sh
  16. 38
      docker/build/startup.sh
  17. 28
      docker/docker-swarm/docker-compose.yml
  18. 28
      docker/docker-swarm/docker-stack.yml
  19. 2
      dolphinscheduler-alert/src/main/resources/alert.properties
  20. 6
      dolphinscheduler-api/src/main/resources/application-api.properties
  21. 4
      dolphinscheduler-api/src/main/resources/logback-api.xml
  22. 9
      dolphinscheduler-common/src/main/resources/common.properties
  23. 3
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
  24. 6
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
  25. 12
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java
  26. 4
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
  27. 16
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java
  28. 3
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
  29. 55
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
  30. 7
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
  31. 44
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
  32. 8
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
  33. 47
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
  34. 143
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
  35. 20
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
  36. 13
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
  37. 3
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java
  38. 21
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
  39. 32
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
  40. 2
      dolphinscheduler-server/src/main/resources/master.properties
  41. 3
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
  42. 9
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
  43. 195
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java
  44. 187
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThreadTest.java
  45. 53
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutorTest.java
  46. 127
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/ShellTaskReturnTest.java
  47. 75
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java
  48. 19
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java
  49. 12
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTaskTest.java
  50. 74
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  51. 2
      dolphinscheduler-service/src/main/resources/logback-zookeeper.xml
  52. 22
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
  53. BIN
      dolphinscheduler-ui.zip
  54. 19
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue
  55. 4
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/localParams.vue
  56. 2
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/shell.vue
  57. 50
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/udp/udp.vue
  58. 11
      dolphinscheduler-ui/src/js/conf/home/store/dag/mutations.js
  59. 4
      pom.xml

12
README.md

@ -37,7 +37,7 @@ Its main objectives are as follows:
### What's in DolphinScheduler
Stability | Easy to use | Features | Scalability |
Stability | Accessibility | Features | Scalability |
-- | -- | -- | --
Decentralized multi-master and multi-worker | Visualization of workflow key information, such as task status, task type, retry times, task operation machine information, visual variables, and so on at a glance.  |  Support pause, recover operation | Support customized task types
support HA | Visualization of all workflow operations, dragging tasks to draw DAGs, configuring data sources and resources. At the same time, for third-party systems, provide API mode operations. | Users on DolphinScheduler can achieve many-to-one or one-to-one mapping relationship through tenants and Hadoop users, which is very important for scheduling large data jobs. | The scheduler supports distributed scheduling, and the overall scheduling capability will increase linearly with the scale of the cluster. Master and Worker support dynamic adjustment.
@ -73,13 +73,19 @@ dolphinscheduler-dist/target/apache-dolphinscheduler-incubating-${latest.release
```
### Thanks
DolphinScheduler is based on a lot of excellent open-source projects, such as google guava, guice, grpc, netty, ali bonecp, quartz, and many open-source projects of Apache and so on.
We would like to express our deep gratitude to all the open-source projects used in Dolphin Scheduler. We hope that we are not only the beneficiaries of open-source, but also give back to the community. Besides, we hope everyone who have the same enthusiasm and passion for open source could join in and contribute to the open-source community!
### Get Help
1. Submit an [[issue](https://github.com/apache/incubator-dolphinscheduler/issues/new/choose)]
1. Subscribe to the mail list: https://dolphinscheduler.apache.org/en-us/docs/development/subscribe.html, then email dev@dolphinscheduler.apache.org
1. Subscribe to this mail list: https://dolphinscheduler.apache.org/en-us/community/development/subscribe.html, then email dev@dolphinscheduler.apache.org
### Community
You are so much welcomed to communicate with the developers and users of Dolphin Scheduler freely. There are two ways to find them:
1. Join the slack channel by [this invitation link](https://join.slack.com/t/asf-dolphinscheduler/shared_invite/zt-l8k90ceu-wwUfobaDkJxjzMfZp4y1Ag).
2. Follow the [twitter account of Dolphin Scheduler](https://twitter.com/dolphinschedule) and get the latest news just on time.
### How to Contribute
The community welcomes everyone to participate in contributing, please refer to this website to find out more: [[How to contribute](https://dolphinscheduler.apache.org/en-us/community/development/contribute.html)]

6
README_zh_CN.md

@ -81,7 +81,11 @@ Dolphin Scheduler使用了很多优秀的开源项目,比如google的guava、g
### 获得帮助
1. 提交issue
1. 先订阅邮件开发列表:[订阅邮件列表](https://dolphinscheduler.apache.org/zh-cn/docs/development/subscribe.html), 订阅成功后发送邮件到dev@dolphinscheduler.apache.org.
2. 先订阅邮件开发列表:[订阅邮件列表](https://dolphinscheduler.apache.org/zh-cn/community/development/subscribe.html), 订阅成功后发送邮件到dev@dolphinscheduler.apache.org.
### 社区
1. 通过[该申请链接](https://join.slack.com/t/asf-dolphinscheduler/shared_invite/zt-l8k90ceu-wwUfobaDkJxjzMfZp4y1Ag)加入slack channel
2. 关注[Apache Dolphin Scheduler的Twitter账号](https://twitter.com/dolphinschedule)获取实时动态
### 版权
请参考 [LICENSE](https://github.com/apache/incubator-dolphinscheduler/blob/dev/LICENSE) 文件.

1
docker/README.md

@ -0,0 +1 @@
# Dolphin Scheduler for Docker

2
docker/build/Dockerfile

@ -57,6 +57,6 @@ RUN dos2unix /root/checkpoint.sh && \
echo "Set disable_coredump false" >> /etc/sudo.conf
# 4. expose port
EXPOSE 5678 1234 12345 50051
EXPOSE 5678 1234 12345 50051 50052
ENTRYPOINT ["/sbin/tini", "--", "/root/startup.sh"]

46
docker/build/conf/dolphinscheduler/alert.properties.tpl

@ -14,43 +14,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
#alert type is EMAIL/SMS
alert.type=EMAIL
# alter msg template, default is html template
#alert.template=html
# mail server configuration
mail.protocol=SMTP
mail.server.host=${MAIL_SERVER_HOST}
mail.server.port=${MAIL_SERVER_PORT}
mail.sender=${MAIL_SENDER}
mail.user=${MAIL_USER}
mail.passwd=${MAIL_PASSWD}
# TLS
mail.smtp.starttls.enable=${MAIL_SMTP_STARTTLS_ENABLE}
# SSL
mail.smtp.ssl.enable=${MAIL_SMTP_SSL_ENABLE}
mail.smtp.ssl.trust=${MAIL_SMTP_SSL_TRUST}
#xls file path,need create if not exist
xls.file.path=${XLS_FILE_PATH}
# plugins dir
plugin.dir=${ALERT_PLUGIN_DIR}
# Enterprise WeChat configuration
enterprise.wechat.enable=${ENTERPRISE_WECHAT_ENABLE}
enterprise.wechat.corp.id=${ENTERPRISE_WECHAT_CORP_ID}
enterprise.wechat.secret=${ENTERPRISE_WECHAT_SECRET}
enterprise.wechat.agent.id=${ENTERPRISE_WECHAT_AGENT_ID}
enterprise.wechat.users=${ENTERPRISE_WECHAT_USERS}
enterprise.wechat.token.url=https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={corpId}&corpsecret={secret}
enterprise.wechat.push.url=https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={token}
enterprise.wechat.team.send.msg={\"toparty\":\"{toParty}\",\"agentid\":\"{agentId}\",\"msgtype\":\"text\",\"text\":{\"content\":\"{msg}\"},\"safe\":\"0\"}
enterprise.wechat.user.send.msg={\"touser\":\"{toUser}\",\"agentid\":\"{agentId}\",\"msgtype\":\"markdown\",\"markdown\":{\"content\":\"{msg}\"}}
#This configuration file configures the configuration parameters related to the AlertServer.
#These parameters are only related to the AlertServer, and it has nothing to do with the specific Alert Plugin.
#eg : max retry num.
#eg : Alert Server Listener port
#alert.plugin.dir config the Alert Plugin dir . AlertServer while find and load the Alert Plugin Jar from this dir when deploy and start AlertServer on the server .
alert.plugin.dir=${ALERT_PLUGIN_DIR}
#maven.local.repository=/Users/gaojun/Documents/jianguoyun/localRepository
#alert.plugin.binding config the Alert Plugin need be load when development and run in IDE
#alert.plugin.binding=\
# ./dolphinscheduler-alert-plugin/dolphinscheduler-alert-email/pom.xml

25
docker/build/conf/dolphinscheduler/application-api.properties.tpl

@ -24,12 +24,19 @@ server.servlet.session.timeout=7200
# servlet config
server.servlet.context-path=/dolphinscheduler/
# time zone
spring.jackson.time-zone=GMT+8
# file size limit for upload
spring.servlet.multipart.max-file-size=1024MB
spring.servlet.multipart.max-request-size=1024MB
# enable response compression
server.compression.enabled=true
server.compression.mime-types=text/html,text/xml,text/plain,text/css,text/javascript,application/javascript,application/json,application/xml
# post content
server.jetty.max-http-post-size=5000000
server.jetty.max-http-form-post-size=5000000
# i18n
spring.messages.encoding=UTF-8
@ -40,6 +47,16 @@ spring.messages.basename=i18n/messages
# Authentication types (supported types: PASSWORD)
security.authentication.type=PASSWORD
#============================================================================
# LDAP Config
# mock ldap server from https://www.forumsys.com/tutorials/integration-how-to/ldap/online-ldap-test-server/
#============================================================================
# admin userId
#security.authentication.ldap.user.admin=read-only-admin
# ldap server config
#ldap.urls=ldap://ldap.forumsys.com:389/
#ldap.base.dn=dc=example,dc=com
#ldap.username=cn=read-only-admin,dc=example,dc=com
#ldap.password=password
#ldap.user.identity.attribute=uid
#ldap.user.email.attribute=mail

66
docker/build/conf/dolphinscheduler/common.properties.tpl

@ -15,64 +15,64 @@
# limitations under the License.
#
#============================================================================
# System
#============================================================================
# system env path. self configuration, please make sure the directory and file exists and have read write execute permissions
dolphinscheduler.env.path=${DOLPHINSCHEDULER_ENV_PATH}
# user data directory path, self configuration, please make sure the directory exists and have read write permissions
data.basedir.path=${DOLPHINSCHEDULER_DATA_BASEDIR_PATH}
# resource upload startup type : HDFS,S3,NONE
# resource storage type : HDFS, S3, NONE
resource.storage.type=${RESOURCE_STORAGE_TYPE}
#============================================================================
# HDFS
#============================================================================
# resource store on HDFS/S3 path, resource file will store to this hadoop hdfs path, self configuration, please make sure the directory exists on hdfs and have read write permissions。"/dolphinscheduler" is recommended
resource.upload.path=${RESOURCE_UPLOAD_PATH}
# user data local directory path, please make sure the directory exists and have read write permissions
data.basedir.path=${DOLPHINSCHEDULER_DATA_BASEDIR_PATH}
# whether kerberos starts
#hadoop.security.authentication.startup.state=false
hadoop.security.authentication.startup.state=false
# java.security.krb5.conf path
#java.security.krb5.conf.path=/opt/krb5.conf
java.security.krb5.conf.path=/opt/krb5.conf
# loginUserFromKeytab user
#login.user.keytab.username=hdfs-mycluster@ESZ.COM
# login user from keytab username
login.user.keytab.username=hdfs-mycluster@ESZ.COM
# loginUserFromKeytab path
#login.user.keytab.path=/opt/hdfs.headless.keytab
# login user from keytab path
login.user.keytab.path=/opt/hdfs.headless.keytab
#resource.view.suffixs
#resource.view.suffixs=txt,log,sh,conf,cfg,py,java,sql,hql,xml,properties
#resource.view.suffixs=txt,log,sh,bat,conf,cfg,py,java,sql,xml,hql,properties,json,yml,yaml,ini,js
# if resource.storage.type=HDFS, the user need to have permission to create directories under the HDFS root path
hdfs.root.user=hdfs
# kerberos expire time
kerberos.expire.time=7
#============================================================================
# S3
#============================================================================
# if resource.storage.type=S3,the value like: s3a://dolphinscheduler ; if resource.storage.type=HDFS, When namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir
# if resource.storage.type=S3, the value like: s3a://dolphinscheduler; if resource.storage.type=HDFS, When namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir
fs.defaultFS=${FS_DEFAULT_FS}
# if resource.storage.type=S3s3 endpoint
# if resource.storage.type=S3, s3 endpoint
fs.s3a.endpoint=${FS_S3A_ENDPOINT}
# if resource.storage.type=S3s3 access key
# if resource.storage.type=S3, s3 access key
fs.s3a.access.key=${FS_S3A_ACCESS_KEY}
# if resource.storage.type=S3s3 secret key
# if resource.storage.type=S3, s3 secret key
fs.s3a.secret.key=${FS_S3A_SECRET_KEY}
# if not use hadoop resourcemanager, please keep default value; if resourcemanager HA enable, please type the HA ips ; if resourcemanager is single, make this value empty TODO
# if resourcemanager HA enable, please type the HA ips ; if resourcemanager is single, make this value empty
yarn.resourcemanager.ha.rm.ids=192.168.xx.xx,192.168.xx.xx
# If resourcemanager HA enable or not use resourcemanager, please keep the default value; If resourcemanager is single, you only need to replace ark1 to actual resourcemanager hostname.
yarn.application.status.address=http://ark1:8088/ws/v1/cluster/apps/%s
# if resourcemanager HA enable or not use resourcemanager, please keep the default value; If resourcemanager is single, you only need to replace ds1 to actual resourcemanager hostname.
yarn.application.status.address=http://ds1:8088/ws/v1/cluster/apps/%s
# job history status url when application number threshold is reached(default 10000,maybe it was set to 1000)
yarn.job.history.status.address=http://ds1:19888/ws/v1/history/mapreduce/jobs/%s
# system env path, If you want to set your own path, you need to set this env file to an absolute path
dolphinscheduler.env.path=${DOLPHINSCHEDULER_ENV_PATH}
development.state=false
# kerberos tgt expire time, unit is hours
kerberos.expire.time=2
# datasource encryption salt
datasource.encryption.enable=false
datasource.encryption.salt=!@#$%^&*
# Network IP gets priority, default inner outer
#dolphin.scheduler.network.priority.strategy=default

15
docker/build/conf/dolphinscheduler/datasource.properties.tpl

@ -17,12 +17,21 @@
# db
spring.datasource.driver-class-name=${DATABASE_DRIVER}
spring.datasource.url=jdbc:${DATABASE_TYPE}://${DATABASE_HOST}:${DATABASE_PORT}/${DATABASE_DATABASE}?${DATABASE_PARAMS}
spring.datasource.url=jdbc:${DATABASE_TYPE}://${DATABASE_HOST}:${DATABASE_PORT}/${DATABASE_DATABASE}${DATABASE_PARAMS:+?${DATABASE_PARAMS}}
spring.datasource.username=${DATABASE_USERNAME}
spring.datasource.password=${DATABASE_PASSWORD}
## base spring data source configuration todo need to remove
#spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
# postgresql
#spring.datasource.driver-class-name=org.postgresql.Driver
#spring.datasource.url=jdbc:postgresql://127.0.0.1:5432/dolphinscheduler
#spring.datasource.username=test
#spring.datasource.password=test
# mysql
#spring.datasource.driver-class-name=com.mysql.jdbc.Driver
#spring.datasource.url=jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8
#spring.datasource.username=xxxx
#spring.datasource.password=xxxx
# connection configuration
#spring.datasource.initialSize=5

4
docker/build/conf/dolphinscheduler/logback/logback-api.xml

@ -23,12 +23,12 @@
<!-- api server logback config start -->
<appender name="APILOGFILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log.base}/dolphinscheduler-api-server.log</file>
<file>${log.base}/dolphinscheduler-api.log</file>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFO</level>
</filter>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${log.base}/dolphinscheduler-api-server.%d{yyyy-MM-dd_HH}.%i.log</fileNamePattern>
<fileNamePattern>${log.base}/dolphinscheduler-api.%d{yyyy-MM-dd_HH}.%i.log</fileNamePattern>
<maxHistory>168</maxHistory>
<maxFileSize>64MB</maxFileSize>
</rollingPolicy>

4
docker/build/conf/dolphinscheduler/logback/logback-master.xml

@ -24,9 +24,9 @@
<conversionRule conversionWord="messsage"
converterClass="org.apache.dolphinscheduler.server.log.SensitiveDataConverter"/>
<appender name="TASKLOGFILE" class="ch.qos.logback.classic.sift.SiftingAppender">
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<!-- <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFO</level>
</filter>
</filter> -->
<filter class="org.apache.dolphinscheduler.server.log.TaskLogFilter"/>
<Discriminator class="org.apache.dolphinscheduler.server.log.TaskLogDiscriminator">
<key>taskAppId</key>

9
docker/build/conf/dolphinscheduler/logback/logback-worker.xml

@ -25,9 +25,9 @@
<conversionRule conversionWord="messsage"
converterClass="org.apache.dolphinscheduler.server.log.SensitiveDataConverter"/>
<appender name="TASKLOGFILE" class="ch.qos.logback.classic.sift.SiftingAppender">
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<!-- <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFO</level>
</filter>
</filter> -->
<filter class="org.apache.dolphinscheduler.server.log.TaskLogFilter"/>
<Discriminator class="org.apache.dolphinscheduler.server.log.TaskLogDiscriminator">
<key>taskAppId</key>
@ -48,10 +48,9 @@
</appender>
<appender name="WORKERLOGFILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log.base}/dolphinscheduler-worker.log</file>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<!-- <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFO</level>
</filter>
<filter class="org.apache.dolphinscheduler.server.log.WorkerLogFilter"/>
</filter> -->
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${log.base}/dolphinscheduler-worker.%d{yyyy-MM-dd_HH}.%i.log</fileNamePattern>
<maxHistory>168</maxHistory>

5
docker/build/conf/dolphinscheduler/master.properties.tpl

@ -21,6 +21,9 @@ master.exec.threads=${MASTER_EXEC_THREADS}
# master execute task number in parallel
master.exec.task.num=${MASTER_EXEC_TASK_NUM}
# master dispatch task number
#master.dispatch.task.num=3
# master heartbeat interval
master.heartbeat.interval=${MASTER_HEARTBEAT_INTERVAL}
@ -37,4 +40,4 @@ master.max.cpuload.avg=${MASTER_MAX_CPULOAD_AVG}
master.reserved.memory=${MASTER_RESERVED_MEMORY}
# master listen port
#master.listen.port=${MASTER_LISTEN_PORT}
master.listen.port=${MASTER_LISTEN_PORT}

14
docker/build/conf/dolphinscheduler/worker.properties.tpl

@ -21,20 +21,20 @@ worker.exec.threads=${WORKER_EXEC_THREADS}
# worker heartbeat interval
worker.heartbeat.interval=${WORKER_HEARTBEAT_INTERVAL}
# submit the number of tasks at a time
worker.fetch.task.num=${WORKER_FETCH_TASK_NUM}
# only less than cpu avg load, worker server can work. default value : the number of cpu cores * 2
# only less than cpu avg load, worker server can work. default value -1: the number of cpu cores * 2
worker.max.cpuload.avg=${WORKER_MAX_CPULOAD_AVG}
# only larger than reserved memory, worker server can work. default value : physical memory * 1/6, unit is G.
worker.reserved.memory=${WORKER_RESERVED_MEMORY}
# worker listener port
#worker.listen.port=${WORKER_LISTEN_PORT}
worker.listen.port=${WORKER_LISTEN_PORT}
# default worker group
#worker.groups=${WORKER_GROUP}
worker.groups=${WORKER_GROUP}
# default worker weight
#worker.weight=${WORKER_WEIGHT}
worker.weight=${WORKER_WEIGHT}
# alert server listener host
alert.listen.host=${ALERT_LISTEN_HOST}

1
docker/build/conf/dolphinscheduler/zookeeper.properties.tpl

@ -27,3 +27,4 @@ zookeeper.dolphinscheduler.root=${ZOOKEEPER_ROOT}
#zookeeper.retry.base.sleep=100
#zookeeper.retry.max.sleep=30000
#zookeeper.retry.maxtime=10
#zookeeper.max.wait.time=10000

27
docker/build/startup-init-conf.sh

@ -20,7 +20,8 @@ set -e
echo "init env variables"
# Define parameters default value.
# Define parameters default value
#============================================================================
# Database Source
#============================================================================
@ -34,7 +35,7 @@ export DATABASE_DRIVER=${DATABASE_DRIVER:-"org.postgresql.Driver"}
export DATABASE_PARAMS=${DATABASE_PARAMS:-"characterEncoding=utf8"}
#============================================================================
# System
# Common
#============================================================================
export DOLPHINSCHEDULER_ENV_PATH=${DOLPHINSCHEDULER_ENV_PATH:-"/opt/dolphinscheduler/conf/env/dolphinscheduler_env.sh"}
export DOLPHINSCHEDULER_DATA_BASEDIR_PATH=${DOLPHINSCHEDULER_DATA_BASEDIR_PATH:-"/tmp/dolphinscheduler"}
@ -69,35 +70,17 @@ export MASTER_LISTEN_PORT=${MASTER_LISTEN_PORT:-"5678"}
#============================================================================
export WORKER_EXEC_THREADS=${WORKER_EXEC_THREADS:-"100"}
export WORKER_HEARTBEAT_INTERVAL=${WORKER_HEARTBEAT_INTERVAL:-"10"}
export WORKER_FETCH_TASK_NUM=${WORKER_FETCH_TASK_NUM:-"3"}
export WORKER_MAX_CPULOAD_AVG=${WORKER_MAX_CPULOAD_AVG:-"100"}
export WORKER_RESERVED_MEMORY=${WORKER_RESERVED_MEMORY:-"0.1"}
export WORKER_LISTEN_PORT=${WORKER_LISTEN_PORT:-"1234"}
export WORKER_GROUP=${WORKER_GROUP:-"default"}
export WORKER_WEIGHT=${WORKER_WEIGHT:-"100"}
export ALERT_LISTEN_HOST=${ALERT_LISTEN_HOST:-"127.0.0.1"}
#============================================================================
# Alert Server
#============================================================================
# alert plugin dir
export ALERT_PLUGIN_DIR=${ALERT_PLUGIN_DIR:-"/opt/dolphinscheduler"}
# xls file
export XLS_FILE_PATH=${XLS_FILE_PATH:-"/tmp/xls"}
# mail
export MAIL_SERVER_HOST=${MAIL_SERVER_HOST:-""}
export MAIL_SERVER_PORT=${MAIL_SERVER_PORT:-""}
export MAIL_SENDER=${MAIL_SENDER:-""}
export MAIL_USER=${MAIL_USER:-""}
export MAIL_PASSWD=${MAIL_PASSWD:-""}
export MAIL_SMTP_STARTTLS_ENABLE=${MAIL_SMTP_STARTTLS_ENABLE:-"true"}
export MAIL_SMTP_SSL_ENABLE=${MAIL_SMTP_SSL_ENABLE:-"false"}
export MAIL_SMTP_SSL_TRUST=${MAIL_SMTP_SSL_TRUST:-""}
# wechat
export ENTERPRISE_WECHAT_ENABLE=${ENTERPRISE_WECHAT_ENABLE:-"false"}
export ENTERPRISE_WECHAT_CORP_ID=${ENTERPRISE_WECHAT_CORP_ID:-""}
export ENTERPRISE_WECHAT_SECRET=${ENTERPRISE_WECHAT_SECRET:-""}
export ENTERPRISE_WECHAT_AGENT_ID=${ENTERPRISE_WECHAT_AGENT_ID:-""}
export ENTERPRISE_WECHAT_USERS=${ENTERPRISE_WECHAT_USERS:-""}
export ALERT_PLUGIN_DIR=${ALERT_PLUGIN_DIR:-"lib/plugin/alert"}
echo "generate app config"
ls ${DOLPHINSCHEDULER_HOME}/conf/ | grep ".tpl" | while read line; do

38
docker/build/startup.sh

@ -26,7 +26,7 @@ DOLPHINSCHEDULER_LOGS=${DOLPHINSCHEDULER_HOME}/logs
waitDatabase() {
echo "test ${DATABASE_TYPE} service"
while ! nc -z ${DATABASE_HOST} ${DATABASE_PORT}; do
counter=$((counter+1))
local counter=$((counter+1))
if [ $counter == 30 ]; then
echo "Error: Couldn't connect to ${DATABASE_TYPE}."
exit 1
@ -57,12 +57,41 @@ initDatabase() {
${DOLPHINSCHEDULER_SCRIPT}/create-dolphinscheduler.sh
}
# check ds version
checkDSVersion() {
if [ ${DATABASE_TYPE} = "mysql" ]; then
v=$(mysql -h${DATABASE_HOST} -P${DATABASE_PORT} -u${DATABASE_USERNAME} --password=${DATABASE_PASSWORD} -D ${DATABASE_DATABASE} -e "SELECT * FROM public.t_ds_version" 2>/dev/null)
else
v=$(PGPASSWORD=${DATABASE_PASSWORD} psql -h ${DATABASE_HOST} -p ${DATABASE_PORT} -U ${DATABASE_USERNAME} -d ${DATABASE_DATABASE} -tAc "SELECT * FROM public.t_ds_version" 2>/dev/null)
fi
if [ -n "$v" ]; then
echo "ds version: $v"
return 0
else
return 1
fi
}
# check init database
checkInitDatabase() {
echo "check init database"
while ! checkDSVersion; do
local counter=$((counter+1))
if [ $counter == 30 ]; then
echo "Error: Couldn't check init database."
exit 1
fi
echo "Trying to check init database. Attempt $counter."
sleep 5
done
}
# wait zk
waitZK() {
echo "connect remote zookeeper"
echo "${ZOOKEEPER_QUORUM}" | awk -F ',' 'BEGIN{ i=1 }{ while( i <= NF ){ print $i; i++ } }' | while read line; do
while ! nc -z ${line%:*} ${line#*:}; do
counter=$((counter+1))
local counter=$((counter+1))
if [ $counter == 30 ]; then
echo "Error: Couldn't connect to zookeeper."
exit 1
@ -133,7 +162,7 @@ case "$1" in
initApiServer
initAlertServer
initLoggerServer
LOGFILE=${DOLPHINSCHEDULER_LOGS}/dolphinscheduler-api-server.log
LOGFILE=${DOLPHINSCHEDULER_LOGS}/dolphinscheduler-api.log
;;
(master-server)
waitZK
@ -153,10 +182,11 @@ case "$1" in
waitDatabase
initDatabase
initApiServer
LOGFILE=${DOLPHINSCHEDULER_LOGS}/dolphinscheduler-api-server.log
LOGFILE=${DOLPHINSCHEDULER_LOGS}/dolphinscheduler-api.log
;;
(alert-server)
waitDatabase
checkInitDatabase
initAlertServer
LOGFILE=${DOLPHINSCHEDULER_LOGS}/dolphinscheduler-alert.log
;;

28
docker/docker-swarm/docker-compose.yml

@ -87,22 +87,11 @@ services:
image: apache/dolphinscheduler:latest
container_name: dolphinscheduler-alert
command: alert-server
ports:
- 50052:50052
environment:
TZ: Asia/Shanghai
XLS_FILE_PATH: "/tmp/xls"
MAIL_SERVER_HOST: ""
MAIL_SERVER_PORT: ""
MAIL_SENDER: ""
MAIL_USER: ""
MAIL_PASSWD: ""
MAIL_SMTP_STARTTLS_ENABLE: "false"
MAIL_SMTP_SSL_ENABLE: "false"
MAIL_SMTP_SSL_TRUST: ""
ENTERPRISE_WECHAT_ENABLE: "false"
ENTERPRISE_WECHAT_CORP_ID: ""
ENTERPRISE_WECHAT_SECRET: ""
ENTERPRISE_WECHAT_AGENT_ID: ""
ENTERPRISE_WECHAT_USERS: ""
ALERT_PLUGIN_DIR: lib/plugin/alert
DATABASE_HOST: dolphinscheduler-postgresql
DATABASE_PORT: 5432
DATABASE_USERNAME: root
@ -169,21 +158,12 @@ services:
TZ: Asia/Shanghai
WORKER_EXEC_THREADS: "100"
WORKER_HEARTBEAT_INTERVAL: "10"
WORKER_FETCH_TASK_NUM: "3"
WORKER_MAX_CPULOAD_AVG: "100"
WORKER_RESERVED_MEMORY: "0.1"
WORKER_GROUP: "default"
WORKER_WEIGHT: "100"
DOLPHINSCHEDULER_DATA_BASEDIR_PATH: /tmp/dolphinscheduler
XLS_FILE_PATH: "/tmp/xls"
MAIL_SERVER_HOST: ""
MAIL_SERVER_PORT: ""
MAIL_SENDER: ""
MAIL_USER: ""
MAIL_PASSWD: ""
MAIL_SMTP_STARTTLS_ENABLE: "false"
MAIL_SMTP_SSL_ENABLE: "false"
MAIL_SMTP_SSL_TRUST: ""
ALERT_LISTEN_HOST: dolphinscheduler-alert
DATABASE_HOST: dolphinscheduler-postgresql
DATABASE_PORT: 5432
DATABASE_USERNAME: root

28
docker/docker-swarm/docker-stack.yml

@ -84,22 +84,11 @@ services:
dolphinscheduler-alert:
image: apache/dolphinscheduler:latest
command: alert-server
ports:
- 50052:50052
environment:
TZ: Asia/Shanghai
XLS_FILE_PATH: "/tmp/xls"
MAIL_SERVER_HOST: ""
MAIL_SERVER_PORT: ""
MAIL_SENDER: ""
MAIL_USER: ""
MAIL_PASSWD: ""
MAIL_SMTP_STARTTLS_ENABLE: "false"
MAIL_SMTP_SSL_ENABLE: "false"
MAIL_SMTP_SSL_TRUST: ""
ENTERPRISE_WECHAT_ENABLE: "false"
ENTERPRISE_WECHAT_CORP_ID: ""
ENTERPRISE_WECHAT_SECRET: ""
ENTERPRISE_WECHAT_AGENT_ID: ""
ENTERPRISE_WECHAT_USERS: ""
ALERT_PLUGIN_DIR: lib/plugin/alert
DATABASE_HOST: dolphinscheduler-postgresql
DATABASE_PORT: 5432
DATABASE_USERNAME: root
@ -163,21 +152,12 @@ services:
TZ: Asia/Shanghai
WORKER_EXEC_THREADS: "100"
WORKER_HEARTBEAT_INTERVAL: "10"
WORKER_FETCH_TASK_NUM: "3"
WORKER_MAX_CPULOAD_AVG: "100"
WORKER_RESERVED_MEMORY: "0.1"
WORKER_GROUP: "default"
WORKER_WEIGHT: "100"
DOLPHINSCHEDULER_DATA_BASEDIR_PATH: /tmp/dolphinscheduler
XLS_FILE_PATH: "/tmp/xls"
MAIL_SERVER_HOST: ""
MAIL_SERVER_PORT: ""
MAIL_SENDER: ""
MAIL_USER: ""
MAIL_PASSWD: ""
MAIL_SMTP_STARTTLS_ENABLE: "false"
MAIL_SMTP_SSL_ENABLE: "false"
MAIL_SMTP_SSL_TRUST: ""
ALERT_LISTEN_HOST: dolphinscheduler-alert
DATABASE_HOST: dolphinscheduler-postgresql
DATABASE_PORT: 5432
DATABASE_USERNAME: root

2
dolphinscheduler-alert/src/main/resources/alert.properties

@ -28,5 +28,3 @@ alert.plugin.dir=./lib/plugin/alert
#alert.plugin.binding config the Alert Plugin need be load when development and run in IDE
#alert.plugin.binding=\
# ./dolphinscheduler-alert-plugin/dolphinscheduler-alert-email/pom.xml

6
dolphinscheduler-api/src/main/resources/application-api.properties

@ -21,9 +21,10 @@ server.port=12345
# session config
server.servlet.session.timeout=7200
# servlet config
server.servlet.context-path=/dolphinscheduler/
# Set time zone
# time zone
spring.jackson.time-zone=GMT+8
# file size limit for upload
@ -37,6 +38,7 @@ server.compression.mime-types=text/html,text/xml,text/plain,text/css,text/javasc
# post content
server.jetty.max-http-form-post-size=5000000
# i18n
spring.messages.encoding=UTF-8
# i18n classpath folder , file prefix messages, if have many files, use "," seperator
@ -58,5 +60,3 @@ security.authentication.type=PASSWORD
#ldap.password=password
#ldap.user.identity.attribute=uid
#ldap.user.email.attribute=mail

4
dolphinscheduler-api/src/main/resources/logback-api.xml

@ -31,12 +31,12 @@
<!-- api server logback config start -->
<appender name="APILOGFILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log.base}/dolphinscheduler-api-server.log</file>
<file>${log.base}/dolphinscheduler-api.log</file>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFO</level>
</filter>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${log.base}/dolphinscheduler-api-server.%d{yyyy-MM-dd_HH}.%i.log</fileNamePattern>
<fileNamePattern>${log.base}/dolphinscheduler-api.%d{yyyy-MM-dd_HH}.%i.log</fileNamePattern>
<maxHistory>168</maxHistory>
<maxFileSize>64MB</maxFileSize>
</rollingPolicy>

9
dolphinscheduler-common/src/main/resources/common.properties

@ -33,7 +33,7 @@ java.security.krb5.conf.path=/opt/krb5.conf
# login user from keytab username
login.user.keytab.username=hdfs-mycluster@ESZ.COM
# loginUserFromKeytab path
# login user from keytab path
login.user.keytab.path=/opt/hdfs.headless.keytab
#resource.view.suffixs
@ -45,13 +45,13 @@ hdfs.root.user=hdfs
# if resource.storage.type=S3, the value like: s3a://dolphinscheduler; if resource.storage.type=HDFS, When namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir
fs.defaultFS=hdfs://mycluster:8020
# if resource.storage.type=S3s3 endpoint
# if resource.storage.type=S3, s3 endpoint
fs.s3a.endpoint=http://192.168.xx.xx:9010
# if resource.storage.type=S3s3 access key
# if resource.storage.type=S3, s3 access key
fs.s3a.access.key=A3DXS30FO22544RE
# if resource.storage.type=S3s3 secret key
# if resource.storage.type=S3, s3 secret key
fs.s3a.secret.key=OloCLq3n+8+sdPHUhJ21XrSxTC+JK
# if resourcemanager HA enable, please type the HA ips ; if resourcemanager is single, make this value empty
@ -59,6 +59,7 @@ yarn.resourcemanager.ha.rm.ids=192.168.xx.xx,192.168.xx.xx
# if resourcemanager HA enable or not use resourcemanager, please keep the default value; If resourcemanager is single, you only need to replace ds1 to actual resourcemanager hostname.
yarn.application.status.address=http://ds1:8088/ws/v1/cluster/apps/%s
# job history status url when application number threshold is reached(default 10000,maybe it was set to 1000)
yarn.job.history.status.address=http://ds1:19888/ws/v1/history/mapreduce/jobs/%s

3
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java

@ -221,4 +221,7 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
@Param("processDefinitionId") int processDefinitionId,
@Param("states") int[] states);
int updateGlobalParamsById(
@Param("globalParams") String globalParams,
@Param("id") int id);
}

6
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml

@ -219,5 +219,9 @@
</foreach>
order by id asc
</select>
<update id="updateGlobalParamsById">
update t_ds_process_instance
set global_params = #{globalParams}
where id = #{id}
</update>
</mapper>

12
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java

@ -68,6 +68,10 @@ public class TaskExecuteResponseCommand implements Serializable {
* varPool string
*/
private String varPool;
/**
* task return result
*/
private String result;
public void setVarPool(String varPool) {
this.varPool = varPool;
@ -139,4 +143,12 @@ public class TaskExecuteResponseCommand implements Serializable {
+ ", appIds='" + appIds + '\''
+ '}';
}
public String getResult() {
return result;
}
public void setResult(String result) {
this.result = result;
}
}

4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java

@ -80,7 +80,9 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
responseCommand.getAppIds(),
responseCommand.getTaskInstanceId(),
responseCommand.getVarPool(),
channel);
channel,
responseCommand.getResult()
);
taskResponseService.addResponse(taskResponseEvent);
}

16
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java

@ -92,6 +92,10 @@ public class TaskResponseEvent {
* channel
*/
private Channel channel;
/**
* task return result
*/
private String result;
public static TaskResponseEvent newAck(ExecutionStatus state,
Date startTime,
@ -118,7 +122,8 @@ public class TaskResponseEvent {
String appIds,
int taskInstanceId,
String varPool,
Channel channel) {
Channel channel,
String result) {
TaskResponseEvent event = new TaskResponseEvent();
event.setState(state);
event.setEndTime(endTime);
@ -128,6 +133,7 @@ public class TaskResponseEvent {
event.setEvent(Event.RESULT);
event.setVarPool(varPool);
event.setChannel(channel);
event.setResult(result);
return event;
}
@ -226,4 +232,12 @@ public class TaskResponseEvent {
public void setChannel(Channel channel) {
this.channel = channel;
}
public String getResult() {
return result;
}
public void setResult(String result) {
this.result = result;
}
}

3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java

@ -165,7 +165,8 @@ public class TaskResponseService {
taskResponseEvent.getProcessId(),
taskResponseEvent.getAppIds(),
taskResponseEvent.getTaskInstanceId(),
taskResponseEvent.getVarPool()
taskResponseEvent.getVarPool(),
taskResponseEvent.getResult()
);
}
// if taskInstance is null (maybe deleted) . retry will be meaningless . so response success

55
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

@ -22,11 +22,13 @@ import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_D
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODE_NAMES;
import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
import static org.apache.dolphinscheduler.common.Constants.SEC_2_MINUTES_TIME_UNIT;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DependResult;
import org.apache.dolphinscheduler.common.enums.Direct;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Flag;
@ -36,6 +38,7 @@ import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.process.ProcessDag;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
@ -67,6 +70,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -74,6 +78,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -491,7 +496,8 @@ public class MasterExecThread implements Runnable {
*/
private TaskInstance createTaskInstance(ProcessInstance processInstance, String nodeName,
TaskNode taskNode) {
//update processInstance for update the globalParams
this.processInstance = this.processService.findProcessInstanceById(this.processInstance.getId());
TaskInstance taskInstance = findTaskIfExists(nodeName);
if (taskInstance == null) {
taskInstance = new TaskInstance();
@ -540,13 +546,57 @@ public class MasterExecThread implements Runnable {
} else {
taskInstance.setWorkerGroup(taskWorkerGroup);
}
//get process global
setProcessGlobal(taskNode, taskInstance);
// delay execution time
taskInstance.setDelayTime(taskNode.getDelayTime());
}
return taskInstance;
}
private void setProcessGlobal(TaskNode taskNode, TaskInstance taskInstance) {
String globalParams = this.processInstance.getGlobalParams();
if (StringUtils.isNotEmpty(globalParams)) {
Map<String, String> globalMap = getGlobalParamMap(globalParams);
if (globalMap != null && globalMap.size() != 0) {
setGlobalMapToTask(taskNode, taskInstance, globalMap);
}
}
}
private void setGlobalMapToTask(TaskNode taskNode, TaskInstance taskInstance, Map<String, String> globalMap) {
// the param save in localParams
Map<String, Object> result = JSONUtils.toMap(taskNode.getParams(), String.class, Object.class);
Object localParams = result.get(LOCAL_PARAMS);
if (localParams != null) {
List<Property> allParam = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class);
for (Property info : allParam) {
if (info.getDirect().equals(Direct.IN)) {
String paramName = info.getProp();
String value = globalMap.get(paramName);
if (StringUtils.isNotEmpty(value)) {
info.setValue(value);
}
}
}
result.put(LOCAL_PARAMS, allParam);
taskNode.setParams(JSONUtils.toJsonString(result));
// task instance node json
taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode));
}
}
public Map<String, String> getGlobalParamMap(String globalParams) {
List<Property> propList;
Map<String,String> globalParamMap = new HashMap<>();
if (StringUtils.isNotEmpty(globalParams)) {
propList = JSONUtils.toList(globalParams, Property.class);
globalParamMap = propList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue));
}
return globalParamMap;
}
private void submitPostNode(String parentNodeName) {
Set<String> submitTaskNodeList = DagHelper.parsePostNodes(parentNodeName, skipTaskNodeList, dag, completeTaskList);
List<TaskInstance> taskInstances = new ArrayList<>();
@ -951,6 +1001,7 @@ public class MasterExecThread implements Runnable {
task.getName(), task.getId(), task.getState());
// node success , post node submit
if (task.getState() == ExecutionStatus.SUCCESS) {
processInstance = processService.findProcessInstanceById(processInstance.getId());
processInstance.setVarPool(task.getVarPool());
processService.updateProcessInstance(processInstance);
completeTaskList.put(task.getName(), task);

7
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
import org.apache.dolphinscheduler.server.worker.runner.RetryReportTaskStatusThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@ -84,6 +85,9 @@ public class WorkerServer {
@Autowired
private RetryReportTaskStatusThread retryReportTaskStatusThread;
@Autowired
private WorkerManagerThread workerManagerThread;
/**
* worker server startup
*
@ -119,6 +123,9 @@ public class WorkerServer {
// worker registry
this.workerRegistry.registry();
// task execute manager
this.workerManagerThread.start();
// retry report task status
this.retryReportTaskStatusThread.start();

44
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java

@ -20,7 +20,7 @@ package org.apache.dolphinscheduler.server.worker.processor;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
@ -38,12 +38,12 @@ import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCache
import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.Date;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -57,11 +57,6 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
private static final Logger logger = LoggerFactory.getLogger(TaskExecuteProcessor.class);
/**
* thread executor service
*/
private final ExecutorService workerExecService;
/**
* worker config
*/
@ -80,13 +75,18 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
/**
* taskExecutionContextCacheManager
*/
private TaskExecutionContextCacheManager taskExecutionContextCacheManager;
private final TaskExecutionContextCacheManager taskExecutionContextCacheManager;
/*
* task execute manager
*/
private final WorkerManagerThread workerManager;
public TaskExecuteProcessor() {
this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class);
this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads());
this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class);
this.workerManager = SpringApplicationContext.getBean(WorkerManagerThread.class);
}
/**
@ -101,11 +101,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
}
public TaskExecuteProcessor(AlertClientService alertClientService) {
this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class);
this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads());
this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class);
this();
this.alertClientService = alertClientService;
}
@ -140,9 +136,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
taskExecutionContext.getTaskInstanceId()));
taskExecutionContext.setHost(NetUtils.getAddr(workerConfig.getListenPort()));
taskExecutionContext.setStartTime(new Date());
taskExecutionContext.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext));
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
// local execute path
String execLocalPath = getExecLocalPath(taskExecutionContext);
@ -163,10 +157,23 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(),
new NettyRemoteChannel(channel, command.getOpaque()));
// delay task process
long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(), taskExecutionContext.getDelayTime() * 60L);
if (remainTime > 0) {
logger.info("delay the execution of task instance {}, delay time: {} s", taskExecutionContext.getTaskInstanceId(), remainTime);
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.DELAY_EXECUTION);
taskExecutionContext.setStartTime(null);
} else {
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
taskExecutionContext.setStartTime(new Date());
}
this.doAck(taskExecutionContext);
// submit task
workerExecService.submit(new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger, alertClientService));
// submit task to manager
if (!workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger, alertClientService))) {
logger.info("submit task to manager error, queue is full, queue size is {}", workerManager.getQueueSize());
}
}
private void doAck(TaskExecutionContext taskExecutionContext) {
@ -178,6 +185,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
/**
* build ack command
*
* @param taskExecutionContext taskExecutionContext
* @return TaskExecuteAckCommand
*/

8
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java

@ -36,6 +36,7 @@ import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.log.LogClientService;
@ -69,10 +70,16 @@ public class TaskKillProcessor implements NettyRequestProcessor {
*/
private TaskExecutionContextCacheManager taskExecutionContextCacheManager;
/*
* task execute manager
*/
private final WorkerManagerThread workerManager;
public TaskKillProcessor() {
this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class);
this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class);
this.workerManager = SpringApplicationContext.getBean(WorkerManagerThread.class);
}
/**
@ -110,6 +117,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
Integer processId = taskExecutionContext.getProcessId();
if (processId.equals(0)) {
workerManager.killTaskBeforeExecuteByInstanceId(taskInstanceId);
taskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
logger.info("the task has not been executed and has been cancelled, task id:{}", taskInstanceId);
return Pair.of(true, appIds);

47
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java

@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
@ -51,7 +50,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
@ -62,7 +63,7 @@ import com.github.rholder.retry.RetryException;
/**
* task scheduler thread
*/
public class TaskExecuteThread implements Runnable {
public class TaskExecuteThread implements Runnable, Delayed {
/**
* logger
@ -132,7 +133,6 @@ public class TaskExecuteThread implements Runnable {
// task node
TaskNode taskNode = JSONUtils.parseObject(taskExecutionContext.getTaskJson(), TaskNode.class);
delayExecutionIfNeeded();
if (taskExecutionContext.getStartTime() == null) {
taskExecutionContext.setStartTime(new Date());
}
@ -174,6 +174,7 @@ public class TaskExecuteThread implements Runnable {
responseCommand.setProcessId(task.getProcessId());
responseCommand.setAppIds(task.getAppIds());
responseCommand.setVarPool(task.getVarPool());
responseCommand.setResult(task.getResultString());
logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), task.getExitStatus());
} catch (Exception e) {
logger.error("task scheduler failure", e);
@ -289,24 +290,6 @@ public class TaskExecuteThread implements Runnable {
}
}
/**
* delay execution if needed.
*/
private void delayExecutionIfNeeded() {
long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(),
taskExecutionContext.getDelayTime() * 60L);
logger.info("delay execution time: {} s", remainTime < 0 ? 0 : remainTime);
if (remainTime > 0) {
try {
Thread.sleep(remainTime * Constants.SLEEP_TIME_MILLIS);
} catch (Exception e) {
logger.error("delay task execution failure, the task will be executed directly. process instance id:{}, task instance id:{}",
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId());
}
}
}
/**
* send an ack to change the status of the task.
*/
@ -343,4 +326,26 @@ public class TaskExecuteThread implements Runnable {
}
return ackCommand;
}
/**
* get current TaskExecutionContext
* @return TaskExecutionContext
*/
public TaskExecutionContext getTaskExecutionContext() {
return this.taskExecutionContext;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(),
taskExecutionContext.getDelayTime() * 60L), TimeUnit.SECONDS);
}
@Override
public int compareTo(Delayed o) {
if (o == null) {
return 1;
}
return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
}
}

143
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java

@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
/**
* Manage tasks
*/
@Component
public class WorkerManagerThread implements Runnable {
private final Logger logger = LoggerFactory.getLogger(WorkerManagerThread.class);
/**
* task queue
*/
private final DelayQueue<TaskExecuteThread> workerExecuteQueue = new DelayQueue<>();
/**
* worker config
*/
private final WorkerConfig workerConfig;
/**
* thread executor service
*/
private final ExecutorService workerExecService;
/**
* taskExecutionContextCacheManager
*/
private TaskExecutionContextCacheManager taskExecutionContextCacheManager;
/**
* task callback service
*/
private final TaskCallbackService taskCallbackService;
public WorkerManagerThread() {
this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class);
this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", this.workerConfig.getWorkerExecThreads());
this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class);
}
/**
* get queue size
*
* @return queue size
*/
public int getQueueSize() {
return workerExecuteQueue.size();
}
/**
* Kill tasks that have not been executed, like delay task
* then send Response to Master, update the execution status of task instance
*/
public void killTaskBeforeExecuteByInstanceId(Integer taskInstanceId) {
workerExecuteQueue.stream()
.filter(taskExecuteThread -> taskExecuteThread.getTaskExecutionContext().getTaskInstanceId() == taskInstanceId)
.forEach(workerExecuteQueue::remove);
sendTaskKillResponse(taskInstanceId);
}
/**
* kill task before execute , like delay task
*/
private void sendTaskKillResponse(Integer taskInstanceId) {
TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
if (taskExecutionContext == null) {
return;
}
TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId());
responseCommand.setStatus(ExecutionStatus.KILL.getCode());
ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command(), Event.RESULT);
taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command());
}
/**
* submit task
*
* @param taskExecuteThread taskExecuteThread
* @return submit result
*/
public boolean offer(TaskExecuteThread taskExecuteThread) {
return workerExecuteQueue.offer(taskExecuteThread);
}
public void start() {
Thread thread = new Thread(this, this.getClass().getName());
thread.start();
}
@Override
public void run() {
Thread.currentThread().setName("Worker-Execute-Manager-Thread");
TaskExecuteThread taskExecuteThread;
while (Stopper.isRunning()) {
try {
taskExecuteThread = workerExecuteQueue.take();
workerExecService.submit(taskExecuteThread);
} catch (Exception e) {
logger.error("An unexpected interrupt is happened, "
+ "the exception will be ignored and this thread will continue to run", e);
}
}
}
}

20
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java

@ -54,6 +54,7 @@ import java.util.regex.Pattern;
import org.slf4j.Logger;
/**
* abstract command executor
*/
@ -84,6 +85,11 @@ public abstract class AbstractCommandExecutor {
*/
protected final List<String> logBuffer;
/**
* SHELL result string
*/
protected String taskResultString;
/**
* taskExecutionContext
*/
@ -104,6 +110,10 @@ public abstract class AbstractCommandExecutor {
this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class);
}
protected AbstractCommandExecutor(List<String> logBuffer) {
this.logBuffer = logBuffer;
}
/**
* build process
*
@ -223,6 +233,7 @@ public abstract class AbstractCommandExecutor {
return varPool.toString();
}
/**
* cancel application
*
@ -355,6 +366,7 @@ public abstract class AbstractCommandExecutor {
varPool.append("$VarPool$");
} else {
logBuffer.add(line);
taskResultString = line;
lastFlushTime = flush(lastFlushTime);
}
}
@ -561,4 +573,12 @@ public abstract class AbstractCommandExecutor {
protected abstract String commandInterpreter();
protected abstract void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException;
public String getTaskResultString() {
return taskResultString;
}
public void setTaskResultString(String taskResultString) {
this.taskResultString = taskResultString;
}
}

13
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java

@ -63,6 +63,11 @@ public abstract class AbstractTask {
*/
protected int processId;
/**
* SHELL result string
*/
protected String resultString;
/**
* other resource manager appId , for example : YARN etc
*/
@ -167,6 +172,14 @@ public abstract class AbstractTask {
this.processId = processId;
}
public String getResultString() {
return resultString;
}
public void setResultString(String resultString) {
this.resultString = resultString;
}
/**
* get task parameters
*

3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java

@ -56,6 +56,9 @@ public class ShellCommandExecutor extends AbstractCommandExecutor {
super(logHandler,taskExecutionContext,logger);
}
public ShellCommandExecutor(List<String> logBuffer) {
super(logBuffer);
}
@Override
protected String buildCommandFilePath() {

21
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java

@ -21,6 +21,7 @@ import static java.util.Calendar.DAY_OF_MONTH;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.Direct;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.shell.ShellParameters;
@ -34,6 +35,8 @@ import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
import org.slf4j.Logger;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
@ -41,13 +44,13 @@ import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
/**
* shell task
*/
@ -102,6 +105,7 @@ public class ShellTask extends AbstractTask {
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(commandExecuteResult.getAppIds());
setProcessId(commandExecuteResult.getProcessId());
setResult(shellCommandExecutor.getTaskResultString());
} catch (Exception e) {
logger.error("shell task error", e);
setExitStatusCode(Constants.EXIT_CODE_FAILURE);
@ -183,4 +187,17 @@ public class ShellTask extends AbstractTask {
}
return ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap));
}
public void setResult(String result) {
Map<String, Property> localParams = shellParameters.getLocalParametersMap();
List<Map<String, String>> outProperties = new ArrayList<>();
Map<String, String> p = new HashMap<>();
localParams.forEach((k,v) -> {
if (v.getDirect() == Direct.OUT) {
p.put(k, result);
}
});
outProperties.add(p);
resultString = JSONUtils.toJsonString(outProperties);
}
}

32
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.worker.task.sql;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.enums.Direct;
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
@ -148,7 +149,7 @@ public class SqlTask extends AbstractTask {
logger);
// execute sql task
executeFuncAndSql(mainSqlBinds, preStatementSqlBinds, postStatementSqlBinds, createFuncs);
executeFuncAndSql(mainSqlBinds, preStatementSqlBinds, postStatementSqlBinds, createFuncs, sqlParameters.getLocalParams());
setExitStatusCode(Constants.EXIT_CODE_SUCCESS);
@ -237,7 +238,8 @@ public class SqlTask extends AbstractTask {
public void executeFuncAndSql(SqlBinds mainSqlBinds,
List<SqlBinds> preStatementsBinds,
List<SqlBinds> postStatementsBinds,
List<String> createFuncs) {
List<String> createFuncs,
List<Property> properties) {
Connection connection = null;
PreparedStatement stmt = null;
ResultSet resultSet = null;
@ -253,18 +255,21 @@ public class SqlTask extends AbstractTask {
preSql(connection, preStatementsBinds);
stmt = prepareStatementAndBind(connection, mainSqlBinds);
String result = null;
// decide whether to executeQuery or executeUpdate based on sqlType
if (sqlParameters.getSqlType() == SqlType.QUERY.ordinal()) {
// query statements need to be convert to JsonArray and inserted into Alert to send
resultSet = stmt.executeQuery();
resultProcess(resultSet);
result = resultProcess(resultSet);
} else if (sqlParameters.getSqlType() == SqlType.NON_QUERY.ordinal()) {
// non query statement
stmt.executeUpdate();
String updateResult = String.valueOf(stmt.executeUpdate());
result = setNonQuerySqlReturn(updateResult, properties);
}
postSql(connection, postStatementsBinds);
this.setResultString(result);
} catch (Exception e) {
logger.error("execute sql error", e);
@ -274,13 +279,28 @@ public class SqlTask extends AbstractTask {
}
}
public String setNonQuerySqlReturn(String updateResult, List<Property> properties) {
String result = null;
for (Property info :properties) {
if (Direct.OUT == info.getDirect()) {
List<Map<String,String>> updateRL = new ArrayList<>();
Map<String,String> updateRM = new HashMap<>();
updateRM.put(info.getProp(),updateResult);
updateRL.add(updateRM);
result = JSONUtils.toJsonString(updateRL);
break;
}
}
return result;
}
/**
* result process
*
* @param resultSet resultSet
* @throws Exception Exception
*/
private void resultProcess(ResultSet resultSet) throws Exception {
private String resultProcess(ResultSet resultSet) throws Exception {
ArrayNode resultJSONArray = JSONUtils.createArrayNode();
ResultSetMetaData md = resultSet.getMetaData();
int num = md.getColumnCount();
@ -297,13 +317,13 @@ public class SqlTask extends AbstractTask {
}
String result = JSONUtils.toJsonString(resultJSONArray);
logger.debug("execute sql : {}", result);
try {
sendAttachment(sqlParameters.getGroupId(), StringUtils.isNotEmpty(sqlParameters.getTitle()) ? sqlParameters.getTitle() : taskExecutionContext.getTaskName() + " query result sets",
JSONUtils.toJsonString(resultJSONArray));
} catch (Exception e) {
logger.warn("sql task sendAttachment error! msg : {} ", e.getMessage());
}
return result;
}
/**

2
dolphinscheduler-server/src/main/resources/master.properties

@ -21,7 +21,6 @@
# master execute task number in parallel
#master.exec.task.num=20
# master dispatch task number
#master.dispatch.task.num=3
@ -34,7 +33,6 @@
# master commit task interval
#master.task.commit.interval=1000
# only less than cpu avg load, master server can work. default value -1 : the number of cpu cores * 2
#master.max.cpuload.avg=-1

3
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java

@ -70,7 +70,8 @@ public class TaskResponseServiceTest {
"ids",
22,
"varPol",
channel);
channel,
"[{\"id\":70000,\"database_name\":\"yuul\",\"status\":-1,\"create_time\":1601202829000,\"update_time\":1601202829000,\"table_name3\":\"\",\"table_name4\":\"\"}]");
taskInstance = new TaskInstance();
taskInstance.setId(22);

9
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java

@ -39,6 +39,7 @@ import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.server.zk.SpringZKServer;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.zk.CuratorZookeeperClient;
@ -79,7 +80,8 @@ import io.netty.channel.Channel;
TaskResponseProcessor.class,
TaskExecuteProcessor.class,
CuratorZookeeperClient.class,
TaskExecutionContextCacheManagerImpl.class})
TaskExecutionContextCacheManagerImpl.class,
WorkerManagerThread.class})
public class TaskCallbackServiceTest {
@Autowired
@ -119,6 +121,11 @@ public class TaskCallbackServiceTest {
ackCommand.setStartTime(new Date());
taskCallbackService.sendAck(1, ackCommand.convert2Command());
TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand();
String result = responseCommand.getResult();
responseCommand.setResult("return string");
taskCallbackService.sendResult(1, responseCommand.convert2Command());
Stopper.stop();
nettyRemotingServer.close();

195
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java

@ -0,0 +1,195 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.processor;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* test task execute processor
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({SpringApplicationContext.class, TaskCallbackService.class, WorkerConfig.class, FileUtils.class,
JsonSerializer.class, JSONUtils.class, ThreadUtils.class, ExecutorService.class, ChannelUtils.class})
public class TaskExecuteProcessorTest {
private TaskExecutionContext taskExecutionContext;
private TaskCallbackService taskCallbackService;
private ExecutorService workerExecService;
private WorkerConfig workerConfig;
private Command command;
private Command ackCommand;
private TaskExecuteRequestCommand taskRequestCommand;
private TaskExecutionContextCacheManagerImpl taskExecutionContextCacheManager;
private AlertClientService alertClientService;
private WorkerManagerThread workerManager;
@Before
public void before() throws Exception {
// init task execution context
taskExecutionContext = getTaskExecutionContext();
workerConfig = new WorkerConfig();
workerConfig.setWorkerExecThreads(1);
workerConfig.setListenPort(1234);
command = new Command();
command.setType(CommandType.TASK_EXECUTE_REQUEST);
ackCommand = new TaskExecuteAckCommand().convert2Command();
taskRequestCommand = new TaskExecuteRequestCommand();
alertClientService = PowerMockito.mock(AlertClientService.class);
workerExecService = PowerMockito.mock(ExecutorService.class);
PowerMockito.when(workerExecService.submit(Mockito.any(TaskExecuteThread.class)))
.thenReturn(null);
PowerMockito.mockStatic(ChannelUtils.class);
PowerMockito.when(ChannelUtils.toAddress(null)).thenReturn(null);
taskExecutionContextCacheManager = PowerMockito.mock(TaskExecutionContextCacheManagerImpl.class);
taskCallbackService = PowerMockito.mock(TaskCallbackService.class);
PowerMockito.doNothing().when(taskCallbackService).sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand);
PowerMockito.mockStatic(SpringApplicationContext.class);
PowerMockito.when(SpringApplicationContext.getBean(TaskCallbackService.class))
.thenReturn(taskCallbackService);
PowerMockito.when(SpringApplicationContext.getBean(WorkerConfig.class))
.thenReturn(workerConfig);
PowerMockito.when(SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class))
.thenReturn(null);
PowerMockito.when(SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class))
.thenReturn(taskExecutionContextCacheManager);
Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
taskExecutionContext.getProcessDefineId(),
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId()));
workerManager = PowerMockito.mock(WorkerManagerThread.class);
PowerMockito.when(workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger, alertClientService))).thenReturn(Boolean.TRUE);
PowerMockito.when(SpringApplicationContext.getBean(WorkerManagerThread.class))
.thenReturn(workerManager);
PowerMockito.mockStatic(ThreadUtils.class);
PowerMockito.when(ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads()))
.thenReturn(workerExecService);
PowerMockito.mockStatic(JsonSerializer.class);
PowerMockito.when(JsonSerializer.deserialize(command.getBody(), TaskExecuteRequestCommand.class))
.thenReturn(taskRequestCommand);
PowerMockito.mockStatic(JSONUtils.class);
PowerMockito.when(JSONUtils.parseObject(command.getBody(), TaskExecuteRequestCommand.class))
.thenReturn(taskRequestCommand);
PowerMockito.when(JSONUtils.parseObject(taskRequestCommand.getTaskExecutionContext(), TaskExecutionContext.class))
.thenReturn(taskExecutionContext);
PowerMockito.mockStatic(FileUtils.class);
PowerMockito.when(FileUtils.getProcessExecDir(taskExecutionContext.getProjectId(),
taskExecutionContext.getProcessDefineId(),
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId()))
.thenReturn(taskExecutionContext.getExecutePath());
PowerMockito.doNothing().when(FileUtils.class, "createWorkDirIfAbsent", taskExecutionContext.getExecutePath());
SimpleTaskExecuteThread simpleTaskExecuteThread = new SimpleTaskExecuteThread(null, null, null, alertClientService);
PowerMockito.whenNew(TaskExecuteThread.class).withAnyArguments()
.thenReturn(simpleTaskExecuteThread);
}
@Test
public void testNormalExecution() {
TaskExecuteProcessor processor = new TaskExecuteProcessor();
processor.process(null, command);
Assert.assertEquals(ExecutionStatus.RUNNING_EXECUTION, taskExecutionContext.getCurrentExecutionStatus());
}
@Test
public void testDelayExecution() {
taskExecutionContext.setDelayTime(1);
TaskExecuteProcessor processor = new TaskExecuteProcessor();
processor.process(null, command);
Assert.assertEquals(ExecutionStatus.DELAY_EXECUTION, taskExecutionContext.getCurrentExecutionStatus());
}
public TaskExecutionContext getTaskExecutionContext() {
TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setProcessId(12345);
taskExecutionContext.setProcessDefineId(1);
taskExecutionContext.setProcessInstanceId(1);
taskExecutionContext.setTaskInstanceId(1);
taskExecutionContext.setTaskType("sql");
taskExecutionContext.setFirstSubmitTime(new Date());
taskExecutionContext.setDelayTime(0);
taskExecutionContext.setLogPath("/tmp/test.log");
taskExecutionContext.setHost("localhost");
taskExecutionContext.setExecutePath("/tmp/dolphinscheduler/exec/process/1/2/3/4");
return taskExecutionContext;
}
private static class SimpleTaskExecuteThread extends TaskExecuteThread {
public SimpleTaskExecuteThread(TaskExecutionContext taskExecutionContext, TaskCallbackService taskCallbackService, Logger taskLogger, AlertClientService alertClientService) {
super(taskExecutionContext, taskCallbackService, taskLogger, alertClientService);
}
@Override
public void run() {
//
}
}
}

187
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThreadTest.java

@ -0,0 +1,187 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskManager;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* test worker manager thread.
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({
Stopper.class,
TaskManager.class,
JSONUtils.class,
CommonUtils.class,
SpringApplicationContext.class,
OSUtils.class})
public class WorkerManagerThreadTest {
private TaskCallbackService taskCallbackService;
private WorkerManagerThread workerManager;
private TaskExecutionContext taskExecutionContext;
private AlertClientService alertClientService;
private Logger taskLogger;
@Before
public void before() {
// init task execution context, logger
taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setProcessId(12345);
taskExecutionContext.setProcessDefineId(1);
taskExecutionContext.setProcessInstanceId(1);
taskExecutionContext.setTaskInstanceId(1);
taskExecutionContext.setTenantCode("test");
taskExecutionContext.setTaskType("");
taskExecutionContext.setFirstSubmitTime(new Date());
taskExecutionContext.setDelayTime(0);
taskExecutionContext.setLogPath("/tmp/test.log");
taskExecutionContext.setHost("localhost");
taskExecutionContext.setExecutePath("/tmp/dolphinscheduler/exec/process/1/2/3/4");
Command ackCommand = new TaskExecuteAckCommand().convert2Command();
Command responseCommand = new TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId()).convert2Command();
taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(
LoggerUtils.TASK_LOGGER_INFO_PREFIX,
taskExecutionContext.getProcessDefineId(),
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId()
));
TaskExecutionContextCacheManagerImpl taskExecutionContextCacheManager = new TaskExecutionContextCacheManagerImpl();
taskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
alertClientService = PowerMockito.mock(AlertClientService.class);
WorkerConfig workerConfig = PowerMockito.mock(WorkerConfig.class);
taskCallbackService = PowerMockito.mock(TaskCallbackService.class);
PowerMockito.doNothing().when(taskCallbackService).sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand);
PowerMockito.doNothing().when(taskCallbackService).sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand);
PowerMockito.mockStatic(SpringApplicationContext.class);
PowerMockito.when(SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class))
.thenReturn(taskExecutionContextCacheManager);
PowerMockito.when(SpringApplicationContext.getBean(WorkerConfig.class))
.thenReturn(workerConfig);
PowerMockito.when(SpringApplicationContext.getBean(TaskCallbackService.class))
.thenReturn(taskCallbackService);
PowerMockito.when(workerConfig.getWorkerExecThreads()).thenReturn(5);
workerManager = new WorkerManagerThread();
PowerMockito.mockStatic(TaskManager.class);
PowerMockito.when(TaskManager.newTask(taskExecutionContext, taskLogger, alertClientService))
.thenReturn(new SimpleTask(taskExecutionContext, taskLogger));
PowerMockito.mockStatic(JSONUtils.class);
PowerMockito.when(JSONUtils.parseObject(taskExecutionContext.getTaskJson(), TaskNode.class))
.thenReturn(new TaskNode());
PowerMockito.mockStatic(CommonUtils.class);
PowerMockito.when(CommonUtils.getSystemEnvPath()).thenReturn("/user_home/.bash_profile");
List<String> osUserList = Collections.singletonList("test");
PowerMockito.mockStatic(OSUtils.class);
PowerMockito.when(OSUtils.getUserList()).thenReturn(osUserList);
PowerMockito.mockStatic(Stopper.class);
PowerMockito.when(Stopper.isRunning()).thenReturn(true, false);
}
@Test
public void testSendTaskKillResponse() {
TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger, alertClientService);
workerManager.offer(taskExecuteThread);
Assert.assertEquals(1, workerManager.getQueueSize());
workerManager.killTaskBeforeExecuteByInstanceId(1);
Assert.assertEquals(0, workerManager.getQueueSize());
}
@Test
public void testRun() {
TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger, alertClientService);
workerManager.offer(taskExecuteThread);
Assert.assertEquals(1, workerManager.getQueueSize());
workerManager.run();
Assert.assertEquals(0, workerManager.getQueueSize());
}
private static class SimpleTask extends AbstractTask {
protected SimpleTask(TaskExecutionContext taskExecutionContext, Logger logger) {
super(taskExecutionContext, logger);
// pid
this.processId = taskExecutionContext.getProcessId();
}
@Override
public AbstractParameters getParameters() {
return null;
}
@Override
public void init() {
}
@Override
public void handle() {
}
@Override
public void after() {
}
@Override
public ExecutionStatus getExitStatus() {
return ExecutionStatus.SUCCESS;
}
}
}

53
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutorTest.java

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@RunWith(PowerMockRunner.class)
@PrepareForTest({SpringApplicationContext.class})
public class AbstractCommandExecutorTest {
private static final Logger logger = LoggerFactory.getLogger(AbstractCommandExecutorTest.class);
private ShellCommandExecutor shellCommandExecutor;
@Before
public void before() throws Exception {
System.setProperty("log4j2.disable.jmx", Boolean.TRUE.toString());
shellCommandExecutor = new ShellCommandExecutor(null);
}
@Test
public void testSetTaskResultString() {
shellCommandExecutor.setTaskResultString("shellReturn");
}
@Test
public void testGetTaskResultString() {
logger.info(shellCommandExecutor.getTaskResultString());
}
}

127
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/ShellTaskReturnTest.java

@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task;
import static org.mockito.ArgumentMatchers.anyString;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
import org.apache.dolphinscheduler.server.worker.task.shell.ShellTask;
import org.apache.dolphinscheduler.server.worker.task.shell.ShellTaskTest;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* shell task return test.
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({ShellTask.class})
public class ShellTaskReturnTest {
private static final Logger logger = LoggerFactory.getLogger(ShellTaskTest.class);
private ShellTask shellTask;
private ShellCommandExecutor shellCommandExecutor;
private TaskExecutionContext taskExecutionContext;
private CommandExecuteResult commandExecuteResult;
@Before
public void before() throws Exception {
System.setProperty("log4j2.disable.jmx", Boolean.TRUE.toString());
shellCommandExecutor = PowerMockito.mock(ShellCommandExecutor.class);
PowerMockito.whenNew(ShellCommandExecutor.class).withAnyArguments().thenReturn(shellCommandExecutor);
taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setTaskInstanceId(1);
taskExecutionContext.setTaskName("kris test");
taskExecutionContext.setTaskType("SHELL");
taskExecutionContext.setHost("127.0.0.1:1234");
taskExecutionContext.setExecutePath("/tmp");
taskExecutionContext.setLogPath("/log");
taskExecutionContext.setTaskJson(
"{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],"
+ "\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,"
+ "\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\""
+ "tasks-16849\",\"maxRetryTimes\":0,\"name\":\"shell test 001\","
+ "\"params\":\"{\\\"rawScript\\\":\\\"#!/bin/sh\\\\necho $[yyyy-MM-dd HH:mm:ss +3]\\\\necho \\\\\\\" ?? "
+ "${time1} \\\\\\\"\\\\necho \\\\\\\" ????? ${time2}\\\\\\\"\\\\n\\\","
+ "\\\"localParams\\\":[{\\\"prop\\\":\\\"time1\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":"
+ "\\\"VARCHAR\\\",\\\"value\\\":\\\"$[yyyy-MM-dd HH:mm:ss]\\\"},"
+ "{\\\"prop\\\":\\\"time2\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"${time_gb}\\\"}"
+ "],\\\"resourceList\\\":[]}\",\"preTasks\":\"[]\",\"retryInterval\":1,"
+ "\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\",\"taskTimeoutParameter\":"
+ "{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\","
+ "\"type\":\"SHELL\",\"workerGroup\":\"default\"}");
taskExecutionContext.setProcessInstanceId(1);
taskExecutionContext.setGlobalParams("[{\"direct\":\"IN\",\"prop\":\"time_gb\",\"type\":\"VARCHAR\",\"value\":\"2020-12-16 17:18:33\"}]");
taskExecutionContext.setExecutorId(1);
taskExecutionContext.setCmdTypeIfComplement(5);
taskExecutionContext.setTenantCode("roo");
taskExecutionContext.setScheduleTime(new Date());
taskExecutionContext.setQueue("default");
taskExecutionContext.setTaskParams(
"{\"rawScript\":\"#!/bin/sh\\necho $[yyyy-MM-dd HH:mm:ss +3]\\necho \\\" ?? ${time1} \\\"\\necho \\\" ????? ${time2}\\\"\\n\",\"localParams\":"
+
"[{\"prop\":\"time1\",\"direct\":\"OUT\",\"type\":\"VARCHAR\",\"value\":\"$[yyyy-MM-dd HH:mm:ss]\"},{\"prop\":\"time2\",\"direct\":\"IN\",\"type\":\"VARCHAR"
+ "\",\"value\":\"${time_gb}\"}],\"resourceList\":[]}");
Map<String, String> definedParams = new HashMap<>();
definedParams.put("time_gb", "2020-12-16 00:00:00");
taskExecutionContext.setDefinedParams(definedParams);
PowerMockito.mockStatic(Files.class);
PowerMockito.when(Files.exists(Paths.get(anyString()))).thenReturn(true);
commandExecuteResult = new CommandExecuteResult();
commandExecuteResult.setAppIds("appId");
commandExecuteResult.setExitStatusCode(0);
commandExecuteResult.setProcessId(1);
}
@Test
public void testShellReturnString() {
shellTask = new ShellTask(taskExecutionContext, logger);
shellTask.init();
try {
PowerMockito.when(shellCommandExecutor.run(anyString())).thenReturn(commandExecuteResult);
} catch (Exception e) {
e.printStackTrace();
}
shellTask.setResult("shell return string");
logger.info("shell return string:{}", shellTask.getResultString());
}
@Test
public void testSetTaskResultString() {
shellCommandExecutor.setTaskResultString("shellReturn");
}
@Test
public void testGetTaskResultString() {
logger.info(shellCommandExecutor.getTaskResultString());
}
}

75
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java

@ -17,13 +17,22 @@
package org.apache.dolphinscheduler.server.worker.task;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.sql.SqlParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
import org.apache.dolphinscheduler.server.worker.task.shell.ShellTask;
import org.apache.dolphinscheduler.server.worker.task.sql.SqlTask;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.junit.Assert;
import org.junit.Before;
@ -116,4 +125,70 @@ public class TaskManagerTest {
taskExecutionContext.setTaskType("XXX");
TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService);
}
@Test
public void testShellTaskReturnString() {
taskExecutionContext.setTaskInstanceId(1);
taskExecutionContext.setTaskName("kris test");
taskExecutionContext.setTaskType("SHELL");
taskExecutionContext.setHost("127.0.0.1:1234");
taskExecutionContext.setExecutePath("/tmp");
taskExecutionContext.setLogPath("/log");
taskExecutionContext.setTaskJson(
"{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
+ "\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\""
+ "tasks-16849\",\"maxRetryTimes\":0,\"name\":\"shell test 001\","
+ "\"params\":\"{\\\"rawScript\\\":\\\"#!/bin/sh\\\\necho $[yyyy-MM-dd HH:mm:ss +3]\\\\necho \\\\\\\" ?? "
+ "${time1} \\\\\\\"\\\\necho \\\\\\\" ????? ${time2}\\\\\\\"\\\\n\\\","
+ "\\\"localParams\\\":[{\\\"prop\\\":\\\"time1\\\",\\\"direct\\\":\\\"OUT\\\",\\\"type\\\":"
+ "\\\"VARCHAR\\\",\\\"value\\\":\\\"$[yyyy-MM-dd HH:mm:ss]\\\"},"
+ "{\\\"prop\\\":\\\"time2\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"${time_gb}\\\"}"
+ "],\\\"resourceList\\\":[]}\",\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\",\"taskTimeoutParameter\":"
+ "{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\",\"type\":\"SHELL\",\"workerGroup\":\"default\"}");
taskExecutionContext.setProcessInstanceId(1);
taskExecutionContext.setGlobalParams("[{\"direct\":\"IN\",\"prop\":\"time_gb\",\"type\":\"VARCHAR\",\"value\":\"2020-12-16 17:18:33\"}]");
taskExecutionContext.setExecutorId(1);
taskExecutionContext.setCmdTypeIfComplement(5);
taskExecutionContext.setTenantCode("roo");
taskExecutionContext.setScheduleTime(new Date());
taskExecutionContext.setQueue("default");
taskExecutionContext.setTaskParams(
"{\"rawScript\":\"#!/bin/sh\\necho $[yyyy-MM-dd HH:mm:ss +3]\\necho \\\" ?? ${time1} \\\"\\necho \\\" ????? ${time2}\\\"\\n\",\"localParams\":"
+
"[{\"prop\":\"time1\",\"direct\":\"OUT\",\"type\":\"VARCHAR\",\"value\":\"$[yyyy-MM-dd HH:mm:ss]\"},{\"prop\":\"time2\",\"direct\":\"IN\",\"type\":\"VARCHAR"
+ "\",\"value\":\"${time_gb}\"}],\"resourceList\":[]}");
Map<String, String> definedParams = new HashMap<>();
definedParams.put("time_gb", "2020-12-16 00:00:00");
taskExecutionContext.setDefinedParams(definedParams);
ShellTask shellTask = (ShellTask) TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService);
shellTask.setResultString("shell return");
String shellReturn = shellTask.getResultString();
shellTask.init();
shellTask.setResult(shellReturn);
Assert.assertSame(shellReturn, "shell return");
}
@Test
public void testSqlTaskReturnString() {
String params = "{\"user\":\"root\",\"password\":\"123456\",\"address\":\"jdbc:mysql://127.0.0.1:3306\","
+ "\"database\":\"test\",\"jdbcUrl\":\"jdbc:mysql://127.0.0.1:3306/test\"}";
taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setTaskParams("{\"localParams\":[{\"prop\":\"ret\", \"direct\":\"OUT\", \"type\":\"VARCHAR\", \"value\":\"\"}],"
+ "\"type\":\"POSTGRESQL\",\"datasource\":1,\"sql\":\"insert into tb_1 values('1','2')\","
+ "\"sqlType\":1}");
taskExecutionContext.setExecutePath("/tmp");
taskExecutionContext.setTaskAppId("1");
taskExecutionContext.setTenantCode("root");
taskExecutionContext.setStartTime(new Date());
taskExecutionContext.setTaskTimeout(10000);
taskExecutionContext.setLogPath("/tmp/dx");
SQLTaskExecutionContext sqlTaskExecutionContext = new SQLTaskExecutionContext();
sqlTaskExecutionContext.setConnectionParams(params);
taskExecutionContext.setSqlTaskExecutionContext(sqlTaskExecutionContext);
SqlTask sqlTask = new SqlTask(taskExecutionContext, logger, null);
SqlParameters sqlParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), SqlParameters.class);
List<Property> properties = sqlParameters.getLocalParams();
sqlTask.setNonQuerySqlReturn("sql return", properties);
}
}

19
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java

@ -19,12 +19,15 @@ package org.apache.dolphinscheduler.server.worker.task.shell;
import static org.mockito.ArgumentMatchers.anyString;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.DriverManager;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
@ -42,7 +45,7 @@ import org.slf4j.LoggerFactory;
* shell task test.
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({ShellTask.class})
@PrepareForTest(value = {ShellTask.class, DriverManager.class, SpringApplicationContext.class, ParameterUtils.class})
public class ShellTaskTest {
private static final Logger logger = LoggerFactory.getLogger(ShellTaskTest.class);
@ -57,6 +60,7 @@ public class ShellTaskTest {
System.setProperty("log4j2.disable.jmx", Boolean.TRUE.toString());
shellCommandExecutor = PowerMockito.mock(ShellCommandExecutor.class);
PowerMockito.whenNew(ShellCommandExecutor.class).withAnyArguments().thenReturn(shellCommandExecutor);
shellCommandExecutor.setTaskResultString("shellReturn");
taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setTaskInstanceId(1);
taskExecutionContext.setTaskName("kris test");
@ -68,7 +72,7 @@ public class ShellTaskTest {
"{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\""
+
"tasks-16849\",\"maxRetryTimes\":0,\"name\":\"shell test 001\",\"params\":\"{\\\"rawScript\\\":\\\"#!/bin/sh\\\\necho $[yyyy-MM-dd HH:mm:ss +3]\\\\necho \\\\\\\" ?? "
+ "${time1} \\\\\\\"\\\\necho \\\\\\\" ????? ${time2}\\\\\\\"\\\\n\\\",\\\"localParams\\\":[{\\\"prop\\\":\\\"time1\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":"
+ "${time1} \\\\\\\"\\\\necho \\\\\\\" ????? ${time2}\\\\\\\"\\\\n\\\",\\\"localParams\\\":[{\\\"prop\\\":\\\"time1\\\",\\\"direct\\\":\\\"OUT\\\",\\\"type\\\":"
+ "\\\"VARCHAR\\\",\\\"value\\\":\\\"$[yyyy-MM-dd HH:mm:ss]\\\"},{\\\"prop\\\":\\\"time2\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"${time_gb}\\\"}"
+ "],\\\"resourceList\\\":[]}\",\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\",\"taskTimeoutParameter\":"
+ "{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\",\"type\":\"SHELL\",\"workerGroup\":\"default\"}");
@ -82,7 +86,7 @@ public class ShellTaskTest {
taskExecutionContext.setTaskParams(
"{\"rawScript\":\"#!/bin/sh\\necho $[yyyy-MM-dd HH:mm:ss +3]\\necho \\\" ?? ${time1} \\\"\\necho \\\" ????? ${time2}\\\"\\n\",\"localParams\":"
+
"[{\"prop\":\"time1\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"$[yyyy-MM-dd HH:mm:ss]\"},{\"prop\":\"time2\",\"direct\":\"IN\",\"type\":\"VARCHAR"
"[{\"prop\":\"time1\",\"direct\":\"OUT\",\"type\":\"VARCHAR\",\"value\":\"$[yyyy-MM-dd HH:mm:ss]\"},{\"prop\":\"time2\",\"direct\":\"IN\",\"type\":\"VARCHAR"
+ "\",\"value\":\"${time_gb}\"}],\"resourceList\":[]}");
Map<String, String> definedParams = new HashMap<>();
definedParams.put("time_gb", "2020-12-16 00:00:00");
@ -111,4 +115,13 @@ public class ShellTaskTest {
PowerMockito.when(shellCommandExecutor.run(anyString())).thenReturn(commandExecuteResult);
shellTask.handle();
}
@Test
public void testSetResult() {
shellTask = new ShellTask(taskExecutionContext, logger);
shellTask.init();
String r = "return";
shellTask.setResult(r);
}
}

12
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTaskTest.java

@ -19,10 +19,12 @@ package org.apache.dolphinscheduler.server.worker.task.sql;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.sql.Connection;
import java.sql.DriverManager;
@ -44,7 +46,7 @@ import org.slf4j.LoggerFactory;
* sql task test
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest(value = {SqlTask.class, DriverManager.class})
@PrepareForTest(value = {SqlTask.class, DriverManager.class, SpringApplicationContext.class, ParameterUtils.class})
public class SqlTaskTest {
private static final Logger logger = LoggerFactory.getLogger(SqlTaskTest.class);
@ -70,7 +72,9 @@ public class SqlTaskTest {
props.setTaskStartTime(new Date());
props.setTaskTimeout(0);
props.setTaskParams(
"{\"localParams\":[],\"type\":\"POSTGRESQL\",\"datasource\":1,\"sql\":\"insert into tb_1 values('1','2')\",\"sqlType\":1}");
"{\"localParams\":[{\"prop\":\"ret\", \"direct\":\"OUT\", \"type\":\"VARCHAR\", \"value\":\"\"}],"
+ "\"type\":\"POSTGRESQL\",\"datasource\":1,\"sql\":\"insert into tb_1 values('1','2')\","
+ "\"sqlType\":1}");
taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class);
PowerMockito.when(taskExecutionContext.getTaskParams()).thenReturn(props.getTaskParams());
@ -85,6 +89,8 @@ public class SqlTaskTest {
sqlTaskExecutionContext.setConnectionParams(CONNECTION_PARAMS);
PowerMockito.when(taskExecutionContext.getSqlTaskExecutionContext()).thenReturn(sqlTaskExecutionContext);
PowerMockito.mockStatic(SpringApplicationContext.class);
PowerMockito.when(SpringApplicationContext.getBean(Mockito.any())).thenReturn(new AlertDao());
alertClientService = PowerMockito.mock(AlertClientService.class);
sqlTask = new SqlTask(taskExecutionContext, logger, alertClientService);
sqlTask.init();
@ -95,7 +101,7 @@ public class SqlTaskTest {
Assert.assertNotNull(sqlTask.getParameters());
}
@Test(expected = Exception.class)
@Test
public void testHandle() throws Exception {
Connection connection = PowerMockito.mock(Connection.class);
PowerMockito.mockStatic(DriverManager.class);

74
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -24,6 +24,7 @@ import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PRO
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
import static org.apache.dolphinscheduler.common.Constants.YYYY_MM_DD_HH_MM_SS;
import static java.util.stream.Collectors.toSet;
@ -32,6 +33,7 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.CycleEnum;
import org.apache.dolphinscheduler.common.enums.Direct;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Flag;
@ -89,6 +91,7 @@ import java.util.Date;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -103,6 +106,8 @@ import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import com.cronutils.model.Cron;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
/**
@ -686,11 +691,6 @@ public class ProcessService {
} else {
processInstance = this.findProcessInstanceDetailById(processInstanceId);
// Recalculate global parameters after rerun.
processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
processDefinition.getGlobalParamMap(),
processDefinition.getGlobalParamList(),
getCommandTypeIfComplement(processInstance, command),
processInstance.getScheduleTime()));
}
processDefinition = processDefineMapper.selectById(processInstance.getProcessDefinitionId());
processInstance.setProcessDefinition(processDefinition);
@ -1193,10 +1193,10 @@ public class ProcessService {
*/
public ExecutionStatus getSubmitTaskState(TaskInstance taskInstance, ExecutionStatus processInstanceState) {
ExecutionStatus state = taskInstance.getState();
if (
// running, delayed or killed
// the task already exists in task queue
// return state
if (
state == ExecutionStatus.RUNNING_EXECUTION
|| state == ExecutionStatus.DELAY_EXECUTION
|| state == ExecutionStatus.KILL
@ -1541,15 +1541,75 @@ public class ProcessService {
int processId,
String appIds,
int taskInstId,
String varPool) {
String varPool,
String result) {
taskInstance.setPid(processId);
taskInstance.setAppLink(appIds);
taskInstance.setState(state);
taskInstance.setEndTime(endTime);
taskInstance.setVarPool(varPool);
changeOutParam(result, taskInstance);
saveTaskInstance(taskInstance);
}
public void changeOutParam(String result, TaskInstance taskInstance) {
if (StringUtils.isEmpty(result)) {
return;
}
List<Map<String, String>> workerResultParam = getListMapByString(result);
if (CollectionUtils.isEmpty(workerResultParam)) {
return;
}
//if the result more than one line,just get the first .
Map<String, String> row = workerResultParam.get(0);
if (row == null || row.size() == 0) {
return;
}
TaskNode taskNode = JSONUtils.parseObject(taskInstance.getTaskJson(), TaskNode.class);
Map<String, Object> taskParams = JSONUtils.toMap(taskNode.getParams(), String.class, Object.class);
Object localParams = taskParams.get(LOCAL_PARAMS);
if (localParams == null) {
return;
}
ProcessInstance processInstance = this.processInstanceMapper.queryDetailById(taskInstance.getProcessInstanceId());
List<Property> params4Property = JSONUtils.toList(processInstance.getGlobalParams(), Property.class);
Map<String, Property> allParamMap = params4Property.stream().collect(Collectors.toMap(Property::getProp, Property -> Property));
List<Property> allParam = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class);
for (Property info : allParam) {
if (info.getDirect() == Direct.OUT) {
String paramName = info.getProp();
Property property = allParamMap.get(paramName);
if (property == null) {
continue;
}
String value = row.get(paramName);
if (StringUtils.isNotEmpty(value)) {
property.setValue(value);
info.setValue(value);
}
}
}
taskParams.put(LOCAL_PARAMS, allParam);
taskNode.setParams(JSONUtils.toJsonString(taskParams));
// task instance node json
taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode));
String params4ProcessString = JSONUtils.toJsonString(params4Property);
int updateCount = this.processInstanceMapper.updateGlobalParamsById(params4ProcessString, processInstance.getId());
logger.info("updateCount:{}, params4Process:{}, processInstanceId:{}", updateCount, params4ProcessString, processInstance.getId());
}
public List<Map<String, String>> getListMapByString(String json) {
List<Map<String, String>> allParams = new ArrayList<>();
ArrayNode paramsByJson = JSONUtils.parseArray(json);
Iterator<JsonNode> listIterator = paramsByJson.iterator();
while (listIterator.hasNext()) {
Map<String, String> param = JSONUtils.toMap(listIterator.next().toString(), String.class, String.class);
allParams.add(param);
}
return allParams;
}
/**
* convert integer list to string list
*

2
dolphinscheduler-service/src/main/resources/logback-zookeeper.xml

@ -32,7 +32,7 @@
<appender name="LOGFILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log.base}/dolphinscheduler-zookeeper.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${log.base}/dolphinscheduler-alert.%d{yyyy-MM-dd_HH}.%i.log</fileNamePattern>
<fileNamePattern>${log.base}/dolphinscheduler-zookeeper.%d{yyyy-MM-dd_HH}.%i.log</fileNamePattern>
<maxHistory>20</maxHistory>
<maxFileSize>64MB</maxFileSize>
</rollingPolicy>

22
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

@ -442,4 +442,26 @@ public class ProcessServiceTest {
Assert.assertEquals(expect, processService.changeJson(newProcessData,oldJson));
}
@Test
public void testChangeOutParam() {
String result = "[{\"d\":\"20210203\"}]";
TaskInstance taskInstance = new TaskInstance();
taskInstance.setProcessInstanceId(62);
taskInstance.setTaskJson("{\"id\":\"tasks-86175\",\"name\":\"wew\",\"desc\":null,\"type\":\"SHELL\",\"runFlag\":\"NORMAL\",\"loc\":null,\"maxRetryTimes\":0,"
+ "\"retryInterval\":1,\"params\":{\"rawScript\":\"echo 20210203\",\"localParams\":[{\"prop\":\"d\",\"direct\":\"OUT\",\"type\":\"VARCHAR\",\"value\":\"\"}],"
+ "\"resourceList\":[]},\"preTasks\":[],\"extras\":null,\"depList\":[],\"dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},"
+ "\"taskInstancePriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"workerGroupId\":null,"
+ "\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"delayTime\":0}");
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(62);
processInstance.setGlobalParams("[{\"prop\":\"sql2\",\"direct\":null,\"type\":null,\"value\":\"\"},{\"prop\":\"out\",\"direct\":null,\"type\":null,\"value\":\"\"},"
+ "{\"prop\":\"d\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"}]");
String params4ProcessString = "[{\"prop\":\"sql2\",\"direct\":null,\"type\":null,\"value\":\"\"},{\"prop\":\"out\",\"direct\":null,\"type\":null,\"value\":\"\"},"
+ "{\"prop\":\"d\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20210203\"}]";
Mockito.when(processInstanceMapper.queryDetailById(taskInstance.getProcessInstanceId())).thenReturn(processInstance);
Mockito.when(this.processInstanceMapper.updateGlobalParamsById(params4ProcessString, processInstance.getId())).thenReturn(1);
processService.changeOutParam(result,taskInstance);
}
}

BIN
dolphinscheduler-ui.zip

Binary file not shown.

19
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue

@ -178,7 +178,7 @@
:title="$t('Set the DAG diagram name')"
:visible.sync="dialogVisible"
width="auto">
<m-udp @onUdp="onUdpDialog" @close="closeDialog"></m-udp>
<m-udp ref="mUdp" @onUdp="onUdpDialog" @close="closeDialog"></m-udp>
</el-dialog>
<el-dialog
:title="$t('Please set the parameters before starting')"
@ -268,7 +268,7 @@
},
methods: {
...mapActions('dag', ['saveDAGchart', 'updateInstance', 'updateDefinition', 'getTaskState', 'switchProcessDefinitionVersion', 'getProcessDefinitionVersionsPage', 'deleteProcessDefinitionVersion']),
...mapMutations('dag', ['addTasks', 'cacheTasks', 'resetParams', 'setIsEditDag', 'setName', 'addConnects']),
...mapMutations('dag', ['addTasks', 'cacheTasks', 'resetParams', 'setIsEditDag', 'setName', 'addConnects', 'resetLocalParam']),
startRunning (item, startNodeList, sourceType) {
this.startData = item
this.startNodeList = startNodeList
@ -377,7 +377,7 @@
// remove tip state dom
$('.w').find('.state-p').html('')
const newTask = []
data.forEach(v1 => {
idArr.forEach(v2 => {
if (v2.name === v1.name) {
@ -387,6 +387,12 @@
taskList.forEach(item => {
if (item.name === v1.name) {
depState = item.state
const params = item.taskJson ? JSON.parse(item.taskJson).params : ''
let localParam = params.localParams || []
newTask.push({
id: v2.id,
localParam
})
}
})
dom.attr('data-state-id', v1.stateId)
@ -403,6 +409,9 @@
findComponentDownward(this.$root, `${this.type}-details`)._reset()
}
}
if (!isReset) {
this.resetLocalParam(newTask)
}
resolve()
})
})
@ -550,7 +559,11 @@
this.$message.warning(`${i18n.$t('Failed to create node to save')}`)
return
}
this.dialogVisible = true
this.$nextTick(() => {
this.$refs.mUdp.reloadParam()
})
},
/**
* Return to the previous child node

4
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/localParams.vue

@ -21,7 +21,7 @@
:key="item.id"
@click="_getIndex($index)">
<el-input
:disabled="isDetails"
:disabled="isDetails || item.ifFixed"
type="text"
size="small"
v-model="localParamsList[$index].prop"
@ -68,7 +68,7 @@
@blur="_handleValue()"
:style="inputStyle">
</el-input>
<span class="lt-add">
<span class="lt-add" v-show="!item.ifFixed">
<a href="javascript:" style="color:red;" @click="!isDetails && _removeUdp($index)" >
<em class="el-icon-delete" :class="_isDetails" data-toggle="tooltip" :title="$t('delete')" ></em>
</a>

2
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/shell.vue

@ -46,7 +46,7 @@
ref="refLocalParams"
@on-local-params="_onLocalParams"
:udp-list="localParams"
:hide="false">
:hide="true">
</m-local-params>
</div>
</m-list-box>

50
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/udp/udp.vue

@ -142,7 +142,12 @@
return true
},
_accuStore () {
this.store.commit('dag/setGlobalParams', _.cloneDeep(this.udpList))
const udp = _.cloneDeep(this.udpList)
udp.forEach(u => {
delete u.ifFixed
})
this.store.commit('dag/setGlobalParams', udp)
this.store.commit('dag/setName', _.cloneDeep(this.name))
this.store.commit('dag/setTimeout', _.cloneDeep(this.timeout))
this.store.commit('dag/setTenantId', _.cloneDeep(this.tenantId))
@ -191,6 +196,46 @@
*/
close () {
this.$emit('close')
},
/**
* reload localParam
*/
reloadParam () {
const dag = _.cloneDeep(this.store.state.dag)
let fixedParam = []
const tasks = this.store.state.dag.tasks
for (const task of tasks) {
const localParam = task.params ? task.params.localParams : []
localParam.forEach(l => {
if (!fixedParam.some(f => { return f.prop === l.prop })) {
fixedParam.push(Object.assign({
ifFixed: true
}, l))
}
})
}
let globalParams = _.cloneDeep(dag.globalParams)
globalParams = globalParams.map(g => {
if (fixedParam.some(f => { return g.prop === f.prop })) {
fixedParam = fixedParam.filter(f => { return g.prop !== f.prop })
return Object.assign(g, {
ifFixed: true
})
} else {
return g
}
})
let udpList = [...fixedParam, ...globalParams].sort(s => {
if (s.ifFixed) {
return -1
} else {
return 1
}
})
this.udpList = udpList
this.udpListCache = udpList
}
},
watch: {
@ -203,8 +248,7 @@
},
created () {
const dag = _.cloneDeep(this.store.state.dag)
this.udpList = dag.globalParams
this.udpListCache = dag.globalParams
this.name = dag.name
this.originalName = dag.name
this.description = dag.description

11
dolphinscheduler-ui/src/js/conf/home/store/dag/mutations.js

@ -162,5 +162,16 @@ export default {
} else {
state.cacheTasks[payload.id] = payload
}
},
resetLocalParam (state, payload) {
const tasks = state.tasks
tasks.forEach((task, index) => {
payload.forEach(p => {
if (p.id === task.id) {
tasks[index].params.localParams = p.localParam
}
})
})
state.tasks = tasks
}
}

4
pom.xml

@ -913,6 +913,7 @@
<include>**/server/utils/ProcessUtilsTest.java</include>
<include>**/server/utils/SparkArgsUtilsTest.java</include>
<include>**/server/worker/processor/TaskCallbackServiceTest.java</include>
<include>**/server/worker/processor/TaskExecuteProcessorTest.java</include>
<include>**/server/worker/registry/WorkerRegistryTest.java</include>
<include>**/server/worker/shell/ShellCommandExecutorTest.java</include>
<include>**/server/worker/sql/SqlExecutorTest.java</include>
@ -924,8 +925,11 @@
<include>**/server/worker/task/sqoop/SqoopTaskTest.java</include>
<include>**/server/worker/task/shell/ShellTaskTest.java</include>
<include>**/server/worker/task/TaskManagerTest.java</include>
<include>**/server/worker/task/AbstractCommandExecutorTest.java</include>
<include>**/server/worker/task/ShellTaskReturnTest.java</include>
<include>**/server/worker/EnvFileTest.java</include>
<include>**/server/worker/runner/TaskExecuteThreadTest.java</include>
<include>**/server/worker/runner/WorkerManagerThreadTest.java</include>
<include>**/service/quartz/cron/CronUtilsTest.java</include>
<include>**/service/process/ProcessServiceTest.java</include>
<include>**/service/zk/DefaultEnsembleProviderTest.java</include>

Loading…
Cancel
Save