diff --git a/README.md b/README.md index f306a398b5..248b9d77ce 100644 --- a/README.md +++ b/README.md @@ -37,7 +37,7 @@ Its main objectives are as follows: ### What's in DolphinScheduler - Stability | Easy to use | Features | Scalability | + Stability | Accessibility | Features | Scalability | -- | -- | -- | -- Decentralized multi-master and multi-worker | Visualization of workflow key information, such as task status, task type, retry times, task operation machine information, visual variables, and so on at a glance.  |  Support pause, recover operation | Support customized task types support HA | Visualization of all workflow operations, dragging tasks to draw DAGs, configuring data sources and resources. At the same time, for third-party systems, provide API mode operations. | Users on DolphinScheduler can achieve many-to-one or one-to-one mapping relationship through tenants and Hadoop users, which is very important for scheduling large data jobs. | The scheduler supports distributed scheduling, and the overall scheduling capability will increase linearly with the scale of the cluster. Master and Worker support dynamic adjustment. @@ -73,13 +73,19 @@ dolphinscheduler-dist/target/apache-dolphinscheduler-incubating-${latest.release ``` ### Thanks - DolphinScheduler is based on a lot of excellent open-source projects, such as google guava, guice, grpc, netty, ali bonecp, quartz, and many open-source projects of Apache and so on. We would like to express our deep gratitude to all the open-source projects used in Dolphin Scheduler. We hope that we are not only the beneficiaries of open-source, but also give back to the community. Besides, we hope everyone who have the same enthusiasm and passion for open source could join in and contribute to the open-source community! ### Get Help 1. Submit an [[issue](https://github.com/apache/incubator-dolphinscheduler/issues/new/choose)] -1. Subscribe to the mail list: https://dolphinscheduler.apache.org/en-us/docs/development/subscribe.html, then email dev@dolphinscheduler.apache.org +1. Subscribe to this mail list: https://dolphinscheduler.apache.org/en-us/community/development/subscribe.html, then email dev@dolphinscheduler.apache.org + + +### Community +You are so much welcomed to communicate with the developers and users of Dolphin Scheduler freely. There are two ways to find them: +1. Join the slack channel by [this invitation link](https://join.slack.com/t/asf-dolphinscheduler/shared_invite/zt-l8k90ceu-wwUfobaDkJxjzMfZp4y1Ag). +2. Follow the [twitter account of Dolphin Scheduler](https://twitter.com/dolphinschedule) and get the latest news just on time. + ### How to Contribute The community welcomes everyone to participate in contributing, please refer to this website to find out more: [[How to contribute](https://dolphinscheduler.apache.org/en-us/community/development/contribute.html)] diff --git a/README_zh_CN.md b/README_zh_CN.md index 4f2a8f0bf8..0e96da8371 100644 --- a/README_zh_CN.md +++ b/README_zh_CN.md @@ -81,7 +81,11 @@ Dolphin Scheduler使用了很多优秀的开源项目,比如google的guava、g ### 获得帮助 1. 提交issue -1. 先订阅邮件开发列表:[订阅邮件列表](https://dolphinscheduler.apache.org/zh-cn/docs/development/subscribe.html), 订阅成功后发送邮件到dev@dolphinscheduler.apache.org. +2. 先订阅邮件开发列表:[订阅邮件列表](https://dolphinscheduler.apache.org/zh-cn/community/development/subscribe.html), 订阅成功后发送邮件到dev@dolphinscheduler.apache.org. + +### 社区 +1. 通过[该申请链接](https://join.slack.com/t/asf-dolphinscheduler/shared_invite/zt-l8k90ceu-wwUfobaDkJxjzMfZp4y1Ag)加入slack channel +2. 关注[Apache Dolphin Scheduler的Twitter账号](https://twitter.com/dolphinschedule)获取实时动态 ### 版权 请参考 [LICENSE](https://github.com/apache/incubator-dolphinscheduler/blob/dev/LICENSE) 文件. diff --git a/docker/README.md b/docker/README.md index e69de29bb2..dcd20981be 100644 --- a/docker/README.md +++ b/docker/README.md @@ -0,0 +1 @@ +# Dolphin Scheduler for Docker diff --git a/docker/build/Dockerfile b/docker/build/Dockerfile index a33cf490b1..121970bf5a 100644 --- a/docker/build/Dockerfile +++ b/docker/build/Dockerfile @@ -57,6 +57,6 @@ RUN dos2unix /root/checkpoint.sh && \ echo "Set disable_coredump false" >> /etc/sudo.conf # 4. expose port -EXPOSE 5678 1234 12345 50051 +EXPOSE 5678 1234 12345 50051 50052 ENTRYPOINT ["/sbin/tini", "--", "/root/startup.sh"] diff --git a/docker/build/conf/dolphinscheduler/alert.properties.tpl b/docker/build/conf/dolphinscheduler/alert.properties.tpl index 6d7450e13d..1ff6fe7adb 100644 --- a/docker/build/conf/dolphinscheduler/alert.properties.tpl +++ b/docker/build/conf/dolphinscheduler/alert.properties.tpl @@ -14,43 +14,17 @@ # See the License for the specific language governing permissions and # limitations under the License. # -#alert type is EMAIL/SMS -alert.type=EMAIL - -# alter msg template, default is html template -#alert.template=html -# mail server configuration -mail.protocol=SMTP -mail.server.host=${MAIL_SERVER_HOST} -mail.server.port=${MAIL_SERVER_PORT} -mail.sender=${MAIL_SENDER} -mail.user=${MAIL_USER} -mail.passwd=${MAIL_PASSWD} -# TLS -mail.smtp.starttls.enable=${MAIL_SMTP_STARTTLS_ENABLE} -# SSL -mail.smtp.ssl.enable=${MAIL_SMTP_SSL_ENABLE} -mail.smtp.ssl.trust=${MAIL_SMTP_SSL_TRUST} - -#xls file path,need create if not exist -xls.file.path=${XLS_FILE_PATH} - -# plugins dir -plugin.dir=${ALERT_PLUGIN_DIR} - -# Enterprise WeChat configuration -enterprise.wechat.enable=${ENTERPRISE_WECHAT_ENABLE} -enterprise.wechat.corp.id=${ENTERPRISE_WECHAT_CORP_ID} -enterprise.wechat.secret=${ENTERPRISE_WECHAT_SECRET} -enterprise.wechat.agent.id=${ENTERPRISE_WECHAT_AGENT_ID} -enterprise.wechat.users=${ENTERPRISE_WECHAT_USERS} -enterprise.wechat.token.url=https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={corpId}&corpsecret={secret} -enterprise.wechat.push.url=https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={token} -enterprise.wechat.team.send.msg={\"toparty\":\"{toParty}\",\"agentid\":\"{agentId}\",\"msgtype\":\"text\",\"text\":{\"content\":\"{msg}\"},\"safe\":\"0\"} -enterprise.wechat.user.send.msg={\"touser\":\"{toUser}\",\"agentid\":\"{agentId}\",\"msgtype\":\"markdown\",\"markdown\":{\"content\":\"{msg}\"}} - - +#This configuration file configures the configuration parameters related to the AlertServer. +#These parameters are only related to the AlertServer, and it has nothing to do with the specific Alert Plugin. +#eg : max retry num. +#eg : Alert Server Listener port +#alert.plugin.dir config the Alert Plugin dir . AlertServer while find and load the Alert Plugin Jar from this dir when deploy and start AlertServer on the server . +alert.plugin.dir=${ALERT_PLUGIN_DIR} +#maven.local.repository=/Users/gaojun/Documents/jianguoyun/localRepository +#alert.plugin.binding config the Alert Plugin need be load when development and run in IDE +#alert.plugin.binding=\ +# ./dolphinscheduler-alert-plugin/dolphinscheduler-alert-email/pom.xml diff --git a/docker/build/conf/dolphinscheduler/application-api.properties.tpl b/docker/build/conf/dolphinscheduler/application-api.properties.tpl index 88915923fa..f42588112b 100644 --- a/docker/build/conf/dolphinscheduler/application-api.properties.tpl +++ b/docker/build/conf/dolphinscheduler/application-api.properties.tpl @@ -24,22 +24,39 @@ server.servlet.session.timeout=7200 # servlet config server.servlet.context-path=/dolphinscheduler/ +# time zone +spring.jackson.time-zone=GMT+8 + # file size limit for upload spring.servlet.multipart.max-file-size=1024MB spring.servlet.multipart.max-request-size=1024MB +# enable response compression +server.compression.enabled=true +server.compression.mime-types=text/html,text/xml,text/plain,text/css,text/javascript,application/javascript,application/json,application/xml + # post content -server.jetty.max-http-post-size=5000000 +server.jetty.max-http-form-post-size=5000000 # i18n spring.messages.encoding=UTF-8 -#i18n classpath folder , file prefix messages, if have many files, use "," seperator +# i18n classpath folder , file prefix messages, if have many files, use "," seperator spring.messages.basename=i18n/messages # Authentication types (supported types: PASSWORD) security.authentication.type=PASSWORD - - - +#============================================================================ +# LDAP Config +# mock ldap server from https://www.forumsys.com/tutorials/integration-how-to/ldap/online-ldap-test-server/ +#============================================================================ +# admin userId +#security.authentication.ldap.user.admin=read-only-admin +# ldap server config +#ldap.urls=ldap://ldap.forumsys.com:389/ +#ldap.base.dn=dc=example,dc=com +#ldap.username=cn=read-only-admin,dc=example,dc=com +#ldap.password=password +#ldap.user.identity.attribute=uid +#ldap.user.email.attribute=mail diff --git a/docker/build/conf/dolphinscheduler/common.properties.tpl b/docker/build/conf/dolphinscheduler/common.properties.tpl index ff74598fd4..a3ccde7c61 100644 --- a/docker/build/conf/dolphinscheduler/common.properties.tpl +++ b/docker/build/conf/dolphinscheduler/common.properties.tpl @@ -15,64 +15,64 @@ # limitations under the License. # -#============================================================================ -# System -#============================================================================ -# system env path. self configuration, please make sure the directory and file exists and have read write execute permissions -dolphinscheduler.env.path=${DOLPHINSCHEDULER_ENV_PATH} - -# user data directory path, self configuration, please make sure the directory exists and have read write permissions -data.basedir.path=${DOLPHINSCHEDULER_DATA_BASEDIR_PATH} - -# resource upload startup type : HDFS,S3,NONE +# resource storage type : HDFS, S3, NONE resource.storage.type=${RESOURCE_STORAGE_TYPE} -#============================================================================ -# HDFS -#============================================================================ # resource store on HDFS/S3 path, resource file will store to this hadoop hdfs path, self configuration, please make sure the directory exists on hdfs and have read write permissions。"/dolphinscheduler" is recommended resource.upload.path=${RESOURCE_UPLOAD_PATH} +# user data local directory path, please make sure the directory exists and have read write permissions +data.basedir.path=${DOLPHINSCHEDULER_DATA_BASEDIR_PATH} + # whether kerberos starts -#hadoop.security.authentication.startup.state=false +hadoop.security.authentication.startup.state=false # java.security.krb5.conf path -#java.security.krb5.conf.path=/opt/krb5.conf +java.security.krb5.conf.path=/opt/krb5.conf -# loginUserFromKeytab user -#login.user.keytab.username=hdfs-mycluster@ESZ.COM +# login user from keytab username +login.user.keytab.username=hdfs-mycluster@ESZ.COM -# loginUserFromKeytab path -#login.user.keytab.path=/opt/hdfs.headless.keytab +# login user from keytab path +login.user.keytab.path=/opt/hdfs.headless.keytab #resource.view.suffixs -#resource.view.suffixs=txt,log,sh,conf,cfg,py,java,sql,hql,xml,properties +#resource.view.suffixs=txt,log,sh,bat,conf,cfg,py,java,sql,xml,hql,properties,json,yml,yaml,ini,js # if resource.storage.type=HDFS, the user need to have permission to create directories under the HDFS root path hdfs.root.user=hdfs -# kerberos expire time -kerberos.expire.time=7 - -#============================================================================ -# S3 -#============================================================================ -# if resource.storage.type=S3,the value like: s3a://dolphinscheduler ; if resource.storage.type=HDFS, When namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir +# if resource.storage.type=S3, the value like: s3a://dolphinscheduler; if resource.storage.type=HDFS, When namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir fs.defaultFS=${FS_DEFAULT_FS} -# if resource.storage.type=S3,s3 endpoint +# if resource.storage.type=S3, s3 endpoint fs.s3a.endpoint=${FS_S3A_ENDPOINT} -# if resource.storage.type=S3,s3 access key +# if resource.storage.type=S3, s3 access key fs.s3a.access.key=${FS_S3A_ACCESS_KEY} -# if resource.storage.type=S3,s3 secret key +# if resource.storage.type=S3, s3 secret key fs.s3a.secret.key=${FS_S3A_SECRET_KEY} -# if not use hadoop resourcemanager, please keep default value; if resourcemanager HA enable, please type the HA ips ; if resourcemanager is single, make this value empty TODO +# if resourcemanager HA enable, please type the HA ips ; if resourcemanager is single, make this value empty yarn.resourcemanager.ha.rm.ids=192.168.xx.xx,192.168.xx.xx -# If resourcemanager HA enable or not use resourcemanager, please keep the default value; If resourcemanager is single, you only need to replace ark1 to actual resourcemanager hostname. -yarn.application.status.address=http://ark1:8088/ws/v1/cluster/apps/%s +# if resourcemanager HA enable or not use resourcemanager, please keep the default value; If resourcemanager is single, you only need to replace ds1 to actual resourcemanager hostname. +yarn.application.status.address=http://ds1:8088/ws/v1/cluster/apps/%s + +# job history status url when application number threshold is reached(default 10000,maybe it was set to 1000) +yarn.job.history.status.address=http://ds1:19888/ws/v1/history/mapreduce/jobs/%s + +# system env path, If you want to set your own path, you need to set this env file to an absolute path +dolphinscheduler.env.path=${DOLPHINSCHEDULER_ENV_PATH} +development.state=false + +# kerberos tgt expire time, unit is hours +kerberos.expire.time=2 +# datasource encryption salt +datasource.encryption.enable=false +datasource.encryption.salt=!@#$%^&* +# Network IP gets priority, default inner outer +#dolphin.scheduler.network.priority.strategy=default diff --git a/docker/build/conf/dolphinscheduler/datasource.properties.tpl b/docker/build/conf/dolphinscheduler/datasource.properties.tpl index f7c5ee6881..b414d21430 100644 --- a/docker/build/conf/dolphinscheduler/datasource.properties.tpl +++ b/docker/build/conf/dolphinscheduler/datasource.properties.tpl @@ -17,12 +17,21 @@ # db spring.datasource.driver-class-name=${DATABASE_DRIVER} -spring.datasource.url=jdbc:${DATABASE_TYPE}://${DATABASE_HOST}:${DATABASE_PORT}/${DATABASE_DATABASE}?${DATABASE_PARAMS} +spring.datasource.url=jdbc:${DATABASE_TYPE}://${DATABASE_HOST}:${DATABASE_PORT}/${DATABASE_DATABASE}${DATABASE_PARAMS:+?${DATABASE_PARAMS}} spring.datasource.username=${DATABASE_USERNAME} spring.datasource.password=${DATABASE_PASSWORD} -## base spring data source configuration todo need to remove -#spring.datasource.type=com.alibaba.druid.pool.DruidDataSource +# postgresql +#spring.datasource.driver-class-name=org.postgresql.Driver +#spring.datasource.url=jdbc:postgresql://127.0.0.1:5432/dolphinscheduler +#spring.datasource.username=test +#spring.datasource.password=test + +# mysql +#spring.datasource.driver-class-name=com.mysql.jdbc.Driver +#spring.datasource.url=jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8 +#spring.datasource.username=xxxx +#spring.datasource.password=xxxx # connection configuration #spring.datasource.initialSize=5 @@ -63,4 +72,4 @@ spring.datasource.password=${DATABASE_PASSWORD} # open PSCache, specify count PSCache for every connection #spring.datasource.poolPreparedStatements=true -#spring.datasource.maxPoolPreparedStatementPerConnectionSize=20 \ No newline at end of file +#spring.datasource.maxPoolPreparedStatementPerConnectionSize=20 diff --git a/docker/build/conf/dolphinscheduler/logback/logback-api.xml b/docker/build/conf/dolphinscheduler/logback/logback-api.xml index 6d29f8af5f..9dcec7afce 100644 --- a/docker/build/conf/dolphinscheduler/logback/logback-api.xml +++ b/docker/build/conf/dolphinscheduler/logback/logback-api.xml @@ -23,12 +23,12 @@ - ${log.base}/dolphinscheduler-api-server.log + ${log.base}/dolphinscheduler-api.log INFO - ${log.base}/dolphinscheduler-api-server.%d{yyyy-MM-dd_HH}.%i.log + ${log.base}/dolphinscheduler-api.%d{yyyy-MM-dd_HH}.%i.log 168 64MB diff --git a/docker/build/conf/dolphinscheduler/logback/logback-master.xml b/docker/build/conf/dolphinscheduler/logback/logback-master.xml index d1bfb67aa1..202369c8dc 100644 --- a/docker/build/conf/dolphinscheduler/logback/logback-master.xml +++ b/docker/build/conf/dolphinscheduler/logback/logback-master.xml @@ -24,9 +24,9 @@ - + taskAppId diff --git a/docker/build/conf/dolphinscheduler/logback/logback-worker.xml b/docker/build/conf/dolphinscheduler/logback/logback-worker.xml index b7e08dd846..bf4dd46332 100644 --- a/docker/build/conf/dolphinscheduler/logback/logback-worker.xml +++ b/docker/build/conf/dolphinscheduler/logback/logback-worker.xml @@ -25,9 +25,9 @@ - + taskAppId @@ -48,10 +48,9 @@ ${log.base}/dolphinscheduler-worker.log - + ${log.base}/dolphinscheduler-worker.%d{yyyy-MM-dd_HH}.%i.log 168 diff --git a/docker/build/conf/dolphinscheduler/master.properties.tpl b/docker/build/conf/dolphinscheduler/master.properties.tpl index 17dd6f9d69..7b07dd7d62 100644 --- a/docker/build/conf/dolphinscheduler/master.properties.tpl +++ b/docker/build/conf/dolphinscheduler/master.properties.tpl @@ -21,6 +21,9 @@ master.exec.threads=${MASTER_EXEC_THREADS} # master execute task number in parallel master.exec.task.num=${MASTER_EXEC_TASK_NUM} +# master dispatch task number +#master.dispatch.task.num=3 + # master heartbeat interval master.heartbeat.interval=${MASTER_HEARTBEAT_INTERVAL} @@ -37,4 +40,4 @@ master.max.cpuload.avg=${MASTER_MAX_CPULOAD_AVG} master.reserved.memory=${MASTER_RESERVED_MEMORY} # master listen port -#master.listen.port=${MASTER_LISTEN_PORT} \ No newline at end of file +master.listen.port=${MASTER_LISTEN_PORT} diff --git a/docker/build/conf/dolphinscheduler/quartz.properties.tpl b/docker/build/conf/dolphinscheduler/quartz.properties.tpl index 25645795bb..10f18122bf 100644 --- a/docker/build/conf/dolphinscheduler/quartz.properties.tpl +++ b/docker/build/conf/dolphinscheduler/quartz.properties.tpl @@ -51,4 +51,4 @@ #============================================================================ # Configure Datasources #============================================================================ -#org.quartz.dataSource.myDs.connectionProvider.class = org.apache.dolphinscheduler.service.quartz.DruidConnectionProvider \ No newline at end of file +#org.quartz.dataSource.myDs.connectionProvider.class = org.apache.dolphinscheduler.service.quartz.DruidConnectionProvider diff --git a/docker/build/conf/dolphinscheduler/worker.properties.tpl b/docker/build/conf/dolphinscheduler/worker.properties.tpl index 83097dd9a4..2093da3501 100644 --- a/docker/build/conf/dolphinscheduler/worker.properties.tpl +++ b/docker/build/conf/dolphinscheduler/worker.properties.tpl @@ -21,20 +21,20 @@ worker.exec.threads=${WORKER_EXEC_THREADS} # worker heartbeat interval worker.heartbeat.interval=${WORKER_HEARTBEAT_INTERVAL} -# submit the number of tasks at a time -worker.fetch.task.num=${WORKER_FETCH_TASK_NUM} - -# only less than cpu avg load, worker server can work. default value : the number of cpu cores * 2 +# only less than cpu avg load, worker server can work. default value -1: the number of cpu cores * 2 worker.max.cpuload.avg=${WORKER_MAX_CPULOAD_AVG} # only larger than reserved memory, worker server can work. default value : physical memory * 1/6, unit is G. worker.reserved.memory=${WORKER_RESERVED_MEMORY} # worker listener port -#worker.listen.port=${WORKER_LISTEN_PORT} +worker.listen.port=${WORKER_LISTEN_PORT} # default worker group -#worker.groups=${WORKER_GROUP} +worker.groups=${WORKER_GROUP} # default worker weight -#worker.weight=${WORKER_WEIGHT} \ No newline at end of file +worker.weight=${WORKER_WEIGHT} + +# alert server listener host +alert.listen.host=${ALERT_LISTEN_HOST} diff --git a/docker/build/conf/dolphinscheduler/zookeeper.properties.tpl b/docker/build/conf/dolphinscheduler/zookeeper.properties.tpl index 51540aa345..8e222328f8 100644 --- a/docker/build/conf/dolphinscheduler/zookeeper.properties.tpl +++ b/docker/build/conf/dolphinscheduler/zookeeper.properties.tpl @@ -26,4 +26,5 @@ zookeeper.dolphinscheduler.root=${ZOOKEEPER_ROOT} #zookeeper.connection.timeout=30000 #zookeeper.retry.base.sleep=100 #zookeeper.retry.max.sleep=30000 -#zookeeper.retry.maxtime=10 \ No newline at end of file +#zookeeper.retry.maxtime=10 +#zookeeper.max.wait.time=10000 diff --git a/docker/build/startup-init-conf.sh b/docker/build/startup-init-conf.sh index c3aadcd673..9439048587 100755 --- a/docker/build/startup-init-conf.sh +++ b/docker/build/startup-init-conf.sh @@ -20,7 +20,8 @@ set -e echo "init env variables" -# Define parameters default value. +# Define parameters default value + #============================================================================ # Database Source #============================================================================ @@ -34,7 +35,7 @@ export DATABASE_DRIVER=${DATABASE_DRIVER:-"org.postgresql.Driver"} export DATABASE_PARAMS=${DATABASE_PARAMS:-"characterEncoding=utf8"} #============================================================================ -# System +# Common #============================================================================ export DOLPHINSCHEDULER_ENV_PATH=${DOLPHINSCHEDULER_ENV_PATH:-"/opt/dolphinscheduler/conf/env/dolphinscheduler_env.sh"} export DOLPHINSCHEDULER_DATA_BASEDIR_PATH=${DOLPHINSCHEDULER_DATA_BASEDIR_PATH:-"/tmp/dolphinscheduler"} @@ -69,35 +70,17 @@ export MASTER_LISTEN_PORT=${MASTER_LISTEN_PORT:-"5678"} #============================================================================ export WORKER_EXEC_THREADS=${WORKER_EXEC_THREADS:-"100"} export WORKER_HEARTBEAT_INTERVAL=${WORKER_HEARTBEAT_INTERVAL:-"10"} -export WORKER_FETCH_TASK_NUM=${WORKER_FETCH_TASK_NUM:-"3"} export WORKER_MAX_CPULOAD_AVG=${WORKER_MAX_CPULOAD_AVG:-"100"} export WORKER_RESERVED_MEMORY=${WORKER_RESERVED_MEMORY:-"0.1"} export WORKER_LISTEN_PORT=${WORKER_LISTEN_PORT:-"1234"} export WORKER_GROUP=${WORKER_GROUP:-"default"} export WORKER_WEIGHT=${WORKER_WEIGHT:-"100"} +export ALERT_LISTEN_HOST=${ALERT_LISTEN_HOST:-"127.0.0.1"} #============================================================================ # Alert Server #============================================================================ -# alert plugin dir -export ALERT_PLUGIN_DIR=${ALERT_PLUGIN_DIR:-"/opt/dolphinscheduler"} -# xls file -export XLS_FILE_PATH=${XLS_FILE_PATH:-"/tmp/xls"} -# mail -export MAIL_SERVER_HOST=${MAIL_SERVER_HOST:-""} -export MAIL_SERVER_PORT=${MAIL_SERVER_PORT:-""} -export MAIL_SENDER=${MAIL_SENDER:-""} -export MAIL_USER=${MAIL_USER:-""} -export MAIL_PASSWD=${MAIL_PASSWD:-""} -export MAIL_SMTP_STARTTLS_ENABLE=${MAIL_SMTP_STARTTLS_ENABLE:-"true"} -export MAIL_SMTP_SSL_ENABLE=${MAIL_SMTP_SSL_ENABLE:-"false"} -export MAIL_SMTP_SSL_TRUST=${MAIL_SMTP_SSL_TRUST:-""} -# wechat -export ENTERPRISE_WECHAT_ENABLE=${ENTERPRISE_WECHAT_ENABLE:-"false"} -export ENTERPRISE_WECHAT_CORP_ID=${ENTERPRISE_WECHAT_CORP_ID:-""} -export ENTERPRISE_WECHAT_SECRET=${ENTERPRISE_WECHAT_SECRET:-""} -export ENTERPRISE_WECHAT_AGENT_ID=${ENTERPRISE_WECHAT_AGENT_ID:-""} -export ENTERPRISE_WECHAT_USERS=${ENTERPRISE_WECHAT_USERS:-""} +export ALERT_PLUGIN_DIR=${ALERT_PLUGIN_DIR:-"lib/plugin/alert"} echo "generate app config" ls ${DOLPHINSCHEDULER_HOME}/conf/ | grep ".tpl" | while read line; do diff --git a/docker/build/startup.sh b/docker/build/startup.sh index 8bd1895cc0..5c00c272c2 100755 --- a/docker/build/startup.sh +++ b/docker/build/startup.sh @@ -26,7 +26,7 @@ DOLPHINSCHEDULER_LOGS=${DOLPHINSCHEDULER_HOME}/logs waitDatabase() { echo "test ${DATABASE_TYPE} service" while ! nc -z ${DATABASE_HOST} ${DATABASE_PORT}; do - counter=$((counter+1)) + local counter=$((counter+1)) if [ $counter == 30 ]; then echo "Error: Couldn't connect to ${DATABASE_TYPE}." exit 1 @@ -57,12 +57,41 @@ initDatabase() { ${DOLPHINSCHEDULER_SCRIPT}/create-dolphinscheduler.sh } +# check ds version +checkDSVersion() { + if [ ${DATABASE_TYPE} = "mysql" ]; then + v=$(mysql -h${DATABASE_HOST} -P${DATABASE_PORT} -u${DATABASE_USERNAME} --password=${DATABASE_PASSWORD} -D ${DATABASE_DATABASE} -e "SELECT * FROM public.t_ds_version" 2>/dev/null) + else + v=$(PGPASSWORD=${DATABASE_PASSWORD} psql -h ${DATABASE_HOST} -p ${DATABASE_PORT} -U ${DATABASE_USERNAME} -d ${DATABASE_DATABASE} -tAc "SELECT * FROM public.t_ds_version" 2>/dev/null) + fi + if [ -n "$v" ]; then + echo "ds version: $v" + return 0 + else + return 1 + fi +} + +# check init database +checkInitDatabase() { + echo "check init database" + while ! checkDSVersion; do + local counter=$((counter+1)) + if [ $counter == 30 ]; then + echo "Error: Couldn't check init database." + exit 1 + fi + echo "Trying to check init database. Attempt $counter." + sleep 5 + done +} + # wait zk waitZK() { echo "connect remote zookeeper" echo "${ZOOKEEPER_QUORUM}" | awk -F ',' 'BEGIN{ i=1 }{ while( i <= NF ){ print $i; i++ } }' | while read line; do while ! nc -z ${line%:*} ${line#*:}; do - counter=$((counter+1)) + local counter=$((counter+1)) if [ $counter == 30 ]; then echo "Error: Couldn't connect to zookeeper." exit 1 @@ -133,7 +162,7 @@ case "$1" in initApiServer initAlertServer initLoggerServer - LOGFILE=${DOLPHINSCHEDULER_LOGS}/dolphinscheduler-api-server.log + LOGFILE=${DOLPHINSCHEDULER_LOGS}/dolphinscheduler-api.log ;; (master-server) waitZK @@ -153,10 +182,11 @@ case "$1" in waitDatabase initDatabase initApiServer - LOGFILE=${DOLPHINSCHEDULER_LOGS}/dolphinscheduler-api-server.log + LOGFILE=${DOLPHINSCHEDULER_LOGS}/dolphinscheduler-api.log ;; (alert-server) waitDatabase + checkInitDatabase initAlertServer LOGFILE=${DOLPHINSCHEDULER_LOGS}/dolphinscheduler-alert.log ;; diff --git a/docker/docker-swarm/docker-compose.yml b/docker/docker-swarm/docker-compose.yml index 04a9bc556d..2360c4abed 100644 --- a/docker/docker-swarm/docker-compose.yml +++ b/docker/docker-swarm/docker-compose.yml @@ -87,22 +87,11 @@ services: image: apache/dolphinscheduler:latest container_name: dolphinscheduler-alert command: alert-server + ports: + - 50052:50052 environment: TZ: Asia/Shanghai - XLS_FILE_PATH: "/tmp/xls" - MAIL_SERVER_HOST: "" - MAIL_SERVER_PORT: "" - MAIL_SENDER: "" - MAIL_USER: "" - MAIL_PASSWD: "" - MAIL_SMTP_STARTTLS_ENABLE: "false" - MAIL_SMTP_SSL_ENABLE: "false" - MAIL_SMTP_SSL_TRUST: "" - ENTERPRISE_WECHAT_ENABLE: "false" - ENTERPRISE_WECHAT_CORP_ID: "" - ENTERPRISE_WECHAT_SECRET: "" - ENTERPRISE_WECHAT_AGENT_ID: "" - ENTERPRISE_WECHAT_USERS: "" + ALERT_PLUGIN_DIR: lib/plugin/alert DATABASE_HOST: dolphinscheduler-postgresql DATABASE_PORT: 5432 DATABASE_USERNAME: root @@ -169,21 +158,12 @@ services: TZ: Asia/Shanghai WORKER_EXEC_THREADS: "100" WORKER_HEARTBEAT_INTERVAL: "10" - WORKER_FETCH_TASK_NUM: "3" WORKER_MAX_CPULOAD_AVG: "100" WORKER_RESERVED_MEMORY: "0.1" WORKER_GROUP: "default" WORKER_WEIGHT: "100" DOLPHINSCHEDULER_DATA_BASEDIR_PATH: /tmp/dolphinscheduler - XLS_FILE_PATH: "/tmp/xls" - MAIL_SERVER_HOST: "" - MAIL_SERVER_PORT: "" - MAIL_SENDER: "" - MAIL_USER: "" - MAIL_PASSWD: "" - MAIL_SMTP_STARTTLS_ENABLE: "false" - MAIL_SMTP_SSL_ENABLE: "false" - MAIL_SMTP_SSL_TRUST: "" + ALERT_LISTEN_HOST: dolphinscheduler-alert DATABASE_HOST: dolphinscheduler-postgresql DATABASE_PORT: 5432 DATABASE_USERNAME: root diff --git a/docker/docker-swarm/docker-stack.yml b/docker/docker-swarm/docker-stack.yml index 39c36347bb..7035ac3bda 100644 --- a/docker/docker-swarm/docker-stack.yml +++ b/docker/docker-swarm/docker-stack.yml @@ -84,22 +84,11 @@ services: dolphinscheduler-alert: image: apache/dolphinscheduler:latest command: alert-server + ports: + - 50052:50052 environment: TZ: Asia/Shanghai - XLS_FILE_PATH: "/tmp/xls" - MAIL_SERVER_HOST: "" - MAIL_SERVER_PORT: "" - MAIL_SENDER: "" - MAIL_USER: "" - MAIL_PASSWD: "" - MAIL_SMTP_STARTTLS_ENABLE: "false" - MAIL_SMTP_SSL_ENABLE: "false" - MAIL_SMTP_SSL_TRUST: "" - ENTERPRISE_WECHAT_ENABLE: "false" - ENTERPRISE_WECHAT_CORP_ID: "" - ENTERPRISE_WECHAT_SECRET: "" - ENTERPRISE_WECHAT_AGENT_ID: "" - ENTERPRISE_WECHAT_USERS: "" + ALERT_PLUGIN_DIR: lib/plugin/alert DATABASE_HOST: dolphinscheduler-postgresql DATABASE_PORT: 5432 DATABASE_USERNAME: root @@ -163,21 +152,12 @@ services: TZ: Asia/Shanghai WORKER_EXEC_THREADS: "100" WORKER_HEARTBEAT_INTERVAL: "10" - WORKER_FETCH_TASK_NUM: "3" WORKER_MAX_CPULOAD_AVG: "100" WORKER_RESERVED_MEMORY: "0.1" WORKER_GROUP: "default" WORKER_WEIGHT: "100" DOLPHINSCHEDULER_DATA_BASEDIR_PATH: /tmp/dolphinscheduler - XLS_FILE_PATH: "/tmp/xls" - MAIL_SERVER_HOST: "" - MAIL_SERVER_PORT: "" - MAIL_SENDER: "" - MAIL_USER: "" - MAIL_PASSWD: "" - MAIL_SMTP_STARTTLS_ENABLE: "false" - MAIL_SMTP_SSL_ENABLE: "false" - MAIL_SMTP_SSL_TRUST: "" + ALERT_LISTEN_HOST: dolphinscheduler-alert DATABASE_HOST: dolphinscheduler-postgresql DATABASE_PORT: 5432 DATABASE_USERNAME: root diff --git a/dolphinscheduler-alert/src/main/resources/alert.properties b/dolphinscheduler-alert/src/main/resources/alert.properties index 80ea87c2cd..33184cadf2 100644 --- a/dolphinscheduler-alert/src/main/resources/alert.properties +++ b/dolphinscheduler-alert/src/main/resources/alert.properties @@ -28,5 +28,3 @@ alert.plugin.dir=./lib/plugin/alert #alert.plugin.binding config the Alert Plugin need be load when development and run in IDE #alert.plugin.binding=\ # ./dolphinscheduler-alert-plugin/dolphinscheduler-alert-email/pom.xml - - diff --git a/dolphinscheduler-api/src/main/resources/application-api.properties b/dolphinscheduler-api/src/main/resources/application-api.properties index e2cabfac67..f42588112b 100644 --- a/dolphinscheduler-api/src/main/resources/application-api.properties +++ b/dolphinscheduler-api/src/main/resources/application-api.properties @@ -21,9 +21,10 @@ server.port=12345 # session config server.servlet.session.timeout=7200 +# servlet config server.servlet.context-path=/dolphinscheduler/ -# Set time zone +# time zone spring.jackson.time-zone=GMT+8 # file size limit for upload @@ -34,12 +35,13 @@ spring.servlet.multipart.max-request-size=1024MB server.compression.enabled=true server.compression.mime-types=text/html,text/xml,text/plain,text/css,text/javascript,application/javascript,application/json,application/xml -#post content +# post content server.jetty.max-http-form-post-size=5000000 +# i18n spring.messages.encoding=UTF-8 -#i18n classpath folder , file prefix messages, if have many files, use "," seperator +# i18n classpath folder , file prefix messages, if have many files, use "," seperator spring.messages.basename=i18n/messages # Authentication types (supported types: PASSWORD) @@ -58,5 +60,3 @@ security.authentication.type=PASSWORD #ldap.password=password #ldap.user.identity.attribute=uid #ldap.user.email.attribute=mail - - diff --git a/dolphinscheduler-api/src/main/resources/logback-api.xml b/dolphinscheduler-api/src/main/resources/logback-api.xml index e5cb37afac..0f06b9cff5 100644 --- a/dolphinscheduler-api/src/main/resources/logback-api.xml +++ b/dolphinscheduler-api/src/main/resources/logback-api.xml @@ -31,12 +31,12 @@ - ${log.base}/dolphinscheduler-api-server.log + ${log.base}/dolphinscheduler-api.log INFO - ${log.base}/dolphinscheduler-api-server.%d{yyyy-MM-dd_HH}.%i.log + ${log.base}/dolphinscheduler-api.%d{yyyy-MM-dd_HH}.%i.log 168 64MB diff --git a/dolphinscheduler-common/src/main/resources/common.properties b/dolphinscheduler-common/src/main/resources/common.properties index 0f21395229..b3d9156c76 100644 --- a/dolphinscheduler-common/src/main/resources/common.properties +++ b/dolphinscheduler-common/src/main/resources/common.properties @@ -15,7 +15,7 @@ # limitations under the License. # -# resource storage type : HDFS,S3,NONE +# resource storage type : HDFS, S3, NONE resource.storage.type=NONE # resource store on HDFS/S3 path, resource file will store to this hadoop hdfs path, self configuration, please make sure the directory exists on hdfs and have read write permissions。"/dolphinscheduler" is recommended @@ -33,7 +33,7 @@ java.security.krb5.conf.path=/opt/krb5.conf # login user from keytab username login.user.keytab.username=hdfs-mycluster@ESZ.COM -# loginUserFromKeytab path +# login user from keytab path login.user.keytab.path=/opt/hdfs.headless.keytab #resource.view.suffixs @@ -42,16 +42,16 @@ login.user.keytab.path=/opt/hdfs.headless.keytab # if resource.storage.type=HDFS, the user need to have permission to create directories under the HDFS root path hdfs.root.user=hdfs -# if resource.storage.type=S3,the value like: s3a://dolphinscheduler ; if resource.storage.type=HDFS, When namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir +# if resource.storage.type=S3, the value like: s3a://dolphinscheduler; if resource.storage.type=HDFS, When namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir fs.defaultFS=hdfs://mycluster:8020 -# if resource.storage.type=S3,s3 endpoint +# if resource.storage.type=S3, s3 endpoint fs.s3a.endpoint=http://192.168.xx.xx:9010 -# if resource.storage.type=S3,s3 access key +# if resource.storage.type=S3, s3 access key fs.s3a.access.key=A3DXS30FO22544RE -# if resource.storage.type=S3,s3 secret key +# if resource.storage.type=S3, s3 secret key fs.s3a.secret.key=OloCLq3n+8+sdPHUhJ21XrSxTC+JK # if resourcemanager HA enable, please type the HA ips ; if resourcemanager is single, make this value empty @@ -59,6 +59,7 @@ yarn.resourcemanager.ha.rm.ids=192.168.xx.xx,192.168.xx.xx # if resourcemanager HA enable or not use resourcemanager, please keep the default value; If resourcemanager is single, you only need to replace ds1 to actual resourcemanager hostname. yarn.application.status.address=http://ds1:8088/ws/v1/cluster/apps/%s + # job history status url when application number threshold is reached(default 10000,maybe it was set to 1000) yarn.job.history.status.address=http://ds1:19888/ws/v1/history/mapreduce/jobs/%s diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java index a9ebbf000c..c110df0c9f 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java @@ -221,4 +221,7 @@ public interface ProcessInstanceMapper extends BaseMapper { @Param("processDefinitionId") int processDefinitionId, @Param("states") int[] states); + int updateGlobalParamsById( + @Param("globalParams") String globalParams, + @Param("id") int id); } diff --git a/dolphinscheduler-dao/src/main/resources/datasource.properties b/dolphinscheduler-dao/src/main/resources/datasource.properties index 535b7493ce..0deb7fe00b 100644 --- a/dolphinscheduler-dao/src/main/resources/datasource.properties +++ b/dolphinscheduler-dao/src/main/resources/datasource.properties @@ -66,4 +66,4 @@ spring.datasource.password=test # open PSCache, specify count PSCache for every connection #spring.datasource.poolPreparedStatements=true -#spring.datasource.maxPoolPreparedStatementPerConnectionSize=20 +#spring.datasource.maxPoolPreparedStatementPerConnectionSize=20 \ No newline at end of file diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml index 39ccdebb22..432032b366 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml @@ -219,5 +219,9 @@ order by id asc - + + update t_ds_process_instance + set global_params = #{globalParams} + where id = #{id} + diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java index de5b82c729..93cc3eab12 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java @@ -68,6 +68,10 @@ public class TaskExecuteResponseCommand implements Serializable { * varPool string */ private String varPool; + /** + * task return result + */ + private String result; public void setVarPool(String varPool) { this.varPool = varPool; @@ -139,4 +143,12 @@ public class TaskExecuteResponseCommand implements Serializable { + ", appIds='" + appIds + '\'' + '}'; } + + public String getResult() { + return result; + } + + public void setResult(String result) { + this.result = result; + } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java index 080fdd540d..186c4f35ba 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java @@ -80,7 +80,9 @@ public class TaskResponseProcessor implements NettyRequestProcessor { responseCommand.getAppIds(), responseCommand.getTaskInstanceId(), responseCommand.getVarPool(), - channel); + channel, + responseCommand.getResult() + ); taskResponseService.addResponse(taskResponseEvent); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java index 0ca558a560..9789bccb3c 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java @@ -92,6 +92,10 @@ public class TaskResponseEvent { * channel */ private Channel channel; + /** + * task return result + */ + private String result; public static TaskResponseEvent newAck(ExecutionStatus state, Date startTime, @@ -118,7 +122,8 @@ public class TaskResponseEvent { String appIds, int taskInstanceId, String varPool, - Channel channel) { + Channel channel, + String result) { TaskResponseEvent event = new TaskResponseEvent(); event.setState(state); event.setEndTime(endTime); @@ -128,6 +133,7 @@ public class TaskResponseEvent { event.setEvent(Event.RESULT); event.setVarPool(varPool); event.setChannel(channel); + event.setResult(result); return event; } @@ -226,4 +232,12 @@ public class TaskResponseEvent { public void setChannel(Channel channel) { this.channel = channel; } + + public String getResult() { + return result; + } + + public void setResult(String result) { + this.result = result; + } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java index 1b5eddbd6f..f3f2e7f15b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java @@ -165,7 +165,8 @@ public class TaskResponseService { taskResponseEvent.getProcessId(), taskResponseEvent.getAppIds(), taskResponseEvent.getTaskInstanceId(), - taskResponseEvent.getVarPool() + taskResponseEvent.getVarPool(), + taskResponseEvent.getResult() ); } // if taskInstance is null (maybe deleted) . retry will be meaningless . so response success diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index ff448423fa..55832402ab 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -22,11 +22,13 @@ import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_D import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODE_NAMES; import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; +import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS; import static org.apache.dolphinscheduler.common.Constants.SEC_2_MINUTES_TIME_UNIT; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.DependResult; +import org.apache.dolphinscheduler.common.enums.Direct; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.FailureStrategy; import org.apache.dolphinscheduler.common.enums.Flag; @@ -36,6 +38,7 @@ import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNodeRelation; import org.apache.dolphinscheduler.common.process.ProcessDag; +import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.CollectionUtils; @@ -67,6 +70,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Date; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -74,6 +78,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -496,7 +501,8 @@ public class MasterExecThread implements Runnable { */ private TaskInstance createTaskInstance(ProcessInstance processInstance, String nodeName, TaskNode taskNode) { - + //update processInstance for update the globalParams + this.processInstance = this.processService.findProcessInstanceById(this.processInstance.getId()); TaskInstance taskInstance = findTaskIfExists(nodeName); if (taskInstance == null) { taskInstance = new TaskInstance(); @@ -545,13 +551,57 @@ public class MasterExecThread implements Runnable { } else { taskInstance.setWorkerGroup(taskWorkerGroup); } - + //get process global + setProcessGlobal(taskNode, taskInstance); // delay execution time taskInstance.setDelayTime(taskNode.getDelayTime()); } return taskInstance; } + private void setProcessGlobal(TaskNode taskNode, TaskInstance taskInstance) { + String globalParams = this.processInstance.getGlobalParams(); + if (StringUtils.isNotEmpty(globalParams)) { + Map globalMap = getGlobalParamMap(globalParams); + if (globalMap != null && globalMap.size() != 0) { + setGlobalMapToTask(taskNode, taskInstance, globalMap); + } + } + } + + private void setGlobalMapToTask(TaskNode taskNode, TaskInstance taskInstance, Map globalMap) { + // the param save in localParams + Map result = JSONUtils.toMap(taskNode.getParams(), String.class, Object.class); + Object localParams = result.get(LOCAL_PARAMS); + if (localParams != null) { + List allParam = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class); + for (Property info : allParam) { + if (info.getDirect().equals(Direct.IN)) { + String paramName = info.getProp(); + String value = globalMap.get(paramName); + if (StringUtils.isNotEmpty(value)) { + info.setValue(value); + } + } + } + result.put(LOCAL_PARAMS, allParam); + taskNode.setParams(JSONUtils.toJsonString(result)); + // task instance node json + taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode)); + } + } + + public Map getGlobalParamMap(String globalParams) { + List propList; + Map globalParamMap = new HashMap<>(); + if (StringUtils.isNotEmpty(globalParams)) { + propList = JSONUtils.toList(globalParams, Property.class); + globalParamMap = propList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue)); + } + + return globalParamMap; + } + private void submitPostNode(String parentNodeName) { Set submitTaskNodeList = DagHelper.parsePostNodes(parentNodeName, skipTaskNodeList, dag, completeTaskList); List taskInstances = new ArrayList<>(); @@ -956,6 +1006,7 @@ public class MasterExecThread implements Runnable { task.getName(), task.getId(), task.getState()); // node success , post node submit if (task.getState() == ExecutionStatus.SUCCESS) { + processInstance = processService.findProcessInstanceById(processInstance.getId()); processInstance.setVarPool(task.getVarPool()); processService.updateProcessInstance(processInstance); completeTaskList.put(task.getName(), task); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java index 05bd8065e8..c036ac9f69 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java @@ -174,6 +174,7 @@ public class TaskExecuteThread implements Runnable, Delayed { responseCommand.setProcessId(task.getProcessId()); responseCommand.setAppIds(task.getAppIds()); responseCommand.setVarPool(task.getVarPool()); + responseCommand.setResult(task.getResultString()); logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), task.getExitStatus()); } catch (Exception e) { logger.error("task scheduler failure", e); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java index 443bd319ed..037bde6c73 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java @@ -54,6 +54,7 @@ import java.util.regex.Pattern; import org.slf4j.Logger; + /** * abstract command executor */ @@ -84,6 +85,11 @@ public abstract class AbstractCommandExecutor { */ protected final List logBuffer; + /** + * SHELL result string + */ + protected String taskResultString; + /** * taskExecutionContext */ @@ -104,6 +110,10 @@ public abstract class AbstractCommandExecutor { this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class); } + protected AbstractCommandExecutor(List logBuffer) { + this.logBuffer = logBuffer; + } + /** * build process * @@ -223,6 +233,7 @@ public abstract class AbstractCommandExecutor { return varPool.toString(); } + /** * cancel application * @@ -355,6 +366,7 @@ public abstract class AbstractCommandExecutor { varPool.append("$VarPool$"); } else { logBuffer.add(line); + taskResultString = line; lastFlushTime = flush(lastFlushTime); } } @@ -561,4 +573,12 @@ public abstract class AbstractCommandExecutor { protected abstract String commandInterpreter(); protected abstract void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException; + + public String getTaskResultString() { + return taskResultString; + } + + public void setTaskResultString(String taskResultString) { + this.taskResultString = taskResultString; + } } \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java index de7d35f404..68152e269f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java @@ -63,6 +63,11 @@ public abstract class AbstractTask { */ protected int processId; + /** + * SHELL result string + */ + protected String resultString; + /** * other resource manager appId , for example : YARN etc */ @@ -167,6 +172,14 @@ public abstract class AbstractTask { this.processId = processId; } + public String getResultString() { + return resultString; + } + + public void setResultString(String resultString) { + this.resultString = resultString; + } + /** * get task parameters * diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java index 5e297abbf0..8f3da4537d 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java @@ -56,6 +56,9 @@ public class ShellCommandExecutor extends AbstractCommandExecutor { super(logHandler,taskExecutionContext,logger); } + public ShellCommandExecutor(List logBuffer) { + super(logBuffer); + } @Override protected String buildCommandFilePath() { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java index 5cbd3c151f..fb0a76cff2 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java @@ -21,6 +21,7 @@ import static java.util.Calendar.DAY_OF_MONTH; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.Direct; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.shell.ShellParameters; @@ -34,6 +35,8 @@ import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult; import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor; +import org.slf4j.Logger; + import java.io.File; import java.nio.file.Files; import java.nio.file.Path; @@ -41,13 +44,13 @@ import java.nio.file.StandardOpenOption; import java.nio.file.attribute.FileAttribute; import java.nio.file.attribute.PosixFilePermission; import java.nio.file.attribute.PosixFilePermissions; +import java.util.ArrayList; import java.util.Date; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; -import org.slf4j.Logger; - /** * shell task */ @@ -102,6 +105,7 @@ public class ShellTask extends AbstractTask { setExitStatusCode(commandExecuteResult.getExitStatusCode()); setAppIds(commandExecuteResult.getAppIds()); setProcessId(commandExecuteResult.getProcessId()); + setResult(shellCommandExecutor.getTaskResultString()); } catch (Exception e) { logger.error("shell task error", e); setExitStatusCode(Constants.EXIT_CODE_FAILURE); @@ -183,4 +187,17 @@ public class ShellTask extends AbstractTask { } return ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap)); } + + public void setResult(String result) { + Map localParams = shellParameters.getLocalParametersMap(); + List> outProperties = new ArrayList<>(); + Map p = new HashMap<>(); + localParams.forEach((k,v) -> { + if (v.getDirect() == Direct.OUT) { + p.put(k, result); + } + }); + outProperties.add(p); + resultString = JSONUtils.toJsonString(outProperties); + } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java index 411032fd1f..5a16194b3b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.worker.task.sql; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.DbType; +import org.apache.dolphinscheduler.common.enums.Direct; import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.AbstractParameters; @@ -148,7 +149,7 @@ public class SqlTask extends AbstractTask { logger); // execute sql task - executeFuncAndSql(mainSqlBinds, preStatementSqlBinds, postStatementSqlBinds, createFuncs); + executeFuncAndSql(mainSqlBinds, preStatementSqlBinds, postStatementSqlBinds, createFuncs, sqlParameters.getLocalParams()); setExitStatusCode(Constants.EXIT_CODE_SUCCESS); @@ -237,7 +238,8 @@ public class SqlTask extends AbstractTask { public void executeFuncAndSql(SqlBinds mainSqlBinds, List preStatementsBinds, List postStatementsBinds, - List createFuncs) { + List createFuncs, + List properties) { Connection connection = null; PreparedStatement stmt = null; ResultSet resultSet = null; @@ -253,18 +255,21 @@ public class SqlTask extends AbstractTask { preSql(connection, preStatementsBinds); stmt = prepareStatementAndBind(connection, mainSqlBinds); + String result = null; // decide whether to executeQuery or executeUpdate based on sqlType if (sqlParameters.getSqlType() == SqlType.QUERY.ordinal()) { // query statements need to be convert to JsonArray and inserted into Alert to send resultSet = stmt.executeQuery(); - resultProcess(resultSet); + result = resultProcess(resultSet); } else if (sqlParameters.getSqlType() == SqlType.NON_QUERY.ordinal()) { // non query statement - stmt.executeUpdate(); + String updateResult = String.valueOf(stmt.executeUpdate()); + result = setNonQuerySqlReturn(updateResult, properties); } postSql(connection, postStatementsBinds); + this.setResultString(result); } catch (Exception e) { logger.error("execute sql error", e); @@ -274,13 +279,28 @@ public class SqlTask extends AbstractTask { } } + public String setNonQuerySqlReturn(String updateResult, List properties) { + String result = null; + for (Property info :properties) { + if (Direct.OUT == info.getDirect()) { + List> updateRL = new ArrayList<>(); + Map updateRM = new HashMap<>(); + updateRM.put(info.getProp(),updateResult); + updateRL.add(updateRM); + result = JSONUtils.toJsonString(updateRL); + break; + } + } + return result; + } + /** * result process * * @param resultSet resultSet * @throws Exception Exception */ - private void resultProcess(ResultSet resultSet) throws Exception { + private String resultProcess(ResultSet resultSet) throws Exception { ArrayNode resultJSONArray = JSONUtils.createArrayNode(); ResultSetMetaData md = resultSet.getMetaData(); int num = md.getColumnCount(); @@ -297,13 +317,13 @@ public class SqlTask extends AbstractTask { } String result = JSONUtils.toJsonString(resultJSONArray); logger.debug("execute sql : {}", result); - try { sendAttachment(sqlParameters.getGroupId(), StringUtils.isNotEmpty(sqlParameters.getTitle()) ? sqlParameters.getTitle() : taskExecutionContext.getTaskName() + " query result sets", JSONUtils.toJsonString(resultJSONArray)); } catch (Exception e) { logger.warn("sql task sendAttachment error! msg : {} ", e.getMessage()); } + return result; } /** diff --git a/dolphinscheduler-server/src/main/resources/master.properties b/dolphinscheduler-server/src/main/resources/master.properties index 44301fb54e..01d1189507 100644 --- a/dolphinscheduler-server/src/main/resources/master.properties +++ b/dolphinscheduler-server/src/main/resources/master.properties @@ -21,9 +21,8 @@ # master execute task number in parallel #master.exec.task.num=20 - # master dispatch task number -#master.dispatch.task.num = 3 +#master.dispatch.task.num=3 # master heartbeat interval #master.heartbeat.interval=10 @@ -34,7 +33,6 @@ # master commit task interval #master.task.commit.interval=1000 - # only less than cpu avg load, master server can work. default value -1 : the number of cpu cores * 2 #master.max.cpuload.avg=-1 @@ -42,4 +40,4 @@ #master.reserved.memory=0.3 # master listen port -#master.listen.port=5678 \ No newline at end of file +#master.listen.port=5678 diff --git a/dolphinscheduler-server/src/main/resources/worker.properties b/dolphinscheduler-server/src/main/resources/worker.properties index a1236a36bd..5fdbf1d910 100644 --- a/dolphinscheduler-server/src/main/resources/worker.properties +++ b/dolphinscheduler-server/src/main/resources/worker.properties @@ -22,7 +22,7 @@ #worker.heartbeat.interval=10 # only less than cpu avg load, worker server can work. default value -1: the number of cpu cores * 2 -#worker.max.cpuload.avg= -1 +#worker.max.cpuload.avg=-1 # only larger than reserved memory, worker server can work. default value : physical memory * 1/6, unit is G. #worker.reserved.memory=0.3 diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java index 5d10f849c5..ec0807cbdd 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java @@ -70,7 +70,8 @@ public class TaskResponseServiceTest { "ids", 22, "varPol", - channel); + channel, + "[{\"id\":70000,\"database_name\":\"yuul\",\"status\":-1,\"create_time\":1601202829000,\"update_time\":1601202829000,\"table_name3\":\"\",\"table_name4\":\"\"}]"); taskInstance = new TaskInstance(); taskInstance.setId(22); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java index bdd723a4cd..8938f49773 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java @@ -121,6 +121,11 @@ public class TaskCallbackServiceTest { ackCommand.setStartTime(new Date()); taskCallbackService.sendAck(1, ackCommand.convert2Command()); + TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(); + String result = responseCommand.getResult(); + responseCommand.setResult("return string"); + taskCallbackService.sendResult(1, responseCommand.convert2Command()); + Stopper.stop(); nettyRemotingServer.close(); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutorTest.java new file mode 100644 index 0000000000..348775cf67 --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutorTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.task; + +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({SpringApplicationContext.class}) +public class AbstractCommandExecutorTest { + + private static final Logger logger = LoggerFactory.getLogger(AbstractCommandExecutorTest.class); + + private ShellCommandExecutor shellCommandExecutor; + + @Before + public void before() throws Exception { + System.setProperty("log4j2.disable.jmx", Boolean.TRUE.toString()); + shellCommandExecutor = new ShellCommandExecutor(null); + } + + @Test + public void testSetTaskResultString() { + shellCommandExecutor.setTaskResultString("shellReturn"); + } + + @Test + public void testGetTaskResultString() { + logger.info(shellCommandExecutor.getTaskResultString()); + } +} \ No newline at end of file diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/ShellTaskReturnTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/ShellTaskReturnTest.java new file mode 100644 index 0000000000..e5bf3bfc40 --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/ShellTaskReturnTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.task; + +import static org.mockito.ArgumentMatchers.anyString; + +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult; +import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor; +import org.apache.dolphinscheduler.server.worker.task.shell.ShellTask; +import org.apache.dolphinscheduler.server.worker.task.shell.ShellTaskTest; + +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * shell task return test. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest({ShellTask.class}) +public class ShellTaskReturnTest { + private static final Logger logger = LoggerFactory.getLogger(ShellTaskTest.class); + + private ShellTask shellTask; + private ShellCommandExecutor shellCommandExecutor; + private TaskExecutionContext taskExecutionContext; + private CommandExecuteResult commandExecuteResult; + + @Before + public void before() throws Exception { + System.setProperty("log4j2.disable.jmx", Boolean.TRUE.toString()); + shellCommandExecutor = PowerMockito.mock(ShellCommandExecutor.class); + PowerMockito.whenNew(ShellCommandExecutor.class).withAnyArguments().thenReturn(shellCommandExecutor); + taskExecutionContext = new TaskExecutionContext(); + taskExecutionContext.setTaskInstanceId(1); + taskExecutionContext.setTaskName("kris test"); + taskExecutionContext.setTaskType("SHELL"); + taskExecutionContext.setHost("127.0.0.1:1234"); + taskExecutionContext.setExecutePath("/tmp"); + taskExecutionContext.setLogPath("/log"); + taskExecutionContext.setTaskJson( + "{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"]," + + "\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false," + + "\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"" + + "tasks-16849\",\"maxRetryTimes\":0,\"name\":\"shell test 001\"," + + "\"params\":\"{\\\"rawScript\\\":\\\"#!/bin/sh\\\\necho $[yyyy-MM-dd HH:mm:ss +3]\\\\necho \\\\\\\" ?? " + + "${time1} \\\\\\\"\\\\necho \\\\\\\" ????? ${time2}\\\\\\\"\\\\n\\\"," + + "\\\"localParams\\\":[{\\\"prop\\\":\\\"time1\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":" + + "\\\"VARCHAR\\\",\\\"value\\\":\\\"$[yyyy-MM-dd HH:mm:ss]\\\"}," + + "{\\\"prop\\\":\\\"time2\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"${time_gb}\\\"}" + + "],\\\"resourceList\\\":[]}\",\"preTasks\":\"[]\",\"retryInterval\":1," + + "\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\",\"taskTimeoutParameter\":" + + "{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\"," + + "\"type\":\"SHELL\",\"workerGroup\":\"default\"}"); + taskExecutionContext.setProcessInstanceId(1); + taskExecutionContext.setGlobalParams("[{\"direct\":\"IN\",\"prop\":\"time_gb\",\"type\":\"VARCHAR\",\"value\":\"2020-12-16 17:18:33\"}]"); + taskExecutionContext.setExecutorId(1); + taskExecutionContext.setCmdTypeIfComplement(5); + taskExecutionContext.setTenantCode("roo"); + taskExecutionContext.setScheduleTime(new Date()); + taskExecutionContext.setQueue("default"); + taskExecutionContext.setTaskParams( + "{\"rawScript\":\"#!/bin/sh\\necho $[yyyy-MM-dd HH:mm:ss +3]\\necho \\\" ?? ${time1} \\\"\\necho \\\" ????? ${time2}\\\"\\n\",\"localParams\":" + + + "[{\"prop\":\"time1\",\"direct\":\"OUT\",\"type\":\"VARCHAR\",\"value\":\"$[yyyy-MM-dd HH:mm:ss]\"},{\"prop\":\"time2\",\"direct\":\"IN\",\"type\":\"VARCHAR" + + "\",\"value\":\"${time_gb}\"}],\"resourceList\":[]}"); + Map definedParams = new HashMap<>(); + definedParams.put("time_gb", "2020-12-16 00:00:00"); + taskExecutionContext.setDefinedParams(definedParams); + PowerMockito.mockStatic(Files.class); + PowerMockito.when(Files.exists(Paths.get(anyString()))).thenReturn(true); + commandExecuteResult = new CommandExecuteResult(); + commandExecuteResult.setAppIds("appId"); + commandExecuteResult.setExitStatusCode(0); + commandExecuteResult.setProcessId(1); + } + + @Test + public void testShellReturnString() { + shellTask = new ShellTask(taskExecutionContext, logger); + shellTask.init(); + try { + PowerMockito.when(shellCommandExecutor.run(anyString())).thenReturn(commandExecuteResult); + } catch (Exception e) { + e.printStackTrace(); + } + shellTask.setResult("shell return string"); + logger.info("shell return string:{}", shellTask.getResultString()); + } + + @Test + public void testSetTaskResultString() { + shellCommandExecutor.setTaskResultString("shellReturn"); + } + + @Test + public void testGetTaskResultString() { + logger.info(shellCommandExecutor.getTaskResultString()); + } +} diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java index 6acfd180c2..24ed5b956d 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java @@ -17,13 +17,22 @@ package org.apache.dolphinscheduler.server.worker.task; +import org.apache.dolphinscheduler.common.process.Property; +import org.apache.dolphinscheduler.common.task.sql.SqlParameters; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils; +import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl; +import org.apache.dolphinscheduler.server.worker.task.shell.ShellTask; +import org.apache.dolphinscheduler.server.worker.task.sql.SqlTask; import org.apache.dolphinscheduler.service.alert.AlertClientService; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.junit.Assert; import org.junit.Before; @@ -116,4 +125,70 @@ public class TaskManagerTest { taskExecutionContext.setTaskType("XXX"); TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService); } + + @Test + public void testShellTaskReturnString() { + taskExecutionContext.setTaskInstanceId(1); + taskExecutionContext.setTaskName("kris test"); + taskExecutionContext.setTaskType("SHELL"); + taskExecutionContext.setHost("127.0.0.1:1234"); + taskExecutionContext.setExecutePath("/tmp"); + taskExecutionContext.setLogPath("/log"); + taskExecutionContext.setTaskJson( + "{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\"," + + "\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"" + + "tasks-16849\",\"maxRetryTimes\":0,\"name\":\"shell test 001\"," + + "\"params\":\"{\\\"rawScript\\\":\\\"#!/bin/sh\\\\necho $[yyyy-MM-dd HH:mm:ss +3]\\\\necho \\\\\\\" ?? " + + "${time1} \\\\\\\"\\\\necho \\\\\\\" ????? ${time2}\\\\\\\"\\\\n\\\"," + + "\\\"localParams\\\":[{\\\"prop\\\":\\\"time1\\\",\\\"direct\\\":\\\"OUT\\\",\\\"type\\\":" + + "\\\"VARCHAR\\\",\\\"value\\\":\\\"$[yyyy-MM-dd HH:mm:ss]\\\"}," + + "{\\\"prop\\\":\\\"time2\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"${time_gb}\\\"}" + + "],\\\"resourceList\\\":[]}\",\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\",\"taskTimeoutParameter\":" + + "{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\",\"type\":\"SHELL\",\"workerGroup\":\"default\"}"); + taskExecutionContext.setProcessInstanceId(1); + taskExecutionContext.setGlobalParams("[{\"direct\":\"IN\",\"prop\":\"time_gb\",\"type\":\"VARCHAR\",\"value\":\"2020-12-16 17:18:33\"}]"); + taskExecutionContext.setExecutorId(1); + taskExecutionContext.setCmdTypeIfComplement(5); + taskExecutionContext.setTenantCode("roo"); + taskExecutionContext.setScheduleTime(new Date()); + taskExecutionContext.setQueue("default"); + taskExecutionContext.setTaskParams( + "{\"rawScript\":\"#!/bin/sh\\necho $[yyyy-MM-dd HH:mm:ss +3]\\necho \\\" ?? ${time1} \\\"\\necho \\\" ????? ${time2}\\\"\\n\",\"localParams\":" + + + "[{\"prop\":\"time1\",\"direct\":\"OUT\",\"type\":\"VARCHAR\",\"value\":\"$[yyyy-MM-dd HH:mm:ss]\"},{\"prop\":\"time2\",\"direct\":\"IN\",\"type\":\"VARCHAR" + + "\",\"value\":\"${time_gb}\"}],\"resourceList\":[]}"); + Map definedParams = new HashMap<>(); + definedParams.put("time_gb", "2020-12-16 00:00:00"); + taskExecutionContext.setDefinedParams(definedParams); + ShellTask shellTask = (ShellTask) TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService); + shellTask.setResultString("shell return"); + String shellReturn = shellTask.getResultString(); + shellTask.init(); + shellTask.setResult(shellReturn); + Assert.assertSame(shellReturn, "shell return"); + } + + @Test + public void testSqlTaskReturnString() { + String params = "{\"user\":\"root\",\"password\":\"123456\",\"address\":\"jdbc:mysql://127.0.0.1:3306\"," + + "\"database\":\"test\",\"jdbcUrl\":\"jdbc:mysql://127.0.0.1:3306/test\"}"; + taskExecutionContext = new TaskExecutionContext(); + taskExecutionContext.setTaskParams("{\"localParams\":[{\"prop\":\"ret\", \"direct\":\"OUT\", \"type\":\"VARCHAR\", \"value\":\"\"}]," + + "\"type\":\"POSTGRESQL\",\"datasource\":1,\"sql\":\"insert into tb_1 values('1','2')\"," + + "\"sqlType\":1}"); + taskExecutionContext.setExecutePath("/tmp"); + taskExecutionContext.setTaskAppId("1"); + taskExecutionContext.setTenantCode("root"); + taskExecutionContext.setStartTime(new Date()); + taskExecutionContext.setTaskTimeout(10000); + taskExecutionContext.setLogPath("/tmp/dx"); + + SQLTaskExecutionContext sqlTaskExecutionContext = new SQLTaskExecutionContext(); + sqlTaskExecutionContext.setConnectionParams(params); + taskExecutionContext.setSqlTaskExecutionContext(sqlTaskExecutionContext); + SqlTask sqlTask = new SqlTask(taskExecutionContext, logger, null); + SqlParameters sqlParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), SqlParameters.class); + List properties = sqlParameters.getLocalParams(); + sqlTask.setNonQuerySqlReturn("sql return", properties); + } } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java index c5f2de82ea..8c734af2ce 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java @@ -19,12 +19,15 @@ package org.apache.dolphinscheduler.server.worker.task.shell; import static org.mockito.ArgumentMatchers.anyString; +import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult; import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import java.nio.file.Files; import java.nio.file.Paths; +import java.sql.DriverManager; import java.util.Date; import java.util.HashMap; import java.util.Map; @@ -42,7 +45,7 @@ import org.slf4j.LoggerFactory; * shell task test. */ @RunWith(PowerMockRunner.class) -@PrepareForTest({ShellTask.class}) +@PrepareForTest(value = {ShellTask.class, DriverManager.class, SpringApplicationContext.class, ParameterUtils.class}) public class ShellTaskTest { private static final Logger logger = LoggerFactory.getLogger(ShellTaskTest.class); @@ -57,6 +60,7 @@ public class ShellTaskTest { System.setProperty("log4j2.disable.jmx", Boolean.TRUE.toString()); shellCommandExecutor = PowerMockito.mock(ShellCommandExecutor.class); PowerMockito.whenNew(ShellCommandExecutor.class).withAnyArguments().thenReturn(shellCommandExecutor); + shellCommandExecutor.setTaskResultString("shellReturn"); taskExecutionContext = new TaskExecutionContext(); taskExecutionContext.setTaskInstanceId(1); taskExecutionContext.setTaskName("kris test"); @@ -68,7 +72,7 @@ public class ShellTaskTest { "{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"" + "tasks-16849\",\"maxRetryTimes\":0,\"name\":\"shell test 001\",\"params\":\"{\\\"rawScript\\\":\\\"#!/bin/sh\\\\necho $[yyyy-MM-dd HH:mm:ss +3]\\\\necho \\\\\\\" ?? " - + "${time1} \\\\\\\"\\\\necho \\\\\\\" ????? ${time2}\\\\\\\"\\\\n\\\",\\\"localParams\\\":[{\\\"prop\\\":\\\"time1\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":" + + "${time1} \\\\\\\"\\\\necho \\\\\\\" ????? ${time2}\\\\\\\"\\\\n\\\",\\\"localParams\\\":[{\\\"prop\\\":\\\"time1\\\",\\\"direct\\\":\\\"OUT\\\",\\\"type\\\":" + "\\\"VARCHAR\\\",\\\"value\\\":\\\"$[yyyy-MM-dd HH:mm:ss]\\\"},{\\\"prop\\\":\\\"time2\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"${time_gb}\\\"}" + "],\\\"resourceList\\\":[]}\",\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\",\"taskTimeoutParameter\":" + "{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\",\"type\":\"SHELL\",\"workerGroup\":\"default\"}"); @@ -82,7 +86,7 @@ public class ShellTaskTest { taskExecutionContext.setTaskParams( "{\"rawScript\":\"#!/bin/sh\\necho $[yyyy-MM-dd HH:mm:ss +3]\\necho \\\" ?? ${time1} \\\"\\necho \\\" ????? ${time2}\\\"\\n\",\"localParams\":" + - "[{\"prop\":\"time1\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"$[yyyy-MM-dd HH:mm:ss]\"},{\"prop\":\"time2\",\"direct\":\"IN\",\"type\":\"VARCHAR" + "[{\"prop\":\"time1\",\"direct\":\"OUT\",\"type\":\"VARCHAR\",\"value\":\"$[yyyy-MM-dd HH:mm:ss]\"},{\"prop\":\"time2\",\"direct\":\"IN\",\"type\":\"VARCHAR" + "\",\"value\":\"${time_gb}\"}],\"resourceList\":[]}"); Map definedParams = new HashMap<>(); definedParams.put("time_gb", "2020-12-16 00:00:00"); @@ -111,4 +115,13 @@ public class ShellTaskTest { PowerMockito.when(shellCommandExecutor.run(anyString())).thenReturn(commandExecuteResult); shellTask.handle(); } + + @Test + public void testSetResult() { + shellTask = new ShellTask(taskExecutionContext, logger); + shellTask.init(); + String r = "return"; + shellTask.setResult(r); + } + } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTaskTest.java index 64db568916..2abb91c6b6 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTaskTest.java @@ -19,10 +19,12 @@ package org.apache.dolphinscheduler.server.worker.task.sql; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.ParameterUtils; +import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.worker.task.TaskProps; import org.apache.dolphinscheduler.service.alert.AlertClientService; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import java.sql.Connection; import java.sql.DriverManager; @@ -44,7 +46,7 @@ import org.slf4j.LoggerFactory; * sql task test */ @RunWith(PowerMockRunner.class) -@PrepareForTest(value = {SqlTask.class, DriverManager.class}) +@PrepareForTest(value = {SqlTask.class, DriverManager.class, SpringApplicationContext.class, ParameterUtils.class}) public class SqlTaskTest { private static final Logger logger = LoggerFactory.getLogger(SqlTaskTest.class); @@ -70,7 +72,9 @@ public class SqlTaskTest { props.setTaskStartTime(new Date()); props.setTaskTimeout(0); props.setTaskParams( - "{\"localParams\":[],\"type\":\"POSTGRESQL\",\"datasource\":1,\"sql\":\"insert into tb_1 values('1','2')\",\"sqlType\":1}"); + "{\"localParams\":[{\"prop\":\"ret\", \"direct\":\"OUT\", \"type\":\"VARCHAR\", \"value\":\"\"}]," + + "\"type\":\"POSTGRESQL\",\"datasource\":1,\"sql\":\"insert into tb_1 values('1','2')\"," + + "\"sqlType\":1}"); taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class); PowerMockito.when(taskExecutionContext.getTaskParams()).thenReturn(props.getTaskParams()); @@ -85,6 +89,8 @@ public class SqlTaskTest { sqlTaskExecutionContext.setConnectionParams(CONNECTION_PARAMS); PowerMockito.when(taskExecutionContext.getSqlTaskExecutionContext()).thenReturn(sqlTaskExecutionContext); + PowerMockito.mockStatic(SpringApplicationContext.class); + PowerMockito.when(SpringApplicationContext.getBean(Mockito.any())).thenReturn(new AlertDao()); alertClientService = PowerMockito.mock(AlertClientService.class); sqlTask = new SqlTask(taskExecutionContext, logger, alertClientService); sqlTask.init(); @@ -95,7 +101,7 @@ public class SqlTaskTest { Assert.assertNotNull(sqlTask.getParameters()); } - @Test(expected = Exception.class) + @Test public void testHandle() throws Exception { Connection connection = PowerMockito.mock(Connection.class); PowerMockito.mockStatic(DriverManager.class); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index cc82844669..fd888e932d 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -24,6 +24,7 @@ import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PRO import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID; +import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS; import static org.apache.dolphinscheduler.common.Constants.YYYY_MM_DD_HH_MM_SS; import static java.util.stream.Collectors.toSet; @@ -33,6 +34,7 @@ import org.apache.dolphinscheduler.common.enums.AuthorizationType; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.ConditionType; import org.apache.dolphinscheduler.common.enums.CycleEnum; +import org.apache.dolphinscheduler.common.enums.Direct; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.FailureStrategy; import org.apache.dolphinscheduler.common.enums.Flag; @@ -112,6 +114,7 @@ import java.util.Date; import java.util.EnumMap; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; @@ -126,6 +129,8 @@ import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import com.cronutils.model.Cron; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; /** @@ -494,7 +499,7 @@ public class ProcessService { * recursive query sub process definition id by parent id. * * @param parentId parentId - * @param ids ids + * @param ids ids */ public void recurseFindSubProcessId(int parentId, List ids) { List taskNodeList = this.getTaskNodeListByDefinitionId(parentId); @@ -519,7 +524,7 @@ public class ProcessService { * create recovery waiting thread command and delete origin command at the same time. * if the recovery command is exists, only update the field update_time * - * @param originCommand originCommand + * @param originCommand originCommand * @param processInstance processInstance */ public void createRecoveryWaitingThreadCommand(Command originCommand, ProcessInstance processInstance) { @@ -571,7 +576,7 @@ public class ProcessService { /** * get schedule time from command * - * @param command command + * @param command command * @param cmdParam cmdParam map * @return date */ @@ -587,8 +592,8 @@ public class ProcessService { * generate a new work process instance from command. * * @param processDefinition processDefinition - * @param command command - * @param cmdParam cmdParam map + * @param command command + * @param cmdParam cmdParam map * @return process instance */ private ProcessInstance generateNewProcessInstance(ProcessDefinition processDefinition, @@ -662,7 +667,7 @@ public class ProcessService { * use definition creator's tenant. * * @param tenantId tenantId - * @param userId userId + * @param userId userId * @return tenant */ public Tenant getTenantForProcess(int tenantId, int userId) { @@ -685,7 +690,7 @@ public class ProcessService { /** * check command parameters is valid * - * @param command command + * @param command command * @param cmdParam cmdParam map * @return whether command param is valid */ @@ -705,7 +710,7 @@ public class ProcessService { * construct process instance according to one command. * * @param command command - * @param host host + * @param host host * @return process instance */ private ProcessInstance constructProcessInstance(Command command, String host) { @@ -747,11 +752,6 @@ public class ProcessService { } else { processInstance = this.findProcessInstanceDetailById(processInstanceId); // Recalculate global parameters after rerun. - processInstance.setGlobalParams(ParameterUtils.curingGlobalParams( - processDefinition.getGlobalParamMap(), - processDefinition.getGlobalParamList(), - getCommandTypeIfComplement(processInstance, command), - processInstance.getScheduleTime())); } processDefinition = processDefineMapper.selectById(processInstance.getProcessDefinitionId()); processInstance.setProcessDefinition(processDefinition); @@ -868,7 +868,7 @@ public class ProcessService { * return complement data if the process start with complement data * * @param processInstance processInstance - * @param command command + * @param command command * @return command type */ private CommandType getCommandTypeIfComplement(ProcessInstance processInstance, Command command) { @@ -883,8 +883,8 @@ public class ProcessService { * initialize complement data parameters * * @param processDefinition processDefinition - * @param processInstance processInstance - * @param cmdParam cmdParam + * @param processInstance processInstance + * @param cmdParam cmdParam */ private void initComplementDataParam(ProcessDefinition processDefinition, ProcessInstance processInstance, @@ -956,7 +956,7 @@ public class ProcessService { * only the keys doesn't in sub process global would be joined. * * @param parentGlobalParams parentGlobalParams - * @param subGlobalParams subGlobalParams + * @param subGlobalParams subGlobalParams * @return global params join */ private String joinGlobalParams(String parentGlobalParams, String subGlobalParams) { @@ -1026,7 +1026,7 @@ public class ProcessService { * set map {parent instance id, task instance id, 0(child instance id)} * * @param parentInstance parentInstance - * @param parentTask parentTask + * @param parentTask parentTask * @return process instance map */ private ProcessInstanceMap setProcessInstanceMap(ProcessInstance parentInstance, TaskInstance parentTask) { @@ -1055,7 +1055,7 @@ public class ProcessService { * find previous task work process map. * * @param parentProcessInstance parentProcessInstance - * @param parentTask parentTask + * @param parentTask parentTask * @return process instance map */ private ProcessInstanceMap findPreviousTaskProcessMap(ProcessInstance parentProcessInstance, @@ -1081,7 +1081,7 @@ public class ProcessService { * create sub work process command * * @param parentProcessInstance parentProcessInstance - * @param task task + * @param task task */ public void createSubWorkProcess(ProcessInstance parentProcessInstance, TaskInstance task) { if (!task.isSubProcess()) { @@ -1180,7 +1180,7 @@ public class ProcessService { * update sub process definition * * @param parentProcessInstance parentProcessInstance - * @param childDefinitionId childDefinitionId + * @param childDefinitionId childDefinitionId */ private void updateSubProcessDefinitionByParent(ProcessInstance parentProcessInstance, int childDefinitionId) { ProcessDefinition fatherDefinition = this.findProcessDefineById(parentProcessInstance.getProcessDefinitionId()); @@ -1194,7 +1194,7 @@ public class ProcessService { /** * submit task to mysql * - * @param taskInstance taskInstance + * @param taskInstance taskInstance * @param processInstance processInstance * @return task instance */ @@ -1248,7 +1248,7 @@ public class ProcessService { * return stop if work process state is ready stop * if all of above are not satisfied, return submit success * - * @param taskInstance taskInstance + * @param taskInstance taskInstance * @param processInstanceState processInstanceState * @return process instance state */ @@ -1257,9 +1257,11 @@ public class ProcessService { // running, delayed or killed // the task already exists in task queue // return state - if (state == ExecutionStatus.RUNNING_EXECUTION - || state == ExecutionStatus.DELAY_EXECUTION - || state == ExecutionStatus.KILL) { + if ( + state == ExecutionStatus.RUNNING_EXECUTION + || state == ExecutionStatus.DELAY_EXECUTION + || state == ExecutionStatus.KILL + ) { return state; } //return pasue /stop if process instance state is ready pause / stop @@ -1422,7 +1424,7 @@ public class ProcessService { * get id list by task state * * @param instanceId instanceId - * @param state state + * @param state state * @return task instance states */ public List findTaskIdByInstanceState(int instanceId, ExecutionStatus state) { @@ -1477,7 +1479,7 @@ public class ProcessService { * find work process map by parent process id and parent task id. * * @param parentWorkProcessId parentWorkProcessId - * @param parentTaskId parentTaskId + * @param parentTaskId parentTaskId * @return process instance map */ public ProcessInstanceMap findWorkProcessMapByParent(Integer parentWorkProcessId, Integer parentTaskId) { @@ -1499,7 +1501,7 @@ public class ProcessService { * find sub process instance * * @param parentProcessId parentProcessId - * @param parentTaskId parentTaskId + * @param parentTaskId parentTaskId * @return process instance */ public ProcessInstance findSubProcessInstance(Integer parentProcessId, Integer parentTaskId) { @@ -1531,12 +1533,12 @@ public class ProcessService { /** * change task state * - * @param state state - * @param startTime startTime - * @param host host + * @param state state + * @param startTime startTime + * @param host host * @param executePath executePath - * @param logPath logPath - * @param taskInstId taskInstId + * @param logPath logPath + * @param taskInstId taskInstId */ public void changeTaskState(TaskInstance taskInstance, ExecutionStatus state, Date startTime, String host, String executePath, @@ -1564,12 +1566,12 @@ public class ProcessService { * update the process instance * * @param processInstanceId processInstanceId - * @param processJson processJson - * @param globalParams globalParams - * @param scheduleTime scheduleTime - * @param flag flag - * @param locations locations - * @param connects connects + * @param processJson processJson + * @param globalParams globalParams + * @param scheduleTime scheduleTime + * @param flag flag + * @param locations locations + * @param connects connects * @return update process instance result */ public int updateProcessInstance(Integer processInstanceId, String processJson, @@ -1590,25 +1592,85 @@ public class ProcessService { /** * change task state * - * @param state state - * @param endTime endTime + * @param state state + * @param endTime endTime * @param taskInstId taskInstId - * @param varPool varPool + * @param varPool varPool */ public void changeTaskState(TaskInstance taskInstance, ExecutionStatus state, Date endTime, int processId, String appIds, int taskInstId, - String varPool) { + String varPool, + String result) { taskInstance.setPid(processId); taskInstance.setAppLink(appIds); taskInstance.setState(state); taskInstance.setEndTime(endTime); taskInstance.setVarPool(varPool); + changeOutParam(result, taskInstance); saveTaskInstance(taskInstance); } + public void changeOutParam(String result, TaskInstance taskInstance) { + if (StringUtils.isEmpty(result)) { + return; + } + List> workerResultParam = getListMapByString(result); + if (CollectionUtils.isEmpty(workerResultParam)) { + return; + } + //if the result more than one line,just get the first . + Map row = workerResultParam.get(0); + if (row == null || row.size() == 0) { + return; + } + TaskNode taskNode = JSONUtils.parseObject(taskInstance.getTaskJson(), TaskNode.class); + Map taskParams = JSONUtils.toMap(taskNode.getParams(), String.class, Object.class); + Object localParams = taskParams.get(LOCAL_PARAMS); + if (localParams == null) { + return; + } + ProcessInstance processInstance = this.processInstanceMapper.queryDetailById(taskInstance.getProcessInstanceId()); + List params4Property = JSONUtils.toList(processInstance.getGlobalParams(), Property.class); + Map allParamMap = params4Property.stream().collect(Collectors.toMap(Property::getProp, Property -> Property)); + + List allParam = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class); + for (Property info : allParam) { + if (info.getDirect() == Direct.OUT) { + String paramName = info.getProp(); + Property property = allParamMap.get(paramName); + if (property == null) { + continue; + } + String value = row.get(paramName); + if (StringUtils.isNotEmpty(value)) { + property.setValue(value); + info.setValue(value); + } + } + } + taskParams.put(LOCAL_PARAMS, allParam); + taskNode.setParams(JSONUtils.toJsonString(taskParams)); + // task instance node json + taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode)); + String params4ProcessString = JSONUtils.toJsonString(params4Property); + int updateCount = this.processInstanceMapper.updateGlobalParamsById(params4ProcessString, processInstance.getId()); + logger.info("updateCount:{}, params4Process:{}, processInstanceId:{}", updateCount, params4ProcessString, processInstance.getId()); + } + + public List> getListMapByString(String json) { + List> allParams = new ArrayList<>(); + ArrayNode paramsByJson = JSONUtils.parseArray(json); + Iterator listIterator = paramsByJson.iterator(); + while (listIterator.hasNext()) { + Map param = JSONUtils.toMap(listIterator.next().toString(), String.class, String.class); + allParams.add(param); + } + return allParams; + } + /** * convert integer list to string list * @@ -1701,7 +1763,7 @@ public class ProcessService { * update process instance state by id * * @param processInstanceId processInstanceId - * @param executionStatus executionStatus + * @param executionStatus executionStatus * @return update process result */ public int updateProcessInstanceState(Integer processInstanceId, ExecutionStatus executionStatus) { @@ -1738,7 +1800,7 @@ public class ProcessService { /** * find tenant code by resource name * - * @param resName resource name + * @param resName resource name * @param resourceType resource type * @return tenant code */ @@ -1762,9 +1824,9 @@ public class ProcessService { /** * get dependency cycle by work process define id and scheduler fire time * - * @param masterId masterId + * @param masterId masterId * @param processDefinitionId processDefinitionId - * @param scheduledFireTime the time the task schedule is expected to trigger + * @param scheduledFireTime the time the task schedule is expected to trigger * @return CycleDependency * @throws Exception if error throws Exception */ @@ -1777,8 +1839,8 @@ public class ProcessService { /** * get dependency cycle list by work process define id list and scheduler fire time * - * @param masterId masterId - * @param ids ids + * @param masterId masterId + * @param ids ids * @param scheduledFireTime the time the task schedule is expected to trigger * @return CycleDependency list * @throws Exception if error throws Exception @@ -1873,8 +1935,8 @@ public class ProcessService { * find last running process instance * * @param definitionId process definition id - * @param startTime start time - * @param endTime end time + * @param startTime start time + * @param endTime end time * @return process instance */ public ProcessInstance findLastRunningProcess(int definitionId, Date startTime, Date endTime) { @@ -1974,7 +2036,7 @@ public class ProcessService { /** * list unauthorized udf function * - * @param userId user id + * @param userId user id * @param needChecks data source id array * @return unauthorized udf function list */ diff --git a/dolphinscheduler-service/src/main/resources/logback-zookeeper.xml b/dolphinscheduler-service/src/main/resources/logback-zookeeper.xml index 34a15a7a5b..9bdf18f8cd 100644 --- a/dolphinscheduler-service/src/main/resources/logback-zookeeper.xml +++ b/dolphinscheduler-service/src/main/resources/logback-zookeeper.xml @@ -32,7 +32,7 @@ ${log.base}/dolphinscheduler-zookeeper.log - ${log.base}/dolphinscheduler-alert.%d{yyyy-MM-dd_HH}.%i.log + ${log.base}/dolphinscheduler-zookeeper.%d{yyyy-MM-dd_HH}.%i.log 20 64MB diff --git a/dolphinscheduler-service/src/main/resources/quartz.properties b/dolphinscheduler-service/src/main/resources/quartz.properties index 6e208f62d4..93ee71c6a3 100644 --- a/dolphinscheduler-service/src/main/resources/quartz.properties +++ b/dolphinscheduler-service/src/main/resources/quartz.properties @@ -51,4 +51,4 @@ #============================================================================ # Configure Datasources #============================================================================ -#org.quartz.dataSource.myDs.connectionProvider.class = org.apache.dolphinscheduler.service.quartz.DruidConnectionProvider \ No newline at end of file +#org.quartz.dataSource.myDs.connectionProvider.class = org.apache.dolphinscheduler.service.quartz.DruidConnectionProvider diff --git a/dolphinscheduler-service/src/main/resources/zookeeper.properties b/dolphinscheduler-service/src/main/resources/zookeeper.properties index 31f5f43c42..006b12036d 100644 --- a/dolphinscheduler-service/src/main/resources/zookeeper.properties +++ b/dolphinscheduler-service/src/main/resources/zookeeper.properties @@ -27,4 +27,4 @@ zookeeper.quorum=localhost:2181 #zookeeper.retry.base.sleep=100 #zookeeper.retry.max.sleep=30000 #zookeeper.retry.maxtime=10 -#zookeeper.max.wait.time=10000 \ No newline at end of file +#zookeeper.max.wait.time=10000 diff --git a/dolphinscheduler-ui.zip b/dolphinscheduler-ui.zip new file mode 100644 index 0000000000..1f11f8f048 Binary files /dev/null and b/dolphinscheduler-ui.zip differ diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue index c56d73f443..131641fccc 100755 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue @@ -178,7 +178,7 @@ :title="$t('Set the DAG diagram name')" :visible.sync="dialogVisible" width="auto"> - + { idArr.forEach(v2 => { if (v2.name === v1.name) { @@ -387,6 +387,12 @@ taskList.forEach(item => { if (item.name === v1.name) { depState = item.state + const params = item.taskJson ? JSON.parse(item.taskJson).params : '' + let localParam = params.localParams || [] + newTask.push({ + id: v2.id, + localParam + }) } }) dom.attr('data-state-id', v1.stateId) @@ -403,6 +409,9 @@ findComponentDownward(this.$root, `${this.type}-details`)._reset() } } + if (!isReset) { + this.resetLocalParam(newTask) + } resolve() }) }) @@ -550,7 +559,11 @@ this.$message.warning(`${i18n.$t('Failed to create node to save')}`) return } + this.dialogVisible = true + this.$nextTick(() => { + this.$refs.mUdp.reloadParam() + }) }, /** * Return to the previous child node diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/localParams.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/localParams.vue index c001a20176..89e0e88c54 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/localParams.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/localParams.vue @@ -21,7 +21,7 @@ :key="item.id" @click="_getIndex($index)"> - + diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/shell.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/shell.vue index 55b9aca071..d66c9e6819 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/shell.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/shell.vue @@ -46,7 +46,7 @@ ref="refLocalParams" @on-local-params="_onLocalParams" :udp-list="localParams" - :hide="false"> + :hide="true"> diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/udp/udp.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/udp/udp.vue index 50f6016191..91b2ca3566 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/udp/udp.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/udp/udp.vue @@ -142,7 +142,12 @@ return true }, _accuStore () { - this.store.commit('dag/setGlobalParams', _.cloneDeep(this.udpList)) + const udp = _.cloneDeep(this.udpList) + udp.forEach(u => { + delete u.ifFixed + }) + this.store.commit('dag/setGlobalParams', udp) + this.store.commit('dag/setName', _.cloneDeep(this.name)) this.store.commit('dag/setTimeout', _.cloneDeep(this.timeout)) this.store.commit('dag/setTenantId', _.cloneDeep(this.tenantId)) @@ -191,6 +196,46 @@ */ close () { this.$emit('close') + }, + /** + * reload localParam + */ + reloadParam () { + const dag = _.cloneDeep(this.store.state.dag) + let fixedParam = [] + const tasks = this.store.state.dag.tasks + for (const task of tasks) { + const localParam = task.params ? task.params.localParams : [] + localParam.forEach(l => { + if (!fixedParam.some(f => { return f.prop === l.prop })) { + fixedParam.push(Object.assign({ + ifFixed: true + }, l)) + } + }) + } + + let globalParams = _.cloneDeep(dag.globalParams) + + globalParams = globalParams.map(g => { + if (fixedParam.some(f => { return g.prop === f.prop })) { + fixedParam = fixedParam.filter(f => { return g.prop !== f.prop }) + return Object.assign(g, { + ifFixed: true + }) + } else { + return g + } + }) + let udpList = [...fixedParam, ...globalParams].sort(s => { + if (s.ifFixed) { + return -1 + } else { + return 1 + } + }) + this.udpList = udpList + this.udpListCache = udpList } }, watch: { @@ -203,8 +248,7 @@ }, created () { const dag = _.cloneDeep(this.store.state.dag) - this.udpList = dag.globalParams - this.udpListCache = dag.globalParams + this.name = dag.name this.originalName = dag.name this.description = dag.description diff --git a/dolphinscheduler-ui/src/js/conf/home/store/dag/mutations.js b/dolphinscheduler-ui/src/js/conf/home/store/dag/mutations.js index 93614c056a..27e06971cb 100755 --- a/dolphinscheduler-ui/src/js/conf/home/store/dag/mutations.js +++ b/dolphinscheduler-ui/src/js/conf/home/store/dag/mutations.js @@ -162,5 +162,16 @@ export default { } else { state.cacheTasks[payload.id] = payload } + }, + resetLocalParam (state, payload) { + const tasks = state.tasks + tasks.forEach((task, index) => { + payload.forEach(p => { + if (p.id === task.id) { + tasks[index].params.localParams = p.localParam + } + }) + }) + state.tasks = tasks } } diff --git a/pom.xml b/pom.xml index c6880c8b8d..7dc7c3fc55 100644 --- a/pom.xml +++ b/pom.xml @@ -925,6 +925,8 @@ **/server/worker/task/sqoop/SqoopTaskTest.java **/server/worker/task/shell/ShellTaskTest.java **/server/worker/task/TaskManagerTest.java + **/server/worker/task/AbstractCommandExecutorTest.java + **/server/worker/task/ShellTaskReturnTest.java **/server/worker/EnvFileTest.java **/server/worker/runner/TaskExecuteThreadTest.java **/server/worker/runner/WorkerManagerThreadTest.java