diff --git a/.gitignore b/.gitignore index 7a99e2e4b0..7cf1d4d7db 100644 --- a/.gitignore +++ b/.gitignore @@ -4,25 +4,27 @@ .zip .gz .DS_Store +.idea .idea/ -dist/ -all-dependencies.txt -self-modules.txt -third-party-dependencies.txt -**/target/ +.idea/* +.target +.target/ +**/**/target/** +target/* +*/target +*/target/* .settings .nbproject .classpath .project -**/*.iml +*.iml *.ipr *.iws *.tgz .*.swp -.factorypath .vim .tmp -**/node_modules +node_modules npm-debug.log .vscode logs/* @@ -39,10 +41,110 @@ dolphinscheduler-alert/logs/ dolphinscheduler-alert/src/main/resources/alert.properties_bak dolphinscheduler-alert/src/main/resources/logback.xml dolphinscheduler-server/src/main/resources/logback.xml -dolphinscheduler-ui/dist/ +dolphinscheduler-ui/dist dolphinscheduler-ui/node -dolphinscheduler-dao/src/main/resources/dao/data_source.properties +dolphinscheduler-ui/dist/css/common.16ac5d9.css +dolphinscheduler-ui/dist/css/home/index.b444b91.css +dolphinscheduler-ui/dist/css/login/index.5866c64.css +dolphinscheduler-ui/dist/js/0.ac94e5d.js +dolphinscheduler-ui/dist/js/0.ac94e5d.js.map +dolphinscheduler-ui/dist/js/1.0b043a3.js +dolphinscheduler-ui/dist/js/1.0b043a3.js.map +dolphinscheduler-ui/dist/js/10.1bce3dc.js +dolphinscheduler-ui/dist/js/10.1bce3dc.js.map +dolphinscheduler-ui/dist/js/11.79f04d8.js +dolphinscheduler-ui/dist/js/11.79f04d8.js.map +dolphinscheduler-ui/dist/js/12.420daa5.js +dolphinscheduler-ui/dist/js/12.420daa5.js.map +dolphinscheduler-ui/dist/js/13.e5bae1c.js +dolphinscheduler-ui/dist/js/13.e5bae1c.js.map +dolphinscheduler-ui/dist/js/14.f2a0dca.js +dolphinscheduler-ui/dist/js/14.f2a0dca.js.map +dolphinscheduler-ui/dist/js/15.45373e8.js +dolphinscheduler-ui/dist/js/15.45373e8.js.map +dolphinscheduler-ui/dist/js/16.fecb0fc.js +dolphinscheduler-ui/dist/js/16.fecb0fc.js.map +dolphinscheduler-ui/dist/js/17.84be279.js +dolphinscheduler-ui/dist/js/17.84be279.js.map +dolphinscheduler-ui/dist/js/18.307ea70.js +dolphinscheduler-ui/dist/js/18.307ea70.js.map +dolphinscheduler-ui/dist/js/19.144db9c.js +dolphinscheduler-ui/dist/js/19.144db9c.js.map +dolphinscheduler-ui/dist/js/2.8b4ef29.js +dolphinscheduler-ui/dist/js/2.8b4ef29.js.map +dolphinscheduler-ui/dist/js/20.4c527e9.js +dolphinscheduler-ui/dist/js/20.4c527e9.js.map +dolphinscheduler-ui/dist/js/21.831b2a2.js +dolphinscheduler-ui/dist/js/21.831b2a2.js.map +dolphinscheduler-ui/dist/js/22.2b4bb2a.js +dolphinscheduler-ui/dist/js/22.2b4bb2a.js.map +dolphinscheduler-ui/dist/js/23.81467ef.js +dolphinscheduler-ui/dist/js/23.81467ef.js.map +dolphinscheduler-ui/dist/js/24.54a00e4.js +dolphinscheduler-ui/dist/js/24.54a00e4.js.map +dolphinscheduler-ui/dist/js/25.8d7bd36.js +dolphinscheduler-ui/dist/js/25.8d7bd36.js.map +dolphinscheduler-ui/dist/js/26.2ec5e78.js +dolphinscheduler-ui/dist/js/26.2ec5e78.js.map +dolphinscheduler-ui/dist/js/27.3ab48c2.js +dolphinscheduler-ui/dist/js/27.3ab48c2.js.map +dolphinscheduler-ui/dist/js/28.363088a.js +dolphinscheduler-ui/dist/js/28.363088a.js.map +dolphinscheduler-ui/dist/js/29.6c5853a.js +dolphinscheduler-ui/dist/js/29.6c5853a.js.map +dolphinscheduler-ui/dist/js/3.a0edb5b.js +dolphinscheduler-ui/dist/js/3.a0edb5b.js.map +dolphinscheduler-ui/dist/js/30.940fdd3.js +dolphinscheduler-ui/dist/js/30.940fdd3.js.map +dolphinscheduler-ui/dist/js/31.168a460.js +dolphinscheduler-ui/dist/js/31.168a460.js.map +dolphinscheduler-ui/dist/js/32.8df6594.js +dolphinscheduler-ui/dist/js/32.8df6594.js.map +dolphinscheduler-ui/dist/js/33.4480bbe.js +dolphinscheduler-ui/dist/js/33.4480bbe.js.map +dolphinscheduler-ui/dist/js/34.b407fe1.js +dolphinscheduler-ui/dist/js/34.b407fe1.js.map +dolphinscheduler-ui/dist/js/35.f340b0a.js +dolphinscheduler-ui/dist/js/35.f340b0a.js.map +dolphinscheduler-ui/dist/js/36.8880c2d.js +dolphinscheduler-ui/dist/js/36.8880c2d.js.map +dolphinscheduler-ui/dist/js/37.ea2a25d.js +dolphinscheduler-ui/dist/js/37.ea2a25d.js.map +dolphinscheduler-ui/dist/js/38.98a59ee.js +dolphinscheduler-ui/dist/js/38.98a59ee.js.map +dolphinscheduler-ui/dist/js/39.a5e958a.js +dolphinscheduler-ui/dist/js/39.a5e958a.js.map +dolphinscheduler-ui/dist/js/4.4ca44db.js +dolphinscheduler-ui/dist/js/4.4ca44db.js.map +dolphinscheduler-ui/dist/js/40.e187b1e.js +dolphinscheduler-ui/dist/js/40.e187b1e.js.map +dolphinscheduler-ui/dist/js/41.0e89182.js +dolphinscheduler-ui/dist/js/41.0e89182.js.map +dolphinscheduler-ui/dist/js/42.341047c.js +dolphinscheduler-ui/dist/js/42.341047c.js.map +dolphinscheduler-ui/dist/js/43.27b8228.js +dolphinscheduler-ui/dist/js/43.27b8228.js.map +dolphinscheduler-ui/dist/js/44.e8869bc.js +dolphinscheduler-ui/dist/js/44.e8869bc.js.map +dolphinscheduler-ui/dist/js/45.8d54901.js +dolphinscheduler-ui/dist/js/45.8d54901.js.map +dolphinscheduler-ui/dist/js/5.e1ed7f3.js +dolphinscheduler-ui/dist/js/5.e1ed7f3.js.map +dolphinscheduler-ui/dist/js/6.241ba07.js +dolphinscheduler-ui/dist/js/6.241ba07.js.map +dolphinscheduler-ui/dist/js/7.ab2e297.js +dolphinscheduler-ui/dist/js/7.ab2e297.js.map +dolphinscheduler-ui/dist/js/8.83ff814.js +dolphinscheduler-ui/dist/js/8.83ff814.js.map +dolphinscheduler-ui/dist/js/9.39cb29f.js +dolphinscheduler-ui/dist/js/9.39cb29f.js.map +dolphinscheduler-ui/dist/js/common.733e342.js +dolphinscheduler-ui/dist/js/common.733e342.js.map +dolphinscheduler-ui/dist/js/home/index.78a5d12.js +dolphinscheduler-ui/dist/js/home/index.78a5d12.js.map +dolphinscheduler-ui/dist/js/login/index.291b8e3.js +dolphinscheduler-ui/dist/js/login/index.291b8e3.js.map +dolphinscheduler-ui/dist/lib/external/ +dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/taskInstance/index.vue +/dolphinscheduler-dao/src/main/resources/dao/data_source.properties -.mvn/wrapper/*.jar - -!/zookeeper_data/ diff --git a/.mvn/wrapper/maven-wrapper.jar b/.mvn/wrapper/maven-wrapper.jar new file mode 100644 index 0000000000..2cc7d4a55c Binary files /dev/null and b/.mvn/wrapper/maven-wrapper.jar differ diff --git a/ambari_plugin/common-services/DOLPHIN/1.2.1/configuration/dolphin-application.xml b/ambari_plugin/common-services/DOLPHIN/1.2.1/configuration/dolphin-application.xml deleted file mode 100644 index 6e50a1b649..0000000000 --- a/ambari_plugin/common-services/DOLPHIN/1.2.1/configuration/dolphin-application.xml +++ /dev/null @@ -1,467 +0,0 @@ - - - - spring.datasource.initialSize - 5 - - Init connection number - - - int - - - - - spring.datasource.minIdle - 5 - - Min connection number - - - int - - - - - spring.datasource.maxActive - 50 - - Max connection number - - - int - - - - - spring.datasource.maxWait - 60000 - - Max wait time for get a connection in milliseconds. - If configuring maxWait, fair locks are enabled by default and concurrency efficiency decreases. - If necessary, unfair locks can be used by configuring the useUnfairLock attribute to true. - - - int - - - - - spring.datasource.timeBetweenEvictionRunsMillis - 60000 - - Milliseconds for check to close free connections - - - int - - - - - spring.datasource.timeBetweenConnectErrorMillis - 60000 - - The Destroy thread detects the connection interval and closes the physical connection in milliseconds - if the connection idle time is greater than or equal to minEvictableIdleTimeMillis. - - - int - - - - - spring.datasource.minEvictableIdleTimeMillis - 300000 - - The longest time a connection remains idle without being evicted, in milliseconds - - - int - - - - - spring.datasource.validationQuery - SELECT 1 - - The SQL used to check whether the connection is valid requires a query statement. - If validation Query is null, testOnBorrow, testOnReturn, and testWhileIdle will not work. - - - - - spring.datasource.validationQueryTimeout - 3 - - int - - - Check whether the connection is valid for timeout, in seconds - - - - - spring.datasource.testWhileIdle - true - - boolean - - - When applying for a connection, - if it is detected that the connection is idle longer than time Between Eviction Runs Millis, - validation Query is performed to check whether the connection is valid - - - - - spring.datasource.testOnBorrow - true - - boolean - - - Execute validation to check if the connection is valid when applying for a connection - - - - - spring.datasource.testOnReturn - false - - boolean - - - Execute validation to check if the connection is valid when the connection is returned - - - - - spring.datasource.defaultAutoCommit - true - - boolean - - - - - - - spring.datasource.keepAlive - false - - boolean - - - - - - - - spring.datasource.poolPreparedStatements - true - - boolean - - - Open PSCache, specify count PSCache for every connection - - - - - spring.datasource.maxPoolPreparedStatementPerConnectionSize - 20 - - int - - - - - - spring.datasource.spring.datasource.filters - stat,wall,log4j - - - - - spring.datasource.connectionProperties - druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000 - - - - - - mybatis-plus.mapper-locations - classpath*:/org.apache.dolphinscheduler.dao.mapper/*.xml - - - - - mybatis-plus.typeEnumsPackage - org.apache.dolphinscheduler.*.enums - - - - - mybatis-plus.typeAliasesPackage - org.apache.dolphinscheduler.dao.entity - - Entity scan, where multiple packages are separated by a comma or semicolon - - - - - mybatis-plus.global-config.db-config.id-type - AUTO - - value-list - - - AUTO - - - - INPUT - - - - ID_WORKER - - - - UUID - - - - 1 - - - Primary key type AUTO:" database ID AUTO ", - INPUT:" user INPUT ID", - ID_WORKER:" global unique ID (numeric type unique ID)", - UUID:" global unique ID UUID"; - - - - - mybatis-plus.global-config.db-config.field-strategy - NOT_NULL - - value-list - - - IGNORED - - - - NOT_NULL - - - - NOT_EMPTY - - - - 1 - - - Field policy IGNORED:" ignore judgment ", - NOT_NULL:" not NULL judgment "), - NOT_EMPTY:" not NULL judgment" - - - - - mybatis-plus.global-config.db-config.column-underline - true - - boolean - - - - - - mybatis-plus.global-config.db-config.logic-delete-value - 1 - - int - - - - - - mybatis-plus.global-config.db-config.logic-not-delete-value - 0 - - int - - - - - - mybatis-plus.global-config.db-config.banner - true - - boolean - - - - - - - mybatis-plus.configuration.map-underscore-to-camel-case - true - - boolean - - - - - - mybatis-plus.configuration.cache-enabled - false - - boolean - - - - - - mybatis-plus.configuration.call-setters-on-nulls - true - - boolean - - - - - - mybatis-plus.configuration.jdbc-type-for-null - null - - - - - master.exec.threads - 100 - - int - - - - - - master.exec.task.num - 20 - - int - - - - - - master.heartbeat.interval - 10 - - int - - - - - - master.task.commit.retryTimes - 5 - - int - - - - - - master.task.commit.interval - 1000 - - int - - - - - - master.max.cpuload.avg - 100 - - int - - - - - - master.reserved.memory - 0.1 - - float - - - - - - worker.exec.threads - 100 - - int - - - - - - worker.heartbeat.interval - 10 - - int - - - - - - worker.fetch.task.num - 3 - - int - - - - - - worker.max.cpuload.avg - 100 - - int - - - - - - worker.reserved.memory - 0.1 - - float - - - - - - \ No newline at end of file diff --git a/ambari_plugin/common-services/DOLPHIN/1.2.1/alerts.json b/ambari_plugin/common-services/DOLPHIN/1.3.3/alerts.json similarity index 95% rename from ambari_plugin/common-services/DOLPHIN/1.2.1/alerts.json rename to ambari_plugin/common-services/DOLPHIN/1.3.3/alerts.json index 769245b366..184f021ac3 100644 --- a/ambari_plugin/common-services/DOLPHIN/1.2.1/alerts.json +++ b/ambari_plugin/common-services/DOLPHIN/1.3.3/alerts.json @@ -65,7 +65,7 @@ "enabled": true, "source": { "type": "SCRIPT", - "path": "DOLPHIN/1.2.1/package/alerts/alert_dolphin_scheduler_status.py", + "path": "DOLPHIN/1.3.3/package/alerts/alert_dolphin_scheduler_status.py", "parameters": [ { @@ -98,7 +98,7 @@ "enabled": true, "source": { "type": "SCRIPT", - "path": "DOLPHIN/1.2.1/package/alerts/alert_dolphin_scheduler_status.py", + "path": "DOLPHIN/1.3.3/package/alerts/alert_dolphin_scheduler_status.py", "parameters": [ { @@ -131,7 +131,7 @@ "enabled": true, "source": { "type": "SCRIPT", - "path": "DOLPHIN/1.2.1/package/alerts/alert_dolphin_scheduler_status.py", + "path": "DOLPHIN/1.3.3/package/alerts/alert_dolphin_scheduler_status.py", "parameters": [ { diff --git a/ambari_plugin/common-services/DOLPHIN/1.2.1/configuration/dolphin-alert.xml b/ambari_plugin/common-services/DOLPHIN/1.3.3/configuration/dolphin-alert.xml similarity index 95% rename from ambari_plugin/common-services/DOLPHIN/1.2.1/configuration/dolphin-alert.xml rename to ambari_plugin/common-services/DOLPHIN/1.3.3/configuration/dolphin-alert.xml index 5b82230148..32abcc791d 100644 --- a/ambari_plugin/common-services/DOLPHIN/1.2.1/configuration/dolphin-alert.xml +++ b/ambari_plugin/common-services/DOLPHIN/1.3.3/configuration/dolphin-alert.xml @@ -90,13 +90,6 @@ - - xls.file.path - /tmp/xls - - - - enterprise.wechat.enable false diff --git a/ambari_plugin/common-services/DOLPHIN/1.2.1/configuration/dolphin-application-api.xml b/ambari_plugin/common-services/DOLPHIN/1.3.3/configuration/dolphin-application-api.xml similarity index 82% rename from ambari_plugin/common-services/DOLPHIN/1.2.1/configuration/dolphin-application-api.xml rename to ambari_plugin/common-services/DOLPHIN/1.3.3/configuration/dolphin-application-api.xml index ea4cb82afd..766c0f477d 100644 --- a/ambari_plugin/common-services/DOLPHIN/1.2.1/configuration/dolphin-application-api.xml +++ b/ambari_plugin/common-services/DOLPHIN/1.3.3/configuration/dolphin-application-api.xml @@ -34,6 +34,12 @@ + + server.servlet.context-path + /dolphinscheduler/ + + + spring.servlet.multipart.max-file-size 1024 @@ -68,4 +74,14 @@ UTF-8 + + spring.messages.basename + i18n/messages + + + + security.authentication.type + PASSWORD + + \ No newline at end of file diff --git a/ambari_plugin/common-services/DOLPHIN/1.2.1/configuration/dolphin-common.xml b/ambari_plugin/common-services/DOLPHIN/1.3.3/configuration/dolphin-common.xml similarity index 100% rename from ambari_plugin/common-services/DOLPHIN/1.2.1/configuration/dolphin-common.xml rename to ambari_plugin/common-services/DOLPHIN/1.3.3/configuration/dolphin-common.xml diff --git a/ambari_plugin/common-services/DOLPHIN/1.3.3/configuration/dolphin-datasource.xml b/ambari_plugin/common-services/DOLPHIN/1.3.3/configuration/dolphin-datasource.xml new file mode 100644 index 0000000000..02d8de0482 --- /dev/null +++ b/ambari_plugin/common-services/DOLPHIN/1.3.3/configuration/dolphin-datasource.xml @@ -0,0 +1,206 @@ + + + + spring.datasource.initialSize + 5 + + Init connection number + + + int + + + + + spring.datasource.minIdle + 5 + + Min connection number + + + int + + + + + spring.datasource.maxActive + 50 + + Max connection number + + + int + + + + + spring.datasource.maxWait + 60000 + + Max wait time for get a connection in milliseconds. + If configuring maxWait, fair locks are enabled by default and concurrency efficiency decreases. + If necessary, unfair locks can be used by configuring the useUnfairLock attribute to true. + + + int + + + + + spring.datasource.timeBetweenEvictionRunsMillis + 60000 + + Milliseconds for check to close free connections + + + int + + + + + spring.datasource.timeBetweenConnectErrorMillis + 60000 + + The Destroy thread detects the connection interval and closes the physical connection in milliseconds + if the connection idle time is greater than or equal to minEvictableIdleTimeMillis. + + + int + + + + + spring.datasource.minEvictableIdleTimeMillis + 300000 + + The longest time a connection remains idle without being evicted, in milliseconds + + + int + + + + + spring.datasource.validationQuery + SELECT 1 + + The SQL used to check whether the connection is valid requires a query statement. + If validation Query is null, testOnBorrow, testOnReturn, and testWhileIdle will not work. + + + + + spring.datasource.validationQueryTimeout + 3 + + int + + + Check whether the connection is valid for timeout, in seconds + + + + + spring.datasource.testWhileIdle + true + + boolean + + + When applying for a connection, + if it is detected that the connection is idle longer than time Between Eviction Runs Millis, + validation Query is performed to check whether the connection is valid + + + + + spring.datasource.testOnBorrow + true + + boolean + + + Execute validation to check if the connection is valid when applying for a connection + + + + + spring.datasource.testOnReturn + false + + boolean + + + Execute validation to check if the connection is valid when the connection is returned + + + + + spring.datasource.defaultAutoCommit + true + + boolean + + + + + + + spring.datasource.keepAlive + false + + boolean + + + + + + + + spring.datasource.poolPreparedStatements + true + + boolean + + + Open PSCache, specify count PSCache for every connection + + + + + spring.datasource.maxPoolPreparedStatementPerConnectionSize + 20 + + int + + + + + + spring.datasource.spring.datasource.filters + stat,wall,log4j + + + + + spring.datasource.connectionProperties + druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000 + + + + \ No newline at end of file diff --git a/ambari_plugin/common-services/DOLPHIN/1.2.1/configuration/dolphin-env.xml b/ambari_plugin/common-services/DOLPHIN/1.3.3/configuration/dolphin-env.xml similarity index 100% rename from ambari_plugin/common-services/DOLPHIN/1.2.1/configuration/dolphin-env.xml rename to ambari_plugin/common-services/DOLPHIN/1.3.3/configuration/dolphin-env.xml diff --git a/ambari_plugin/common-services/DOLPHIN/1.3.3/configuration/dolphin-master.xml b/ambari_plugin/common-services/DOLPHIN/1.3.3/configuration/dolphin-master.xml new file mode 100644 index 0000000000..c8eec047fc --- /dev/null +++ b/ambari_plugin/common-services/DOLPHIN/1.3.3/configuration/dolphin-master.xml @@ -0,0 +1,88 @@ + + + + master.exec.threads + 100 + + int + + master execute thread num + + + + master.exec.task.num + 20 + + int + + master execute task number in parallel + + + + master.heartbeat.interval + 10 + + int + + master heartbeat interval + + + + master.task.commit.retryTimes + 5 + + int + + master commit task retry times + + + + master.task.commit.interval + 1000 + + int + + master commit task interval + + + + master.max.cpuload.avg + 100 + + int + + only less than cpu avg load, master server can work. default value : the number of cpu cores * 2 + + + + master.reserved.memory + 0.3 + only larger than reserved memory, master server can work. default value : physical memory * 1/10, unit is G. + + + + + master.listen.port + 5678 + + int + + master listen port + + + \ No newline at end of file diff --git a/ambari_plugin/common-services/DOLPHIN/1.2.1/configuration/dolphin-quartz.xml b/ambari_plugin/common-services/DOLPHIN/1.3.3/configuration/dolphin-quartz.xml similarity index 91% rename from ambari_plugin/common-services/DOLPHIN/1.2.1/configuration/dolphin-quartz.xml rename to ambari_plugin/common-services/DOLPHIN/1.3.3/configuration/dolphin-quartz.xml index 82b59d8827..7a0c68b051 100644 --- a/ambari_plugin/common-services/DOLPHIN/1.2.1/configuration/dolphin-quartz.xml +++ b/ambari_plugin/common-services/DOLPHIN/1.3.3/configuration/dolphin-quartz.xml @@ -106,26 +106,21 @@ - org.quartz.jobStore.dataSource - myDs - - - - org.quartz.dataSource.myDs.connectionProvider.class - org.apache.dolphinscheduler.server.quartz.DruidConnectionProvider + org.quartz.jobStore.acquireTriggersWithinLock + true + + boolean + - org.quartz.dataSource.myDs.maxConnections - 10 - - int - + org.quartz.jobStore.dataSource + myDs - org.quartz.dataSource.myDs.validationQuery - select 1 + org.quartz.dataSource.myDs.connectionProvider.class + org.apache.dolphinscheduler.service.quartz.DruidConnectionProvider \ No newline at end of file diff --git a/ambari_plugin/common-services/DOLPHIN/1.3.3/configuration/dolphin-worker.xml b/ambari_plugin/common-services/DOLPHIN/1.3.3/configuration/dolphin-worker.xml new file mode 100644 index 0000000000..1ae7a1a765 --- /dev/null +++ b/ambari_plugin/common-services/DOLPHIN/1.3.3/configuration/dolphin-worker.xml @@ -0,0 +1,67 @@ + + + + worker.exec.threads + 100 + + int + + worker execute thread num + + + + worker.heartbeat.interval + 10 + + int + + worker heartbeat interval + + + + worker.max.cpuload.avg + 100 + + int + + only less than cpu avg load, worker server can work. default value : the number of cpu cores * 2 + + + + worker.reserved.memory + 0.3 + only larger than reserved memory, worker server can work. default value : physical memory * 1/10, unit is G. + + + + + worker.listen.port + 1234 + + int + + worker listen port + + + + worker.groups + default + default worker group + + + \ No newline at end of file diff --git a/ambari_plugin/common-services/DOLPHIN/1.3.3/configuration/dolphin-zookeeper.xml b/ambari_plugin/common-services/DOLPHIN/1.3.3/configuration/dolphin-zookeeper.xml new file mode 100644 index 0000000000..e89962d900 --- /dev/null +++ b/ambari_plugin/common-services/DOLPHIN/1.3.3/configuration/dolphin-zookeeper.xml @@ -0,0 +1,76 @@ + + + + zookeeper.dolphinscheduler.root + /dolphinscheduler + + dolphinscheduler root directory + + + + + zookeeper.session.timeout + 300 + + int + + + + + + + zookeeper.connection.timeout + 300 + + int + + + + + + + zookeeper.retry.base.sleep + 100 + + int + + + + + + + zookeeper.retry.max.sleep + 30000 + + int + + + + + + + zookeeper.retry.maxtime + 5 + + int + + + + + + \ No newline at end of file diff --git a/ambari_plugin/common-services/DOLPHIN/1.2.1/metainfo.xml b/ambari_plugin/common-services/DOLPHIN/1.3.3/metainfo.xml similarity index 98% rename from ambari_plugin/common-services/DOLPHIN/1.2.1/metainfo.xml rename to ambari_plugin/common-services/DOLPHIN/1.3.3/metainfo.xml index 0d2bbe3163..074306d5cb 100644 --- a/ambari_plugin/common-services/DOLPHIN/1.2.1/metainfo.xml +++ b/ambari_plugin/common-services/DOLPHIN/1.3.3/metainfo.xml @@ -22,7 +22,7 @@ DOLPHIN Dolphin Scheduler 分布式易扩展的可视化DAG工作流任务调度系统 - 1.2.1 + 1.3.3 DOLPHIN_MASTER @@ -103,7 +103,7 @@ any - apache-dolphinscheduler-incubating-1.2.1* + apache-dolphinscheduler-incubating* @@ -134,4 +134,4 @@ - + \ No newline at end of file diff --git a/ambari_plugin/common-services/DOLPHIN/1.2.1/package/alerts/alert_dolphin_scheduler_status.py b/ambari_plugin/common-services/DOLPHIN/1.3.3/package/alerts/alert_dolphin_scheduler_status.py similarity index 100% rename from ambari_plugin/common-services/DOLPHIN/1.2.1/package/alerts/alert_dolphin_scheduler_status.py rename to ambari_plugin/common-services/DOLPHIN/1.3.3/package/alerts/alert_dolphin_scheduler_status.py diff --git a/ambari_plugin/common-services/DOLPHIN/1.2.1/package/scripts/dolphin_alert_service.py b/ambari_plugin/common-services/DOLPHIN/1.3.3/package/scripts/dolphin_alert_service.py similarity index 92% rename from ambari_plugin/common-services/DOLPHIN/1.2.1/package/scripts/dolphin_alert_service.py rename to ambari_plugin/common-services/DOLPHIN/1.3.3/package/scripts/dolphin_alert_service.py index 62255a3432..e78c38d272 100644 --- a/ambari_plugin/common-services/DOLPHIN/1.2.1/package/scripts/dolphin_alert_service.py +++ b/ambari_plugin/common-services/DOLPHIN/1.3.3/package/scripts/dolphin_alert_service.py @@ -26,7 +26,8 @@ class DolphinAlertService(Script): import params env.set_params(params) self.install_packages(env) - Execute(('chmod', '-R', '777', params.dolphin_home), user=params.dolphin_user, sudo=True) + Execute(('chmod', '-R', '777', params.dolphin_home)) + Execute(('chown', '-R', params.dolphin_user + ":" + params.dolphin_group, params.dolphin_home)) def configure(self, env): import params diff --git a/ambari_plugin/common-services/DOLPHIN/1.2.1/package/scripts/dolphin_api_service.py b/ambari_plugin/common-services/DOLPHIN/1.3.3/package/scripts/dolphin_api_service.py similarity index 93% rename from ambari_plugin/common-services/DOLPHIN/1.2.1/package/scripts/dolphin_api_service.py rename to ambari_plugin/common-services/DOLPHIN/1.3.3/package/scripts/dolphin_api_service.py index bdc18fb602..5a28924a9a 100644 --- a/ambari_plugin/common-services/DOLPHIN/1.2.1/package/scripts/dolphin_api_service.py +++ b/ambari_plugin/common-services/DOLPHIN/1.3.3/package/scripts/dolphin_api_service.py @@ -26,7 +26,8 @@ class DolphinApiService(Script): import params env.set_params(params) self.install_packages(env) - Execute(('chmod', '-R', '777', params.dolphin_home), user=params.dolphin_user, sudo=True) + Execute(('chmod', '-R', '777', params.dolphin_home)) + Execute(('chown', '-R', params.dolphin_user + ":" + params.dolphin_group, params.dolphin_home)) def configure(self, env): import params diff --git a/ambari_plugin/common-services/DOLPHIN/1.2.1/package/scripts/dolphin_env.py b/ambari_plugin/common-services/DOLPHIN/1.3.3/package/scripts/dolphin_env.py similarity index 78% rename from ambari_plugin/common-services/DOLPHIN/1.2.1/package/scripts/dolphin_env.py rename to ambari_plugin/common-services/DOLPHIN/1.3.3/package/scripts/dolphin_env.py index 235605894f..1661d76c75 100644 --- a/ambari_plugin/common-services/DOLPHIN/1.2.1/package/scripts/dolphin_env.py +++ b/ambari_plugin/common-services/DOLPHIN/1.3.3/package/scripts/dolphin_env.py @@ -42,31 +42,12 @@ def dolphin_env(): create_parents=True ) - - Directory(params.dolphin_alert_map['xls.file.path'], - mode=0777, - owner=params.dolphin_user, - group=params.dolphin_group, - create_parents=True - ) Directory(params.dolphin_common_map['data.basedir.path'], mode=0777, owner=params.dolphin_user, group=params.dolphin_group, create_parents=True ) - Directory(params.dolphin_common_map['data.download.basedir.path'], - mode=0777, - owner=params.dolphin_user, - group=params.dolphin_group, - create_parents=True - ) - Directory(params.dolphin_common_map['process.exec.basepath'], - mode=0777, - owner=params.dolphin_user, - group=params.dolphin_group, - create_parents=True - ) File(format(params.dolphin_env_path), @@ -79,11 +60,25 @@ def dolphin_env(): File(format(params.dolphin_bin_dir + "/dolphinscheduler-daemon.sh"), mode=0755, - content=Template("dolphin-daemon.j2"), + content=Template("dolphin-daemon.sh.j2"), owner=params.dolphin_user, group=params.dolphin_group ) + File(format(params.dolphin_conf_dir + "/master.properties"), + mode=0755, + content=Template("master.properties.j2"), + owner=params.dolphin_user, + group=params.dolphin_group + ) + + File(format(params.dolphin_conf_dir + "/worker.properties"), + mode=0755, + content=Template("worker.properties.j2"), + owner=params.dolphin_user, + group=params.dolphin_group + ) + File(format(params.dolphin_conf_dir + "/alert.properties"), mode=0755, @@ -92,9 +87,9 @@ def dolphin_env(): group=params.dolphin_group ) - File(format(params.dolphin_conf_dir + "/application.properties"), + File(format(params.dolphin_conf_dir + "/datasource.properties"), mode=0755, - content=Template("application.properties.j2"), + content=Template("datasource.properties.j2"), owner=params.dolphin_user, group=params.dolphin_group ) @@ -119,3 +114,10 @@ def dolphin_env(): owner=params.dolphin_user, group=params.dolphin_group ) + + File(format(params.dolphin_conf_dir + "/zookeeper.properties"), + mode=0755, + content=Template("zookeeper.properties.j2"), + owner=params.dolphin_user, + group=params.dolphin_group + ) diff --git a/ambari_plugin/common-services/DOLPHIN/1.2.1/package/scripts/dolphin_logger_service.py b/ambari_plugin/common-services/DOLPHIN/1.3.3/package/scripts/dolphin_logger_service.py similarity index 92% rename from ambari_plugin/common-services/DOLPHIN/1.2.1/package/scripts/dolphin_logger_service.py rename to ambari_plugin/common-services/DOLPHIN/1.3.3/package/scripts/dolphin_logger_service.py index f1c19bd66f..fb47e132e1 100644 --- a/ambari_plugin/common-services/DOLPHIN/1.2.1/package/scripts/dolphin_logger_service.py +++ b/ambari_plugin/common-services/DOLPHIN/1.3.3/package/scripts/dolphin_logger_service.py @@ -26,8 +26,8 @@ class DolphinLoggerService(Script): import params env.set_params(params) self.install_packages(env) - Execute(('chmod', '-R', '777', params.dolphin_home), user=params.dolphin_user, sudo=True) - + Execute(('chmod', '-R', '777', params.dolphin_home)) + Execute(('chown', '-R', params.dolphin_user + ":" + params.dolphin_group, params.dolphin_home)) def configure(self, env): import params params.pika_slave = True diff --git a/ambari_plugin/common-services/DOLPHIN/1.2.1/package/scripts/dolphin_master_service.py b/ambari_plugin/common-services/DOLPHIN/1.3.3/package/scripts/dolphin_master_service.py similarity index 92% rename from ambari_plugin/common-services/DOLPHIN/1.2.1/package/scripts/dolphin_master_service.py rename to ambari_plugin/common-services/DOLPHIN/1.3.3/package/scripts/dolphin_master_service.py index 6ee7ecfcf3..8d64935d26 100644 --- a/ambari_plugin/common-services/DOLPHIN/1.2.1/package/scripts/dolphin_master_service.py +++ b/ambari_plugin/common-services/DOLPHIN/1.3.3/package/scripts/dolphin_master_service.py @@ -27,7 +27,8 @@ class DolphinMasterService(Script): import params env.set_params(params) self.install_packages(env) - Execute(('chmod', '-R', '777', params.dolphin_home), user=params.dolphin_user, sudo=True) + Execute(('chmod', '-R', '777', params.dolphin_home)) + Execute(('chown', '-R', params.dolphin_user + ":" + params.dolphin_group, params.dolphin_home)) def configure(self, env): import params diff --git a/ambari_plugin/common-services/DOLPHIN/1.2.1/package/scripts/dolphin_worker_service.py b/ambari_plugin/common-services/DOLPHIN/1.3.3/package/scripts/dolphin_worker_service.py similarity index 92% rename from ambari_plugin/common-services/DOLPHIN/1.2.1/package/scripts/dolphin_worker_service.py rename to ambari_plugin/common-services/DOLPHIN/1.3.3/package/scripts/dolphin_worker_service.py index 2d145ee730..1f542c06c2 100644 --- a/ambari_plugin/common-services/DOLPHIN/1.2.1/package/scripts/dolphin_worker_service.py +++ b/ambari_plugin/common-services/DOLPHIN/1.3.3/package/scripts/dolphin_worker_service.py @@ -26,7 +26,8 @@ class DolphinWorkerService(Script): import params env.set_params(params) self.install_packages(env) - Execute(('chmod', '-R', '777', params.dolphin_home), user=params.dolphin_user, sudo=True) + Execute(('chmod', '-R', '777', params.dolphin_home)) + Execute(('chown', '-R', params.dolphin_user + ":" + params.dolphin_group, params.dolphin_home)) def configure(self, env): import params diff --git a/ambari_plugin/common-services/DOLPHIN/1.2.1/package/scripts/params.py b/ambari_plugin/common-services/DOLPHIN/1.3.3/package/scripts/params.py similarity index 68% rename from ambari_plugin/common-services/DOLPHIN/1.2.1/package/scripts/params.py rename to ambari_plugin/common-services/DOLPHIN/1.3.3/package/scripts/params.py index 049b2cf3ae..5a9994f559 100644 --- a/ambari_plugin/common-services/DOLPHIN/1.2.1/package/scripts/params.py +++ b/ambari_plugin/common-services/DOLPHIN/1.3.3/package/scripts/params.py @@ -54,11 +54,8 @@ dolphin_env_content = dolphin_env_map['dolphinscheduler-env-content'] # database config dolphin_database_config = {} dolphin_database_config['dolphin_database_type'] = dolphin_env_map['dolphin.database.type'] -dolphin_database_config['dolphin_database_host'] = dolphin_env_map['dolphin.database.host'] -dolphin_database_config['dolphin_database_port'] = dolphin_env_map['dolphin.database.port'] dolphin_database_config['dolphin_database_username'] = dolphin_env_map['dolphin.database.username'] dolphin_database_config['dolphin_database_password'] = dolphin_env_map['dolphin.database.password'] - if 'mysql' == dolphin_database_config['dolphin_database_type']: dolphin_database_config['dolphin_database_driver'] = 'com.mysql.jdbc.Driver' dolphin_database_config['driverDelegateClass'] = 'org.quartz.impl.jdbcjobstore.StdJDBCDelegate' @@ -72,6 +69,10 @@ else: + ':' + dolphin_env_map['dolphin.database.port'] \ + '/dolphinscheduler' + + + + # application-alert.properties dolphin_alert_map = {} wechat_push_url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=$token' @@ -79,27 +80,22 @@ wechat_token_url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=$corpId& wechat_team_send_msg = '{\"toparty\":\"{toParty}\",\"agentid\":\"{agentId}\",\"msgtype\":\"text\",\"text\":{\"content\":\"{msg}\"},\"safe\":\"0\"}' wechat_user_send_msg = '{\"touser\":\"{toUser}\",\"agentid\":\"{agentId}\",\"msgtype\":\"markdown\",\"markdown\":{\"content\":\"{msg}\"}}' -dolphin_alert_map['enterprise.wechat.push.ur'] = wechat_push_url -dolphin_alert_map['enterprise.wechat.token.url'] = wechat_token_url -dolphin_alert_map['enterprise.wechat.team.send.msg'] = wechat_team_send_msg -dolphin_alert_map['enterprise.wechat.user.send.msg'] = wechat_user_send_msg -dolphin_alert_map.update(config['configurations']['dolphin-alert']) +dolphin_alert_config_map = config['configurations']['dolphin-alert'] + +if dolphin_alert_config_map['enterprise.wechat.enable']: + dolphin_alert_map['enterprise.wechat.push.ur'] = wechat_push_url + dolphin_alert_map['enterprise.wechat.token.url'] = wechat_token_url + dolphin_alert_map['enterprise.wechat.team.send.msg'] = wechat_team_send_msg + dolphin_alert_map['enterprise.wechat.user.send.msg'] = wechat_user_send_msg + +dolphin_alert_map.update(dolphin_alert_config_map) + + # application-api.properties dolphin_app_api_map = {} -dolphin_app_api_map['logging.config'] = 'classpath:apiserver_logback.xml' -dolphin_app_api_map['spring.messages.basename'] = 'i18n/messages' -dolphin_app_api_map['server.servlet.context-path'] = '/dolphinscheduler/' dolphin_app_api_map.update(config['configurations']['dolphin-application-api']) -# application-dao.properties -dolphin_application_map = {} -dolphin_application_map['spring.datasource.type'] = 'com.alibaba.druid.pool.DruidDataSource' -dolphin_application_map['spring.datasource.driver-class-name'] = dolphin_database_config['dolphin_database_driver'] -dolphin_application_map['spring.datasource.url'] = dolphin_database_config['dolphin_database_url'] -dolphin_application_map['spring.datasource.username'] = dolphin_database_config['dolphin_database_username'] -dolphin_application_map['spring.datasource.password'] = dolphin_database_config['dolphin_database_password'] -dolphin_application_map.update(config['configurations']['dolphin-application']) # common.properties dolphin_common_map = {} @@ -118,33 +114,42 @@ else: dolphin_common_map_tmp = config['configurations']['dolphin-common'] data_basedir_path = dolphin_common_map_tmp['data.basedir.path'] -process_exec_basepath = data_basedir_path + '/exec' -data_download_basedir_path = data_basedir_path + '/download' -dolphin_common_map['process.exec.basepath'] = process_exec_basepath -dolphin_common_map['data.download.basedir.path'] = data_download_basedir_path dolphin_common_map['dolphinscheduler.env.path'] = dolphin_env_path +dolphin_common_map.update(config['configurations']['dolphin-common']) -zookeeperHosts = default("/clusterHostInfo/zookeeper_hosts", []) -if len(zookeeperHosts) > 0 and "clientPort" in config['configurations']['zoo.cfg']: - clientPort = config['configurations']['zoo.cfg']['clientPort'] - zookeeperPort = ":" + clientPort + "," - dolphin_common_map['zookeeper.quorum'] = zookeeperPort.join(zookeeperHosts) + ":" + clientPort +# datasource.properties +dolphin_datasource_map = {} +dolphin_datasource_map['spring.datasource.type'] = 'com.alibaba.druid.pool.DruidDataSource' +dolphin_datasource_map['spring.datasource.driver-class-name'] = dolphin_database_config['dolphin_database_driver'] +dolphin_datasource_map['spring.datasource.url'] = dolphin_database_config['dolphin_database_url'] +dolphin_datasource_map['spring.datasource.username'] = dolphin_database_config['dolphin_database_username'] +dolphin_datasource_map['spring.datasource.password'] = dolphin_database_config['dolphin_database_password'] +dolphin_datasource_map.update(config['configurations']['dolphin-datasource']) -dolphin_common_map.update(config['configurations']['dolphin-common']) +# master.properties +dolphin_master_map = config['configurations']['dolphin-master'] # quartz.properties dolphin_quartz_map = {} dolphin_quartz_map['org.quartz.jobStore.driverDelegateClass'] = dolphin_database_config['driverDelegateClass'] -dolphin_quartz_map['org.quartz.dataSource.myDs.driver'] = dolphin_database_config['dolphin_database_driver'] -dolphin_quartz_map['org.quartz.dataSource.myDs.URL'] = dolphin_database_config['dolphin_database_url'] -dolphin_quartz_map['org.quartz.dataSource.myDs.user'] = dolphin_database_config['dolphin_database_username'] -dolphin_quartz_map['org.quartz.dataSource.myDs.password'] = dolphin_database_config['dolphin_database_password'] dolphin_quartz_map.update(config['configurations']['dolphin-quartz']) -# if 'ganglia_server_host' in config['clusterHostInfo'] and \ -# len(config['clusterHostInfo']['ganglia_server_host'])>0: -# ganglia_installed = True -# ganglia_server = config['clusterHostInfo']['ganglia_server_host'][0] -# ganglia_report_interval = 60 -# else: -# ganglia_installed = False +# worker.properties +dolphin_worker_map = config['configurations']['dolphin-worker'] + +# zookeeper.properties +dolphin_zookeeper_map={} +zookeeperHosts = default("/clusterHostInfo/zookeeper_hosts", []) +if len(zookeeperHosts) > 0 and "clientPort" in config['configurations']['zoo.cfg']: + clientPort = config['configurations']['zoo.cfg']['clientPort'] + zookeeperPort = ":" + clientPort + "," + dolphin_zookeeper_map['zookeeper.quorum'] = zookeeperPort.join(zookeeperHosts) + ":" + clientPort +dolphin_zookeeper_map.update(config['configurations']['dolphin-zookeeper']) +if 'spring.servlet.multipart.max-file-size' in dolphin_app_api_map: + file_size = dolphin_app_api_map['spring.servlet.multipart.max-file-size'] + dolphin_app_api_map['spring.servlet.multipart.max-file-size'] = file_size + "MB" +if 'spring.servlet.multipart.max-request-size' in dolphin_app_api_map: + request_size = dolphin_app_api_map['spring.servlet.multipart.max-request-size'] + dolphin_app_api_map['spring.servlet.multipart.max-request-size'] = request_size + "MB" + + diff --git a/ambari_plugin/common-services/DOLPHIN/1.2.1/package/scripts/service_check.py b/ambari_plugin/common-services/DOLPHIN/1.3.3/package/scripts/service_check.py similarity index 100% rename from ambari_plugin/common-services/DOLPHIN/1.2.1/package/scripts/service_check.py rename to ambari_plugin/common-services/DOLPHIN/1.3.3/package/scripts/service_check.py diff --git a/ambari_plugin/common-services/DOLPHIN/1.2.1/package/scripts/status_params.py b/ambari_plugin/common-services/DOLPHIN/1.3.3/package/scripts/status_params.py similarity index 100% rename from ambari_plugin/common-services/DOLPHIN/1.2.1/package/scripts/status_params.py rename to ambari_plugin/common-services/DOLPHIN/1.3.3/package/scripts/status_params.py diff --git a/ambari_plugin/common-services/DOLPHIN/1.2.1/package/templates/alert.properties.j2 b/ambari_plugin/common-services/DOLPHIN/1.3.3/package/templates/alert.properties.j2 similarity index 100% rename from ambari_plugin/common-services/DOLPHIN/1.2.1/package/templates/alert.properties.j2 rename to ambari_plugin/common-services/DOLPHIN/1.3.3/package/templates/alert.properties.j2 diff --git a/ambari_plugin/common-services/DOLPHIN/1.2.1/package/templates/application-api.properties.j2 b/ambari_plugin/common-services/DOLPHIN/1.3.3/package/templates/application-api.properties.j2 similarity index 100% rename from ambari_plugin/common-services/DOLPHIN/1.2.1/package/templates/application-api.properties.j2 rename to ambari_plugin/common-services/DOLPHIN/1.3.3/package/templates/application-api.properties.j2 diff --git a/ambari_plugin/common-services/DOLPHIN/1.2.1/package/templates/common.properties.j2 b/ambari_plugin/common-services/DOLPHIN/1.3.3/package/templates/common.properties.j2 similarity index 100% rename from ambari_plugin/common-services/DOLPHIN/1.2.1/package/templates/common.properties.j2 rename to ambari_plugin/common-services/DOLPHIN/1.3.3/package/templates/common.properties.j2 diff --git a/ambari_plugin/common-services/DOLPHIN/1.3.3/package/templates/datasource.properties.j2 b/ambari_plugin/common-services/DOLPHIN/1.3.3/package/templates/datasource.properties.j2 new file mode 100644 index 0000000000..40aed83543 --- /dev/null +++ b/ambari_plugin/common-services/DOLPHIN/1.3.3/package/templates/datasource.properties.j2 @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +{% for key, value in dolphin_datasource_map.iteritems() -%} + {{key}}={{value}} +{% endfor %} \ No newline at end of file diff --git a/ambari_plugin/common-services/DOLPHIN/1.2.1/package/templates/dolphin-daemon.j2 b/ambari_plugin/common-services/DOLPHIN/1.3.3/package/templates/dolphin-daemon.sh.j2 similarity index 83% rename from ambari_plugin/common-services/DOLPHIN/1.2.1/package/templates/dolphin-daemon.j2 rename to ambari_plugin/common-services/DOLPHIN/1.3.3/package/templates/dolphin-daemon.sh.j2 index 1dc4bac0ab..c5cc11fb62 100644 --- a/ambari_plugin/common-services/DOLPHIN/1.2.1/package/templates/dolphin-daemon.j2 +++ b/ambari_plugin/common-services/DOLPHIN/1.3.3/package/templates/dolphin-daemon.sh.j2 @@ -48,22 +48,19 @@ pid={{dolphin_pidfile_dir}}/$command.pid cd $DOLPHINSCHEDULER_HOME if [ "$command" = "api-server" ]; then - LOG_FILE="-Dlogging.config={{dolphin_conf_dir}}/apiserver_logback.xml -Dspring.profiles.active=api" + LOG_FILE="-Dlogging.config={{dolphin_conf_dir}}/logback-api.xml -Dspring.profiles.active=api" CLASS=org.apache.dolphinscheduler.api.ApiApplicationServer elif [ "$command" = "master-server" ]; then - LOG_FILE="-Dlogging.config={{dolphin_conf_dir}}/master_logback.xml -Ddruid.mysql.usePingMethod=false" + LOG_FILE="-Dlogging.config={{dolphin_conf_dir}}/logback-master.xml -Ddruid.mysql.usePingMethod=false" CLASS=org.apache.dolphinscheduler.server.master.MasterServer elif [ "$command" = "worker-server" ]; then - LOG_FILE="-Dlogging.config={{dolphin_conf_dir}}/worker_logback.xml -Ddruid.mysql.usePingMethod=false" + LOG_FILE="-Dlogging.config={{dolphin_conf_dir}}/logback-worker.xml -Ddruid.mysql.usePingMethod=false" CLASS=org.apache.dolphinscheduler.server.worker.WorkerServer elif [ "$command" = "alert-server" ]; then - LOG_FILE="-Dlogback.configurationFile={{dolphin_conf_dir}}/alert_logback.xml" + LOG_FILE="-Dlogging.config={{dolphin_conf_dir}}/logback-alert.xml" CLASS=org.apache.dolphinscheduler.alert.AlertServer elif [ "$command" = "logger-server" ]; then - CLASS=org.apache.dolphinscheduler.server.rpc.LoggerServer -elif [ "$command" = "combined-server" ]; then - LOG_FILE="-Dlogging.config={{dolphin_conf_dir}}/combined_logback.xml -Dspring.profiles.active=api -Dserver.is-combined-server=true" - CLASS=org.apache.dolphinscheduler.api.CombinedApplicationServer + CLASS=org.apache.dolphinscheduler.server.log.LoggerServer else echo "Error: No command named \`$command' was found." exit 1 diff --git a/ambari_plugin/common-services/DOLPHIN/1.3.3/package/templates/master.properties.j2 b/ambari_plugin/common-services/DOLPHIN/1.3.3/package/templates/master.properties.j2 new file mode 100644 index 0000000000..d9b85e14cf --- /dev/null +++ b/ambari_plugin/common-services/DOLPHIN/1.3.3/package/templates/master.properties.j2 @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +{% for key, value in dolphin_master_map.iteritems() -%} + {{key}}={{value}} +{% endfor %} \ No newline at end of file diff --git a/ambari_plugin/common-services/DOLPHIN/1.2.1/package/templates/quartz.properties.j2 b/ambari_plugin/common-services/DOLPHIN/1.3.3/package/templates/quartz.properties.j2 similarity index 100% rename from ambari_plugin/common-services/DOLPHIN/1.2.1/package/templates/quartz.properties.j2 rename to ambari_plugin/common-services/DOLPHIN/1.3.3/package/templates/quartz.properties.j2 diff --git a/ambari_plugin/common-services/DOLPHIN/1.3.3/package/templates/worker.properties.j2 b/ambari_plugin/common-services/DOLPHIN/1.3.3/package/templates/worker.properties.j2 new file mode 100644 index 0000000000..a008b74084 --- /dev/null +++ b/ambari_plugin/common-services/DOLPHIN/1.3.3/package/templates/worker.properties.j2 @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +{% for key, value in dolphin_worker_map.iteritems() -%} + {{key}}={{value}} +{% endfor %} \ No newline at end of file diff --git a/ambari_plugin/common-services/DOLPHIN/1.3.3/package/templates/zookeeper.properties.j2 b/ambari_plugin/common-services/DOLPHIN/1.3.3/package/templates/zookeeper.properties.j2 new file mode 100644 index 0000000000..9eb14eaef3 --- /dev/null +++ b/ambari_plugin/common-services/DOLPHIN/1.3.3/package/templates/zookeeper.properties.j2 @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +{% for key, value in dolphin_zookeeper_map.iteritems() -%} + {{key}}={{value}} +{% endfor %} \ No newline at end of file diff --git a/ambari_plugin/common-services/DOLPHIN/1.2.1/quicklinks/quicklinks.json b/ambari_plugin/common-services/DOLPHIN/1.3.3/quicklinks/quicklinks.json old mode 100755 new mode 100644 similarity index 100% rename from ambari_plugin/common-services/DOLPHIN/1.2.1/quicklinks/quicklinks.json rename to ambari_plugin/common-services/DOLPHIN/1.3.3/quicklinks/quicklinks.json diff --git a/ambari_plugin/common-services/DOLPHIN/1.2.1/themes/theme.json b/ambari_plugin/common-services/DOLPHIN/1.3.3/themes/theme.json similarity index 86% rename from ambari_plugin/common-services/DOLPHIN/1.2.1/themes/theme.json rename to ambari_plugin/common-services/DOLPHIN/1.3.3/themes/theme.json index 23e46076aa..953e2323f8 100644 --- a/ambari_plugin/common-services/DOLPHIN/1.2.1/themes/theme.json +++ b/ambari_plugin/common-services/DOLPHIN/1.3.3/themes/theme.json @@ -151,18 +151,40 @@ "subsection-name": "env-row1-col2" }, { - "config": "dolphin-common/res.upload.startup.type", + "config": "dolphin-common/resource.storage.type", "subsection-name": "dynamic-row1-col1" }, + { + "config": "dolphin-common/resource.upload.path", + "subsection-name": "dynamic-row1-col1", + "depends-on": [ + { + "configs":[ + "dolphin-common/resource.storage.type" + ], + "if": "${dolphin-common/resource.storage.type} === HDFS || ${dolphin-common/resource.storage.type} === S3", + "then": { + "property_value_attributes": { + "visible": true + } + }, + "else": { + "property_value_attributes": { + "visible": false + } + } + } + ] + }, { "config": "dolphin-common/hdfs.root.user", "subsection-name": "dynamic-row1-col1", "depends-on": [ { "configs":[ - "dolphin-common/res.upload.startup.type" + "dolphin-common/resource.storage.type" ], - "if": "${dolphin-common/res.upload.startup.type} === HDFS", + "if": "${dolphin-common/resource.storage.type} === HDFS", "then": { "property_value_attributes": { "visible": true @@ -182,9 +204,9 @@ "depends-on": [ { "configs":[ - "dolphin-common/res.upload.startup.type" + "dolphin-common/resource.storage.type" ], - "if": "${dolphin-common/res.upload.startup.type} === HDFS", + "if": "${dolphin-common/resource.storage.type} === HDFS", "then": { "property_value_attributes": { "visible": true @@ -204,9 +226,9 @@ "depends-on": [ { "configs":[ - "dolphin-common/res.upload.startup.type" + "dolphin-common/resource.storage.type" ], - "if": "${dolphin-common/res.upload.startup.type} === HDFS", + "if": "${dolphin-common/resource.storage.type} === HDFS", "then": { "property_value_attributes": { "visible": true @@ -226,9 +248,9 @@ "depends-on": [ { "configs":[ - "dolphin-common/res.upload.startup.type" + "dolphin-common/resource.storage.type" ], - "if": "${dolphin-common/res.upload.startup.type} === S3", + "if": "${dolphin-common/resource.storage.type} === S3", "then": { "property_value_attributes": { "visible": true @@ -248,9 +270,9 @@ "depends-on": [ { "configs":[ - "dolphin-common/res.upload.startup.type" + "dolphin-common/resource.storage.type" ], - "if": "${dolphin-common/res.upload.startup.type} === S3", + "if": "${dolphin-common/resource.storage.type} === S3", "then": { "property_value_attributes": { "visible": true @@ -270,9 +292,9 @@ "depends-on": [ { "configs":[ - "dolphin-common/res.upload.startup.type" + "dolphin-common/resource.storage.type" ], - "if": "${dolphin-common/res.upload.startup.type} === S3", + "if": "${dolphin-common/resource.storage.type} === S3", "then": { "property_value_attributes": { "visible": true @@ -356,6 +378,28 @@ } ] }, + { + "config": "dolphin-common/kerberos.expire.time", + "subsection-name": "dynamic-row1-col2", + "depends-on": [ + { + "configs":[ + "dolphin-common/hadoop.security.authentication.startup.state" + ], + "if": "${dolphin-common/hadoop.security.authentication.startup.state}", + "then": { + "property_value_attributes": { + "visible": true + } + }, + "else": { + "property_value_attributes": { + "visible": false + } + } + } + ] + }, { "config": "dolphin-alert/enterprise.wechat.enable", "subsection-name": "dynamic-row1-col3" @@ -505,11 +549,17 @@ } }, { - "config": "dolphin-common/res.upload.startup.type", + "config": "dolphin-common/resource.storage.type", "widget": { "type": "combo" } }, + { + "config": "dolphin-common/resource.upload.path", + "widget": { + "type": "text-field" + } + }, { "config": "dolphin-common/hdfs.root.user", "widget": { @@ -570,6 +620,12 @@ "type": "text-field" } }, + { + "config": "dolphin-common/kerberos.expire.time", + "widget": { + "type": "text-field" + } + }, { "config": "dolphin-alert/enterprise.wechat.enable", "widget": { diff --git a/ambari_plugin/common-services/DOLPHIN/1.2.1/package/templates/application.properties.j2 b/docker/kubernetes/dolphinscheduler/requirements.yaml similarity index 76% rename from ambari_plugin/common-services/DOLPHIN/1.2.1/package/templates/application.properties.j2 rename to docker/kubernetes/dolphinscheduler/requirements.yaml index 7bb9f8aff3..e219975995 100644 --- a/ambari_plugin/common-services/DOLPHIN/1.2.1/package/templates/application.properties.j2 +++ b/docker/kubernetes/dolphinscheduler/requirements.yaml @@ -14,7 +14,12 @@ # See the License for the specific language governing permissions and # limitations under the License. # - -{% for key, value in dolphin_application_map.iteritems() -%} - {{key}}={{value}} -{% endfor %} \ No newline at end of file +dependencies: +- name: postgresql + version: 8.x.x + repository: https://charts.bitnami.com/bitnami + condition: postgresql.enabled +- name: zookeeper + version: 5.x.x + repository: https://charts.bitnami.com/bitnami + condition: redis.enabled \ No newline at end of file diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/BaseService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/BaseService.java index 4b094ea494..e9b25250a8 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/BaseService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/BaseService.java @@ -95,6 +95,25 @@ public class BaseService { } + /** + * check + * + * @param result result + * @param bool bool + * @param userNoOperationPerm status + * @return check result + */ + protected boolean check(Map result, boolean bool, Status userNoOperationPerm) { + //only admin can operate + if (bool) { + result.put(Constants.STATUS, userNoOperationPerm); + result.put(Constants.MSG, userNoOperationPerm.getMsg()); + return true; + } + return false; + } + + /** * get cookie info by name * diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/AccessTokenServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/AccessTokenServiceImpl.java index 54151d902f..da85621041 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/AccessTokenServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/AccessTokenServiceImpl.java @@ -86,9 +86,14 @@ public class AccessTokenServiceImpl extends BaseService implements AccessTokenSe * @param token token string * @return create result code */ - public Map createToken(int userId, String expireTime, String token) { + public Map createToken(User loginUser, int userId, String expireTime, String token) { Map result = new HashMap<>(5); + if (!hasPerm(loginUser,userId)){ + putMsg(result, Status.USER_NO_OPERATION_PERM); + return result; + } + if (userId <= 0) { throw new IllegalArgumentException("User id should not less than or equals to 0."); } @@ -118,8 +123,12 @@ public class AccessTokenServiceImpl extends BaseService implements AccessTokenSe * @param expireTime token expire time * @return token string */ - public Map generateToken(int userId, String expireTime) { + public Map generateToken(User loginUser, int userId, String expireTime) { Map result = new HashMap<>(5); + if (!hasPerm(loginUser,userId)){ + putMsg(result, Status.USER_NO_OPERATION_PERM); + return result; + } String token = EncryptionUtils.getMd5(userId + expireTime + System.currentTimeMillis()); result.put(Constants.DATA_LIST, token); putMsg(result, Status.SUCCESS); @@ -144,8 +153,8 @@ public class AccessTokenServiceImpl extends BaseService implements AccessTokenSe return result; } - if (loginUser.getId() != accessToken.getUserId() && - loginUser.getUserType() != UserType.ADMIN_USER) { + + if (!hasPerm(loginUser,accessToken.getUserId())){ putMsg(result, Status.USER_NO_OPERATION_PERM); return result; } @@ -164,9 +173,12 @@ public class AccessTokenServiceImpl extends BaseService implements AccessTokenSe * @param token token string * @return update result code */ - public Map updateToken(int id, int userId, String expireTime, String token) { + public Map updateToken(User loginUser, int id, int userId, String expireTime, String token) { Map result = new HashMap<>(5); - + if (!hasPerm(loginUser,userId)){ + putMsg(result, Status.USER_NO_OPERATION_PERM); + return result; + } AccessToken accessToken = accessTokenMapper.selectById(id); if (accessToken == null) { logger.error("access token not exist, access token id {}", id); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java index c71f2980f5..1574e7f0e7 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java @@ -130,7 +130,6 @@ public class LoggerServiceImpl implements LoggerService { logClient.getLogBytes(host, Constants.RPC_PORT, taskInstance.getLogPath())); } - /** * get host * diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index b7ab24bb13..7f1db3ff18 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -49,6 +49,7 @@ import org.apache.dolphinscheduler.common.model.TaskNodeRelation; import org.apache.dolphinscheduler.common.process.ProcessDag; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.ResourceInfo; +import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.DateUtils; @@ -166,7 +167,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements String processDefinitionJson, String desc, String locations, - String connects) throws JsonProcessingException { + String connects) { Map result = new HashMap<>(); Project project = projectMapper.queryByName(projectName); @@ -229,23 +230,34 @@ public class ProcessDefinitionServiceImpl extends BaseService implements /** * get resource ids - * * @param processData process data * @return resource ids */ private String getResourceIds(ProcessData processData) { - return Optional.ofNullable(processData.getTasks()) - .orElse(Collections.emptyList()) - .stream() - .map(taskNode -> TaskParametersUtils.getParameters(taskNode.getType(), taskNode.getParams())) - .filter(Objects::nonNull) - .flatMap(parameters -> parameters.getResourceFilesList().stream()) - .map(ResourceInfo::getId) - .distinct() - .map(Objects::toString) - .collect(Collectors.joining(",")); - } + List tasks = processData.getTasks(); + Set resourceIds = new HashSet<>(); + for (TaskNode taskNode : tasks) { + String taskParameter = taskNode.getParams(); + AbstractParameters params = TaskParametersUtils.getParameters(taskNode.getType(), taskParameter); + if (CollectionUtils.isNotEmpty(params.getResourceFilesList())) { + Set tempSet = params.getResourceFilesList(). + stream() + .filter(t -> t.getId() != 0) + .map(ResourceInfo::getId) + .collect(Collectors.toSet()); + resourceIds.addAll(tempSet); + } + } + StringBuilder sb = new StringBuilder(); + for (int i : resourceIds) { + if (sb.length() > 0) { + sb.append(","); + } + sb.append(i); + } + return sb.toString(); + } /** * query process definition list * @@ -255,7 +267,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements */ public Map queryProcessDefinitionList(User loginUser, String projectName) { - HashMap result = new HashMap<>(); + HashMap result = new HashMap<>(5); Project project = projectMapper.queryByName(projectName); Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); @@ -348,10 +360,15 @@ public class ProcessDefinitionServiceImpl extends BaseService implements * @param connects connects for nodes * @return update result code */ - public Map updateProcessDefinition(User loginUser, String projectName, int id, String name, - String processDefinitionJson, String desc, - String locations, String connects) { - Map result = new HashMap<>(); + public Map updateProcessDefinition(User loginUser, + String projectName, + int id, + String name, + String processDefinitionJson, + String desc, + String locations, + String connects) { + Map result = new HashMap<>(5); Project project = projectMapper.queryByName(projectName); Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); @@ -462,7 +479,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements @Transactional(rollbackFor = RuntimeException.class) public Map deleteProcessDefinitionById(User loginUser, String projectName, Integer processDefinitionId) { - Map result = new HashMap<>(); + Map result = new HashMap<>(5); Project project = projectMapper.queryByName(projectName); Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); @@ -674,6 +691,17 @@ public class ProcessDefinitionServiceImpl extends BaseService implements } } + /** + * get export process metadata string + * @param processDefinitionId process definition id + * @param processDefinition process definition + * @return export process metadata string + */ + public String exportProcessMetaDataStr(Integer processDefinitionId, ProcessDefinition processDefinition) { + //create workflow json file + return JSONUtils.toJsonString(exportProcessMetaData(processDefinitionId, processDefinition)); + } + /** * get export process metadata string * @@ -758,7 +786,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements */ @Transactional(rollbackFor = RuntimeException.class) public Map importProcessDefinition(User loginUser, MultipartFile file, String currentProjectName) { - Map result = new HashMap<>(); + Map result = new HashMap<>(5); String processMetaJson = FileUtils.file2String(file); List processMetaList = JSONUtils.toList(processMetaJson, ProcessMeta.class); @@ -927,7 +955,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements } //recursive sub-process parameter correction map key for old process id value for new process id - Map subProcessIdMap = new HashMap<>(); + Map subProcessIdMap = new HashMap<>(20); List subProcessList = StreamUtils.asStream(jsonArray.elements()) .filter(elem -> checkTaskHasSubProcess(JSONUtils.parseObject(elem.toString()).path("type").asText())) @@ -1215,7 +1243,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements */ public Map queryProcessDefinitionAllByProjectId(Integer projectId) { - HashMap result = new HashMap<>(); + HashMap result = new HashMap<>(5); List resourceList = processDefineMapper.queryAllDefinitionList(projectId); result.put(Constants.DATA_LIST, resourceList); @@ -1425,7 +1453,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements Integer processId, Project targetProject) throws JsonProcessingException { - Map result = new HashMap<>(); + Map result = new HashMap<>(5); ProcessDefinition processDefinition = processDefineMapper.selectById(processId); if (processDefinition == null) { @@ -1444,6 +1472,41 @@ public class ProcessDefinitionServiceImpl extends BaseService implements } } + /** + * copy process definition + * + * @param loginUser login user + * @param projectName project name + * @param processId process definition id + * @return copy result code + */ + public Map copyProcessDefinition(User loginUser, String projectName, Integer processId) { + + Map result = new HashMap<>(5); + Project project = projectMapper.queryByName(projectName); + + Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); + Status resultStatus = (Status) checkResult.get(Constants.STATUS); + if (resultStatus != Status.SUCCESS) { + return checkResult; + } + + ProcessDefinition processDefinition = processDefineMapper.selectById(processId); + if (processDefinition == null) { + putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processId); + return result; + } else { + return createProcessDefinition( + loginUser, + projectName, + processDefinition.getName() + "_copy_" + System.currentTimeMillis(), + processDefinition.getProcessDefinitionJson(), + processDefinition.getDescription(), + processDefinition.getLocations(), + processDefinition.getConnects()); + } + } + /** * batch copy process definition * diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java index 52f0d79ead..52899f5c09 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java @@ -291,6 +291,25 @@ public class TenantServiceImpl extends BaseService implements TenantService { return processInstanceMapper.queryByTenantIdAndStatus(tenant.getId(), Constants.NOT_TERMINATED_STATES); } + /** + * query tenant list + * + * @param tenantCode tenant code + * @return tenant list + */ + public Map queryTenantList(String tenantCode) { + + Map result = new HashMap<>(5); + + List resourceList = tenantMapper.queryByTenantCode(tenantCode); + if (CollectionUtils.isNotEmpty(resourceList)) { + result.put(Constants.DATA_LIST, resourceList); + putMsg(result, Status.SUCCESS); + } else { + putMsg(result, Status.TENANT_NOT_EXIST); + } + } + /** * query tenant list * diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UdfFuncServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UdfFuncServiceTest.java index 9ec24bbb50..884e9b6b36 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UdfFuncServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UdfFuncServiceTest.java @@ -149,9 +149,11 @@ public class UdfFuncServiceTest { } @Test - public void testQueryResourceList(){ - Mockito.when(udfFuncMapper.getUdfFuncByType(1, 1)).thenReturn(getList()); - Map result = udfFuncService.queryResourceList(getLoginUser(),1); + public void testQueryUdfFuncList(){ + User user = getLoginUser(); + user.setUserType(UserType.GENERAL_USER); + Mockito.when(udfFuncMapper.getUdfFuncByType(user.getId(), UdfType.HIVE.ordinal())).thenReturn(getList()); + Map result = udfFuncService.queryUdfFuncList(user,UdfType.HIVE.ordinal()); logger.info(result.toString()); Assert.assertEquals(Status.SUCCESS,result.get(Constants.STATUS)); List udfFuncList = (List) result.get(Constants.DATA_LIST); diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java new file mode 100644 index 0000000000..9cec2766f1 --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.common.enums; + +public enum Event { + ACK, + RESULT; +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/process/ResourceInfo.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/process/ResourceInfo.java index a7fc0839eb..287f7267bc 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/process/ResourceInfo.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/process/ResourceInfo.java @@ -42,6 +42,4 @@ public class ResourceInfo { public void setRes(String res) { this.res = res; } - - } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java index 231dd33146..1b1f0a6c5d 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.common.task.flink; import org.apache.dolphinscheduler.common.enums.ProgramType; import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.AbstractParameters; +import org.apache.dolphinscheduler.common.utils.CollectionUtils; import java.util.ArrayList; import java.util.List; diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/spark/SparkParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/spark/SparkParameters.java index 4e58201bf3..32a2a6b05d 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/spark/SparkParameters.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/spark/SparkParameters.java @@ -216,7 +216,7 @@ public class SparkParameters extends AbstractParameters { @Override public boolean checkParameters() { - return mainJar != null && programType != null && sparkVersion != null; + return mainJar != null && programType != null; } @Override diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java index 7d52dc93f3..cba0151828 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java @@ -131,6 +131,7 @@ public class Command { WarningType warningType, int warningGroupId, Date scheduleTime, + String workerGroup, Priority processInstancePriority) { this.commandType = commandType; this.executorId = executorId; @@ -143,6 +144,7 @@ public class Command { this.failureStrategy = failureStrategy; this.startTime = new Date(); this.updateTime = new Date(); + this.workerGroup = workerGroup; this.processInstancePriority = processInstancePriority; } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java index 1c521ce91e..a9ebbf000c 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java @@ -82,6 +82,20 @@ public interface ProcessInstanceMapper extends BaseMapper { * @param endTime endTime * @return process instance IPage */ + + /** + * process instance page + * @param page page + * @param projectId projectId + * @param processDefinitionId processDefinitionId + * @param searchVal searchVal + * @param executorId executorId + * @param statusArray statusArray + * @param host host + * @param startTime startTime + * @param endTime endTime + * @return process instance page + */ IPage queryProcessInstanceListPaging(Page page, @Param("projectId") int projectId, @Param("processDefinitionId") Integer processDefinitionId, diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ResourceDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ResourceDao.java new file mode 100644 index 0000000000..b47971e1f0 --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ResourceDao.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.upgrade; + +import org.apache.dolphinscheduler.common.utils.ConnectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.util.HashMap; +import java.util.Map; + +/** + * resource dao + */ +public class ResourceDao { + public static final Logger logger = LoggerFactory.getLogger(ProcessDefinitionDao.class); + + /** + * list all resources + * @param conn connection + * @return map that key is full_name and value is id + */ + Map listAllResources(Connection conn){ + Map resourceMap = new HashMap<>(); + + String sql = String.format("SELECT id,full_name FROM t_ds_resources"); + ResultSet rs = null; + PreparedStatement pstmt = null; + try { + pstmt = conn.prepareStatement(sql); + rs = pstmt.executeQuery(); + + while (rs.next()){ + Integer id = rs.getInt(1); + String fullName = rs.getString(2); + resourceMap.put(fullName,id); + } + + } catch (Exception e) { + logger.error(e.getMessage(),e); + throw new RuntimeException("sql: " + sql, e); + } finally { + ConnectionUtils.releaseResource(rs, pstmt, conn); + } + + return resourceMap; + } + +} diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapperTest.java index 47d8d89b40..9399c51f3e 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapperTest.java @@ -33,6 +33,7 @@ import org.springframework.test.annotation.Rollback; import org.springframework.test.context.junit4.SpringRunner; import org.springframework.transaction.annotation.Transactional; +import java.util.ArrayList; import java.util.Arrays; import java.util.Date; import java.util.List; @@ -303,4 +304,16 @@ public class UdfFuncMapperTest { authorizedUdfFunc = udfFuncMapper.listAuthorizedUdfFunc(generalUser1.getId(), udfFuncIds); Assert.assertTrue(authorizedUdfFunc.stream().map(t -> t.getId()).collect(toList()).containsAll(Arrays.asList(udfFuncIds))); } + + @Test + public void batchUpdateUdfFuncTest(){ + //create general user + User generalUser1 = createGeneralUser("user1"); + UdfFunc udfFunc = insertOne(generalUser1); + udfFunc.setResourceName("/updateTest"); + List udfFuncList = new ArrayList<>(); + udfFuncList.add(udfFunc); + Assert.assertTrue(udfFuncMapper.batchUpdateUdfFunc(udfFuncList)>0); + + } } \ No newline at end of file diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java index 14dfe0b750..9803b4b903 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.dao.utils; import com.fasterxml.jackson.core.JsonProcessingException; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.TaskDependType; import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.model.TaskNode; @@ -27,6 +28,7 @@ import org.apache.dolphinscheduler.common.process.ProcessDag; import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.dao.entity.ProcessData; import org.apache.dolphinscheduler.dao.entity.TaskInstance; + import org.junit.Assert; import org.junit.Test; @@ -34,6 +36,8 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; + /** * dag helper test @@ -41,15 +45,17 @@ import java.util.Map; public class DagHelperTest { /** * test task node can submit + * * @throws JsonProcessingException if error throws JsonProcessingException */ @Test public void testTaskNodeCanSubmit() throws JsonProcessingException { - //1->2->3->5 - //4->3 + //1->2->3->5->7 + //4->3->6 DAG dag = generateDag(); TaskNode taskNode3 = dag.getNode("3"); - Map completeTaskList = new HashMap<>(); + Map completeTaskList = new HashMap<>(); + Map skipNodeList = new HashMap<>(); completeTaskList.putIfAbsent("1", new TaskInstance()); Boolean canSubmit = false; @@ -58,27 +64,199 @@ public class DagHelperTest { node2.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN); TaskNode nodex = dag.getNode("4"); nodex.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN); - canSubmit = DagHelper.taskNodeCanSubmit(taskNode3, dag, completeTaskList); + canSubmit = DagHelper.allDependsForbiddenOrEnd(taskNode3, dag, skipNodeList, completeTaskList); Assert.assertEquals(canSubmit, true); // 2forbidden, 3 cannot be submit completeTaskList.putIfAbsent("2", new TaskInstance()); TaskNode nodey = dag.getNode("4"); nodey.setRunFlag(""); - canSubmit = DagHelper.taskNodeCanSubmit(taskNode3, dag, completeTaskList); + canSubmit = DagHelper.allDependsForbiddenOrEnd(taskNode3, dag, skipNodeList, completeTaskList); Assert.assertEquals(canSubmit, false); // 2/3 forbidden submit 5 TaskNode node3 = dag.getNode("3"); node3.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN); + TaskNode node8 = dag.getNode("8"); + node8.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN); TaskNode node5 = dag.getNode("5"); - canSubmit = DagHelper.taskNodeCanSubmit(node5, dag, completeTaskList); + canSubmit = DagHelper.allDependsForbiddenOrEnd(node5, dag, skipNodeList, completeTaskList); Assert.assertEquals(canSubmit, true); - } + } + + /** + * test parse post node list + */ + @Test + public void testParsePostNodeList() throws JsonProcessingException { + DAG dag = generateDag(); + Map completeTaskList = new HashMap<>(); + Map skipNodeList = new HashMap<>(); + + Set postNodes = null; + //complete : null + // expect post: 1/4 + postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList); + Assert.assertEquals(2, postNodes.size()); + Assert.assertTrue(postNodes.contains("1")); + Assert.assertTrue(postNodes.contains("4")); + + //complete : 1 + // expect post: 2/4 + completeTaskList.put("1", new TaskInstance()); + postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList); + Assert.assertEquals(2, postNodes.size()); + Assert.assertTrue(postNodes.contains("2")); + Assert.assertTrue(postNodes.contains("4")); + + // complete : 1/2 + // expect post: 4 + completeTaskList.put("2", new TaskInstance()); + postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList); + Assert.assertEquals(2, postNodes.size()); + Assert.assertTrue(postNodes.contains("4")); + Assert.assertTrue(postNodes.contains("8")); + + // complete : 1/2/4 + // expect post: 3 + completeTaskList.put("4", new TaskInstance()); + postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList); + Assert.assertEquals(2, postNodes.size()); + Assert.assertTrue(postNodes.contains("3")); + Assert.assertTrue(postNodes.contains("8")); + + // complete : 1/2/4/3 + // expect post: 8/6 + completeTaskList.put("3", new TaskInstance()); + postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList); + Assert.assertEquals(2, postNodes.size()); + Assert.assertTrue(postNodes.contains("8")); + Assert.assertTrue(postNodes.contains("6")); + + // complete : 1/2/4/3/8 + // expect post: 6/5 + completeTaskList.put("8", new TaskInstance()); + postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList); + Assert.assertEquals(2, postNodes.size()); + Assert.assertTrue(postNodes.contains("5")); + Assert.assertTrue(postNodes.contains("6")); + // complete : 1/2/4/3/5/6/8 + // expect post: 7 + completeTaskList.put("6", new TaskInstance()); + completeTaskList.put("5", new TaskInstance()); + postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList); + Assert.assertEquals(1, postNodes.size()); + Assert.assertTrue(postNodes.contains("7")); + } + + /** + * test forbidden post node + * @throws JsonProcessingException + */ + @Test + public void testForbiddenPostNode() throws JsonProcessingException { + DAG dag = generateDag(); + Map completeTaskList = new HashMap<>(); + Map skipNodeList = new HashMap<>(); + Set postNodes = null; + // dag: 1-2-3-5-7 4-3-6 2-8-5-7 + // forbid:2 complete:1 post:4/8 + completeTaskList.put("1", new TaskInstance()); + TaskNode node2 = dag.getNode("2"); + node2.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN); + postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList); + Assert.assertEquals(2, postNodes.size()); + Assert.assertTrue(postNodes.contains("4")); + Assert.assertTrue(postNodes.contains("8")); + + //forbid:2/4 complete:1 post:3/8 + TaskNode node4 = dag.getNode("4"); + node4.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN); + postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList); + Assert.assertEquals(2, postNodes.size()); + Assert.assertTrue(postNodes.contains("3")); + Assert.assertTrue(postNodes.contains("8")); + + //forbid:2/4/5 complete:1/8 post:3 + completeTaskList.put("8", new TaskInstance()); + TaskNode node5 = dag.getNode("5"); + node5.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN); + postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList); + Assert.assertEquals(1, postNodes.size()); + Assert.assertTrue(postNodes.contains("3")); + } + + /** + * test condition post node + * @throws JsonProcessingException + */ + @Test + public void testConditionPostNode() throws JsonProcessingException { + DAG dag = generateDag(); + Map completeTaskList = new HashMap<>(); + Map skipNodeList = new HashMap<>(); + Set postNodes = null; + // dag: 1-2-3-5-7 4-3-6 2-8-5-7 + // 3-if + completeTaskList.put("1", new TaskInstance()); + completeTaskList.put("2", new TaskInstance()); + completeTaskList.put("4", new TaskInstance()); + TaskNode node3 = dag.getNode("3"); + node3.setType("CONDITIONS"); + node3.setConditionResult("{\n" + + " \"successNode\": [5\n" + + " ],\n" + + " \"failedNode\": [6\n" + + " ]\n" + + " }"); + completeTaskList.remove("3"); + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setState(ExecutionStatus.SUCCESS); + //complete 1/2/3/4 expect:8 + completeTaskList.put("3", taskInstance); + postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList); + Assert.assertEquals(1, postNodes.size()); + Assert.assertTrue(postNodes.contains("8")); + + //2.complete 1/2/3/4/8 expect:5 skip:6 + completeTaskList.put("8", new TaskInstance()); + postNodes = DagHelper.parsePostNodes(null ,skipNodeList, dag, completeTaskList); + Assert.assertTrue(postNodes.contains("5")); + Assert.assertEquals(1, skipNodeList.size()); + Assert.assertTrue(skipNodeList.containsKey("6")); + + // 3.complete 1/2/3/4/5/8 expect post:7 skip:6 + skipNodeList.clear(); + TaskInstance taskInstance1 = new TaskInstance(); + taskInstance.setState(ExecutionStatus.SUCCESS); + completeTaskList.put("5", taskInstance1); + postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList); + Assert.assertEquals(1, postNodes.size()); + Assert.assertTrue(postNodes.contains("7")); + Assert.assertEquals(1, skipNodeList.size()); + Assert.assertTrue(skipNodeList.containsKey("6")); + + // dag: 1-2-3-5-7 4-3-6 + // 3-if , complete:1/2/3/4 + // 1.failure:3 expect post:6 skip:5/7 + skipNodeList.clear(); + completeTaskList.remove("3"); + taskInstance = new TaskInstance(); + taskInstance.setState(ExecutionStatus.FAILURE); + completeTaskList.put("3", taskInstance); + postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList); + Assert.assertEquals(1, postNodes.size()); + Assert.assertTrue(postNodes.contains("6")); + Assert.assertEquals(2, skipNodeList.size()); + Assert.assertTrue(skipNodeList.containsKey("5")); + Assert.assertTrue(skipNodeList.containsKey("7")); + } /** - * 1->2->3->5 - * 4->3 + * 1->2->3->5->7 + * 4->3->6 + * 2->8->5->7 + * * @return dag * @throws JsonProcessingException if error throws JsonProcessingException */ @@ -87,11 +265,13 @@ public class DagHelperTest { TaskNode node1 = new TaskNode(); node1.setId("1"); node1.setName("1"); + node1.setType("SHELL"); taskNodeList.add(node1); TaskNode node2 = new TaskNode(); node2.setId("2"); node2.setName("2"); + node2.setType("SHELL"); List dep2 = new ArrayList<>(); dep2.add("1"); node2.setDepList(dep2); @@ -101,11 +281,13 @@ public class DagHelperTest { TaskNode node4 = new TaskNode(); node4.setId("4"); node4.setName("4"); + node4.setType("SHELL"); taskNodeList.add(node4); TaskNode node3 = new TaskNode(); node3.setId("3"); node3.setName("3"); + node3.setType("SHELL"); List dep3 = new ArrayList<>(); dep3.add("2"); dep3.add("4"); @@ -115,20 +297,48 @@ public class DagHelperTest { TaskNode node5 = new TaskNode(); node5.setId("5"); node5.setName("5"); + node5.setType("SHELL"); List dep5 = new ArrayList<>(); dep5.add("3"); + dep5.add("8"); node5.setDepList(dep5); taskNodeList.add(node5); + TaskNode node6 = new TaskNode(); + node6.setId("6"); + node6.setName("6"); + node6.setType("SHELL"); + List dep6 = new ArrayList<>(); + dep6.add("3"); + node6.setDepList(dep6); + taskNodeList.add(node6); + + TaskNode node7 = new TaskNode(); + node7.setId("7"); + node7.setName("7"); + node7.setType("SHELL"); + List dep7 = new ArrayList<>(); + dep7.add("5"); + node7.setDepList(dep7); + taskNodeList.add(node7); + + TaskNode node8 = new TaskNode(); + node8.setId("8"); + node8.setName("8"); + node8.setType("SHELL"); + List dep8 = new ArrayList<>(); + dep8.add("2"); + node8.setDepList(dep8); + taskNodeList.add(node8); + List startNodes = new ArrayList<>(); - List recoveryNodes = new ArrayList<>(); + List recoveryNodes = new ArrayList<>(); List destTaskNodeList = DagHelper.generateFlowNodeListByStartNode(taskNodeList, startNodes, recoveryNodes, TaskDependType.TASK_POST); - List taskNodeRelations =DagHelper.generateRelationListByFlowNodes(destTaskNodeList); + List taskNodeRelations = DagHelper.generateRelationListByFlowNodes(destTaskNodeList); ProcessDag processDag = new ProcessDag(); processDag.setEdges(taskNodeRelations); processDag.setNodes(destTaskNodeList); - return DagHelper.buildDagGraph(processDag); } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/DBTaskAckCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/DBTaskAckCommand.java new file mode 100644 index 0000000000..f37eb979fc --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/DBTaskAckCommand.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.remote.command; + +import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; + +import java.io.Serializable; + +/** + * db task ack request command + */ +public class DBTaskAckCommand implements Serializable { + + private int taskInstanceId; + private int status; + + public DBTaskAckCommand(int status,int taskInstanceId) { + this.status = status; + this.taskInstanceId = taskInstanceId; + } + + public int getTaskInstanceId() { + return taskInstanceId; + } + + public void setTaskInstanceId(int taskInstanceId) { + this.taskInstanceId = taskInstanceId; + } + + public int getStatus() { + return status; + } + + public void setStatus(int status) { + this.status = status; + } + + /** + * package response command + * @return command + */ + public Command convert2Command(){ + Command command = new Command(); + command.setType(CommandType.DB_TASK_ACK); + byte[] body = FastJsonSerializer.serialize(this); + command.setBody(body); + return command; + } + + + @Override + public String toString() { + return "DBTaskAckCommand{" + + "taskInstanceId=" + taskInstanceId + + ", status=" + status + + '}'; + } +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/DBTaskResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/DBTaskResponseCommand.java new file mode 100644 index 0000000000..a64029822c --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/DBTaskResponseCommand.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.remote.command; + +import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; + +import java.io.Serializable; + +/** + * db task final result response command + */ +public class DBTaskResponseCommand implements Serializable { + + private int taskInstanceId; + private int status; + + public DBTaskResponseCommand(int status,int taskInstanceId) { + this.status = status; + this.taskInstanceId = taskInstanceId; + } + + public int getStatus() { + return status; + } + + public void setStatus(int status) { + this.status = status; + } + + public int getTaskInstanceId() { + return taskInstanceId; + } + + public void setTaskInstanceId(int taskInstanceId) { + this.taskInstanceId = taskInstanceId; + } + + /** + * package response command + * @return command + */ + public Command convert2Command(){ + Command command = new Command(); + command.setType(CommandType.DB_TASK_RESPONSE); + byte[] body = FastJsonSerializer.serialize(this); + command.setBody(body); + return command; + } + + @Override + public String toString() { + return "DBTaskResponseCommand{" + + "taskInstanceId=" + taskInstanceId + + ", status=" + status + + '}'; + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/TaskLogAppender.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/TaskLogAppender.java new file mode 100644 index 0000000000..0819b9fce3 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/TaskLogAppender.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.log; + +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.FileAppender; +import org.slf4j.Marker; + +import static ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER; + +/** + * Task log appender + */ +public class TaskLogAppender extends FileAppender{ + @Override + protected void append(ILoggingEvent event) { + Marker marker = event.getMarker(); + if (marker !=null) { + if (marker.equals(FINALIZE_SESSION_MARKER)) { + stop(); + } + } + super.subAppend(event); + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java deleted file mode 100644 index 021f10d444..0000000000 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java +++ /dev/null @@ -1,162 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dolphinscheduler.server.master.runner; - -import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.DependResult; -import org.apache.dolphinscheduler.common.enums.ExecutionStatus; -import org.apache.dolphinscheduler.common.model.DependentItem; -import org.apache.dolphinscheduler.common.model.DependentTaskModel; -import org.apache.dolphinscheduler.common.task.dependent.DependentParameters; -import org.apache.dolphinscheduler.common.utils.DependentUtils; -import org.apache.dolphinscheduler.common.utils.*; -import org.apache.dolphinscheduler.common.utils.LoggerUtils; -import org.apache.dolphinscheduler.common.utils.NetUtils; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.server.utils.LogUtils; - -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -public class ConditionsTaskExecThread extends MasterBaseTaskExecThread { - - - /** - * dependent parameters - */ - private DependentParameters dependentParameters; - - /** - * complete task map - */ - private Map completeTaskList = new ConcurrentHashMap<>(); - - /** - * condition result - */ - private DependResult conditionResult; - - /** - * constructor of MasterBaseTaskExecThread - * - * @param taskInstance task instance - */ - public ConditionsTaskExecThread(TaskInstance taskInstance) { - super(taskInstance); - taskInstance.setStartTime(new Date()); - } - - @Override - public Boolean submitWaitComplete() { - try{ - this.taskInstance = submit(); - logger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX, - taskInstance.getProcessDefinitionId(), - taskInstance.getProcessInstanceId(), - taskInstance.getId())); - String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, processService.formatTaskAppId(this.taskInstance)); - Thread.currentThread().setName(threadLoggerInfoName); - initTaskParameters(); - logger.info("dependent task start"); - waitTaskQuit(); - updateTaskState(); - }catch (Exception e){ - logger.error("conditions task run exception" , e); - } - return true; - } - - private void waitTaskQuit() { - List taskInstances = processService.findValidTaskListByProcessId( - taskInstance.getProcessInstanceId() - ); - for(TaskInstance task : taskInstances){ - completeTaskList.putIfAbsent(task.getName(), task.getState()); - } - - List modelResultList = new ArrayList<>(); - for(DependentTaskModel dependentTaskModel : dependentParameters.getDependTaskList()){ - - List itemDependResult = new ArrayList<>(); - for(DependentItem item : dependentTaskModel.getDependItemList()){ - itemDependResult.add(getDependResultForItem(item)); - } - DependResult modelResult = DependentUtils.getDependResultForRelation(dependentTaskModel.getRelation(), itemDependResult); - modelResultList.add(modelResult); - } - conditionResult = DependentUtils.getDependResultForRelation( - dependentParameters.getRelation(), modelResultList - ); - logger.info("the conditions task depend result : {}", conditionResult); - } - - /** - * - */ - private void updateTaskState() { - ExecutionStatus status; - if(this.cancel){ - status = ExecutionStatus.KILL; - }else{ - status = (conditionResult == DependResult.SUCCESS) ? ExecutionStatus.SUCCESS : ExecutionStatus.FAILURE; - } - taskInstance.setState(status); - taskInstance.setEndTime(new Date()); - processService.updateTaskInstance(taskInstance); - } - - private void initTaskParameters() { - this.taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance)); - this.taskInstance.setHost(NetUtils.getHost() + Constants.COLON + masterConfig.getListenPort()); - taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION); - taskInstance.setStartTime(new Date()); - this.processService.saveTaskInstance(taskInstance); - - this.dependentParameters = JSONUtils.parseObject(this.taskInstance.getDependency(), DependentParameters.class); - } - - - /** - * depend result for depend item - * @param item - * @return - */ - private DependResult getDependResultForItem(DependentItem item){ - - DependResult dependResult = DependResult.SUCCESS; - if(!completeTaskList.containsKey(item.getDepTasks())){ - logger.info("depend item: {} have not completed yet.", item.getDepTasks()); - dependResult = DependResult.FAILED; - return dependResult; - } - ExecutionStatus executionStatus = completeTaskList.get(item.getDepTasks()); - if(executionStatus != item.getStatus()){ - logger.info("depend item : {} expect status: {}, actual status: {}" ,item.getDepTasks(), item.getStatus(), executionStatus); - dependResult = DependResult.FAILED; - } - logger.info("dependent item complete {} {},{}", - Constants.DEPENDENT_SPLIT, item.getDepTasks(), dependResult); - return dependResult; - } - - -} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/SubProcessTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/SubProcessTaskExecThread.java index 2b1508dc44..74b1c2f271 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/SubProcessTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/SubProcessTaskExecThread.java @@ -130,19 +130,20 @@ public class SubProcessTaskExecThread extends MasterBaseTaskExecThread { while (Stopper.isRunning()) { // waiting for subflow process instance establishment if (subProcessInstance == null) { - Thread.sleep(Constants.SLEEP_TIME_MILLIS); - if(!setTaskInstanceState()){ continue; } } subProcessInstance = processService.findProcessInstanceById(subProcessInstance.getId()); + if (checkTaskTimeout()) { + this.checkTimeoutFlag = !alertTimeout(); + handleTimeoutFailed(); + } updateParentProcessState(); if (subProcessInstance.getState().typeIsFinished()){ break; } - if(this.processInstance.getState() == ExecutionStatus.READY_PAUSE){ // parent process "ready to pause" , child process "pause" pauseSubProcess(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java index 71c7d959e1..7f76baaa52 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java @@ -104,9 +104,7 @@ public class DependentExecute { ProcessInstance processInstance = findLastProcessInterval(dependentItem.getDefinitionId(), dateInterval); if(processInstance == null){ - logger.error("cannot find the right process instance: definition id:{}, start:{}, end:{}", - dependentItem.getDefinitionId(), dateInterval.getStartTime(), dateInterval.getEndTime() ); - return DependResult.FAILED; + return DependResult.WAITING; } // need to check workflow for updates, so get all task and check the task state if(dependentItem.getDepTasks().equals(Constants.DEPENDENT_ALL)){ diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponceCache.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponceCache.java new file mode 100644 index 0000000000..3639b8eba3 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponceCache.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.cache; + +import org.apache.dolphinscheduler.common.enums.Event; +import org.apache.dolphinscheduler.remote.command.Command; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Responce Cache : cache worker send master result + */ +public class ResponceCache { + + private static final ResponceCache instance = new ResponceCache(); + + private ResponceCache(){} + + public static ResponceCache get(){ + return instance; + } + + private Map ackCache = new ConcurrentHashMap<>(); + private Map responseCache = new ConcurrentHashMap<>(); + + + /** + * cache response + * @param taskInstanceId taskInstanceId + * @param command command + * @param event event ACK/RESULT + */ + public void cache(Integer taskInstanceId, Command command, Event event){ + switch (event){ + case ACK: + ackCache.put(taskInstanceId,command); + break; + case RESULT: + responseCache.put(taskInstanceId,command); + break; + default: + throw new IllegalArgumentException("invalid event type : " + event); + } + } + + + /** + * remove ack cache + * @param taskInstanceId taskInstanceId + */ + public void removeAckCache(Integer taskInstanceId){ + ackCache.remove(taskInstanceId); + } + + /** + * remove reponse cache + * @param taskInstanceId taskInstanceId + */ + public void removeResponseCache(Integer taskInstanceId){ + responseCache.remove(taskInstanceId); + } + + /** + * getAckCache + * @return getAckCache + */ + public Map getAckCache(){ + return ackCache; + } + + /** + * getResponseCache + * @return getResponseCache + */ + public Map getResponseCache(){ + return responseCache; + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java new file mode 100644 index 0000000000..1ccd092554 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.processor; + +import io.netty.channel.Channel; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.utils.Preconditions; +import org.apache.dolphinscheduler.remote.command.*; +import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; +import org.apache.dolphinscheduler.remote.utils.JsonSerializer; +import org.apache.dolphinscheduler.server.worker.cache.ResponceCache; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * db task ack processor + */ +public class DBTaskAckProcessor implements NettyRequestProcessor { + + private final Logger logger = LoggerFactory.getLogger(DBTaskAckProcessor.class); + + + @Override + public void process(Channel channel, Command command) { + Preconditions.checkArgument(CommandType.DB_TASK_ACK == command.getType(), + String.format("invalid command type : %s", command.getType())); + + DBTaskAckCommand taskAckCommand = JsonSerializer.deserialize( + command.getBody(), DBTaskAckCommand.class); + + if (taskAckCommand == null){ + return; + } + + if (taskAckCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()){ + ResponceCache.get().removeAckCache(taskAckCommand.getTaskInstanceId()); + } + } + + +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java new file mode 100644 index 0000000000..52536ab7be --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.processor; + +import io.netty.channel.Channel; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.utils.Preconditions; +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.command.CommandType; +import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand; +import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; +import org.apache.dolphinscheduler.remote.utils.JsonSerializer; +import org.apache.dolphinscheduler.server.worker.cache.ResponceCache; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * db task response processor + */ +public class DBTaskResponseProcessor implements NettyRequestProcessor { + + private final Logger logger = LoggerFactory.getLogger(DBTaskResponseProcessor.class); + + + @Override + public void process(Channel channel, Command command) { + Preconditions.checkArgument(CommandType.DB_TASK_RESPONSE == command.getType(), + String.format("invalid command type : %s", command.getType())); + + DBTaskResponseCommand taskResponseCommand = JsonSerializer.deserialize( + command.getBody(), DBTaskResponseCommand.class); + + if (taskResponseCommand == null){ + return; + } + + if (taskResponseCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()){ + ResponceCache.get().removeResponseCache(taskResponseCommand.getTaskInstanceId()); + } + } + + +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java new file mode 100644 index 0000000000..ec79238d39 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.runner; + +import org.apache.dolphinscheduler.common.thread.Stopper; + +import org.apache.dolphinscheduler.common.thread.ThreadUtils; +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.server.worker.cache.ResponceCache; +import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import java.util.Map; + +/** + * Retry Report Task Status Thread + */ +@Component +public class RetryReportTaskStatusThread implements Runnable { + + private final Logger logger = LoggerFactory.getLogger(RetryReportTaskStatusThread.class); + + /** + * every 5 minutes + */ + private static long RETRY_REPORT_TASK_STATUS_INTERVAL = 5 * 60 * 1000L; + /** + * task callback service + */ + private final TaskCallbackService taskCallbackService; + + public void start(){ + Thread thread = new Thread(this,"RetryReportTaskStatusThread"); + thread.start(); + } + + public RetryReportTaskStatusThread(){ + this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class); + } + + /** + * retry ack/response + */ + @Override + public void run() { + ResponceCache responceCache = ResponceCache.get(); + + while (Stopper.isRunning()){ + + // sleep 5 minutes + ThreadUtils.sleep(RETRY_REPORT_TASK_STATUS_INTERVAL); + + try { + if (!responceCache.getAckCache().isEmpty()){ + Map ackCache = responceCache.getAckCache(); + for (Map.Entry entry : ackCache.entrySet()){ + Integer taskInstanceId = entry.getKey(); + Command ackCommand = entry.getValue(); + taskCallbackService.sendAck(taskInstanceId,ackCommand); + } + } + + if (!responceCache.getResponseCache().isEmpty()){ + Map responseCache = responceCache.getResponseCache(); + for (Map.Entry entry : responseCache.entrySet()){ + Integer taskInstanceId = entry.getKey(); + Command responseCommand = entry.getValue(); + taskCallbackService.sendResult(taskInstanceId,responseCommand); + } + } + }catch (Exception e){ + logger.warn("retry report task status error", e); + } + } + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java index dddd1a64b7..662dc13414 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java @@ -16,46 +16,34 @@ */ package org.apache.dolphinscheduler.server.worker.task; -import static org.apache.dolphinscheduler.common.Constants.EXIT_CODE_FAILURE; -import static org.apache.dolphinscheduler.common.Constants.EXIT_CODE_SUCCESS; - import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.ThreadUtils; -import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.HadoopUtils; -import org.apache.dolphinscheduler.common.utils.LoggerUtils; -import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; -import org.apache.dolphinscheduler.common.utils.process.ProcessBuilderForWin32; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager; import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.apache.dolphinscheduler.service.process.ProcessService; +import org.slf4j.Logger; -import java.io.BufferedReader; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStreamReader; +import java.io.*; import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; +import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.regex.Matcher; import java.util.regex.Pattern; -import org.slf4j.Logger; - -import com.sun.jna.platform.win32.Kernel32; -import com.sun.jna.platform.win32.WinNT; +import static org.apache.dolphinscheduler.common.Constants.EXIT_CODE_FAILURE; +import static org.apache.dolphinscheduler.common.Constants.EXIT_CODE_SUCCESS; /** * abstract command executor @@ -117,45 +105,24 @@ public abstract class AbstractCommandExecutor { // setting up user to run commands List command = new LinkedList<>(); - if (OSUtils.isWindows()) { - //init process builder - ProcessBuilderForWin32 processBuilder = new ProcessBuilderForWin32(); - // setting up a working directory - processBuilder.directory(new File(taskExecutionContext.getExecutePath())); - // setting up a username and password - processBuilder.user(taskExecutionContext.getTenantCode(), StringUtils.EMPTY); - // merge error information to standard output stream - processBuilder.redirectErrorStream(true); - - // setting up user to run commands - command.add(commandInterpreter()); - command.add("/c"); - command.addAll(commandOptions()); - command.add(commandFile); - - // setting commands - processBuilder.command(command); - process = processBuilder.start(); - } else { - //init process builder - ProcessBuilder processBuilder = new ProcessBuilder(); - // setting up a working directory - processBuilder.directory(new File(taskExecutionContext.getExecutePath())); - // merge error information to standard output stream - processBuilder.redirectErrorStream(true); - - // setting up user to run commands - command.add("sudo"); - command.add("-u"); - command.add(taskExecutionContext.getTenantCode()); - command.add(commandInterpreter()); - command.addAll(commandOptions()); - command.add(commandFile); - - // setting commands - processBuilder.command(command); - process = processBuilder.start(); - } + //init process builder + ProcessBuilder processBuilder = new ProcessBuilder(); + // setting up a working directory + processBuilder.directory(new File(taskExecutionContext.getExecutePath())); + // merge error information to standard output stream + processBuilder.redirectErrorStream(true); + + // setting up user to run commands + command.add("sudo"); + command.add("-u"); + command.add(taskExecutionContext.getTenantCode()); + command.add(commandInterpreter()); + command.addAll(commandOptions()); + command.add(commandFile); + + // setting commands + processBuilder.command(command); + process = processBuilder.start(); // print command printCommand(command); @@ -235,10 +202,7 @@ public abstract class AbstractCommandExecutor { return result; } - public String getVarPool() { - return varPool.toString(); - } - + /** * cancel application * @throws Exception exception @@ -316,20 +280,30 @@ public abstract class AbstractCommandExecutor { * @param commands process builder */ private void printCommand(List commands) { - String cmdStr = ProcessUtils.buildCommandStr(commands); - logger.info("task run command:\n{}", cmdStr); + String cmdStr; + + try { + cmdStr = ProcessUtils.buildCommandStr(commands); + logger.info("task run command:\n{}", cmdStr); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } } /** * clear */ private void clear() { + + List markerList = new ArrayList<>(); + markerList.add(ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER.toString()); + if (!logBuffer.isEmpty()) { // log handle logHandler.accept(logBuffer); - logBuffer.clear(); } + logHandler.accept(markerList); } /** @@ -479,12 +453,13 @@ public abstract class AbstractCommandExecutor { /** - * get remain time?s? + * get remain time(s) * * @return remain time */ private long getRemaintime() { - long remainTime = DateUtils.getRemainTime(taskExecutionContext.getStartTime(), taskExecutionContext.getTaskTimeout()); + long usedTime = (System.currentTimeMillis() - taskExecutionContext.getStartTime().getTime()) / 1000; + long remainTime = taskExecutionContext.getTaskTimeout() - usedTime; if (remainTime < 0) { throw new RuntimeException("task execution time out"); @@ -506,12 +481,7 @@ public abstract class AbstractCommandExecutor { Field f = process.getClass().getDeclaredField(Constants.PID); f.setAccessible(true); - if (OSUtils.isWindows()) { - WinNT.HANDLE handle = (WinNT.HANDLE) f.get(process); - processId = Kernel32.INSTANCE.GetProcessId(handle); - } else { - processId = f.getInt(process); - } + processId = f.getInt(process); } catch (Throwable e) { logger.error(e.getMessage(), e); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java index 1a66349817..fe60f4a82c 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java @@ -17,7 +17,10 @@ package org.apache.dolphinscheduler.server.worker.task; import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.*; +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.enums.TaskRecordStatus; +import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.conditions.ConditionsParameters; @@ -30,18 +33,18 @@ import org.apache.dolphinscheduler.common.task.shell.ShellParameters; import org.apache.dolphinscheduler.common.task.spark.SparkParameters; import org.apache.dolphinscheduler.common.task.sql.SqlParameters; import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; -import org.apache.dolphinscheduler.common.utils.*; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.TaskRecordDao; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; -import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; +import static ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER; + /** * executive task */ @@ -51,7 +54,7 @@ public abstract class AbstractTask { * varPool string */ protected String varPool; - + /** * taskExecutionContext **/ @@ -123,17 +126,20 @@ public abstract class AbstractTask { */ public void logHandle(List logs) { // note that the "new line" is added here to facilitate log parsing - logger.info(" -> {}", String.join("\n\t", logs)); + if (logs.contains(FINALIZE_SESSION_MARKER.toString())) { + logger.info(FINALIZE_SESSION_MARKER, FINALIZE_SESSION_MARKER.toString()); + } else { + logger.info(" -> {}", String.join("\n\t", logs)); + } } public void setVarPool(String varPool) { this.varPool = varPool; } - public String getVarPool() { return varPool; } - + /** * get exit status code * @return exit status code diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterCommandTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterCommandTest.java index d541f43a3b..d402afcee2 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterCommandTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterCommandTest.java @@ -111,29 +111,4 @@ public class MasterCommandTest { } - @Test - public void testDagHelper(){ - - ProcessDefinition processDefinition = processDefinitionMapper.selectById(19); - - try { - ProcessDag processDag = DagHelper.generateFlowDag(processDefinition.getProcessDefinitionJson(), - new ArrayList<>(), new ArrayList<>(), TaskDependType.TASK_POST); - - DAG dag = DagHelper.buildDagGraph(processDag); - Collection start = DagHelper.getStartVertex("1", dag, null); - - System.out.println(start.toString()); - - Map forbidden = DagHelper.getForbiddenTaskNodeMaps(processDefinition.getProcessDefinitionJson()); - System.out.println(forbidden); - } catch (Exception e) { - e.printStackTrace(); - } - - } - - - - } diff --git a/dolphinscheduler-ui/build/webpack.config.prod.js b/dolphinscheduler-ui/build/webpack.config.prod.js index 4bb90d54d1..1024ac6724 100644 --- a/dolphinscheduler-ui/build/webpack.config.prod.js +++ b/dolphinscheduler-ui/build/webpack.config.prod.js @@ -51,12 +51,7 @@ const config = merge.smart(baseConfig, { minimizer: [ new TerserPlugin({ terserOptions: { - compress: { - warnings: false, - drop_console: true, - drop_debugger: true, - pure_funcs: ['console.log'] - } + compress: {} }, cache: true, parallel: true, diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/log.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/log.vue index 7874b53885..838f429bd3 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/log.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/log.vue @@ -303,25 +303,27 @@ color: #0097e0; font-size: 12px; margin-left: 10px; - i { + em { + text-decoration: none !important; vertical-align: middle; } } .clock { - >i { + >em { font-size: 20px; vertical-align: middle; transform: scale(1); } } .refresh-log { - >i { + >em { + text-decoration: none; font-size: 20px; vertical-align: middle; transform: scale(1); } &.active { - >i { + >em { -webkit-transition-property: -webkit-transform; -webkit-transition-duration: 1s; -moz-transition-property: -moz-transform; @@ -368,5 +370,16 @@ } } } - + @-webkit-keyframes rotateloading{from{-webkit-transform: rotate(0deg)} + to{-webkit-transform: rotate(360deg)} + } + @-moz-keyframes rotateloading{from{-moz-transform: rotate(0deg)} + to{-moz-transform: rotate(359deg)} + } + @-o-keyframes rotateloading{from{-o-transform: rotate(0deg)} + to{-o-transform: rotate(359deg)} + } + @keyframes rotateloading{from{transform: rotate(0deg)} + to{transform: rotate(359deg)} + } diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/index/_source/processStateCount.vue b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/index/_source/processStateCount.vue index d121034932..974d57ff95 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/index/_source/processStateCount.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/index/_source/processStateCount.vue @@ -19,7 +19,7 @@
-
+
@@ -31,7 +31,10 @@ {{$index+1}} - {{item.value}} + + {{item.value}} + {{item.value}} + {{item.key}} @@ -49,6 +52,8 @@ import { mapActions } from 'vuex' import { pie } from './chartConfig' import Chart from '@/module/ana-charts' + import echarts from 'echarts' + import store from '@/conf/home/store' import mNoData from '@/module/components/noData/noData' import { stateType } from '@/conf/home/pages/projects/pages/_source/instanceConditions/common' export default { @@ -57,7 +62,8 @@ return { isSpin: true, msg: '', - processStateList: [] + processStateList: [], + currentName: '' } }, props: { @@ -83,7 +89,7 @@ value: v.count } }) - const myChart = Chart.pie('#process-state-pie', this.processStateList, { title: '' }) + const myChart = Chart.pie('#process-state-pie', this.processStateList, { title: '' }) myChart.echart.setOption(pie) // 首页不允许跳转 if (this.searchParams.projectId) { @@ -108,11 +114,15 @@ this.isSpin = false }) } + }, + '$store.state.projects.sideBar': function() { + echarts.init(document.getElementById('process-state-pie')).resize() } }, beforeCreate () { }, created () { + this.currentName = this.$router.currentRoute.name }, beforeMount () { }, @@ -132,7 +142,4 @@ diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/index/_source/taskCtatusCount.vue b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/index/_source/taskCtatusCount.vue index 3e56c344f7..60e9413b2e 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/index/_source/taskCtatusCount.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/index/_source/taskCtatusCount.vue @@ -19,7 +19,7 @@
-
+
@@ -32,8 +32,9 @@ {{$index+1}} - - {{item.value}} + {{item.value}} + + {{item.value}} {{item.key}} @@ -53,6 +54,8 @@ import { mapActions } from 'vuex' import { pie } from './chartConfig' import Chart from '@/module/ana-charts' + import echarts from 'echarts' + import store from '@/conf/home/store' import mNoData from '@/module/components/noData/noData' import { stateType } from '@/conf/home/pages/projects/pages/_source/instanceConditions/common' @@ -62,7 +65,8 @@ return { isSpin: true, msg: '', - taskCtatusList: [] + taskCtatusList: [], + currentName: '' } }, props: { @@ -115,11 +119,15 @@ this.isSpin = false }) } + }, + '$store.state.projects.sideBar': function() { + echarts.init(document.getElementById('task-status-pie')).resize() } }, beforeCreate () { }, created () { + this.currentName = this.$router.currentRoute.name }, beforeMount () { }, @@ -139,7 +147,4 @@ diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/taskInstance/_source/list.vue b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/taskInstance/_source/list.vue index a6c7de1a1f..9ad0938009 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/taskInstance/_source/list.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/taskInstance/_source/list.vue @@ -108,6 +108,7 @@ data-toggle="tooltip" :title="$t('View log')" icon="ans-icon-log" + :disabled="item.taskType==='SUB_PROCESS'? true: false" @click="_refreshLog(item)"> diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/details/index.vue b/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/details/index.vue index b4ee720d12..06626b6dcf 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/details/index.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/details/index.vue @@ -232,7 +232,7 @@ position: absolute; right: 0; top: 0; - >i { + >em { font-size: 20px; color: #2d8cf0; cursor: pointer; diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/udf/pages/function/_source/list.vue b/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/udf/pages/function/_source/list.vue index d77f55722e..8320a17516 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/udf/pages/function/_source/list.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/udf/pages/function/_source/list.vue @@ -94,11 +94,11 @@ v-ps