From 6467875a87d3c19bde205e83ea5c0dd40b132298 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9B=B9=E8=81=9A=E9=98=B3?= Date: Sat, 15 Aug 2020 05:04:58 +0800 Subject: [PATCH 01/18] delay execution ui --- .../js/conf/home/pages/dag/_source/config.js | 7 + .../pages/dag/_source/formModel/formModel.vue | 21 ++- .../pages/list/_source/createDataSource.vue | 7 - .../_source/instanceConditions/common.js | 3 + .../pages/index/_source/taskStatusCount.vue | 147 ++++++++++++++++++ .../home/pages/projects/pages/index/index.vue | 8 +- .../js/conf/home/store/datasource/actions.js | 4 +- .../js/conf/home/store/projects/actions.js | 2 +- .../src/js/module/i18n/locale/en_US.js | 2 + .../src/js/module/i18n/locale/zh_CN.js | 2 + 10 files changed, 188 insertions(+), 15 deletions(-) create mode 100644 dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/index/_source/taskStatusCount.vue diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/config.js b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/config.js index a4960f7ac5..2e60929577 100755 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/config.js +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/config.js @@ -229,6 +229,13 @@ const tasksState = { color: '#5101be', icoUnicode: 'ans-icon-dependence', isSpin: false + }, + DELAY_EXECUTION: { + id: 12, + desc: `${i18n.$t('Delay execution')}`, + color: '#5102ce', + icoUnicode: 'ans-icon-coin', + isSpin: false } } diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue index 6f07f97f02..8444863aea 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue @@ -109,6 +109,20 @@ ({{$t('Minute')}}) + + +
+
+ {{$t('Delay execution time')}} +
+
+ + + ({{$t('Minute')}}) +
+
+ +
{{$t('State')}} @@ -127,7 +141,6 @@
-
{{$t('State')}} @@ -339,6 +352,8 @@ maxRetryTimes: '0', // Failure retry interval retryInterval: '1', + // Delay execution time + delayTime: '0', // Task timeout alarm timeout: {}, // Task priority @@ -466,6 +481,7 @@ dependence: this.cacheDependence, maxRetryTimes: this.maxRetryTimes, retryInterval: this.retryInterval, + delayTime: this.delayTime, timeout: this.timeout, taskInstancePriority: this.taskInstancePriority, workerGroup: this.workerGroup, @@ -544,6 +560,7 @@ dependence: this.dependence, maxRetryTimes: this.maxRetryTimes, retryInterval: this.retryInterval, + delayTime: this.delayTime, timeout: this.timeout, taskInstancePriority: this.taskInstancePriority, workerGroup: this.workerGroup, @@ -634,6 +651,7 @@ this.description = o.description this.maxRetryTimes = o.maxRetryTimes this.retryInterval = o.retryInterval + this.delayTime = o.delayTime if(o.conditionResult) { this.successBranch = o.conditionResult.successNode[0] this.failedBranch = o.conditionResult.failedNode[0] @@ -699,6 +717,7 @@ dependence: this.cacheDependence, maxRetryTimes: this.maxRetryTimes, retryInterval: this.retryInterval, + delayTime: this.delayTime, timeout: this.timeout, taskInstancePriority: this.taskInstancePriority, workerGroup: this.workerGroup, diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/datasource/pages/list/_source/createDataSource.vue b/dolphinscheduler-ui/src/js/conf/home/pages/datasource/pages/list/_source/createDataSource.vue index adf4753f4f..1e15688c5d 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/datasource/pages/list/_source/createDataSource.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/datasource/pages/list/_source/createDataSource.vue @@ -227,10 +227,6 @@ { value: 'DB2', label: 'DB2' - }, - { - value: 'PRESTO', - label: 'PRESTO' } ] } @@ -437,9 +433,6 @@ case 'DB2': defaultPort = '50000' break - case 'PRESTO': - defaultPort = '8080' - break default: break diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/_source/instanceConditions/common.js b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/_source/instanceConditions/common.js index 694d04748c..8a13aeacb4 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/_source/instanceConditions/common.js +++ b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/_source/instanceConditions/common.js @@ -60,6 +60,9 @@ const stateType = [ }, { code: 'WAITTING_DEPEND', label: `${i18n.$t('Waiting for dependency to complete')}` + }, { + code: 'DELAY_EXECUTION', + label: `${i18n.$t('Delay execution')}` } ] diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/index/_source/taskStatusCount.vue b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/index/_source/taskStatusCount.vue new file mode 100644 index 0000000000..f24b0f3402 --- /dev/null +++ b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/index/_source/taskStatusCount.vue @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + + + diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/index/index.vue b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/index/index.vue index 2b5cbbc017..7ca6e3a0f6 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/index/index.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/index/index.vue @@ -35,8 +35,8 @@ {{$t('Task status statistics')}}
- - + +
@@ -68,7 +68,7 @@ import dayjs from 'dayjs' import mDefineUserCount from './_source/defineUserCount' import mCommandStateCount from './_source/commandStateCount' - import mTaskCtatusCount from './_source/taskCtatusCount' + import mTaskStatusCount from './_source/taskStatusCount' import mProcessStateCount from './_source/processStateCount' import mQueueCount from './_source/queueCount' import localStore from '@/module/util/localStorage' @@ -105,7 +105,7 @@ mListConstruction, mDefineUserCount, mCommandStateCount, - mTaskCtatusCount, + mTaskStatusCount, mProcessStateCount, mQueueCount } diff --git a/dolphinscheduler-ui/src/js/conf/home/store/datasource/actions.js b/dolphinscheduler-ui/src/js/conf/home/store/datasource/actions.js index 0743621e21..f8166d610c 100644 --- a/dolphinscheduler-ui/src/js/conf/home/store/datasource/actions.js +++ b/dolphinscheduler-ui/src/js/conf/home/store/datasource/actions.js @@ -20,7 +20,7 @@ import io from '@/module/io' export default { /** * Data source creation - * @param "type": string,//MYSQL, POSTGRESQL, HIVE, SPARK, CLICKHOUSE, ORACLE, SQLSERVER, PRESTO + * @param "type": string,//MYSQL, POSTGRESQL, HIVE, SPARK, CLICKHOUSE, ORACLE, SQLSERVER * @param "name": string, * @param "desc": string, * @param "parameter":string //{"address":"jdbc:hive2://192.168.220.189:10000","autoReconnect":"true","characterEncoding":"utf8","database":"default","initialTimeout":3000,"jdbcUrl":"jdbc:hive2://192.168.220.189:10000/default","maxReconnect":10,"password":"","useUnicode":true,"user":"hive"} @@ -49,7 +49,7 @@ export default { }, /** * Query data source list - no paging - * @param "type": string//MYSQL, POSTGRESQL, HIVE, SPARK, CLICKHOUSE, ORACLE, SQLSERVER, PRESTO + * @param "type": string//MYSQL, POSTGRESQL, HIVE, SPARK, CLICKHOUSE, ORACLE, SQLSERVER */ getDatasourcesList ({ state }, payload) { return new Promise((resolve, reject) => { diff --git a/dolphinscheduler-ui/src/js/conf/home/store/projects/actions.js b/dolphinscheduler-ui/src/js/conf/home/store/projects/actions.js index 43273de9e2..6a18fdaf9c 100644 --- a/dolphinscheduler-ui/src/js/conf/home/store/projects/actions.js +++ b/dolphinscheduler-ui/src/js/conf/home/store/projects/actions.js @@ -69,7 +69,7 @@ export default { /** * Task status statistics */ - getTaskCtatusCount ({ state }, payload) { + getTaskStatusCount ({ state }, payload) { return new Promise((resolve, reject) => { io.get('projects/analysis/task-state-count', payload, res => { resolve(res) diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js index 0ef5340488..e5e96131f5 100755 --- a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js +++ b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js @@ -42,6 +42,8 @@ export default { Times: 'Times', 'Failed retry interval': 'Failed retry interval', Minute: 'Minute', + 'Delay execution time': 'Delay execution time', + 'Delay execution': 'Delay execution', Cancel: 'Cancel', 'Confirm add': 'Confirm add', 'The newly created sub-Process has not yet been executed and cannot enter the sub-Process': 'The newly created sub-Process has not yet been executed and cannot enter the sub-Process', diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js index a352183fca..26326d3fe2 100755 --- a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js +++ b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js @@ -43,6 +43,7 @@ export default { Times: '次', 'Failed retry interval': '失败重试间隔', Minute: '分', + 'Delay execution time': '延时执行时间', Cancel: '取消', 'Confirm add': '确认添加', 'The newly created sub-Process has not yet been executed and cannot enter the sub-Process': '新创建子工作流还未执行,不能进入子工作流', @@ -425,6 +426,7 @@ export default { hour: '时', Running: '正在运行', 'Waiting for dependency to complete': '等待依赖完成', + 'Delay execution': '延时执行', Selected: '已选', CurrentHour: '当前小时', Last1Hour: '前1小时', From 8c0180d2118f8258802d32559ea40c90b5ed23b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9B=B9=E8=81=9A=E9=98=B3?= Date: Sat, 15 Aug 2020 09:46:07 +0800 Subject: [PATCH 02/18] delay execution ui[fixed] --- .../datasource/pages/list/_source/createDataSource.vue | 9 ++++++++- .../src/js/conf/home/store/datasource/actions.js | 6 +++--- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/datasource/pages/list/_source/createDataSource.vue b/dolphinscheduler-ui/src/js/conf/home/pages/datasource/pages/list/_source/createDataSource.vue index 1e15688c5d..5377db7d25 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/datasource/pages/list/_source/createDataSource.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/datasource/pages/list/_source/createDataSource.vue @@ -227,6 +227,10 @@ { value: 'DB2', label: 'DB2' + }, + { + value: 'PRESTO', + label: 'PRESTO' } ] } @@ -433,6 +437,9 @@ case 'DB2': defaultPort = '50000' break + case 'PRESTO': + defaultPort = '8080' + break default: break @@ -536,4 +543,4 @@ } - + \ No newline at end of file diff --git a/dolphinscheduler-ui/src/js/conf/home/store/datasource/actions.js b/dolphinscheduler-ui/src/js/conf/home/store/datasource/actions.js index f8166d610c..3a80cc3a29 100644 --- a/dolphinscheduler-ui/src/js/conf/home/store/datasource/actions.js +++ b/dolphinscheduler-ui/src/js/conf/home/store/datasource/actions.js @@ -20,7 +20,7 @@ import io from '@/module/io' export default { /** * Data source creation - * @param "type": string,//MYSQL, POSTGRESQL, HIVE, SPARK, CLICKHOUSE, ORACLE, SQLSERVER + * @param "type": string,//MYSQL, POSTGRESQL, HIVE, SPARK, CLICKHOUSE, ORACLE, SQLSERVER, PRESTO * @param "name": string, * @param "desc": string, * @param "parameter":string //{"address":"jdbc:hive2://192.168.220.189:10000","autoReconnect":"true","characterEncoding":"utf8","database":"default","initialTimeout":3000,"jdbcUrl":"jdbc:hive2://192.168.220.189:10000/default","maxReconnect":10,"password":"","useUnicode":true,"user":"hive"} @@ -49,7 +49,7 @@ export default { }, /** * Query data source list - no paging - * @param "type": string//MYSQL, POSTGRESQL, HIVE, SPARK, CLICKHOUSE, ORACLE, SQLSERVER + * @param "type": string//MYSQL, POSTGRESQL, HIVE, SPARK, CLICKHOUSE, ORACLE, SQLSERVER, PRESTO */ getDatasourcesList ({ state }, payload) { return new Promise((resolve, reject) => { @@ -126,4 +126,4 @@ export default { }) }) } -} +} \ No newline at end of file From 3b723c0cc51c40b7b8023ce41ea1834fd3a2a7b5 Mon Sep 17 00:00:00 2001 From: yinancx Date: Sat, 15 Aug 2020 11:20:32 +0800 Subject: [PATCH 03/18] restore --- .../pages/datasource/pages/list/_source/createDataSource.vue | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/datasource/pages/list/_source/createDataSource.vue b/dolphinscheduler-ui/src/js/conf/home/pages/datasource/pages/list/_source/createDataSource.vue index 5377db7d25..adf4753f4f 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/datasource/pages/list/_source/createDataSource.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/datasource/pages/list/_source/createDataSource.vue @@ -543,4 +543,4 @@ } - \ No newline at end of file + From faeb97e70f7bac996dc89ac0da9165722feccfd7 Mon Sep 17 00:00:00 2001 From: yinancx Date: Sat, 15 Aug 2020 11:20:58 +0800 Subject: [PATCH 04/18] restore --- .../src/js/conf/home/store/datasource/actions.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dolphinscheduler-ui/src/js/conf/home/store/datasource/actions.js b/dolphinscheduler-ui/src/js/conf/home/store/datasource/actions.js index 3a80cc3a29..0743621e21 100644 --- a/dolphinscheduler-ui/src/js/conf/home/store/datasource/actions.js +++ b/dolphinscheduler-ui/src/js/conf/home/store/datasource/actions.js @@ -126,4 +126,4 @@ export default { }) }) } -} \ No newline at end of file +} From f9f910187042f6a6043ffaad87e34e8ceb3d38b8 Mon Sep 17 00:00:00 2001 From: yinancx Date: Sat, 15 Aug 2020 11:36:27 +0800 Subject: [PATCH 05/18] update an annotation --- .../home/pages/projects/pages/index/_source/taskStatusCount.vue | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/index/_source/taskStatusCount.vue b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/index/_source/taskStatusCount.vue index f24b0f3402..90ae53f4c9 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/index/_source/taskStatusCount.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/index/_source/taskStatusCount.vue @@ -93,7 +93,7 @@ const myChart = Chart.pie('#task-status-pie', this.taskStatusList, { title: '' }) myChart.echart.setOption(pie) - // 首页不允许跳转 + // Jump forbidden in index page if (this.searchParams.projectId) { myChart.echart.on('click', e => { this._goTask(e.data.name) From 42c88a7c68a89af7e3d7f060fd65d329a13c33d0 Mon Sep 17 00:00:00 2001 From: yangruochen Date: Mon, 28 Sep 2020 18:18:52 +0800 Subject: [PATCH 06/18] [FIX-Bug #3845][Ambari Plugin] Start Ambari report an error: Table 't_ds_process_definition_version' already exists (#3846) * [Bug][Ambari Plugin] Start Ambari report an error: Table 't_ds_process_definition_version' already exists #3845 * Revert "[Bug][Ambari Plugin] Start Ambari report an error: Table 't_ds_process_definition_version' already exists #3845" This reverts commit 56e030e6 * [Bug][Ambari Plugin] Start Ambari report an error: Table 't_ds_process_definition_version' already exists #3845 --- sql/upgrade/1.3.3_schema/mysql/dolphinscheduler_ddl.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/upgrade/1.3.3_schema/mysql/dolphinscheduler_ddl.sql b/sql/upgrade/1.3.3_schema/mysql/dolphinscheduler_ddl.sql index ae66da914f..f7b3bbcabb 100644 --- a/sql/upgrade/1.3.3_schema/mysql/dolphinscheduler_ddl.sql +++ b/sql/upgrade/1.3.3_schema/mysql/dolphinscheduler_ddl.sql @@ -101,7 +101,7 @@ drop PROCEDURE if EXISTS ct_dolphin_T_t_ds_process_definition_version; delimiter d// CREATE PROCEDURE ct_dolphin_T_t_ds_process_definition_version() BEGIN - CREATE TABLE `t_ds_process_definition_version` ( + CREATE TABLE IF NOT EXISTS `t_ds_process_definition_version` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'key', `process_definition_id` int(11) NOT NULL COMMENT 'process definition id', `version` int(11) DEFAULT NULL COMMENT 'process definition version', From 2a927bcad049efd2a187af5070f9b6307d7162e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9B=B9=E8=81=9A=E9=98=B3?= Date: Tue, 29 Sep 2020 10:58:14 +0800 Subject: [PATCH 07/18] Dependent Timeout --- .../formModel/_source/dependentTimeout.vue | 203 ++++++++++++++++++ .../pages/dag/_source/formModel/formModel.vue | 33 ++- .../src/js/module/i18n/locale/en_US.js | 2 + .../src/js/module/i18n/locale/zh_CN.js | 2 + 4 files changed, 238 insertions(+), 2 deletions(-) create mode 100644 dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/_source/dependentTimeout.vue diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/_source/dependentTimeout.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/_source/dependentTimeout.vue new file mode 100644 index 0000000000..076a536b44 --- /dev/null +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/_source/dependentTimeout.vue @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + \ No newline at end of file diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue index 7cd63c07db..b6c48fe5e8 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue @@ -162,10 +162,18 @@ + + + Date: Tue, 29 Sep 2020 14:22:56 +0800 Subject: [PATCH 08/18] add check interval --- .../_source/formModel/_source/dependentTimeout.vue | 14 ++++++++++++-- .../src/js/module/i18n/locale/en_US.js | 1 + .../src/js/module/i18n/locale/zh_CN.js | 1 + 3 files changed, 14 insertions(+), 2 deletions(-) diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/_source/dependentTimeout.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/_source/dependentTimeout.vue index 076a536b44..235fb08412 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/_source/dependentTimeout.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/_source/dependentTimeout.vue @@ -49,6 +49,12 @@ {{$t('Minute')}} + + {{$t('Check interval')}} + + + {{$t('Minute')}} + {{$t('Timeout strategy')}} @@ -110,7 +116,8 @@ // Timeout strategy strategy: ['FAILED'], // Timeout period - interval: null + interval: null, + checkInterval: null }, waitCompleteTimeout: { enable: false, @@ -131,6 +138,7 @@ // p = 0 for timeout switch; p = 1 for wait start timeout switch; p = 2 for wait complete timeout switch. if (p === 1 || p === 0) { this.waitStartTimeout.interval = is ? 30 : null + this.waitStartTimeout.checkInterval = is ? 1 : null } if (p === 2 || p === 0) { this.waitCompleteTimeout.strategy = is ? ['WARN'] : [] @@ -149,7 +157,7 @@ const reg = /^[1-9]\d*$/ if (this.enable && (this.waitCompleteTimeout.enable && !reg.test(this.waitCompleteTimeout.interval)) - || (this.waitStartTimeout.enable && !reg.test(this.waitStartTimeout.interval))) { + || (this.waitStartTimeout.enable && (!reg.test(this.waitStartTimeout.interval || !reg.test(this.waitStartTimeout.checkInterval))))) { this.$message.warning(`${this.$t('Timeout must be a positive integer')}`) return false } @@ -157,6 +165,7 @@ waitStartTimeout: { strategy: 'FAILED', interval: parseInt(this.waitStartTimeout.interval), + checkInterval: parseInt(this.waitStartTimeout.checkInterval), enable: this.waitStartTimeout.enable }, waitCompleteTimeout: { @@ -193,6 +202,7 @@ this.waitStartTimeout.enable = o.waitStartTimeout.enable || false this.waitStartTimeout.strategy = ['FAILED'] this.waitStartTimeout.interval = o.waitStartTimeout.interval || null + this.waitStartTimeout.checkInterval = o.waitStartTimeout.checkInterval || null } } }, diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js index 2052d3e2a0..50d03ef555 100755 --- a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js +++ b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js @@ -428,6 +428,7 @@ export default { 'Timeout period': 'Timeout period', 'Waiting Dependent complete': 'Waiting Dependent complete', 'Waiting Dependent start': 'Waiting Dependent start', + 'Check interval': 'Check interval', 'Timeout strategy must be selected': 'Timeout strategy must be selected', 'Timeout must be a positive integer': 'Timeout must be a positive integer', 'Add dependency': 'Add dependency', diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js index d46fdf551c..3f4be67b1e 100755 --- a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js +++ b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js @@ -422,6 +422,7 @@ export default { 'Timeout period': '超时时长', 'Waiting Dependent complete': '等待依赖完成', 'Waiting Dependent start': '等待依赖启动', + 'Check interval': '检查间隔', 'Timeout strategy must be selected': '超时策略必须选一个', 'Timeout must be a positive integer': '超时时长必须为正整数', 'Add dependency': '添加依赖', From 61fcad772299ce369b1c130c5f7fb0516b1bdc92 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9B=B9=E8=81=9A=E9=98=B3?= Date: Tue, 29 Sep 2020 14:49:33 +0800 Subject: [PATCH 09/18] add timeout verification --- .../pages/dag/_source/formModel/_source/dependentTimeout.vue | 5 +++++ dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js | 1 + dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js | 2 ++ 3 files changed, 8 insertions(+) diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/_source/dependentTimeout.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/_source/dependentTimeout.vue index 235fb08412..2b2ed78ccc 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/_source/dependentTimeout.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/_source/dependentTimeout.vue @@ -161,6 +161,11 @@ this.$message.warning(`${this.$t('Timeout must be a positive integer')}`) return false } + // Verify timeout duration longer than check interval + if (this.enable && this.waitStartTimeout.enable && this.waitStartTimeout.checkInterval >= this.waitStartTimeout.interval) { + this.$message.warning(`${this.$t('Timeout must be longer than check interval')}`) + return false + } this.$emit('on-timeout', { waitStartTimeout: { strategy: 'FAILED', diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js index 50d03ef555..5148849d3b 100755 --- a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js +++ b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js @@ -429,6 +429,7 @@ export default { 'Waiting Dependent complete': 'Waiting Dependent complete', 'Waiting Dependent start': 'Waiting Dependent start', 'Check interval': 'Check interval', + 'Timeout must be longer than check interval': 'Timeout must be longer than check interval', 'Timeout strategy must be selected': 'Timeout strategy must be selected', 'Timeout must be a positive integer': 'Timeout must be a positive integer', 'Add dependency': 'Add dependency', diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js index 3f4be67b1e..9341d4fd2b 100755 --- a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js +++ b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js @@ -425,6 +425,7 @@ export default { 'Check interval': '检查间隔', 'Timeout strategy must be selected': '超时策略必须选一个', 'Timeout must be a positive integer': '超时时长必须为正整数', + 'Timeout must be longer than check interval': '超时时间必须比检查间隔长', 'Add dependency': '添加依赖', and: '且', or: '或', @@ -435,6 +436,7 @@ export default { Running: '正在运行', 'Waiting for dependency to complete': '等待依赖完成', 'Delay execution': '延时执行', + 'Forced success': '强制成功过', Selected: '已选', CurrentHour: '当前小时', Last1Hour: '前1小时', From df33d1a28ee0055ae8b64335aa9234a55e65f651 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9B=B9=E8=81=9A=E9=98=B3?= Date: Tue, 29 Sep 2020 23:06:53 +0800 Subject: [PATCH 10/18] Force success --- .../js/conf/home/pages/dag/_source/config.js | 7 ++++++ .../_source/instanceConditions/common.js | 3 +++ .../pages/taskInstance/_source/list.vue | 23 +++++++++++++++++++ .../src/js/conf/home/store/dag/actions.js | 12 ++++++++++ .../src/js/module/i18n/locale/en_US.js | 2 ++ .../src/js/module/i18n/locale/zh_CN.js | 1 + 6 files changed, 48 insertions(+) diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/config.js b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/config.js index 2e60929577..18fbd94341 100755 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/config.js +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/config.js @@ -236,6 +236,13 @@ const tasksState = { color: '#5102ce', icoUnicode: 'ans-icon-coin', isSpin: false + }, + FORCED_SUCCESS: { + id: 13, + desc: `${i18n.$t('Forced success')}`, + color: '#5102ce', + icoUnicode: 'ans-icon-success-solid', + isSpin: false } } diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/_source/instanceConditions/common.js b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/_source/instanceConditions/common.js index 8a13aeacb4..e917cf3af0 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/_source/instanceConditions/common.js +++ b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/_source/instanceConditions/common.js @@ -63,6 +63,9 @@ const stateType = [ }, { code: 'DELAY_EXECUTION', label: `${i18n.$t('Delay execution')}` + }, { + code: 'FORCED_SUCCESS', + label: `${i18n.$t('Forced success')}` } ] diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/taskInstance/_source/list.vue b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/taskInstance/_source/list.vue index cdcf0b0785..a6c7de1a1f 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/taskInstance/_source/list.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/taskInstance/_source/list.vue @@ -91,6 +91,16 @@ {{item.duration}} {{item.retryTimes}} + + ` @@ -156,6 +168,17 @@ } }) }, + _forceSuccess (item) { + this.forceTaskSuccess({taskInstanceId: item.id}).then(res => { + if (res.code === 0) { + this.$message.success(res.msg) + } else { + this.$message.error(res.msg) + } + }).catch(e => { + this.$message.error(e.msg) + }) + }, _go (item) { this.$router.push({ path: `/projects/instance/list/${item.processInstanceId}` }) }, diff --git a/dolphinscheduler-ui/src/js/conf/home/store/dag/actions.js b/dolphinscheduler-ui/src/js/conf/home/store/dag/actions.js index 93fab5a224..b35d07052a 100644 --- a/dolphinscheduler-ui/src/js/conf/home/store/dag/actions.js +++ b/dolphinscheduler-ui/src/js/conf/home/store/dag/actions.js @@ -734,6 +734,18 @@ export default { }) }) }, + /** + * Force fail/kill/need_fault_tolerance task success + */ + forceTaskSuccess ({ state }, payload) { + return new Promise((resolve, reject) => { + io.post(`projects/${state.projectName}/task-instance/force-success`, payload, res => { + resolve(res) + }).catch(e => { + reject(e) + }) + }) + }, /** * Query task record list */ diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js index 5148849d3b..0f59aecc5a 100755 --- a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js +++ b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js @@ -32,6 +32,7 @@ export default { 'Current node settings': 'Current node settings', 'View history': 'View history', 'View log': 'View log', + 'Force success': 'Force success', 'Enter this child node': 'Enter this child node', 'Node name': 'Node name', 'Run flag': 'Run flag', @@ -432,6 +433,7 @@ export default { 'Timeout must be longer than check interval': 'Timeout must be longer than check interval', 'Timeout strategy must be selected': 'Timeout strategy must be selected', 'Timeout must be a positive integer': 'Timeout must be a positive integer', + 'Forced success': 'Forced success', 'Add dependency': 'Add dependency', and: 'and', or: 'or', diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js index 9341d4fd2b..0481cd0137 100755 --- a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js +++ b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js @@ -32,6 +32,7 @@ export default { 'Current node settings': '当前节点设置', 'View history': '查看历史', 'View log': '查看日志', + 'Force success': '强制成功', 'Enter this child node': '进入该子节点', 'Node name': '节点名称', 'Please enter name (required)': '请输入名称(必填)', From 1c96ae09448d59bfd1ef445af4df42a74cddfa37 Mon Sep 17 00:00:00 2001 From: Yarlung Date: Thu, 8 Oct 2020 17:08:26 +0800 Subject: [PATCH 11/18] [Fix-3840][server] Fix When the tenant code is a number, it should not be saved successfully (#3867) * bug fixed #3840 * fix bug #3840 * bug fixed #3840 * bug fixed issue #3840 * bug fix #3840 * bug fix #3540 Co-authored-by: gechunfa --- .../api/controller/TaskRecordController.java | 4 +- .../dolphinscheduler/api/enums/Status.java | 4 +- .../api/service/impl/TenantServiceImpl.java | 26 ++++++---- .../api/utils/RegexUtils.java | 47 +++++++++++++++++++ .../api/utils/RegexUtilsTest.java | 39 +++++++++++++++ 5 files changed, 108 insertions(+), 12 deletions(-) create mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/RegexUtils.java create mode 100644 dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/RegexUtilsTest.java diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskRecordController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskRecordController.java index e20c845d42..4ff769dce6 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskRecordController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskRecordController.java @@ -22,11 +22,13 @@ import org.apache.dolphinscheduler.api.service.TaskRecordService; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.dao.entity.User; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; import org.springframework.web.bind.annotation.*; + import springfox.documentation.annotations.ApiIgnore; import java.util.Map; @@ -59,7 +61,7 @@ public class TaskRecordController extends BaseController { * @param taskDate task date * @param startTime start time * @param endTime end time - * @param pageNo page numbere + * @param pageNo page number * @param pageSize page size * @return task record list */ diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java index 43c03f09d8..4fda99c37d 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java @@ -193,7 +193,9 @@ public enum Status { BATCH_COPY_PROCESS_DEFINITION_ERROR(10159, "batch copy process definition error", "复制工作流错误"), BATCH_MOVE_PROCESS_DEFINITION_ERROR(10160, "batch move process definition error", "移动工作流错误"), QUERY_WORKFLOW_LINEAGE_ERROR(10161, "query workflow lineage error", "查询血缘失败"), - DELETE_PROCESS_DEFINITION_BY_ID_FAIL(10162,"delete process definition by id fail, for there are {0} process instances in executing using it", "删除工作流定义失败,有[{0}]个运行中的工作流实例正在使用"), + DELETE_PROCESS_DEFINITION_BY_ID_FAIL(10162, "delete process definition by id fail, for there are {0} process instances in executing using it", "删除工作流定义失败,有[{0}]个运行中的工作流实例正在使用"), + CHECK_TENANT_CODE_ERROR(10163, "Please enter the English tenant code", "请输入英文租户编码"), + UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found", "UDF函数不存在"), UDF_FUNCTION_EXISTS(20002, "UDF function already exists", "UDF函数已存在"), diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java index 3a267bcc8c..52f0d79ead 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java @@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.service.BaseService; import org.apache.dolphinscheduler.api.service.TenantService; import org.apache.dolphinscheduler.api.utils.PageInfo; +import org.apache.dolphinscheduler.api.utils.RegexUtils; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.CollectionUtils; @@ -73,11 +74,11 @@ public class TenantServiceImpl extends BaseService implements TenantService { /** * create tenant * - * @param loginUser login user + * @param loginUser login user * @param tenantCode tenant code * @param tenantName tenant name - * @param queueId queue id - * @param desc description + * @param queueId queue id + * @param desc description * @return create result code * @throws Exception exception */ @@ -94,6 +95,11 @@ public class TenantServiceImpl extends BaseService implements TenantService { return result; } + if (RegexUtils.isNumeric(tenantCode)) { + putMsg(result, Status.CHECK_TENANT_CODE_ERROR); + return result; + } + if (checkTenantExists(tenantCode)) { putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, tenantCode); return result; @@ -131,8 +137,8 @@ public class TenantServiceImpl extends BaseService implements TenantService { * * @param loginUser login user * @param searchVal search value - * @param pageNo page number - * @param pageSize page size + * @param pageNo page number + * @param pageSize page size * @return tenant list page */ public Map queryTenantList(User loginUser, String searchVal, Integer pageNo, Integer pageSize) { @@ -157,12 +163,12 @@ public class TenantServiceImpl extends BaseService implements TenantService { /** * updateProcessInstance tenant * - * @param loginUser login user - * @param id tennat id + * @param loginUser login user + * @param id tennat id * @param tenantCode tennat code * @param tenantName tennat name - * @param queueId queue id - * @param desc description + * @param queueId queue id + * @param desc description * @return update result code * @throws Exception exception */ @@ -229,7 +235,7 @@ public class TenantServiceImpl extends BaseService implements TenantService { * delete tenant * * @param loginUser login user - * @param id tenant id + * @param id tenant id * @return delete result code * @throws Exception exception */ diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/RegexUtils.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/RegexUtils.java new file mode 100644 index 0000000000..9ff7fac463 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/RegexUtils.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.utils; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * This is Regex expression utils. + */ +public class RegexUtils { + + /** + * check number regex expression + */ + private static final String CHECK_NUMBER = "^-?\\d+(\\.\\d+)?$"; + + private RegexUtils() { + } + + /** + * check if the input is number + * + * @param str input + * @return + */ + public static boolean isNumeric(String str) { + Pattern pattern = Pattern.compile(CHECK_NUMBER); + Matcher isNum = pattern.matcher(str); + return isNum.matches(); + } +} diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/RegexUtilsTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/RegexUtilsTest.java new file mode 100644 index 0000000000..5b62d51b07 --- /dev/null +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/RegexUtilsTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.utils; + +import org.junit.Assert; +import org.junit.Test; + +/** + * RegexUtils test case + */ +public class RegexUtilsTest { + + @Test + public void testIsNumeric() { + String num1 = "123467854678"; + boolean numeric = RegexUtils.isNumeric(num1); + Assert.assertTrue(numeric); + + String num2 = "0.0.01"; + boolean numeric2 = RegexUtils.isNumeric(num2); + Assert.assertFalse(numeric2); + } + +} \ No newline at end of file From 7c1ff53a7428b9edb8d179c01a6d715920745f1e Mon Sep 17 00:00:00 2001 From: yangruochen Date: Thu, 8 Oct 2020 17:23:40 +0800 Subject: [PATCH 12/18] =?UTF-8?q?[Bug-3845]postgresql=20need=20to=20add=20?= =?UTF-8?q?IF=20NOT=20EXISTS=20for=20table=20t=5Fds=5Fprocess=E2=80=A6=20(?= =?UTF-8?q?#3852)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * [Bug-3845]postgresql need to add IF NOT EXISTS for table t_ds_process_definition_version too. * restart unit test * rerun unit test --- sql/upgrade/1.3.3_schema/postgresql/dolphinscheduler_ddl.sql | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sql/upgrade/1.3.3_schema/postgresql/dolphinscheduler_ddl.sql b/sql/upgrade/1.3.3_schema/postgresql/dolphinscheduler_ddl.sql index 3351cac88c..477cb3bf60 100644 --- a/sql/upgrade/1.3.3_schema/postgresql/dolphinscheduler_ddl.sql +++ b/sql/upgrade/1.3.3_schema/postgresql/dolphinscheduler_ddl.sql @@ -91,7 +91,7 @@ DROP FUNCTION IF EXISTS uc_dolphin_T_t_ds_task_instance_A_var_pool(); delimiter d// CREATE OR REPLACE FUNCTION ct_dolphin_T_t_ds_process_definition_version() RETURNS void AS $$ BEGIN -CREATE TABLE t_ds_process_definition_version ( +CREATE TABLE IF NOT EXISTS t_ds_process_definition_version ( id int NOT NULL , process_definition_id int NOT NULL , version int DEFAULT NULL , @@ -140,5 +140,3 @@ DROP FUNCTION IF EXISTS uc_dolphin_T_t_ds_resources_un(); - - From 1a257b10771adef0bca6595b0751e8d0ba838c35 Mon Sep 17 00:00:00 2001 From: AhahaGe Date: Fri, 9 Oct 2020 21:00:02 +0800 Subject: [PATCH 13/18] [Feature][api]Export and import workflow add processDefinitionDescription #3808 (#3847) * add .asf.yaml file * export processDefinition add desc * clear conentent in .asf.yaml * delete .asf.yaml file Co-authored-by: guirong.ggr --- .../api/service/impl/ProcessDefinitionServiceImpl.java | 1 + 1 file changed, 1 insertion(+) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index cade36a1d6..7b68003440 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -683,6 +683,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements exportProcessMeta.setProjectName(processDefinition.getProjectName()); exportProcessMeta.setProcessDefinitionName(processDefinition.getName()); exportProcessMeta.setProcessDefinitionJson(processDefinition.getProcessDefinitionJson()); + exportProcessMeta.setProcessDefinitionDescription(processDefinition.getDescription()); exportProcessMeta.setProcessDefinitionLocations(processDefinition.getLocations()); exportProcessMeta.setProcessDefinitionConnects(processDefinition.getConnects()); From c4be3b57493fe75f5a5dbb9f258a0430d0363cc6 Mon Sep 17 00:00:00 2001 From: hailin0 Date: Sat, 10 Oct 2020 11:14:32 +0800 Subject: [PATCH 14/18] [Fix][API] Log dependency package conflict (#3833) Co-authored-by: lisiying --- dolphinscheduler-api/pom.xml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/dolphinscheduler-api/pom.xml b/dolphinscheduler-api/pom.xml index 76dd8980b7..cd6e04fc26 100644 --- a/dolphinscheduler-api/pom.xml +++ b/dolphinscheduler-api/pom.xml @@ -162,6 +162,12 @@ org.apache.hadoop hadoop-client + + + org.slf4j + slf4j-log4j12 + + From 39411ce03b864bc770da220ad6f81df47bd2487b Mon Sep 17 00:00:00 2001 From: bao liang <29528966+lenboo@users.noreply.github.com> Date: Sat, 10 Oct 2020 15:05:56 +0800 Subject: [PATCH 15/18] [FIX-3617][Service]fix 2 tasks instance are generated when fault tolerance (#3873) * fix 2 tasks instance are generated when fault tolerance * update code style * update code style * update code style * update code style * update code style * update code style * update code style * update code style * update code style * update code style * update code style * update ut * update code style Co-authored-by: baoliang --- .../server/zk/ZKMasterClient.java | 604 +++++++++--------- .../service/process/ProcessService.java | 167 +++-- .../service/process/ProcessServiceTest.java | 116 ++++ pom.xml | 1 + 4 files changed, 520 insertions(+), 368 deletions(-) create mode 100644 dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java index 6abb381583..49bfb5f9a8 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java @@ -20,6 +20,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.TreeCacheEvent; import org.apache.curator.framework.recipes.locks.InterProcessMutex; + import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ZKNodeType; @@ -33,6 +34,7 @@ import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.zk.AbstractZKClient; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -45,309 +47,309 @@ import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS; /** - * zookeeper master client - * - * single instance + * zookeeper master client + *

+ * single instance */ @Component public class ZKMasterClient extends AbstractZKClient { - /** - * logger - */ - private static final Logger logger = LoggerFactory.getLogger(ZKMasterClient.class); - - /** - * process service - */ - @Autowired - private ProcessService processService; - - public void start() { - - InterProcessMutex mutex = null; - try { - // create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/failover/master - String znodeLock = getMasterStartUpLockPath(); - mutex = new InterProcessMutex(getZkClient(), znodeLock); - mutex.acquire(); - - // init system znode - this.initSystemZNode(); - - while (!checkZKNodeExists(NetUtils.getHost(), ZKNodeType.MASTER)){ - ThreadUtils.sleep(SLEEP_TIME_MILLIS); - } - - - // self tolerant - if (getActiveMasterNum() == 1) { - failoverWorker(null, true); - failoverMaster(null); - } - - }catch (Exception e){ - logger.error("master start up exception",e); - }finally { - releaseMutex(mutex); - } - } - - @Override - public void close(){ - super.close(); - } - - /** - * handle path events that this class cares about - * @param client zkClient - * @param event path event - * @param path zk path - */ - @Override - protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) { - //monitor master - if(path.startsWith(getZNodeParentPath(ZKNodeType.MASTER)+Constants.SINGLE_SLASH)){ - handleMasterEvent(event,path); - }else if(path.startsWith(getZNodeParentPath(ZKNodeType.WORKER)+Constants.SINGLE_SLASH)){ - //monitor worker - handleWorkerEvent(event,path); - } - } - - /** - * remove zookeeper node path - * - * @param path zookeeper node path - * @param zkNodeType zookeeper node type - * @param failover is failover - */ - private void removeZKNodePath(String path, ZKNodeType zkNodeType, boolean failover) { - logger.info("{} node deleted : {}", zkNodeType.toString(), path); - InterProcessMutex mutex = null; - try { - String failoverPath = getFailoverLockPath(zkNodeType); - // create a distributed lock - mutex = new InterProcessMutex(getZkClient(), failoverPath); - mutex.acquire(); - - String serverHost = getHostByEventDataPath(path); - // handle dead server - handleDeadServer(path, zkNodeType, Constants.ADD_ZK_OP); - //failover server - if(failover){ - failoverServerWhenDown(serverHost, zkNodeType); - } - }catch (Exception e){ - logger.error("{} server failover failed.", zkNodeType.toString()); - logger.error("failover exception ",e); - } - finally { - releaseMutex(mutex); - } - } - - /** - * failover server when server down - * - * @param serverHost server host - * @param zkNodeType zookeeper node type - * @throws Exception exception - */ - private void failoverServerWhenDown(String serverHost, ZKNodeType zkNodeType) throws Exception { - if(StringUtils.isEmpty(serverHost) || serverHost.startsWith(NetUtils.getHost())){ - return ; - } - switch (zkNodeType){ - case MASTER: - failoverMaster(serverHost); - break; - case WORKER: - failoverWorker(serverHost, true); - default: - break; - } - } - - /** - * get failover lock path - * - * @param zkNodeType zookeeper node type - * @return fail over lock path - */ - private String getFailoverLockPath(ZKNodeType zkNodeType){ - - switch (zkNodeType){ - case MASTER: - return getMasterFailoverLockPath(); - case WORKER: - return getWorkerFailoverLockPath(); - default: - return ""; - } - } - - /** - * monitor master - * @param event event - * @param path path - */ - public void handleMasterEvent(TreeCacheEvent event, String path){ - switch (event.getType()) { - case NODE_ADDED: - logger.info("master node added : {}", path); - break; - case NODE_REMOVED: - removeZKNodePath(path, ZKNodeType.MASTER, true); - break; - default: - break; - } - } - - /** - * monitor worker - * @param event event - * @param path path - */ - public void handleWorkerEvent(TreeCacheEvent event, String path){ - switch (event.getType()) { - case NODE_ADDED: - logger.info("worker node added : {}", path); - break; - case NODE_REMOVED: - logger.info("worker node deleted : {}", path); - removeZKNodePath(path, ZKNodeType.WORKER, true); - break; - default: - break; - } - } - - /** - * task needs failover if task start before worker starts - * - * @param taskInstance task instance - * @return true if task instance need fail over - */ - private boolean checkTaskInstanceNeedFailover(TaskInstance taskInstance) throws Exception { - - boolean taskNeedFailover = true; - - //now no host will execute this task instance,so no need to failover the task - if(taskInstance.getHost() == null){ - return false; - } - - // if the worker node exists in zookeeper, we must check the task starts after the worker - if(checkZKNodeExists(taskInstance.getHost(), ZKNodeType.WORKER)){ - //if task start after worker starts, there is no need to failover the task. - if(checkTaskAfterWorkerStart(taskInstance)){ - taskNeedFailover = false; - } - } - return taskNeedFailover; - } - - /** - * check task start after the worker server starts. - * - * @param taskInstance task instance - * @return true if task instance start time after worker server start date - */ - private boolean checkTaskAfterWorkerStart(TaskInstance taskInstance) { - if(StringUtils.isEmpty(taskInstance.getHost())){ - return false; - } - Date workerServerStartDate = null; - List workerServers = getServersList(ZKNodeType.WORKER); - for(Server workerServer : workerServers){ - if(taskInstance.getHost().equals(workerServer.getHost() + Constants.COLON + workerServer.getPort())){ - workerServerStartDate = workerServer.getCreateTime(); - break; - } - } - - if(workerServerStartDate != null){ - return taskInstance.getStartTime().after(workerServerStartDate); - }else{ - return false; - } - } - - /** - * failover worker tasks - * - * 1. kill yarn job if there are yarn jobs in tasks. - * 2. change task state from running to need failover. - * 3. failover all tasks when workerHost is null - * @param workerHost worker host - */ - - /** - * failover worker tasks - * - * 1. kill yarn job if there are yarn jobs in tasks. - * 2. change task state from running to need failover. - * 3. failover all tasks when workerHost is null - * @param workerHost worker host - * @param needCheckWorkerAlive need check worker alive - * @throws Exception exception - */ - private void failoverWorker(String workerHost, boolean needCheckWorkerAlive) throws Exception { - logger.info("start worker[{}] failover ...", workerHost); - - List needFailoverTaskInstanceList = processService.queryNeedFailoverTaskInstances(workerHost); - for(TaskInstance taskInstance : needFailoverTaskInstanceList){ - if(needCheckWorkerAlive){ - if(!checkTaskInstanceNeedFailover(taskInstance)){ - continue; - } - } - - ProcessInstance processInstance = processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId()); - if(processInstance != null){ - taskInstance.setProcessInstance(processInstance); - } - - TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get() - .buildTaskInstanceRelatedInfo(taskInstance) - .buildProcessInstanceRelatedInfo(processInstance) - .create(); - // only kill yarn job if exists , the local thread has exited - ProcessUtils.killYarnJob(taskExecutionContext); - - taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE); - processService.saveTaskInstance(taskInstance); - } - logger.info("end worker[{}] failover ...", workerHost); - } - - /** - * failover master tasks - * - * @param masterHost master host - */ - private void failoverMaster(String masterHost) { - logger.info("start master failover ..."); - - List needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(masterHost); - - //updateProcessInstance host is null and insert into command - for(ProcessInstance processInstance : needFailoverProcessInstanceList){ - if(Constants.NULL.equals(processInstance.getHost()) ){ - continue; - } - processService.processNeedFailoverProcessInstances(processInstance); - } - - logger.info("master failover end"); - } - - public InterProcessMutex blockAcquireMutex() throws Exception { - InterProcessMutex mutex = new InterProcessMutex(getZkClient(), getMasterLockPath()); - mutex.acquire(); - return mutex; - } - + /** + * logger + */ + private static final Logger logger = LoggerFactory.getLogger(ZKMasterClient.class); + + /** + * process service + */ + @Autowired + private ProcessService processService; + + public void start() { + + InterProcessMutex mutex = null; + try { + // create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/failover/master + String znodeLock = getMasterStartUpLockPath(); + mutex = new InterProcessMutex(getZkClient(), znodeLock); + mutex.acquire(); + + // init system znode + this.initSystemZNode(); + + while (!checkZKNodeExists(NetUtils.getHost(), ZKNodeType.MASTER)) { + ThreadUtils.sleep(SLEEP_TIME_MILLIS); + } + + // self tolerant + if (getActiveMasterNum() == 1) { + failoverWorker(null, true); + failoverMaster(null); + } + + } catch (Exception e) { + logger.error("master start up exception", e); + } finally { + releaseMutex(mutex); + } + } + + @Override + public void close() { + super.close(); + } + + /** + * handle path events that this class cares about + * + * @param client zkClient + * @param event path event + * @param path zk path + */ + @Override + protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) { + //monitor master + if (path.startsWith(getZNodeParentPath(ZKNodeType.MASTER) + Constants.SINGLE_SLASH)) { + handleMasterEvent(event, path); + } else if (path.startsWith(getZNodeParentPath(ZKNodeType.WORKER) + Constants.SINGLE_SLASH)) { + //monitor worker + handleWorkerEvent(event, path); + } + } + + /** + * remove zookeeper node path + * + * @param path zookeeper node path + * @param zkNodeType zookeeper node type + * @param failover is failover + */ + private void removeZKNodePath(String path, ZKNodeType zkNodeType, boolean failover) { + logger.info("{} node deleted : {}", zkNodeType.toString(), path); + InterProcessMutex mutex = null; + try { + String failoverPath = getFailoverLockPath(zkNodeType); + // create a distributed lock + mutex = new InterProcessMutex(getZkClient(), failoverPath); + mutex.acquire(); + + String serverHost = getHostByEventDataPath(path); + // handle dead server + handleDeadServer(path, zkNodeType, Constants.ADD_ZK_OP); + //failover server + if (failover) { + failoverServerWhenDown(serverHost, zkNodeType); + } + } catch (Exception e) { + logger.error("{} server failover failed.", zkNodeType.toString()); + logger.error("failover exception ", e); + } finally { + releaseMutex(mutex); + } + } + + /** + * failover server when server down + * + * @param serverHost server host + * @param zkNodeType zookeeper node type + * @throws Exception exception + */ + private void failoverServerWhenDown(String serverHost, ZKNodeType zkNodeType) throws Exception { + if (StringUtils.isEmpty(serverHost)) { + return; + } + switch (zkNodeType) { + case MASTER: + failoverMaster(serverHost); + break; + case WORKER: + failoverWorker(serverHost, true); + break; + default: + break; + } + } + + /** + * get failover lock path + * + * @param zkNodeType zookeeper node type + * @return fail over lock path + */ + private String getFailoverLockPath(ZKNodeType zkNodeType) { + + switch (zkNodeType) { + case MASTER: + return getMasterFailoverLockPath(); + case WORKER: + return getWorkerFailoverLockPath(); + default: + return ""; + } + } + + /** + * monitor master + * + * @param event event + * @param path path + */ + public void handleMasterEvent(TreeCacheEvent event, String path) { + switch (event.getType()) { + case NODE_ADDED: + logger.info("master node added : {}", path); + break; + case NODE_REMOVED: + removeZKNodePath(path, ZKNodeType.MASTER, true); + break; + default: + break; + } + } + + /** + * monitor worker + * + * @param event event + * @param path path + */ + public void handleWorkerEvent(TreeCacheEvent event, String path) { + switch (event.getType()) { + case NODE_ADDED: + logger.info("worker node added : {}", path); + break; + case NODE_REMOVED: + logger.info("worker node deleted : {}", path); + removeZKNodePath(path, ZKNodeType.WORKER, true); + break; + default: + break; + } + } + + /** + * task needs failover if task start before worker starts + * + * @param taskInstance task instance + * @return true if task instance need fail over + */ + private boolean checkTaskInstanceNeedFailover(TaskInstance taskInstance) throws Exception { + + boolean taskNeedFailover = true; + + //now no host will execute this task instance,so no need to failover the task + if (taskInstance.getHost() == null) { + return false; + } + + // if the worker node exists in zookeeper, we must check the task starts after the worker + if (checkZKNodeExists(taskInstance.getHost(), ZKNodeType.WORKER)) { + //if task start after worker starts, there is no need to failover the task. + if (checkTaskAfterWorkerStart(taskInstance)) { + taskNeedFailover = false; + } + } + return taskNeedFailover; + } + + /** + * check task start after the worker server starts. + * + * @param taskInstance task instance + * @return true if task instance start time after worker server start date + */ + private boolean checkTaskAfterWorkerStart(TaskInstance taskInstance) { + if (StringUtils.isEmpty(taskInstance.getHost())) { + return false; + } + Date workerServerStartDate = null; + List workerServers = getServersList(ZKNodeType.WORKER); + for (Server workerServer : workerServers) { + if (taskInstance.getHost().equals(workerServer.getHost() + Constants.COLON + workerServer.getPort())) { + workerServerStartDate = workerServer.getCreateTime(); + break; + } + } + if (workerServerStartDate != null) { + return taskInstance.getStartTime().after(workerServerStartDate); + } + return false; + } + + /** + * failover worker tasks + * + * 1. kill yarn job if there are yarn jobs in tasks. + * 2. change task state from running to need failover. + * 3. failover all tasks when workerHost is null + * @param workerHost worker host + */ + + /** + * failover worker tasks + *

+ * 1. kill yarn job if there are yarn jobs in tasks. + * 2. change task state from running to need failover. + * 3. failover all tasks when workerHost is null + * + * @param workerHost worker host + * @param needCheckWorkerAlive need check worker alive + * @throws Exception exception + */ + private void failoverWorker(String workerHost, boolean needCheckWorkerAlive) throws Exception { + logger.info("start worker[{}] failover ...", workerHost); + + List needFailoverTaskInstanceList = processService.queryNeedFailoverTaskInstances(workerHost); + for (TaskInstance taskInstance : needFailoverTaskInstanceList) { + if (needCheckWorkerAlive) { + if (!checkTaskInstanceNeedFailover(taskInstance)) { + continue; + } + } + + ProcessInstance processInstance = processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId()); + if (processInstance != null) { + taskInstance.setProcessInstance(processInstance); + } + + TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get() + .buildTaskInstanceRelatedInfo(taskInstance) + .buildProcessInstanceRelatedInfo(processInstance) + .create(); + // only kill yarn job if exists , the local thread has exited + ProcessUtils.killYarnJob(taskExecutionContext); + + taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE); + processService.saveTaskInstance(taskInstance); + } + logger.info("end worker[{}] failover ...", workerHost); + } + + /** + * failover master tasks + * + * @param masterHost master host + */ + private void failoverMaster(String masterHost) { + logger.info("start master failover ..."); + + List needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(masterHost); + + //updateProcessInstance host is null and insert into command + for (ProcessInstance processInstance : needFailoverProcessInstanceList) { + if (Constants.NULL.equals(processInstance.getHost())) { + continue; + } + processService.processNeedFailoverProcessInstances(processInstance); + } + + logger.info("master failover end"); + } + + public InterProcessMutex blockAcquireMutex() throws Exception { + InterProcessMutex mutex = new InterProcessMutex(getZkClient(), getMasterLockPath()); + mutex.acquire(); + return mutex; + } } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 7344cf13e5..7fca37470d 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -896,7 +896,7 @@ public class ProcessService { return task; } if(!task.getState().typeIsFinished()){ - createSubWorkProcessCommand(processInstance, task); + createSubWorkProcess(processInstance, task); } logger.info("end submit task to db successfully:{} state:{} complete, instance id:{} state: {} ", @@ -906,20 +906,22 @@ public class ProcessService { /** * set work process instance map + * consider o + * repeat running does not generate new sub process instance + * set map {parent instance id, task instance id, 0(child instance id)} * @param parentInstance parentInstance * @param parentTask parentTask * @return process instance map */ private ProcessInstanceMap setProcessInstanceMap(ProcessInstance parentInstance, TaskInstance parentTask){ ProcessInstanceMap processMap = findWorkProcessMapByParent(parentInstance.getId(), parentTask.getId()); - if(processMap != null){ + if (processMap != null) { return processMap; - }else if(parentInstance.getCommandType() == CommandType.REPEAT_RUNNING - || parentInstance.isComplementData()){ + } + if (parentInstance.getCommandType() == CommandType.REPEAT_RUNNING) { // update current task id to map - // repeat running does not generate new sub process instance processMap = findPreviousTaskProcessMap(parentInstance, parentTask); - if(processMap!= null){ + if (processMap != null) { processMap.setParentTaskInstanceId(parentTask.getId()); updateWorkProcessInstanceMap(processMap); return processMap; @@ -944,11 +946,11 @@ public class ProcessService { Integer preTaskId = 0; List preTaskList = this.findPreviousTaskListByWorkProcessId(parentProcessInstance.getId()); - for(TaskInstance task : preTaskList){ - if(task.getName().equals(parentTask.getName())){ + for (TaskInstance task : preTaskList) { + if (task.getName().equals(parentTask.getName())) { preTaskId = task.getId(); ProcessInstanceMap map = findWorkProcessMapByParent(parentProcessInstance.getId(), preTaskId); - if(map!=null){ + if (map != null) { return map; } } @@ -960,66 +962,111 @@ public class ProcessService { /** * create sub work process command + * * @param parentProcessInstance parentProcessInstance - * @param task task + * @param task task */ - private void createSubWorkProcessCommand(ProcessInstance parentProcessInstance, - TaskInstance task){ - if(!task.isSubProcess()){ + public void createSubWorkProcess(ProcessInstance parentProcessInstance, TaskInstance task) { + if (!task.isSubProcess()) { return; } - ProcessInstanceMap instanceMap = setProcessInstanceMap(parentProcessInstance, task); - TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(), TaskNode.class); - Map subProcessParam = JSONUtils.toMap(taskNode.getParams()); - Integer childDefineId = Integer.parseInt(subProcessParam.get(Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID)); - - ProcessInstance childInstance = findSubProcessInstance(parentProcessInstance.getId(), task.getId()); - - CommandType fatherType = parentProcessInstance.getCommandType(); - CommandType commandType = fatherType; - if(childInstance == null){ - String fatherHistoryCommand = parentProcessInstance.getHistoryCmd(); - // sub process must begin with schedule/complement data - // if father begin with scheduler/complement data - if(fatherHistoryCommand.startsWith(CommandType.SCHEDULER.toString()) || - fatherHistoryCommand.startsWith(CommandType.COMPLEMENT_DATA.toString())){ - commandType = CommandType.valueOf(fatherHistoryCommand.split(Constants.COMMA)[0]); - } + //check create sub work flow firstly + ProcessInstanceMap instanceMap = findWorkProcessMapByParent(parentProcessInstance.getId(), task.getId()); + if (null != instanceMap && CommandType.RECOVER_TOLERANCE_FAULT_PROCESS == parentProcessInstance.getCommandType()) { + // recover failover tolerance would not create a new command when the sub command already have been created + return; } - - if(childInstance != null){ - childInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS); - updateProcessInstance(childInstance); + instanceMap = setProcessInstanceMap(parentProcessInstance, task); + ProcessInstance childInstance = null; + if (instanceMap.getProcessInstanceId() != 0) { + childInstance = findProcessInstanceById(instanceMap.getProcessInstanceId()); } + Command subProcessCommand = createSubProcessCommand(parentProcessInstance, childInstance, instanceMap, task); + updateSubProcessDefinitionByParent(parentProcessInstance, subProcessCommand.getProcessDefinitionId()); + initSubInstanceState(childInstance); + createCommand(subProcessCommand); + logger.info("sub process command created: {} ", subProcessCommand); + } + + /** + * complement data needs transform parent parameter to child. + * @param instanceMap + * @param parentProcessInstance + * @return + */ + private String getSubWorkFlowParam(ProcessInstanceMap instanceMap, ProcessInstance parentProcessInstance) { // set sub work process command String processMapStr = JSONUtils.toJsonString(instanceMap); Map cmdParam = JSONUtils.toMap(processMapStr); - - if(commandType == CommandType.COMPLEMENT_DATA || - (childInstance != null && childInstance.isComplementData())){ + if (parentProcessInstance.isComplementData()) { Map parentParam = JSONUtils.toMap(parentProcessInstance.getCommandParam()); - String endTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE); - String startTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE); + String endTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE); + String startTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE); cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, endTime); cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, startTime); processMapStr = JSONUtils.toJsonString(cmdParam); } + return processMapStr; + } - updateSubProcessDefinitionByParent(parentProcessInstance, childDefineId); + /** + * create sub work process command + * @param parentProcessInstance + * @param childInstance + * @param instanceMap + * @param task + */ + public Command createSubProcessCommand(ProcessInstance parentProcessInstance, + ProcessInstance childInstance, + ProcessInstanceMap instanceMap, + TaskInstance task) { + CommandType commandType = getSubCommandType(parentProcessInstance, childInstance); + TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(), TaskNode.class); + Map subProcessParam = JSONUtils.toMap(taskNode.getParams()); + Integer childDefineId = Integer.parseInt(subProcessParam.get(Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID)); + String processParam = getSubWorkFlowParam(instanceMap, parentProcessInstance); + + return new Command( + commandType, + TaskDependType.TASK_POST, + parentProcessInstance.getFailureStrategy(), + parentProcessInstance.getExecutorId(), + childDefineId, + processParam, + parentProcessInstance.getWarningType(), + parentProcessInstance.getWarningGroupId(), + parentProcessInstance.getScheduleTime(), + parentProcessInstance.getProcessInstancePriority() + ); + } + + /** + * initialize sub work flow state + * child instance state would be initialized when 'recovery from pause/stop/failure' + * @param childInstance + */ + private void initSubInstanceState(ProcessInstance childInstance) { + if (childInstance != null) { + childInstance.setState(ExecutionStatus.RUNNING_EXECUTION); + updateProcessInstance(childInstance); + } + } - Command command = new Command(); - command.setWarningType(parentProcessInstance.getWarningType()); - command.setWarningGroupId(parentProcessInstance.getWarningGroupId()); - command.setFailureStrategy(parentProcessInstance.getFailureStrategy()); - command.setProcessDefinitionId(childDefineId); - command.setScheduleTime(parentProcessInstance.getScheduleTime()); - command.setExecutorId(parentProcessInstance.getExecutorId()); - command.setCommandParam(processMapStr); - command.setCommandType(commandType); - command.setProcessInstancePriority(parentProcessInstance.getProcessInstancePriority()); - command.setWorkerGroup(parentProcessInstance.getWorkerGroup()); - createCommand(command); - logger.info("sub process command created: {} ", command.toString()); + /** + * get sub work flow command type + * child instance exist: child command = fatherCommand + * child instance not exists: child command = fatherCommand[0] + * + * @param parentProcessInstance + * @return + */ + private CommandType getSubCommandType(ProcessInstance parentProcessInstance, ProcessInstance childInstance) { + CommandType commandType = parentProcessInstance.getCommandType(); + if (childInstance == null) { + String fatherHistoryCommand = parentProcessInstance.getHistoryCmd(); + commandType = CommandType.valueOf(fatherHistoryCommand.split(Constants.COMMA)[0]); + } + return commandType; } /** @@ -1497,20 +1544,6 @@ public class ProcessService { return result; } - /** - * update pid and app links field by task instance id - * @param taskInstId taskInstId - * @param pid pid - * @param appLinks appLinks - */ - public void updatePidByTaskInstId(int taskInstId, int pid,String appLinks) { - - TaskInstance taskInstance = taskInstanceMapper.selectById(taskInstId); - taskInstance.setPid(pid); - taskInstance.setAppLink(appLinks); - saveTaskInstance(taskInstance); - } - /** * query schedule by id * @param id id diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java new file mode 100644 index 0000000000..74b52bb316 --- /dev/null +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.service.process; + +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.WarningType; +import org.apache.dolphinscheduler.common.utils.DateUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.entity.Command; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; + +import java.util.Date; +import java.util.HashMap; +import java.util.Map; + +import org.junit.Assert; +import org.junit.Test; + +import com.fasterxml.jackson.databind.JsonNode; + +/** + * process service test + */ +public class ProcessServiceTest { + + @Test + public void testCreateSubCommand() { + ProcessService processService = new ProcessService(); + ProcessInstance parentInstance = new ProcessInstance(); + parentInstance.setProcessDefinitionId(1); + parentInstance.setWarningType(WarningType.SUCCESS); + parentInstance.setWarningGroupId(0); + + TaskInstance task = new TaskInstance(); + task.setTaskJson("{\"params\":{\"processDefinitionId\":100}}"); + task.setId(10); + + ProcessInstance childInstance = null; + ProcessInstanceMap instanceMap = new ProcessInstanceMap(); + instanceMap.setParentProcessInstanceId(1); + instanceMap.setParentTaskInstanceId(10); + Command command = null; + + //father history: start; child null == command type: start + parentInstance.setHistoryCmd("START_PROCESS"); + parentInstance.setCommandType(CommandType.START_PROCESS); + command = processService.createSubProcessCommand( + parentInstance, childInstance, instanceMap, task + ); + Assert.assertEquals(CommandType.START_PROCESS, command.getCommandType()); + + //father history: start,start failure; child null == command type: start + parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS); + parentInstance.setHistoryCmd("START_PROCESS,START_FAILURE_TASK_PROCESS"); + command = processService.createSubProcessCommand( + parentInstance, childInstance, instanceMap, task + ); + Assert.assertEquals(CommandType.START_PROCESS, command.getCommandType()); + + //father history: scheduler,start failure; child null == command type: scheduler + parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS); + parentInstance.setHistoryCmd("SCHEDULER,START_FAILURE_TASK_PROCESS"); + command = processService.createSubProcessCommand( + parentInstance, childInstance, instanceMap, task + ); + Assert.assertEquals(CommandType.SCHEDULER, command.getCommandType()); + + //father history: complement,start failure; child null == command type: complement + + String startString = "2020-01-01 00:00:00"; + String endString = "2020-01-10 00:00:00"; + parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS); + parentInstance.setHistoryCmd("COMPLEMENT_DATA,START_FAILURE_TASK_PROCESS"); + Map complementMap = new HashMap<>(); + complementMap.put(Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE, startString); + complementMap.put(Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE, endString); + parentInstance.setCommandParam(JSONUtils.toJsonString(complementMap)); + command = processService.createSubProcessCommand( + parentInstance, childInstance, instanceMap, task + ); + Assert.assertEquals(CommandType.COMPLEMENT_DATA, command.getCommandType()); + + JsonNode complementDate = JSONUtils.parseObject(command.getCommandParam()); + Date start = DateUtils.stringToDate(complementDate.get(Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE).asText()); + Date end = DateUtils.stringToDate(complementDate.get(Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE).asText()); + Assert.assertEquals(startString, DateUtils.dateToString(start)); + Assert.assertEquals(endString, DateUtils.dateToString(end)); + + //father history: start,failure,start failure; child not null == command type: start failure + childInstance = new ProcessInstance(); + parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS); + parentInstance.setHistoryCmd("START_PROCESS,START_FAILURE_TASK_PROCESS"); + command = processService.createSubProcessCommand( + parentInstance, childInstance, instanceMap, task + ); + Assert.assertEquals(CommandType.START_FAILURE_TASK_PROCESS, command.getCommandType()); + } +} diff --git a/pom.xml b/pom.xml index e895b01d89..9e4934e833 100644 --- a/pom.xml +++ b/pom.xml @@ -853,6 +853,7 @@ **/server/worker/EnvFileTest.java **/server/worker/runner/TaskExecuteThreadTest.java **/service/quartz/cron/CronUtilsTest.java + **/service/process/ProcessServiceTest.java **/service/zk/DefaultEnsembleProviderTest.java **/service/zk/ZKServerTest.java **/service/zk/CuratorZookeeperClientTest.java From 9de7d3c7725588393b4e90bef28697dda4b20f5d Mon Sep 17 00:00:00 2001 From: bao liang <29528966+lenboo@users.noreply.github.com> Date: Mon, 12 Oct 2020 10:23:44 +0800 Subject: [PATCH 16/18] [FIX-3573][DAO] potential horizontal unauthorized access (#3880) * fix bug[#3573] potential horizontal unauthorized access * fix bug[#3573] potential horizontal unauthorized access Co-authored-by: baoliang --- .../dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml index 831c4a9c23..83864b5163 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml @@ -167,7 +167,7 @@ + select pd.* + from t_ds_process_definition pd + WHERE pd.project_id = #{projectId} + and pd.name = #{processDefinitionName} +