From 5185d283f9bea4d0a345b256a080b8fa5ee913f6 Mon Sep 17 00:00:00 2001 From: break60 <790061044@qq.com> Date: Tue, 14 Apr 2020 14:28:03 +0800 Subject: [PATCH 1/8] Change the name of the shell node and modify the parameter transmission method of spark, mr, python, and flink nodes (#2416) * Fix the problem of data echo in script edit box * Optimize resource tree * Change the name of the shell node and modify the parameter transmission method of spark, mr, python, and flink nodes Co-authored-by: dailidong --- .../dag/_source/formModel/tasks/flink.vue | 107 +++++++++++++++- .../pages/dag/_source/formModel/tasks/mr.vue | 118 +++++++++++++++-- .../dag/_source/formModel/tasks/python.vue | 119 ++++++++++++++++-- .../dag/_source/formModel/tasks/shell.vue | 4 +- .../dag/_source/formModel/tasks/spark.vue | 107 +++++++++++++++- .../src/js/module/i18n/locale/en_US.js | 2 +- .../src/js/module/i18n/locale/zh_CN.js | 2 +- 7 files changed, 425 insertions(+), 34 deletions(-) diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue index 50866afc42..30b644dd69 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue @@ -211,7 +211,9 @@ return { label: node.name } - } + }, + allNoResources: [], + noRes: [], } }, props: { @@ -300,6 +302,12 @@ return false } + // noRes + if (this.noRes.length>0) { + this.$message.warning(`${i18n.$t('Please delete all non-existent resources')}`) + return false + } + // localParams Subcomponent verification if (!this.$refs.refLocalParams._verifProp()) { return false @@ -339,6 +347,67 @@ } delete item.children }, + searchTree(element, id) { + // 根据id查找节点 + if (element.id == id) { + return element; + } else if (element.children != null) { + var i; + var result = null; + for (i = 0; result == null && i < element.children.length; i++) { + result = this.searchTree(element.children[i], id); + } + return result; + } + return null; + }, + dataProcess(backResource) { + let isResourceId = [] + let resourceIdArr = [] + if(this.resourceList.length>0) { + this.resourceList.forEach(v=>{ + this.mainJarList.forEach(v1=>{ + if(this.searchTree(v1,v)) { + isResourceId.push(this.searchTree(v1,v)) + } + }) + }) + resourceIdArr = isResourceId.map(item=>{ + return item.id + }) + Array.prototype.diff = function(a) { + return this.filter(function(i) {return a.indexOf(i) < 0;}); + }; + let diffSet = this.resourceList.diff(resourceIdArr); + let optionsCmp = [] + if(diffSet.length>0) { + diffSet.forEach(item=>{ + backResource.forEach(item1=>{ + if(item==item1.id || item==item1.res) { + optionsCmp.push(item1) + } + }) + }) + } + let noResources = [{ + id: -1, + name: $t('No resources'), + fullName: '/'+$t('No resources'), + children: [] + }] + if(optionsCmp.length>0) { + this.allNoResources = optionsCmp + optionsCmp = optionsCmp.map(item=>{ + return {id: item.id,name: item.name,fullName: item.res} + }) + optionsCmp.forEach(item=>{ + item.isNew = true + }) + noResources[0].children = optionsCmp + this.mainJarList = this.mainJarList.concat(noResources) + } + } + }, }, watch: { // Listening type @@ -354,15 +423,37 @@ }, computed: { cacheParams () { + let isResourceId = [] + let resourceIdArr = [] + if(this.resourceList.length>0) { + this.resourceList.forEach(v=>{ + this.mainJarList.forEach(v1=>{ + if(this.searchTree(v1,v)) { + isResourceId.push(this.searchTree(v1,v)) + } + }) + }) + resourceIdArr = isResourceId.map(item=>{ + return {id: item.id,name: item.name,res: item.fullName} + }) + } + let result = [] + resourceIdArr.forEach(item=>{ + this.allNoResources.forEach(item1=>{ + if(item.id==item1.id) { + // resultBool = true + result.push(item1) + } + }) + }) + this.noRes = result return { mainClass: this.mainClass, mainJar: { id: this.mainJar }, deployMode: this.deployMode, - resourceList: _.map(this.resourceList, v => { - return {id: v} - }), + resourceList: resourceIdArr, localParams: this.localParams, slot: this.slot, taskManager: this.taskManager, @@ -404,20 +495,24 @@ this.programType = o.params.programType || 'SCALA' // backfill resourceList + let backResource = o.params.resourceList || [] let resourceList = o.params.resourceList || [] if (resourceList.length) { _.map(resourceList, v => { - if(v.res) { + if(!v.id) { this.store.dispatch('dag/getResourceId',{ type: 'FILE', fullName: '/'+v.res }).then(res => { this.resourceList.push(res.id) + this.dataProcess(backResource) }).catch(e => { - this.$message.error(e.msg || '') + this.resourceList.push(v.res) + this.dataProcess(backResource) }) } else { this.resourceList.push(v.id) + this.dataProcess(backResource) } }) this.cacheResourceList = resourceList diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/mr.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/mr.vue index 121147d7e5..1df1f93ba9 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/mr.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/mr.vue @@ -44,7 +44,7 @@
{{$t('Main jar package')}}
- +
{{ node.raw.fullName }}
@@ -109,6 +109,7 @@ name: 'mr', data () { return { + valueConsistsOf: 'LEAF_PRIORITY', // Main function class mainClass: '', // Master jar package @@ -134,7 +135,9 @@ return { label: node.name } - } + }, + allNoResources: [], + noRes: [] } }, props: { @@ -176,9 +179,76 @@ diGuiTree(item) { // Recursive convenience tree structure item.forEach(item => { item.children === '' || item.children === undefined || item.children === null || item.children.length === 0?         - delete item.children : this.diGuiTree(item.children); + this.operationTree(item) : this.diGuiTree(item.children); }) }, + operationTree(item) { + if(item.dirctory) { + item.isDisabled =true + } + delete item.children + }, + searchTree(element, id) { + // 根据id查找节点 + if (element.id == id) { + return element; + } else if (element.children != null) { + var i; + var result = null; + for (i = 0; result == null && i < element.children.length; i++) { + result = this.searchTree(element.children[i], id); + } + return result; + } + return null; + }, + dataProcess(backResource) { + let isResourceId = [] + let resourceIdArr = [] + if(this.resourceList.length>0) { + this.resourceList.forEach(v=>{ + this.mainJarList.forEach(v1=>{ + if(this.searchTree(v1,v)) { + isResourceId.push(this.searchTree(v1,v)) + } + }) + }) + resourceIdArr = isResourceId.map(item=>{ + return item.id + }) + Array.prototype.diff = function(a) { + return this.filter(function(i) {return a.indexOf(i) < 0;}); + }; + let diffSet = this.resourceList.diff(resourceIdArr); + let optionsCmp = [] + if(diffSet.length>0) { + diffSet.forEach(item=>{ + backResource.forEach(item1=>{ + if(item==item1.id || item==item1.res) { + optionsCmp.push(item1) + } + }) + }) + } + let noResources = [{ + id: -1, + name: $t('No resources'), + fullName: '/'+$t('No resources'), + children: [] + }] + if(optionsCmp.length>0) { + this.allNoResources = optionsCmp + optionsCmp = optionsCmp.map(item=>{ + return {id: item.id,name: item.name,fullName: item.res} + }) + optionsCmp.forEach(item=>{ + item.isNew = true + }) + noResources[0].children = optionsCmp + this.mainJarList = this.mainJarList.concat(noResources) + } + } + }, /** * verification */ @@ -193,6 +263,12 @@ return false } + // noRes + if (this.noRes.length>0) { + this.$message.warning(`${i18n.$t('Please delete all non-existent resources')}`) + return false + } + // localParams Subcomponent verification if (!this.$refs.refLocalParams._verifProp()) { return false @@ -231,14 +307,36 @@ }, computed: { cacheParams () { + let isResourceId = [] + let resourceIdArr = [] + if(this.resourceList.length>0) { + this.resourceList.forEach(v=>{ + this.mainJarList.forEach(v1=>{ + if(this.searchTree(v1,v)) { + isResourceId.push(this.searchTree(v1,v)) + } + }) + }) + resourceIdArr = isResourceId.map(item=>{ + return {id: item.id,name: item.name,res: item.fullName} + }) + } + let result = [] + resourceIdArr.forEach(item=>{ + this.allNoResources.forEach(item1=>{ + if(item.id==item1.id) { + // resultBool = true + result.push(item1) + } + }) + }) + this.noRes = result return { mainClass: this.mainClass, mainJar: { id: this.mainJar }, - resourceList: _.map(this.resourceList, v => { - return {id: v} - }), + resourceList: resourceIdArr, localParams: this.localParams, mainArgs: this.mainArgs, others: this.others, @@ -273,23 +371,27 @@ let resourceList = o.params.resourceList || [] if (resourceList.length) { _.map(resourceList, v => { - if(v.res) { + if(!v.id) { this.store.dispatch('dag/getResourceId',{ type: 'FILE', fullName: '/'+v.res }).then(res => { this.resourceList.push(res.id) + this.dataProcess(backResource) }).catch(e => { - this.$message.error(e.msg || '') + this.resourceList.push(v.res) + this.dataProcess(backResource) }) } else { this.resourceList.push(v.id) + this.dataProcess(backResource) } }) this.cacheResourceList = resourceList } // backfill localParams + let backResource = o.params.resourceList || [] let localParams = o.params.localParams || [] if (localParams.length) { this.localParams = localParams diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/python.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/python.vue index 67669b4654..3e2abcbb00 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/python.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/python.vue @@ -80,6 +80,13 @@ // Cache ResourceList cacheResourceList: [], resourceOptions: [], + normalizer(node) { + return { + label: node.name + } + }, + allNoResources: [], + noRes: [] } }, mixins: [disabledState], @@ -96,9 +103,9 @@ /** * return resourceList */ - _onResourcesData (a) { - this.resourceList = a - }, + // _onResourcesData (a) { + // this.resourceList = a + // }, /** * cache resourceList */ @@ -120,6 +127,12 @@ return false } + // noRes + if (this.noRes.length>0) { + this.$message.warning(`${i18n.$t('Please delete all non-existent resources')}`) + return false + } + // storage this.$emit('on-params', { resourceList: _.map(this.resourceList, v => { @@ -166,6 +179,67 @@ item.isDisabled =true } delete item.children + }, + searchTree(element, id) { + // 根据id查找节点 + if (element.id == id) { + return element; + } else if (element.children != null) { + var i; + var result = null; + for (i = 0; result == null && i < element.children.length; i++) { + result = this.searchTree(element.children[i], id); + } + return result; + } + return null; + }, + dataProcess(backResource) { + let isResourceId = [] + let resourceIdArr = [] + if(this.resourceList.length>0) { + this.resourceList.forEach(v=>{ + this.resourceOptions.forEach(v1=>{ + if(this.searchTree(v1,v)) { + isResourceId.push(this.searchTree(v1,v)) + } + }) + }) + resourceIdArr = isResourceId.map(item=>{ + return item.id + }) + Array.prototype.diff = function(a) { + return this.filter(function(i) {return a.indexOf(i) < 0;}); + }; + let diffSet = this.resourceList.diff(resourceIdArr); + let optionsCmp = [] + if(diffSet.length>0) { + diffSet.forEach(item=>{ + backResource.forEach(item1=>{ + if(item==item1.id || item==item1.res) { + optionsCmp.push(item1) + } + }) + }) + } + let noResources = [{ + id: -1, + name: $t('No resources'), + fullName: '/'+$t('No resources'), + children: [] + }] + if(optionsCmp.length>0) { + this.allNoResources = optionsCmp + optionsCmp = optionsCmp.map(item=>{ + return {id: item.id,name: item.name,fullName: item.res} + }) + optionsCmp.forEach(item=>{ + item.isNew = true + }) + noResources[0].children = optionsCmp + this.resourceOptions = this.resourceOptions.concat(noResources) + } + } } }, watch: { @@ -176,10 +250,32 @@ }, computed: { cacheParams () { + let isResourceId = [] + let resourceIdArr = [] + if(this.resourceList.length>0) { + this.resourceList.forEach(v=>{ + this.resourceOptions.forEach(v1=>{ + if(this.searchTree(v1,v)) { + isResourceId.push(this.searchTree(v1,v)) + } + }) + }) + resourceIdArr = isResourceId.map(item=>{ + return {id: item.id,name: item.name,res: item.fullName} + }) + } + let result = [] + resourceIdArr.forEach(item=>{ + this.allNoResources.forEach(item1=>{ + if(item.id==item1.id) { + // resultBool = true + result.push(item1) + } + }) + }) + this.noRes = result return { - resourceList: _.map(this.resourceList, v => { - return {id: v} - }), + resourceList: resourceIdArr, localParams: this.localParams } } @@ -187,7 +283,7 @@ created () { let item = this.store.state.dag.resourcesListS this.diGuiTree(item) - this.options = item + this.resourceOptions = item let o = this.backfillItem // Non-null objects represent backfill @@ -195,17 +291,20 @@ this.rawScript = o.params.rawScript || '' // backfill resourceList + let backResource = o.params.resourceList || [] let resourceList = o.params.resourceList || [] if (resourceList.length) { _.map(resourceList, v => { - if(v.res) { + if(!v.id) { this.store.dispatch('dag/getResourceId',{ type: 'FILE', fullName: '/'+v.res }).then(res => { this.resourceList.push(res.id) + this.dataProcess(backResource) }).catch(e => { - this.$message.error(e.msg || '') + this.resourceList.push(v.res) + this.dataProcess(backResource) }) } else { this.resourceList.push(v.id) @@ -230,6 +329,6 @@ editor.toTextArea() // Uninstall editor.off($('.code-python-mirror'), 'keypress', this.keypress) }, - components: { mLocalParams, mListBox, mResources } + components: { mLocalParams, mListBox, mResources,Treeselect } } diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/shell.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/shell.vue index df315a14f1..b627602e04 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/shell.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/shell.vue @@ -266,8 +266,8 @@ } let noResources = [{ id: -1, - name: $t('No resources'), - fullName: '/'+$t('No resources'), + name: $t('Unauthorized or deleted resources'), + fullName: '/'+$t('Unauthorized or deleted resources'), children: [] }] if(optionsCmp.length>0) { diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/spark.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/spark.vue index 7a00528149..2304ab1d20 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/spark.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/spark.vue @@ -254,7 +254,9 @@ return { label: node.name } - } + }, + allNoResources: [], + noRes: [] } }, props: { @@ -305,6 +307,67 @@ } delete item.children }, + searchTree(element, id) { + // 根据id查找节点 + if (element.id == id) { + return element; + } else if (element.children != null) { + var i; + var result = null; + for (i = 0; result == null && i < element.children.length; i++) { + result = this.searchTree(element.children[i], id); + } + return result; + } + return null; + }, + dataProcess(backResource) { + let isResourceId = [] + let resourceIdArr = [] + if(this.resourceList.length>0) { + this.resourceList.forEach(v=>{ + this.mainJarList.forEach(v1=>{ + if(this.searchTree(v1,v)) { + isResourceId.push(this.searchTree(v1,v)) + } + }) + }) + resourceIdArr = isResourceId.map(item=>{ + return item.id + }) + Array.prototype.diff = function(a) { + return this.filter(function(i) {return a.indexOf(i) < 0;}); + }; + let diffSet = this.resourceList.diff(resourceIdArr); + let optionsCmp = [] + if(diffSet.length>0) { + diffSet.forEach(item=>{ + backResource.forEach(item1=>{ + if(item==item1.id || item==item1.res) { + optionsCmp.push(item1) + } + }) + }) + } + let noResources = [{ + id: -1, + name: $t('No resources'), + fullName: '/'+$t('No resources'), + children: [] + }] + if(optionsCmp.length>0) { + this.allNoResources = optionsCmp + optionsCmp = optionsCmp.map(item=>{ + return {id: item.id,name: item.name,fullName: item.res} + }) + optionsCmp.forEach(item=>{ + item.isNew = true + }) + noResources[0].children = optionsCmp + this.mainJarList = this.mainJarList.concat(noResources) + } + } + }, /** * verification */ @@ -324,6 +387,12 @@ return false } + // noRes + if (this.noRes.length>0) { + this.$message.warning(`${i18n.$t('Please delete all non-existent resources')}`) + return false + } + if (!Number.isInteger(parseInt(this.numExecutors))) { this.$message.warning(`${i18n.$t('The number of Executors should be a positive integer')}`) return false @@ -400,15 +469,37 @@ }, computed: { cacheParams () { + let isResourceId = [] + let resourceIdArr = [] + if(this.resourceList.length>0) { + this.resourceList.forEach(v=>{ + this.mainJarList.forEach(v1=>{ + if(this.searchTree(v1,v)) { + isResourceId.push(this.searchTree(v1,v)) + } + }) + }) + resourceIdArr = isResourceId.map(item=>{ + return {id: item.id,name: item.name,res: item.fullName} + }) + } + let result = [] + resourceIdArr.forEach(item=>{ + this.allNoResources.forEach(item1=>{ + if(item.id==item1.id) { + // resultBool = true + result.push(item1) + } + }) + }) + this.noRes = result return { mainClass: this.mainClass, mainJar: { id: this.mainJar }, deployMode: this.deployMode, - resourceList: _.map(this.resourceList, v => { - return {id: v} - }), + resourceList: resourceIdArr, localParams: this.localParams, driverCores: this.driverCores, driverMemory: this.driverMemory, @@ -453,20 +544,24 @@ this.sparkVersion = o.params.sparkVersion || 'SPARK2' // backfill resourceList + let backResource = o.params.resourceList || [] let resourceList = o.params.resourceList || [] if (resourceList.length) { _.map(resourceList, v => { - if(v.res) { + if(!v.id) { this.store.dispatch('dag/getResourceId',{ type: 'FILE', fullName: '/'+v.res }).then(res => { this.resourceList.push(res.id) + this.dataProcess(backResource) }).catch(e => { - this.$message.error(e.msg || '') + this.resourceList.push(v.res) + this.dataProcess(backResource) }) } else { this.resourceList.push(v.id) + this.dataProcess(backResource) } }) this.cacheResourceList = resourceList diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js index 53dce962d8..db19e1fc0e 100755 --- a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js +++ b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js @@ -588,6 +588,6 @@ export default { 'Branch flow': 'Branch flow', 'Cannot select the same node for successful branch flow and failed branch flow': 'Cannot select the same node for successful branch flow and failed branch flow', 'Successful branch flow and failed branch flow are required': 'Successful branch flow and failed branch flow are required', - 'No resources': 'No resources', + 'Unauthorized or deleted resources': 'Unauthorized or deleted resources', 'Please delete all non-existent resources': 'Please delete all non-existent resources', } diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js index 5e0880ee38..80b45c7a11 100755 --- a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js +++ b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js @@ -588,6 +588,6 @@ export default { 'Branch flow': '分支流转', 'Cannot select the same node for successful branch flow and failed branch flow': '成功分支流转和失败分支流转不能选择同一个节点', 'Successful branch flow and failed branch flow are required': '成功分支流转和失败分支流转必填', - 'No resources': '未授权或已删除资源', + 'Unauthorized or deleted resources': '未授权或已删除资源', 'Please delete all non-existent resources': '请删除所有未授权或已删除资源', } From c881c9517e4348acc53c315ea6767d5c6d86f48d Mon Sep 17 00:00:00 2001 From: lgcareer <18610854716@163.com> Date: Wed, 15 Apr 2020 10:22:02 +0800 Subject: [PATCH 2/8] It's to remove `static` of method dataSource,If not remove `static` the transaction will not work. (#2422) * It's to remove `static` of method dataSource,If not remove `static` the transaction will not work. * update testQueryDetailsById because it didn't run success --- .../datasource/SpringConnectionFactory.java | 2 +- .../dao/mapper/UserMapperTest.java | 29 +++++++++++++++---- 2 files changed, 25 insertions(+), 6 deletions(-) diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java index 1d6dc5f51a..9e27d949aa 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java @@ -64,7 +64,7 @@ public class SpringConnectionFactory { * @return druid dataSource */ @Bean(destroyMethod="") - public static DruidDataSource dataSource() { + public DruidDataSource dataSource() { DruidDataSource druidDataSource = new DruidDataSource(); diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/UserMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/UserMapperTest.java index 651ca93f27..7b1849ef4d 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/UserMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/UserMapperTest.java @@ -16,11 +16,11 @@ */ package org.apache.dolphinscheduler.dao.mapper; +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import org.apache.dolphinscheduler.common.enums.AlertType; import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.utils.DateUtils; -import com.baomidou.mybatisplus.core.metadata.IPage; -import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import org.apache.dolphinscheduler.dao.entity.*; import org.junit.Assert; import org.junit.Test; @@ -179,6 +179,23 @@ public class UserMapperTest { return tenant; } + /** + * insert one Tenant + * @return Tenant + */ + private Tenant insertOneTenant(Queue queue){ + Tenant tenant = new Tenant(); + tenant.setTenantCode("dolphin"); + tenant.setTenantName("dolphin test"); + tenant.setDescription("dolphin user use"); + tenant.setQueueId(queue.getId()); + tenant.setQueue(queue.getQueue()); + tenant.setCreateTime(new Date()); + tenant.setUpdateTime(new Date()); + tenantMapper.insert(tenant); + return tenant; + } + /** * insert one Queue * @return Queue @@ -291,11 +308,13 @@ public class UserMapperTest { */ @Test public void testQueryDetailsById() { - //insertOne - User user = insertOne(); + //insertOneQueue and insertOneTenant + Queue queue = insertOneQueue(); + Tenant tenant = insertOneTenant(queue); + User user = insertOne(queue,tenant); //queryDetailsById User queryUser = userMapper.queryDetailsById(user.getId()); - Assert.assertEquals(queryUser.getUserName(), queryUser.getUserName()); + Assert.assertEquals(user.getUserName(), queryUser.getUserName()); } /** From 0bad7c2f539ef4aea8956610146cd70a7b8267ee Mon Sep 17 00:00:00 2001 From: lgcareer <18610854716@163.com> Date: Wed, 15 Apr 2020 13:50:06 +0800 Subject: [PATCH 3/8] add worker_group field and remove worker_group_id field of the table t_ds_command and t_ds_error_command (#2428) --- .../mysql/dolphinscheduler_ddl.sql | 79 +++++++++++++++++ .../postgresql/dolphinscheduler_ddl.sql | 84 +++++++++++++++++++ 2 files changed, 163 insertions(+) diff --git a/sql/upgrade/1.2.2_schema/mysql/dolphinscheduler_ddl.sql b/sql/upgrade/1.2.2_schema/mysql/dolphinscheduler_ddl.sql index 6a8665f199..33d801d4ec 100644 --- a/sql/upgrade/1.2.2_schema/mysql/dolphinscheduler_ddl.sql +++ b/sql/upgrade/1.2.2_schema/mysql/dolphinscheduler_ddl.sql @@ -277,4 +277,83 @@ delimiter ; CALL dc_dolphin_T_t_ds_schedules_D_worker_group_id; DROP PROCEDURE dc_dolphin_T_t_ds_schedules_D_worker_group_id; +-- ac_dolphin_T_t_ds_command_A_worker_group +drop PROCEDURE if EXISTS ac_dolphin_T_t_ds_command_A_worker_group; +delimiter d// +CREATE PROCEDURE ac_dolphin_T_t_ds_command_A_worker_group() + BEGIN + IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_NAME='t_ds_command' + AND TABLE_SCHEMA=(SELECT DATABASE()) + AND COLUMN_NAME ='worker_group') + THEN + ALTER TABLE t_ds_command ADD `worker_group` varchar(255) DEFAULT '' COMMENT 'worker group'; + END IF; + END; + +d// + +delimiter ; +CALL ac_dolphin_T_t_ds_command_A_worker_group; +DROP PROCEDURE ac_dolphin_T_t_ds_command_A_worker_group; + +-- dc_dolphin_T_t_ds_command_D_worker_group_id +drop PROCEDURE if EXISTS dc_dolphin_T_t_ds_command_D_worker_group_id; +delimiter d// +CREATE PROCEDURE dc_dolphin_T_t_ds_command_D_worker_group_id() + BEGIN + IF EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_NAME='t_ds_command' + AND TABLE_SCHEMA=(SELECT DATABASE()) + AND COLUMN_NAME ='worker_group_id') + THEN + ALTER TABLE t_ds_command DROP COLUMN worker_group_id; + END IF; + END; + +d// + +delimiter ; +CALL dc_dolphin_T_t_ds_command_D_worker_group_id; +DROP PROCEDURE dc_dolphin_T_t_ds_command_D_worker_group_id; + +-- ac_dolphin_T_t_ds_error_command_A_worker_group +drop PROCEDURE if EXISTS ac_dolphin_T_t_ds_error_command_A_worker_group; +delimiter d// +CREATE PROCEDURE ac_dolphin_T_t_ds_error_command_A_worker_group() + BEGIN + IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_NAME='t_ds_error_command' + AND TABLE_SCHEMA=(SELECT DATABASE()) + AND COLUMN_NAME ='worker_group') + THEN + ALTER TABLE t_ds_error_command ADD `worker_group` varchar(255) DEFAULT '' COMMENT 'worker group'; + END IF; + END; + +d// + +delimiter ; +CALL ac_dolphin_T_t_ds_error_command_A_worker_group; +DROP PROCEDURE ac_dolphin_T_t_ds_error_command_A_worker_group; + +-- dc_dolphin_T_t_ds_error_command_D_worker_group_id +drop PROCEDURE if EXISTS dc_dolphin_T_t_ds_error_command_D_worker_group_id; +delimiter d// +CREATE PROCEDURE dc_dolphin_T_t_ds_error_command_D_worker_group_id() + BEGIN + IF EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_NAME='t_ds_error_command' + AND TABLE_SCHEMA=(SELECT DATABASE()) + AND COLUMN_NAME ='worker_group_id') + THEN + ALTER TABLE t_ds_error_command DROP COLUMN worker_group_id; + END IF; + END; + +d// + +delimiter ; +CALL dc_dolphin_T_t_ds_error_command_D_worker_group_id; +DROP PROCEDURE dc_dolphin_T_t_ds_error_command_D_worker_group_id; diff --git a/sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_ddl.sql b/sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_ddl.sql index dd332533a6..a4fdc2d6bf 100644 --- a/sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_ddl.sql +++ b/sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_ddl.sql @@ -279,4 +279,88 @@ delimiter ; select dc_dolphin_T_t_ds_schedules_D_worker_group_id(); DROP FUNCTION dc_dolphin_T_t_ds_schedules_D_worker_group_id(); +-- ac_dolphin_T_t_ds_command_A_worker_group +delimiter ; +DROP FUNCTION IF EXISTS ac_dolphin_T_t_ds_command_A_worker_group(); +delimiter d// +CREATE FUNCTION ac_dolphin_T_t_ds_command_A_worker_group() RETURNS void AS $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_CATALOG=current_database() + AND TABLE_SCHEMA=current_schema() + AND TABLE_NAME='t_ds_command' + AND COLUMN_NAME ='worker_group') + THEN + ALTER TABLE t_ds_command ADD COLUMN worker_group varchar(255) DEFAULT null; + END IF; +END; +$$ LANGUAGE plpgsql; +d// +delimiter ; +select ac_dolphin_T_t_ds_command_A_worker_group(); +DROP FUNCTION ac_dolphin_T_t_ds_command_A_worker_group(); + +-- dc_dolphin_T_t_ds_command_D_worker_group_id +delimiter ; +DROP FUNCTION IF EXISTS dc_dolphin_T_t_ds_command_D_worker_group_id(); +delimiter d// +CREATE FUNCTION dc_dolphin_T_t_ds_command_D_worker_group_id() RETURNS void AS $$ +BEGIN + IF EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_CATALOG=current_database() + AND TABLE_SCHEMA=current_schema() + AND TABLE_NAME='t_ds_command' + AND COLUMN_NAME ='worker_group_id') + THEN + ALTER TABLE t_ds_command DROP COLUMN worker_group_id; + END IF; +END; +$$ LANGUAGE plpgsql; +d// +delimiter ; +select dc_dolphin_T_t_ds_command_D_worker_group_id(); +DROP FUNCTION dc_dolphin_T_t_ds_command_D_worker_group_id(); + +-- ac_dolphin_T_t_ds_error_command_A_worker_group +delimiter ; +DROP FUNCTION IF EXISTS ac_dolphin_T_t_ds_error_command_A_worker_group(); +delimiter d// +CREATE FUNCTION ac_dolphin_T_t_ds_error_command_A_worker_group() RETURNS void AS $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_CATALOG=current_database() + AND TABLE_SCHEMA=current_schema() + AND TABLE_NAME='t_ds_error_command' + AND COLUMN_NAME ='worker_group') + THEN + ALTER TABLE t_ds_error_command ADD COLUMN worker_group varchar(255) DEFAULT null; + END IF; +END; +$$ LANGUAGE plpgsql; +d// +delimiter ; +select ac_dolphin_T_t_ds_error_command_A_worker_group(); +DROP FUNCTION ac_dolphin_T_t_ds_error_command_A_worker_group(); + +-- dc_dolphin_T_t_ds_error_command_D_worker_group_id +delimiter ; +DROP FUNCTION IF EXISTS dc_dolphin_T_t_ds_error_command_D_worker_group_id(); +delimiter d// +CREATE FUNCTION dc_dolphin_T_t_ds_error_command_D_worker_group_id() RETURNS void AS $$ +BEGIN + IF EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_CATALOG=current_database() + AND TABLE_SCHEMA=current_schema() + AND TABLE_NAME='t_ds_error_command' + AND COLUMN_NAME ='worker_group_id') + THEN + ALTER TABLE t_ds_error_command DROP COLUMN worker_group_id; + END IF; +END; +$$ LANGUAGE plpgsql; +d// +delimiter ; +select dc_dolphin_T_t_ds_error_command_D_worker_group_id(); +DROP FUNCTION dc_dolphin_T_t_ds_error_command_D_worker_group_id(); + From 96835ebda2d4f9c5aa13b39bdd1e90f5d702b2f6 Mon Sep 17 00:00:00 2001 From: qiaozhanwei Date: Wed, 15 Apr 2020 16:19:42 +0800 Subject: [PATCH 4/8] =?UTF-8?q?1=EF=BC=8Cno=20worker=20condition=20,=20mas?= =?UTF-8?q?ter=20will=20while=20ture=20wait=20for=20worker=20startup=202?= =?UTF-8?q?=EF=BC=8Cworker=20response=20task=20status=20sync=20wait=20for?= =?UTF-8?q?=20result=20(#2420)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * dispatch task fail will set task status failed * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result Co-authored-by: qiaozhanwei --- .../consumer/TaskPriorityQueueConsumer.java | 27 +++-- .../master/processor/TaskAckProcessor.java | 28 +++++- .../processor/TaskResponseProcessor.java | 25 +++++ .../worker/processor/TaskCallbackService.java | 15 ++- .../processor/TaskCallbackServiceTest.java | 98 +++++++++++++++++-- 5 files changed, 176 insertions(+), 17 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index 50c851c483..b2cf53a575 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -53,6 +53,8 @@ import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.dolphinscheduler.common.Constants.*; + /** * TaskUpdateQueue consumer */ @@ -68,7 +70,7 @@ public class TaskPriorityQueueConsumer extends Thread{ * taskUpdateQueue */ @Autowired - private TaskPriorityQueue taskUpdateQueue; + private TaskPriorityQueue taskPriorityQueue; /** * processService @@ -93,7 +95,7 @@ public class TaskPriorityQueueConsumer extends Thread{ while (Stopper.isRunning()){ try { // if not task , blocking here - String taskPriorityInfo = taskUpdateQueue.take(); + String taskPriorityInfo = taskPriorityQueue.take(); TaskPriority taskPriority = TaskPriority.of(taskPriorityInfo); @@ -114,13 +116,22 @@ public class TaskPriorityQueueConsumer extends Thread{ private Boolean dispatch(int taskInstanceId){ TaskExecutionContext context = getTaskExecutionContext(taskInstanceId); ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER, context.getWorkerGroup()); - try { - return dispatcher.dispatch(executionContext); - } catch (ExecuteException e) { - logger.error("execute exception", e); - return false; - } + Boolean result = false; + while (Stopper.isRunning()){ + try { + result = dispatcher.dispatch(executionContext); + } catch (ExecuteException e) { + logger.error("dispatch error",e); + try { + Thread.sleep(SLEEP_TIME_MILLIS); + } catch (InterruptedException e1) {} + } + if (result){ + break; + } + } + return result; } /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java index 1eb40db152..7af9cdc2cc 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java @@ -19,7 +19,9 @@ package org.apache.dolphinscheduler.server.master.processor; import io.netty.channel.Channel; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.utils.Preconditions; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; @@ -31,9 +33,12 @@ import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheMan import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.apache.dolphinscheduler.service.process.ProcessService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.dolphinscheduler.common.Constants.*; + /** * task ack processor */ @@ -51,9 +56,16 @@ public class TaskAckProcessor implements NettyRequestProcessor { */ private final TaskInstanceCacheManager taskInstanceCacheManager; + + /** + * processService + */ + private ProcessService processService; + public TaskAckProcessor(){ this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class); this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class); + this.processService = SpringApplicationContext.getBean(ProcessService.class); } /** @@ -71,8 +83,10 @@ public class TaskAckProcessor implements NettyRequestProcessor { String workerAddress = ChannelUtils.toAddress(channel).getAddress(); + ExecutionStatus ackStatus = ExecutionStatus.of(taskAckCommand.getStatus()); + // TaskResponseEvent - TaskResponseEvent taskResponseEvent = TaskResponseEvent.newAck(ExecutionStatus.of(taskAckCommand.getStatus()), + TaskResponseEvent taskResponseEvent = TaskResponseEvent.newAck(ackStatus, taskAckCommand.getStartTime(), workerAddress, taskAckCommand.getExecutePath(), @@ -81,6 +95,18 @@ public class TaskAckProcessor implements NettyRequestProcessor { taskResponseService.addResponse(taskResponseEvent); + while (Stopper.isRunning()){ + TaskInstance taskInstance = processService.findTaskInstanceById(taskAckCommand.getTaskInstanceId()); + + if (taskInstance != null && ackStatus.typeIsRunning()){ + break; + } + + try { + Thread.sleep(SLEEP_TIME_MILLIS); + } catch (InterruptedException e) {} + } + } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java index 36b382313b..ecb8646ad0 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java @@ -19,7 +19,9 @@ package org.apache.dolphinscheduler.server.master.processor; import io.netty.channel.Channel; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.utils.Preconditions; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; @@ -30,9 +32,12 @@ import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheMan import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.apache.dolphinscheduler.service.process.ProcessService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.dolphinscheduler.common.Constants.*; + /** * task response processor */ @@ -50,9 +55,15 @@ public class TaskResponseProcessor implements NettyRequestProcessor { */ private final TaskInstanceCacheManager taskInstanceCacheManager; + /** + * processService + */ + private ProcessService processService; + public TaskResponseProcessor(){ this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class); this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class); + this.processService = SpringApplicationContext.getBean(ProcessService.class); } /** @@ -71,6 +82,8 @@ public class TaskResponseProcessor implements NettyRequestProcessor { taskInstanceCacheManager.cacheTaskInstance(responseCommand); + ExecutionStatus responseStatus = ExecutionStatus.of(responseCommand.getStatus()); + // TaskResponseEvent TaskResponseEvent taskResponseEvent = TaskResponseEvent.newResult(ExecutionStatus.of(responseCommand.getStatus()), responseCommand.getEndTime(), @@ -79,6 +92,18 @@ public class TaskResponseProcessor implements NettyRequestProcessor { responseCommand.getTaskInstanceId()); taskResponseService.addResponse(taskResponseEvent); + + while (Stopper.isRunning()){ + TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId()); + + if (taskInstance != null && responseStatus.typeIsFinished()){ + break; + } + + try { + Thread.sleep(SLEEP_TIME_MILLIS); + } catch (InterruptedException e) {} + } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java index f966591df4..7cd25cba65 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java @@ -18,9 +18,11 @@ package org.apache.dolphinscheduler.server.worker.processor; +import com.alibaba.fastjson.JSONObject; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; +import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.command.Command; @@ -93,8 +95,17 @@ public class TaskCallbackService { } logger.warn("original master : {} is not reachable, random select master", nettyRemoteChannel.getHost()); Set masterNodes = zookeeperRegistryCenter.getMasterNodesDirectly(); - if(CollectionUtils.isEmpty(masterNodes)){ - throw new IllegalStateException("no available master node exception"); + while (Stopper.isRunning()) { + if (CollectionUtils.isEmpty(masterNodes)) { + logger.error("no available master node"); + try { + Thread.sleep(1000); + }catch (Exception e){ + + } + }else { + break; + } } for(String masterNode : masterNodes){ newChannel = nettyRemotingClient.getChannel(Host.of(masterNode)); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java index 5f44e1cee2..a0fee7c36e 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java @@ -17,21 +17,26 @@ package org.apache.dolphinscheduler.server.worker.processor; import io.netty.channel.Channel; +import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; +import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.config.NettyServerConfig; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor; +import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor; +import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService; import org.apache.dolphinscheduler.server.master.registry.MasterRegistry; import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry; import org.apache.dolphinscheduler.server.zk.SpringZKServer; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; import org.junit.Test; @@ -47,9 +52,10 @@ import java.util.Date; * test task call back service */ @RunWith(SpringJUnit4ClassRunner.class) -@ContextConfiguration(classes={TaskCallbackServiceTestConfig.class, SpringZKServer.class, MasterRegistry.class, WorkerRegistry.class, +@ContextConfiguration(classes={TaskCallbackServiceTestConfig.class, SpringZKServer.class, SpringApplicationContext.class, MasterRegistry.class, WorkerRegistry.class, ZookeeperRegistryCenter.class, MasterConfig.class, WorkerConfig.class, - ZookeeperCachedOperator.class, ZookeeperConfig.class, ZookeeperNodeManager.class, TaskCallbackService.class}) + ZookeeperCachedOperator.class, ZookeeperConfig.class, ZookeeperNodeManager.class, TaskCallbackService.class, + TaskResponseService.class, TaskAckProcessor.class,TaskResponseProcessor.class}) public class TaskCallbackServiceTest { @Autowired @@ -58,12 +64,22 @@ public class TaskCallbackServiceTest { @Autowired private MasterRegistry masterRegistry; + @Autowired + private TaskAckProcessor taskAckProcessor; + + @Autowired + private TaskResponseProcessor taskResponseProcessor; + + /** + * send ack test + * @throws Exception + */ @Test - public void testSendAck(){ + public void testSendAck() throws Exception{ final NettyServerConfig serverConfig = new NettyServerConfig(); serverConfig.setListenPort(30000); NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig); - nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, Mockito.mock(TaskAckProcessor.class)); + nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, taskAckProcessor); nettyRemotingServer.start(); final NettyClientConfig clientConfig = new NettyClientConfig(); @@ -75,22 +91,64 @@ public class TaskCallbackServiceTest { ackCommand.setStartTime(new Date()); taskCallbackService.sendAck(1, ackCommand.convert2Command()); + Thread.sleep(5000); + + Stopper.stop(); + + Thread.sleep(5000); + nettyRemotingServer.close(); nettyRemotingClient.close(); } + /** + * send result test + * @throws Exception + */ + @Test + public void testSendResult() throws Exception{ + final NettyServerConfig serverConfig = new NettyServerConfig(); + serverConfig.setListenPort(30000); + NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig); + nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskResponseProcessor); + nettyRemotingServer.start(); + + final NettyClientConfig clientConfig = new NettyClientConfig(); + NettyRemotingClient nettyRemotingClient = new NettyRemotingClient(clientConfig); + Channel channel = nettyRemotingClient.getChannel(Host.of("localhost:30000")); + taskCallbackService.addRemoteChannel(1, new NettyRemoteChannel(channel, 1)); + TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(); + responseCommand.setTaskInstanceId(1); + responseCommand.setEndTime(new Date()); + + taskCallbackService.sendResult(1, responseCommand.convert2Command()); + + Thread.sleep(5000); + + Stopper.stop(); + + Thread.sleep(5000); + + nettyRemotingServer.close(); + nettyRemotingClient.close(); + } + + + @Test(expected = IllegalArgumentException.class) public void testSendAckWithIllegalArgumentException(){ TaskExecuteAckCommand ackCommand = Mockito.mock(TaskExecuteAckCommand.class); taskCallbackService.sendAck(1, ackCommand.convert2Command()); + Stopper.stop(); } @Test(expected = IllegalStateException.class) public void testSendAckWithIllegalStateException1(){ + masterRegistry.registry(); final NettyServerConfig serverConfig = new NettyServerConfig(); serverConfig.setListenPort(30000); NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig); - nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, Mockito.mock(TaskAckProcessor.class)); + nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, taskAckProcessor); nettyRemotingServer.start(); final NettyClientConfig clientConfig = new NettyClientConfig(); @@ -103,7 +161,21 @@ public class TaskCallbackServiceTest { ackCommand.setStartTime(new Date()); nettyRemotingServer.close(); + taskCallbackService.sendAck(1, ackCommand.convert2Command()); + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + Stopper.stop(); + + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } } @Test(expected = IllegalStateException.class) @@ -112,7 +184,7 @@ public class TaskCallbackServiceTest { final NettyServerConfig serverConfig = new NettyServerConfig(); serverConfig.setListenPort(30000); NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig); - nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, Mockito.mock(TaskAckProcessor.class)); + nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, taskAckProcessor); nettyRemotingServer.start(); final NettyClientConfig clientConfig = new NettyClientConfig(); @@ -125,6 +197,20 @@ public class TaskCallbackServiceTest { ackCommand.setStartTime(new Date()); nettyRemotingServer.close(); + taskCallbackService.sendAck(1, ackCommand.convert2Command()); + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + Stopper.stop(); + + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } } } From c4acbdcfd0159df3c3336ff9b491b9fc7387116f Mon Sep 17 00:00:00 2001 From: break60 <790061044@qq.com> Date: Wed, 15 Apr 2020 17:00:31 +0800 Subject: [PATCH 5/8] Repair naming (#2426) * Fix the problem of data echo in script edit box * Optimize resource tree * Change the name of the shell node and modify the parameter transmission method of spark, mr, python, and flink nodes * Repair naming * Modify list style Co-authored-by: dailidong Co-authored-by: qiaozhanwei --- .../conf/home/pages/dag/_source/formModel/tasks/flink.vue | 4 ++-- .../js/conf/home/pages/dag/_source/formModel/tasks/mr.vue | 4 ++-- .../conf/home/pages/dag/_source/formModel/tasks/python.vue | 5 +++-- .../conf/home/pages/dag/_source/formModel/tasks/spark.vue | 4 ++-- .../projects/pages/instance/pages/list/_source/list.vue | 4 ++-- .../home/pages/projects/pages/taskInstance/_source/list.vue | 6 +++--- 6 files changed, 14 insertions(+), 13 deletions(-) diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue index 30b644dd69..195e3c64f9 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue @@ -391,8 +391,8 @@ } let noResources = [{ id: -1, - name: $t('No resources'), - fullName: '/'+$t('No resources'), + name: $t('Unauthorized or deleted resources'), + fullName: '/'+$t('Unauthorized or deleted resources'), children: [] }] if(optionsCmp.length>0) { diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/mr.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/mr.vue index 1df1f93ba9..112e47dc4f 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/mr.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/mr.vue @@ -232,8 +232,8 @@ } let noResources = [{ id: -1, - name: $t('No resources'), - fullName: '/'+$t('No resources'), + name: $t('Unauthorized or deleted resources'), + fullName: '/'+$t('Unauthorized or deleted resources'), children: [] }] if(optionsCmp.length>0) { diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/python.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/python.vue index 3e2abcbb00..dd7ea942dd 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/python.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/python.vue @@ -224,8 +224,8 @@ } let noResources = [{ id: -1, - name: $t('No resources'), - fullName: '/'+$t('No resources'), + name: $t('Unauthorized or deleted resources'), + fullName: '/'+$t('Unauthorized or deleted resources'), children: [] }] if(optionsCmp.length>0) { @@ -308,6 +308,7 @@ }) } else { this.resourceList.push(v.id) + this.dataProcess(backResource) } }) this.cacheResourceList = resourceList diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/spark.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/spark.vue index 2304ab1d20..628d58ab0c 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/spark.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/spark.vue @@ -351,8 +351,8 @@ } let noResources = [{ id: -1, - name: $t('No resources'), - fullName: '/'+$t('No resources'), + name: $t('Unauthorized or deleted resources'), + fullName: '/'+$t('Unauthorized or deleted resources'), children: [] }] if(optionsCmp.length>0) { diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue index 513b8ec6dd..641c7aaa63 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue @@ -49,10 +49,10 @@ {{$t('Run Times')}} - + {{$t('host')}} - + {{$t('fault-tolerant sign')}} diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/taskInstance/_source/list.vue b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/taskInstance/_source/list.vue index f7be553568..f014874175 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/taskInstance/_source/list.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/taskInstance/_source/list.vue @@ -43,13 +43,13 @@ {{$t('Start Time')}} - + {{$t('End Time')}} - + {{$t('host')}} - + {{$t('Duration')}}(s) From f21837076e0647103fc901e604013d0e21139460 Mon Sep 17 00:00:00 2001 From: zixi0825 <649790970@qq.com> Date: Wed, 15 Apr 2020 17:39:12 +0800 Subject: [PATCH 6/8] Solve the failure to execute non-query sql (#2413) Co-authored-by: sunchaohe Co-authored-by: qiaozhanwei Co-authored-by: lgcareer <18610854716@163.com> --- .../dolphinscheduler/server/worker/task/sql/SqlTask.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java index 61d93259df..22fa91dc1d 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java @@ -214,11 +214,8 @@ public class SqlTask extends AbstractTask { try { // if upload resource is HDFS and kerberos startup CommonUtils.loadKerberosConf(); - - // create connection connection = createConnection(); - // create temp function if (CollectionUtils.isNotEmpty(createFuncs)) { createTempFunction(connection,createFuncs); @@ -226,13 +223,12 @@ public class SqlTask extends AbstractTask { // pre sql preSql(connection,preStatementsBinds); - - stmt = prepareStatementAndBind(connection, mainSqlBinds); - resultSet = stmt.executeQuery(); + // decide whether to executeQuery or executeUpdate based on sqlType if (sqlParameters.getSqlType() == SqlType.QUERY.ordinal()) { // query statements need to be convert to JsonArray and inserted into Alert to send + resultSet = stmt.executeQuery(); resultProcess(resultSet); } else if (sqlParameters.getSqlType() == SqlType.NON_QUERY.ordinal()) { From 347f05f08c3203c9e0890b244134196c79a27b9c Mon Sep 17 00:00:00 2001 From: xingchun-chen <55787491+xingchun-chen@users.noreply.github.com> Date: Wed, 15 Apr 2020 18:30:31 +0800 Subject: [PATCH 7/8] Update worker_group_id to worker_group (#2433) * add LoginTest license * Delete useless packages * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * Update worker_group_id to worker_group in init.sql * Update worker_group_id to worker_group in init.sql * Update worker_group_id to worker_group Co-authored-by: chenxingchun <438044805@qq.com> --- .../docker-entrypoint-initdb/init.sql | 6 ++-- .../dao/entity/ErrorCommand.java | 35 +++++++++---------- 2 files changed, 20 insertions(+), 21 deletions(-) diff --git a/docker/postgres/docker-entrypoint-initdb/init.sql b/docker/postgres/docker-entrypoint-initdb/init.sql index b48ddde042..b26520e29c 100755 --- a/docker/postgres/docker-entrypoint-initdb/init.sql +++ b/docker/postgres/docker-entrypoint-initdb/init.sql @@ -234,7 +234,7 @@ CREATE TABLE t_ds_command ( dependence varchar(255) DEFAULT NULL , update_time timestamp DEFAULT NULL , process_instance_priority int DEFAULT NULL , - worker_group_id int DEFAULT '-1' , + worker_group varchar(64), PRIMARY KEY (id) ) ; @@ -275,7 +275,7 @@ CREATE TABLE t_ds_error_command ( update_time timestamp DEFAULT NULL , dependence text , process_instance_priority int DEFAULT NULL , - worker_group_id int DEFAULT '-1' , + worker_group varchar(64), message text , PRIMARY KEY (id) ); @@ -748,7 +748,7 @@ CREATE SEQUENCE t_ds_worker_server_id_sequence; ALTER TABLE t_ds_worker_server ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_worker_server_id_sequence'); --- Records of t_ds_user,user : admin , password : dolphinscheduler123 +-- Records of t_ds_user?user : admin , password : dolphinscheduler123 INSERT INTO t_ds_user(user_name,user_password,user_type,email,phone,tenant_id,create_time,update_time) VALUES ('admin', '7ad2410b2f4c074479a8937a28a22b8f', '0', 'xxx@qq.com', 'xx', '0', '2018-03-27 15:48:50', '2018-10-24 17:40:22'); -- Records of t_ds_alertgroup,dolphinscheduler warning group diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ErrorCommand.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ErrorCommand.java index 7f3eb38760..127c5b7322 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ErrorCommand.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ErrorCommand.java @@ -101,9 +101,9 @@ public class ErrorCommand { private String message; /** - * worker group id + * worker group */ - private int workerGroupId; + private String workerGroup; public ErrorCommand(){} @@ -257,17 +257,25 @@ public class ErrorCommand { this.updateTime = updateTime; } - public int getWorkerGroupId() { - return workerGroupId; + public String getWorkerGroup() { + return workerGroup; } - public void setWorkerGroupId(int workerGroupId) { - this.workerGroupId = workerGroupId; + public void setWorkerGroup(String workerGroup) { + this.workerGroup = workerGroup; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; } @Override public String toString() { - return "Command{" + + return "ErrorCommand{" + "id=" + id + ", commandType=" + commandType + ", processDefinitionId=" + processDefinitionId + @@ -281,17 +289,8 @@ public class ErrorCommand { ", startTime=" + startTime + ", processInstancePriority=" + processInstancePriority + ", updateTime=" + updateTime + - ", message=" + message + + ", message='" + message + '\'' + + ", workerGroup='" + workerGroup + '\'' + '}'; } - - public String getMessage() { - return message; - } - - public void setMessage(String message) { - this.message = message; - } - - } From 8b4eb20b5cd27121cfe1c4bd57bb2369fef884d8 Mon Sep 17 00:00:00 2001 From: lgcareer <18610854716@163.com> Date: Wed, 15 Apr 2020 22:28:49 +0800 Subject: [PATCH 8/8] fix NPE when grant resource or save process definition (#2434) Co-authored-by: dailidong --- .../api/service/ProcessDefinitionService.java | 6 ++++-- .../api/service/UsersService.java | 21 ++++++++++++------- .../mysql/dolphinscheduler_ddl.sql | 10 ++++----- .../postgresql/dolphinscheduler_ddl.sql | 10 ++++----- 4 files changed, 28 insertions(+), 19 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java index eed9c78e74..368492388d 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java @@ -173,8 +173,10 @@ public class ProcessDefinitionService extends BaseDAGService { for(TaskNode taskNode : tasks){ String taskParameter = taskNode.getParams(); AbstractParameters params = TaskParametersUtils.getParameters(taskNode.getType(),taskParameter); - Set tempSet = params.getResourceFilesList().stream().map(t->t.getId()).collect(Collectors.toSet()); - resourceIds.addAll(tempSet); + if (CollectionUtils.isNotEmpty(params.getResourceFilesList())) { + Set tempSet = params.getResourceFilesList().stream().map(t->t.getId()).collect(Collectors.toSet()); + resourceIds.addAll(tempSet); + } } StringBuilder sb = new StringBuilder(); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UsersService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UsersService.java index 4671188d28..220b4fc4d0 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UsersService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UsersService.java @@ -423,6 +423,7 @@ public class UsersService extends BaseService { * @param projectIds project id array * @return grant result code */ + @Transactional(rollbackFor = Exception.class) public Map grantProject(User loginUser, int userId, String projectIds) { Map result = new HashMap<>(5); result.put(Constants.STATUS, false); @@ -472,6 +473,7 @@ public class UsersService extends BaseService { * @param resourceIds resource id array * @return grant result code */ + @Transactional(rollbackFor = Exception.class) public Map grantResources(User loginUser, int userId, String resourceIds) { Map result = new HashMap<>(5); //only admin can operate @@ -484,17 +486,20 @@ public class UsersService extends BaseService { return result; } - String[] resourceFullIdArr = resourceIds.split(","); - // need authorize resource id set Set needAuthorizeResIds = new HashSet(); - for (String resourceFullId : resourceFullIdArr) { - String[] resourceIdArr = resourceFullId.split("-"); - for (int i=0;i<=resourceIdArr.length-1;i++) { - int resourceIdValue = Integer.parseInt(resourceIdArr[i]); - needAuthorizeResIds.add(resourceIdValue); + if (StringUtils.isNotBlank(resourceIds)) { + String[] resourceFullIdArr = resourceIds.split(","); + // need authorize resource id set + for (String resourceFullId : resourceFullIdArr) { + String[] resourceIdArr = resourceFullId.split("-"); + for (int i=0;i<=resourceIdArr.length-1;i++) { + int resourceIdValue = Integer.parseInt(resourceIdArr[i]); + needAuthorizeResIds.add(resourceIdValue); + } } } + //get the authorized resource id list by user id List oldAuthorizedRes = resourceMapper.queryAuthorizedResourceList(userId); //if resource type is UDF,need check whether it is bound by UDF functon @@ -565,6 +570,7 @@ public class UsersService extends BaseService { * @param udfIds udf id array * @return grant result code */ + @Transactional(rollbackFor = Exception.class) public Map grantUDFFunction(User loginUser, int userId, String udfIds) { Map result = new HashMap<>(5); @@ -611,6 +617,7 @@ public class UsersService extends BaseService { * @param datasourceIds data source id array * @return grant result code */ + @Transactional(rollbackFor = Exception.class) public Map grantDataSource(User loginUser, int userId, String datasourceIds) { Map result = new HashMap<>(5); result.put(Constants.STATUS, false); diff --git a/sql/upgrade/1.2.2_schema/mysql/dolphinscheduler_ddl.sql b/sql/upgrade/1.2.2_schema/mysql/dolphinscheduler_ddl.sql index 33d801d4ec..7432396825 100644 --- a/sql/upgrade/1.2.2_schema/mysql/dolphinscheduler_ddl.sql +++ b/sql/upgrade/1.2.2_schema/mysql/dolphinscheduler_ddl.sql @@ -167,7 +167,7 @@ CREATE PROCEDURE ac_dolphin_T_t_ds_process_instance_A_worker_group() AND TABLE_SCHEMA=(SELECT DATABASE()) AND COLUMN_NAME ='worker_group') THEN - ALTER TABLE t_ds_process_instance ADD `worker_group` varchar(255) DEFAULT '' COMMENT 'worker group'; + ALTER TABLE t_ds_process_instance ADD `worker_group` varchar(64) DEFAULT '' COMMENT 'worker group'; END IF; END; @@ -207,7 +207,7 @@ CREATE PROCEDURE ac_dolphin_T_t_ds_task_instance_A_worker_group() AND TABLE_SCHEMA=(SELECT DATABASE()) AND COLUMN_NAME ='worker_group') THEN - ALTER TABLE t_ds_task_instance ADD `worker_group` varchar(255) DEFAULT '' COMMENT 'worker group'; + ALTER TABLE t_ds_task_instance ADD `worker_group` varchar(64) DEFAULT '' COMMENT 'worker group'; END IF; END; @@ -247,7 +247,7 @@ CREATE PROCEDURE ac_dolphin_T_t_ds_schedules_A_worker_group() AND TABLE_SCHEMA=(SELECT DATABASE()) AND COLUMN_NAME ='worker_group') THEN - ALTER TABLE t_ds_schedules ADD `worker_group` varchar(255) DEFAULT '' COMMENT 'worker group'; + ALTER TABLE t_ds_schedules ADD `worker_group` varchar(64) DEFAULT '' COMMENT 'worker group'; END IF; END; @@ -287,7 +287,7 @@ CREATE PROCEDURE ac_dolphin_T_t_ds_command_A_worker_group() AND TABLE_SCHEMA=(SELECT DATABASE()) AND COLUMN_NAME ='worker_group') THEN - ALTER TABLE t_ds_command ADD `worker_group` varchar(255) DEFAULT '' COMMENT 'worker group'; + ALTER TABLE t_ds_command ADD `worker_group` varchar(64) DEFAULT '' COMMENT 'worker group'; END IF; END; @@ -327,7 +327,7 @@ CREATE PROCEDURE ac_dolphin_T_t_ds_error_command_A_worker_group() AND TABLE_SCHEMA=(SELECT DATABASE()) AND COLUMN_NAME ='worker_group') THEN - ALTER TABLE t_ds_error_command ADD `worker_group` varchar(255) DEFAULT '' COMMENT 'worker group'; + ALTER TABLE t_ds_error_command ADD `worker_group` varchar(64) DEFAULT '' COMMENT 'worker group'; END IF; END; diff --git a/sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_ddl.sql b/sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_ddl.sql index a4fdc2d6bf..5312d895c8 100644 --- a/sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_ddl.sql +++ b/sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_ddl.sql @@ -164,7 +164,7 @@ BEGIN AND TABLE_NAME='t_ds_process_instance' AND COLUMN_NAME ='worker_group') THEN - ALTER TABLE t_ds_process_instance ADD COLUMN worker_group varchar(255) DEFAULT null; + ALTER TABLE t_ds_process_instance ADD COLUMN worker_group varchar(64) DEFAULT null; END IF; END; $$ LANGUAGE plpgsql; @@ -207,7 +207,7 @@ BEGIN AND TABLE_NAME='t_ds_task_instance' AND COLUMN_NAME ='worker_group') THEN - ALTER TABLE t_ds_task_instance ADD COLUMN worker_group varchar(255) DEFAULT null; + ALTER TABLE t_ds_task_instance ADD COLUMN worker_group varchar(64) DEFAULT null; END IF; END; $$ LANGUAGE plpgsql; @@ -249,7 +249,7 @@ BEGIN AND TABLE_NAME='t_ds_schedules' AND COLUMN_NAME ='worker_group') THEN - ALTER TABLE t_ds_schedules ADD COLUMN worker_group varchar(255) DEFAULT null; + ALTER TABLE t_ds_schedules ADD COLUMN worker_group varchar(64) DEFAULT null; END IF; END; $$ LANGUAGE plpgsql; @@ -291,7 +291,7 @@ BEGIN AND TABLE_NAME='t_ds_command' AND COLUMN_NAME ='worker_group') THEN - ALTER TABLE t_ds_command ADD COLUMN worker_group varchar(255) DEFAULT null; + ALTER TABLE t_ds_command ADD COLUMN worker_group varchar(64) DEFAULT null; END IF; END; $$ LANGUAGE plpgsql; @@ -333,7 +333,7 @@ BEGIN AND TABLE_NAME='t_ds_error_command' AND COLUMN_NAME ='worker_group') THEN - ALTER TABLE t_ds_error_command ADD COLUMN worker_group varchar(255) DEFAULT null; + ALTER TABLE t_ds_error_command ADD COLUMN worker_group varchar(64) DEFAULT null; END IF; END; $$ LANGUAGE plpgsql;