From 5185d283f9bea4d0a345b256a080b8fa5ee913f6 Mon Sep 17 00:00:00 2001 From: break60 <790061044@qq.com> Date: Tue, 14 Apr 2020 14:28:03 +0800 Subject: [PATCH 01/10] Change the name of the shell node and modify the parameter transmission method of spark, mr, python, and flink nodes (#2416) * Fix the problem of data echo in script edit box * Optimize resource tree * Change the name of the shell node and modify the parameter transmission method of spark, mr, python, and flink nodes Co-authored-by: dailidong --- .../dag/_source/formModel/tasks/flink.vue | 107 +++++++++++++++- .../pages/dag/_source/formModel/tasks/mr.vue | 118 +++++++++++++++-- .../dag/_source/formModel/tasks/python.vue | 119 ++++++++++++++++-- .../dag/_source/formModel/tasks/shell.vue | 4 +- .../dag/_source/formModel/tasks/spark.vue | 107 +++++++++++++++- .../src/js/module/i18n/locale/en_US.js | 2 +- .../src/js/module/i18n/locale/zh_CN.js | 2 +- 7 files changed, 425 insertions(+), 34 deletions(-) diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue index 50866afc42..30b644dd69 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue @@ -211,7 +211,9 @@ return { label: node.name } - } + }, + allNoResources: [], + noRes: [], } }, props: { @@ -300,6 +302,12 @@ return false } + // noRes + if (this.noRes.length>0) { + this.$message.warning(`${i18n.$t('Please delete all non-existent resources')}`) + return false + } + // localParams Subcomponent verification if (!this.$refs.refLocalParams._verifProp()) { return false @@ -339,6 +347,67 @@ } delete item.children }, + searchTree(element, id) { + // 根据id查找节点 + if (element.id == id) { + return element; + } else if (element.children != null) { + var i; + var result = null; + for (i = 0; result == null && i < element.children.length; i++) { + result = this.searchTree(element.children[i], id); + } + return result; + } + return null; + }, + dataProcess(backResource) { + let isResourceId = [] + let resourceIdArr = [] + if(this.resourceList.length>0) { + this.resourceList.forEach(v=>{ + this.mainJarList.forEach(v1=>{ + if(this.searchTree(v1,v)) { + isResourceId.push(this.searchTree(v1,v)) + } + }) + }) + resourceIdArr = isResourceId.map(item=>{ + return item.id + }) + Array.prototype.diff = function(a) { + return this.filter(function(i) {return a.indexOf(i) < 0;}); + }; + let diffSet = this.resourceList.diff(resourceIdArr); + let optionsCmp = [] + if(diffSet.length>0) { + diffSet.forEach(item=>{ + backResource.forEach(item1=>{ + if(item==item1.id || item==item1.res) { + optionsCmp.push(item1) + } + }) + }) + } + let noResources = [{ + id: -1, + name: $t('No resources'), + fullName: '/'+$t('No resources'), + children: [] + }] + if(optionsCmp.length>0) { + this.allNoResources = optionsCmp + optionsCmp = optionsCmp.map(item=>{ + return {id: item.id,name: item.name,fullName: item.res} + }) + optionsCmp.forEach(item=>{ + item.isNew = true + }) + noResources[0].children = optionsCmp + this.mainJarList = this.mainJarList.concat(noResources) + } + } + }, }, watch: { // Listening type @@ -354,15 +423,37 @@ }, computed: { cacheParams () { + let isResourceId = [] + let resourceIdArr = [] + if(this.resourceList.length>0) { + this.resourceList.forEach(v=>{ + this.mainJarList.forEach(v1=>{ + if(this.searchTree(v1,v)) { + isResourceId.push(this.searchTree(v1,v)) + } + }) + }) + resourceIdArr = isResourceId.map(item=>{ + return {id: item.id,name: item.name,res: item.fullName} + }) + } + let result = [] + resourceIdArr.forEach(item=>{ + this.allNoResources.forEach(item1=>{ + if(item.id==item1.id) { + // resultBool = true + result.push(item1) + } + }) + }) + this.noRes = result return { mainClass: this.mainClass, mainJar: { id: this.mainJar }, deployMode: this.deployMode, - resourceList: _.map(this.resourceList, v => { - return {id: v} - }), + resourceList: resourceIdArr, localParams: this.localParams, slot: this.slot, taskManager: this.taskManager, @@ -404,20 +495,24 @@ this.programType = o.params.programType || 'SCALA' // backfill resourceList + let backResource = o.params.resourceList || [] let resourceList = o.params.resourceList || [] if (resourceList.length) { _.map(resourceList, v => { - if(v.res) { + if(!v.id) { this.store.dispatch('dag/getResourceId',{ type: 'FILE', fullName: '/'+v.res }).then(res => { this.resourceList.push(res.id) + this.dataProcess(backResource) }).catch(e => { - this.$message.error(e.msg || '') + this.resourceList.push(v.res) + this.dataProcess(backResource) }) } else { this.resourceList.push(v.id) + this.dataProcess(backResource) } }) this.cacheResourceList = resourceList diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/mr.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/mr.vue index 121147d7e5..1df1f93ba9 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/mr.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/mr.vue @@ -44,7 +44,7 @@
{{$t('Main jar package')}}
- +
{{ node.raw.fullName }}
@@ -109,6 +109,7 @@ name: 'mr', data () { return { + valueConsistsOf: 'LEAF_PRIORITY', // Main function class mainClass: '', // Master jar package @@ -134,7 +135,9 @@ return { label: node.name } - } + }, + allNoResources: [], + noRes: [] } }, props: { @@ -176,9 +179,76 @@ diGuiTree(item) { // Recursive convenience tree structure item.forEach(item => { item.children === '' || item.children === undefined || item.children === null || item.children.length === 0?         - delete item.children : this.diGuiTree(item.children); + this.operationTree(item) : this.diGuiTree(item.children); }) }, + operationTree(item) { + if(item.dirctory) { + item.isDisabled =true + } + delete item.children + }, + searchTree(element, id) { + // 根据id查找节点 + if (element.id == id) { + return element; + } else if (element.children != null) { + var i; + var result = null; + for (i = 0; result == null && i < element.children.length; i++) { + result = this.searchTree(element.children[i], id); + } + return result; + } + return null; + }, + dataProcess(backResource) { + let isResourceId = [] + let resourceIdArr = [] + if(this.resourceList.length>0) { + this.resourceList.forEach(v=>{ + this.mainJarList.forEach(v1=>{ + if(this.searchTree(v1,v)) { + isResourceId.push(this.searchTree(v1,v)) + } + }) + }) + resourceIdArr = isResourceId.map(item=>{ + return item.id + }) + Array.prototype.diff = function(a) { + return this.filter(function(i) {return a.indexOf(i) < 0;}); + }; + let diffSet = this.resourceList.diff(resourceIdArr); + let optionsCmp = [] + if(diffSet.length>0) { + diffSet.forEach(item=>{ + backResource.forEach(item1=>{ + if(item==item1.id || item==item1.res) { + optionsCmp.push(item1) + } + }) + }) + } + let noResources = [{ + id: -1, + name: $t('No resources'), + fullName: '/'+$t('No resources'), + children: [] + }] + if(optionsCmp.length>0) { + this.allNoResources = optionsCmp + optionsCmp = optionsCmp.map(item=>{ + return {id: item.id,name: item.name,fullName: item.res} + }) + optionsCmp.forEach(item=>{ + item.isNew = true + }) + noResources[0].children = optionsCmp + this.mainJarList = this.mainJarList.concat(noResources) + } + } + }, /** * verification */ @@ -193,6 +263,12 @@ return false } + // noRes + if (this.noRes.length>0) { + this.$message.warning(`${i18n.$t('Please delete all non-existent resources')}`) + return false + } + // localParams Subcomponent verification if (!this.$refs.refLocalParams._verifProp()) { return false @@ -231,14 +307,36 @@ }, computed: { cacheParams () { + let isResourceId = [] + let resourceIdArr = [] + if(this.resourceList.length>0) { + this.resourceList.forEach(v=>{ + this.mainJarList.forEach(v1=>{ + if(this.searchTree(v1,v)) { + isResourceId.push(this.searchTree(v1,v)) + } + }) + }) + resourceIdArr = isResourceId.map(item=>{ + return {id: item.id,name: item.name,res: item.fullName} + }) + } + let result = [] + resourceIdArr.forEach(item=>{ + this.allNoResources.forEach(item1=>{ + if(item.id==item1.id) { + // resultBool = true + result.push(item1) + } + }) + }) + this.noRes = result return { mainClass: this.mainClass, mainJar: { id: this.mainJar }, - resourceList: _.map(this.resourceList, v => { - return {id: v} - }), + resourceList: resourceIdArr, localParams: this.localParams, mainArgs: this.mainArgs, others: this.others, @@ -273,23 +371,27 @@ let resourceList = o.params.resourceList || [] if (resourceList.length) { _.map(resourceList, v => { - if(v.res) { + if(!v.id) { this.store.dispatch('dag/getResourceId',{ type: 'FILE', fullName: '/'+v.res }).then(res => { this.resourceList.push(res.id) + this.dataProcess(backResource) }).catch(e => { - this.$message.error(e.msg || '') + this.resourceList.push(v.res) + this.dataProcess(backResource) }) } else { this.resourceList.push(v.id) + this.dataProcess(backResource) } }) this.cacheResourceList = resourceList } // backfill localParams + let backResource = o.params.resourceList || [] let localParams = o.params.localParams || [] if (localParams.length) { this.localParams = localParams diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/python.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/python.vue index 67669b4654..3e2abcbb00 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/python.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/python.vue @@ -80,6 +80,13 @@ // Cache ResourceList cacheResourceList: [], resourceOptions: [], + normalizer(node) { + return { + label: node.name + } + }, + allNoResources: [], + noRes: [] } }, mixins: [disabledState], @@ -96,9 +103,9 @@ /** * return resourceList */ - _onResourcesData (a) { - this.resourceList = a - }, + // _onResourcesData (a) { + // this.resourceList = a + // }, /** * cache resourceList */ @@ -120,6 +127,12 @@ return false } + // noRes + if (this.noRes.length>0) { + this.$message.warning(`${i18n.$t('Please delete all non-existent resources')}`) + return false + } + // storage this.$emit('on-params', { resourceList: _.map(this.resourceList, v => { @@ -166,6 +179,67 @@ item.isDisabled =true } delete item.children + }, + searchTree(element, id) { + // 根据id查找节点 + if (element.id == id) { + return element; + } else if (element.children != null) { + var i; + var result = null; + for (i = 0; result == null && i < element.children.length; i++) { + result = this.searchTree(element.children[i], id); + } + return result; + } + return null; + }, + dataProcess(backResource) { + let isResourceId = [] + let resourceIdArr = [] + if(this.resourceList.length>0) { + this.resourceList.forEach(v=>{ + this.resourceOptions.forEach(v1=>{ + if(this.searchTree(v1,v)) { + isResourceId.push(this.searchTree(v1,v)) + } + }) + }) + resourceIdArr = isResourceId.map(item=>{ + return item.id + }) + Array.prototype.diff = function(a) { + return this.filter(function(i) {return a.indexOf(i) < 0;}); + }; + let diffSet = this.resourceList.diff(resourceIdArr); + let optionsCmp = [] + if(diffSet.length>0) { + diffSet.forEach(item=>{ + backResource.forEach(item1=>{ + if(item==item1.id || item==item1.res) { + optionsCmp.push(item1) + } + }) + }) + } + let noResources = [{ + id: -1, + name: $t('No resources'), + fullName: '/'+$t('No resources'), + children: [] + }] + if(optionsCmp.length>0) { + this.allNoResources = optionsCmp + optionsCmp = optionsCmp.map(item=>{ + return {id: item.id,name: item.name,fullName: item.res} + }) + optionsCmp.forEach(item=>{ + item.isNew = true + }) + noResources[0].children = optionsCmp + this.resourceOptions = this.resourceOptions.concat(noResources) + } + } } }, watch: { @@ -176,10 +250,32 @@ }, computed: { cacheParams () { + let isResourceId = [] + let resourceIdArr = [] + if(this.resourceList.length>0) { + this.resourceList.forEach(v=>{ + this.resourceOptions.forEach(v1=>{ + if(this.searchTree(v1,v)) { + isResourceId.push(this.searchTree(v1,v)) + } + }) + }) + resourceIdArr = isResourceId.map(item=>{ + return {id: item.id,name: item.name,res: item.fullName} + }) + } + let result = [] + resourceIdArr.forEach(item=>{ + this.allNoResources.forEach(item1=>{ + if(item.id==item1.id) { + // resultBool = true + result.push(item1) + } + }) + }) + this.noRes = result return { - resourceList: _.map(this.resourceList, v => { - return {id: v} - }), + resourceList: resourceIdArr, localParams: this.localParams } } @@ -187,7 +283,7 @@ created () { let item = this.store.state.dag.resourcesListS this.diGuiTree(item) - this.options = item + this.resourceOptions = item let o = this.backfillItem // Non-null objects represent backfill @@ -195,17 +291,20 @@ this.rawScript = o.params.rawScript || '' // backfill resourceList + let backResource = o.params.resourceList || [] let resourceList = o.params.resourceList || [] if (resourceList.length) { _.map(resourceList, v => { - if(v.res) { + if(!v.id) { this.store.dispatch('dag/getResourceId',{ type: 'FILE', fullName: '/'+v.res }).then(res => { this.resourceList.push(res.id) + this.dataProcess(backResource) }).catch(e => { - this.$message.error(e.msg || '') + this.resourceList.push(v.res) + this.dataProcess(backResource) }) } else { this.resourceList.push(v.id) @@ -230,6 +329,6 @@ editor.toTextArea() // Uninstall editor.off($('.code-python-mirror'), 'keypress', this.keypress) }, - components: { mLocalParams, mListBox, mResources } + components: { mLocalParams, mListBox, mResources,Treeselect } } diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/shell.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/shell.vue index df315a14f1..b627602e04 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/shell.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/shell.vue @@ -266,8 +266,8 @@ } let noResources = [{ id: -1, - name: $t('No resources'), - fullName: '/'+$t('No resources'), + name: $t('Unauthorized or deleted resources'), + fullName: '/'+$t('Unauthorized or deleted resources'), children: [] }] if(optionsCmp.length>0) { diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/spark.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/spark.vue index 7a00528149..2304ab1d20 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/spark.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/spark.vue @@ -254,7 +254,9 @@ return { label: node.name } - } + }, + allNoResources: [], + noRes: [] } }, props: { @@ -305,6 +307,67 @@ } delete item.children }, + searchTree(element, id) { + // 根据id查找节点 + if (element.id == id) { + return element; + } else if (element.children != null) { + var i; + var result = null; + for (i = 0; result == null && i < element.children.length; i++) { + result = this.searchTree(element.children[i], id); + } + return result; + } + return null; + }, + dataProcess(backResource) { + let isResourceId = [] + let resourceIdArr = [] + if(this.resourceList.length>0) { + this.resourceList.forEach(v=>{ + this.mainJarList.forEach(v1=>{ + if(this.searchTree(v1,v)) { + isResourceId.push(this.searchTree(v1,v)) + } + }) + }) + resourceIdArr = isResourceId.map(item=>{ + return item.id + }) + Array.prototype.diff = function(a) { + return this.filter(function(i) {return a.indexOf(i) < 0;}); + }; + let diffSet = this.resourceList.diff(resourceIdArr); + let optionsCmp = [] + if(diffSet.length>0) { + diffSet.forEach(item=>{ + backResource.forEach(item1=>{ + if(item==item1.id || item==item1.res) { + optionsCmp.push(item1) + } + }) + }) + } + let noResources = [{ + id: -1, + name: $t('No resources'), + fullName: '/'+$t('No resources'), + children: [] + }] + if(optionsCmp.length>0) { + this.allNoResources = optionsCmp + optionsCmp = optionsCmp.map(item=>{ + return {id: item.id,name: item.name,fullName: item.res} + }) + optionsCmp.forEach(item=>{ + item.isNew = true + }) + noResources[0].children = optionsCmp + this.mainJarList = this.mainJarList.concat(noResources) + } + } + }, /** * verification */ @@ -324,6 +387,12 @@ return false } + // noRes + if (this.noRes.length>0) { + this.$message.warning(`${i18n.$t('Please delete all non-existent resources')}`) + return false + } + if (!Number.isInteger(parseInt(this.numExecutors))) { this.$message.warning(`${i18n.$t('The number of Executors should be a positive integer')}`) return false @@ -400,15 +469,37 @@ }, computed: { cacheParams () { + let isResourceId = [] + let resourceIdArr = [] + if(this.resourceList.length>0) { + this.resourceList.forEach(v=>{ + this.mainJarList.forEach(v1=>{ + if(this.searchTree(v1,v)) { + isResourceId.push(this.searchTree(v1,v)) + } + }) + }) + resourceIdArr = isResourceId.map(item=>{ + return {id: item.id,name: item.name,res: item.fullName} + }) + } + let result = [] + resourceIdArr.forEach(item=>{ + this.allNoResources.forEach(item1=>{ + if(item.id==item1.id) { + // resultBool = true + result.push(item1) + } + }) + }) + this.noRes = result return { mainClass: this.mainClass, mainJar: { id: this.mainJar }, deployMode: this.deployMode, - resourceList: _.map(this.resourceList, v => { - return {id: v} - }), + resourceList: resourceIdArr, localParams: this.localParams, driverCores: this.driverCores, driverMemory: this.driverMemory, @@ -453,20 +544,24 @@ this.sparkVersion = o.params.sparkVersion || 'SPARK2' // backfill resourceList + let backResource = o.params.resourceList || [] let resourceList = o.params.resourceList || [] if (resourceList.length) { _.map(resourceList, v => { - if(v.res) { + if(!v.id) { this.store.dispatch('dag/getResourceId',{ type: 'FILE', fullName: '/'+v.res }).then(res => { this.resourceList.push(res.id) + this.dataProcess(backResource) }).catch(e => { - this.$message.error(e.msg || '') + this.resourceList.push(v.res) + this.dataProcess(backResource) }) } else { this.resourceList.push(v.id) + this.dataProcess(backResource) } }) this.cacheResourceList = resourceList diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js index 53dce962d8..db19e1fc0e 100755 --- a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js +++ b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js @@ -588,6 +588,6 @@ export default { 'Branch flow': 'Branch flow', 'Cannot select the same node for successful branch flow and failed branch flow': 'Cannot select the same node for successful branch flow and failed branch flow', 'Successful branch flow and failed branch flow are required': 'Successful branch flow and failed branch flow are required', - 'No resources': 'No resources', + 'Unauthorized or deleted resources': 'Unauthorized or deleted resources', 'Please delete all non-existent resources': 'Please delete all non-existent resources', } diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js index 5e0880ee38..80b45c7a11 100755 --- a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js +++ b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js @@ -588,6 +588,6 @@ export default { 'Branch flow': '分支流转', 'Cannot select the same node for successful branch flow and failed branch flow': '成功分支流转和失败分支流转不能选择同一个节点', 'Successful branch flow and failed branch flow are required': '成功分支流转和失败分支流转必填', - 'No resources': '未授权或已删除资源', + 'Unauthorized or deleted resources': '未授权或已删除资源', 'Please delete all non-existent resources': '请删除所有未授权或已删除资源', } From c881c9517e4348acc53c315ea6767d5c6d86f48d Mon Sep 17 00:00:00 2001 From: lgcareer <18610854716@163.com> Date: Wed, 15 Apr 2020 10:22:02 +0800 Subject: [PATCH 02/10] It's to remove `static` of method dataSource,If not remove `static` the transaction will not work. (#2422) * It's to remove `static` of method dataSource,If not remove `static` the transaction will not work. * update testQueryDetailsById because it didn't run success --- .../datasource/SpringConnectionFactory.java | 2 +- .../dao/mapper/UserMapperTest.java | 29 +++++++++++++++---- 2 files changed, 25 insertions(+), 6 deletions(-) diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java index 1d6dc5f51a..9e27d949aa 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java @@ -64,7 +64,7 @@ public class SpringConnectionFactory { * @return druid dataSource */ @Bean(destroyMethod="") - public static DruidDataSource dataSource() { + public DruidDataSource dataSource() { DruidDataSource druidDataSource = new DruidDataSource(); diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/UserMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/UserMapperTest.java index 651ca93f27..7b1849ef4d 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/UserMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/UserMapperTest.java @@ -16,11 +16,11 @@ */ package org.apache.dolphinscheduler.dao.mapper; +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import org.apache.dolphinscheduler.common.enums.AlertType; import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.utils.DateUtils; -import com.baomidou.mybatisplus.core.metadata.IPage; -import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import org.apache.dolphinscheduler.dao.entity.*; import org.junit.Assert; import org.junit.Test; @@ -179,6 +179,23 @@ public class UserMapperTest { return tenant; } + /** + * insert one Tenant + * @return Tenant + */ + private Tenant insertOneTenant(Queue queue){ + Tenant tenant = new Tenant(); + tenant.setTenantCode("dolphin"); + tenant.setTenantName("dolphin test"); + tenant.setDescription("dolphin user use"); + tenant.setQueueId(queue.getId()); + tenant.setQueue(queue.getQueue()); + tenant.setCreateTime(new Date()); + tenant.setUpdateTime(new Date()); + tenantMapper.insert(tenant); + return tenant; + } + /** * insert one Queue * @return Queue @@ -291,11 +308,13 @@ public class UserMapperTest { */ @Test public void testQueryDetailsById() { - //insertOne - User user = insertOne(); + //insertOneQueue and insertOneTenant + Queue queue = insertOneQueue(); + Tenant tenant = insertOneTenant(queue); + User user = insertOne(queue,tenant); //queryDetailsById User queryUser = userMapper.queryDetailsById(user.getId()); - Assert.assertEquals(queryUser.getUserName(), queryUser.getUserName()); + Assert.assertEquals(user.getUserName(), queryUser.getUserName()); } /** From 0bad7c2f539ef4aea8956610146cd70a7b8267ee Mon Sep 17 00:00:00 2001 From: lgcareer <18610854716@163.com> Date: Wed, 15 Apr 2020 13:50:06 +0800 Subject: [PATCH 03/10] add worker_group field and remove worker_group_id field of the table t_ds_command and t_ds_error_command (#2428) --- .../mysql/dolphinscheduler_ddl.sql | 79 +++++++++++++++++ .../postgresql/dolphinscheduler_ddl.sql | 84 +++++++++++++++++++ 2 files changed, 163 insertions(+) diff --git a/sql/upgrade/1.2.2_schema/mysql/dolphinscheduler_ddl.sql b/sql/upgrade/1.2.2_schema/mysql/dolphinscheduler_ddl.sql index 6a8665f199..33d801d4ec 100644 --- a/sql/upgrade/1.2.2_schema/mysql/dolphinscheduler_ddl.sql +++ b/sql/upgrade/1.2.2_schema/mysql/dolphinscheduler_ddl.sql @@ -277,4 +277,83 @@ delimiter ; CALL dc_dolphin_T_t_ds_schedules_D_worker_group_id; DROP PROCEDURE dc_dolphin_T_t_ds_schedules_D_worker_group_id; +-- ac_dolphin_T_t_ds_command_A_worker_group +drop PROCEDURE if EXISTS ac_dolphin_T_t_ds_command_A_worker_group; +delimiter d// +CREATE PROCEDURE ac_dolphin_T_t_ds_command_A_worker_group() + BEGIN + IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_NAME='t_ds_command' + AND TABLE_SCHEMA=(SELECT DATABASE()) + AND COLUMN_NAME ='worker_group') + THEN + ALTER TABLE t_ds_command ADD `worker_group` varchar(255) DEFAULT '' COMMENT 'worker group'; + END IF; + END; + +d// + +delimiter ; +CALL ac_dolphin_T_t_ds_command_A_worker_group; +DROP PROCEDURE ac_dolphin_T_t_ds_command_A_worker_group; + +-- dc_dolphin_T_t_ds_command_D_worker_group_id +drop PROCEDURE if EXISTS dc_dolphin_T_t_ds_command_D_worker_group_id; +delimiter d// +CREATE PROCEDURE dc_dolphin_T_t_ds_command_D_worker_group_id() + BEGIN + IF EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_NAME='t_ds_command' + AND TABLE_SCHEMA=(SELECT DATABASE()) + AND COLUMN_NAME ='worker_group_id') + THEN + ALTER TABLE t_ds_command DROP COLUMN worker_group_id; + END IF; + END; + +d// + +delimiter ; +CALL dc_dolphin_T_t_ds_command_D_worker_group_id; +DROP PROCEDURE dc_dolphin_T_t_ds_command_D_worker_group_id; + +-- ac_dolphin_T_t_ds_error_command_A_worker_group +drop PROCEDURE if EXISTS ac_dolphin_T_t_ds_error_command_A_worker_group; +delimiter d// +CREATE PROCEDURE ac_dolphin_T_t_ds_error_command_A_worker_group() + BEGIN + IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_NAME='t_ds_error_command' + AND TABLE_SCHEMA=(SELECT DATABASE()) + AND COLUMN_NAME ='worker_group') + THEN + ALTER TABLE t_ds_error_command ADD `worker_group` varchar(255) DEFAULT '' COMMENT 'worker group'; + END IF; + END; + +d// + +delimiter ; +CALL ac_dolphin_T_t_ds_error_command_A_worker_group; +DROP PROCEDURE ac_dolphin_T_t_ds_error_command_A_worker_group; + +-- dc_dolphin_T_t_ds_error_command_D_worker_group_id +drop PROCEDURE if EXISTS dc_dolphin_T_t_ds_error_command_D_worker_group_id; +delimiter d// +CREATE PROCEDURE dc_dolphin_T_t_ds_error_command_D_worker_group_id() + BEGIN + IF EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_NAME='t_ds_error_command' + AND TABLE_SCHEMA=(SELECT DATABASE()) + AND COLUMN_NAME ='worker_group_id') + THEN + ALTER TABLE t_ds_error_command DROP COLUMN worker_group_id; + END IF; + END; + +d// + +delimiter ; +CALL dc_dolphin_T_t_ds_error_command_D_worker_group_id; +DROP PROCEDURE dc_dolphin_T_t_ds_error_command_D_worker_group_id; diff --git a/sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_ddl.sql b/sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_ddl.sql index dd332533a6..a4fdc2d6bf 100644 --- a/sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_ddl.sql +++ b/sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_ddl.sql @@ -279,4 +279,88 @@ delimiter ; select dc_dolphin_T_t_ds_schedules_D_worker_group_id(); DROP FUNCTION dc_dolphin_T_t_ds_schedules_D_worker_group_id(); +-- ac_dolphin_T_t_ds_command_A_worker_group +delimiter ; +DROP FUNCTION IF EXISTS ac_dolphin_T_t_ds_command_A_worker_group(); +delimiter d// +CREATE FUNCTION ac_dolphin_T_t_ds_command_A_worker_group() RETURNS void AS $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_CATALOG=current_database() + AND TABLE_SCHEMA=current_schema() + AND TABLE_NAME='t_ds_command' + AND COLUMN_NAME ='worker_group') + THEN + ALTER TABLE t_ds_command ADD COLUMN worker_group varchar(255) DEFAULT null; + END IF; +END; +$$ LANGUAGE plpgsql; +d// +delimiter ; +select ac_dolphin_T_t_ds_command_A_worker_group(); +DROP FUNCTION ac_dolphin_T_t_ds_command_A_worker_group(); + +-- dc_dolphin_T_t_ds_command_D_worker_group_id +delimiter ; +DROP FUNCTION IF EXISTS dc_dolphin_T_t_ds_command_D_worker_group_id(); +delimiter d// +CREATE FUNCTION dc_dolphin_T_t_ds_command_D_worker_group_id() RETURNS void AS $$ +BEGIN + IF EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_CATALOG=current_database() + AND TABLE_SCHEMA=current_schema() + AND TABLE_NAME='t_ds_command' + AND COLUMN_NAME ='worker_group_id') + THEN + ALTER TABLE t_ds_command DROP COLUMN worker_group_id; + END IF; +END; +$$ LANGUAGE plpgsql; +d// +delimiter ; +select dc_dolphin_T_t_ds_command_D_worker_group_id(); +DROP FUNCTION dc_dolphin_T_t_ds_command_D_worker_group_id(); + +-- ac_dolphin_T_t_ds_error_command_A_worker_group +delimiter ; +DROP FUNCTION IF EXISTS ac_dolphin_T_t_ds_error_command_A_worker_group(); +delimiter d// +CREATE FUNCTION ac_dolphin_T_t_ds_error_command_A_worker_group() RETURNS void AS $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_CATALOG=current_database() + AND TABLE_SCHEMA=current_schema() + AND TABLE_NAME='t_ds_error_command' + AND COLUMN_NAME ='worker_group') + THEN + ALTER TABLE t_ds_error_command ADD COLUMN worker_group varchar(255) DEFAULT null; + END IF; +END; +$$ LANGUAGE plpgsql; +d// +delimiter ; +select ac_dolphin_T_t_ds_error_command_A_worker_group(); +DROP FUNCTION ac_dolphin_T_t_ds_error_command_A_worker_group(); + +-- dc_dolphin_T_t_ds_error_command_D_worker_group_id +delimiter ; +DROP FUNCTION IF EXISTS dc_dolphin_T_t_ds_error_command_D_worker_group_id(); +delimiter d// +CREATE FUNCTION dc_dolphin_T_t_ds_error_command_D_worker_group_id() RETURNS void AS $$ +BEGIN + IF EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_CATALOG=current_database() + AND TABLE_SCHEMA=current_schema() + AND TABLE_NAME='t_ds_error_command' + AND COLUMN_NAME ='worker_group_id') + THEN + ALTER TABLE t_ds_error_command DROP COLUMN worker_group_id; + END IF; +END; +$$ LANGUAGE plpgsql; +d// +delimiter ; +select dc_dolphin_T_t_ds_error_command_D_worker_group_id(); +DROP FUNCTION dc_dolphin_T_t_ds_error_command_D_worker_group_id(); + From 96835ebda2d4f9c5aa13b39bdd1e90f5d702b2f6 Mon Sep 17 00:00:00 2001 From: qiaozhanwei Date: Wed, 15 Apr 2020 16:19:42 +0800 Subject: [PATCH 04/10] =?UTF-8?q?1=EF=BC=8Cno=20worker=20condition=20,=20m?= =?UTF-8?q?aster=20will=20while=20ture=20wait=20for=20worker=20startup=202?= =?UTF-8?q?=EF=BC=8Cworker=20response=20task=20status=20sync=20wait=20for?= =?UTF-8?q?=20result=20(#2420)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * dispatch task fail will set task status failed * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result Co-authored-by: qiaozhanwei --- .../consumer/TaskPriorityQueueConsumer.java | 27 +++-- .../master/processor/TaskAckProcessor.java | 28 +++++- .../processor/TaskResponseProcessor.java | 25 +++++ .../worker/processor/TaskCallbackService.java | 15 ++- .../processor/TaskCallbackServiceTest.java | 98 +++++++++++++++++-- 5 files changed, 176 insertions(+), 17 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index 50c851c483..b2cf53a575 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -53,6 +53,8 @@ import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.dolphinscheduler.common.Constants.*; + /** * TaskUpdateQueue consumer */ @@ -68,7 +70,7 @@ public class TaskPriorityQueueConsumer extends Thread{ * taskUpdateQueue */ @Autowired - private TaskPriorityQueue taskUpdateQueue; + private TaskPriorityQueue taskPriorityQueue; /** * processService @@ -93,7 +95,7 @@ public class TaskPriorityQueueConsumer extends Thread{ while (Stopper.isRunning()){ try { // if not task , blocking here - String taskPriorityInfo = taskUpdateQueue.take(); + String taskPriorityInfo = taskPriorityQueue.take(); TaskPriority taskPriority = TaskPriority.of(taskPriorityInfo); @@ -114,13 +116,22 @@ public class TaskPriorityQueueConsumer extends Thread{ private Boolean dispatch(int taskInstanceId){ TaskExecutionContext context = getTaskExecutionContext(taskInstanceId); ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER, context.getWorkerGroup()); - try { - return dispatcher.dispatch(executionContext); - } catch (ExecuteException e) { - logger.error("execute exception", e); - return false; - } + Boolean result = false; + while (Stopper.isRunning()){ + try { + result = dispatcher.dispatch(executionContext); + } catch (ExecuteException e) { + logger.error("dispatch error",e); + try { + Thread.sleep(SLEEP_TIME_MILLIS); + } catch (InterruptedException e1) {} + } + if (result){ + break; + } + } + return result; } /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java index 1eb40db152..7af9cdc2cc 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java @@ -19,7 +19,9 @@ package org.apache.dolphinscheduler.server.master.processor; import io.netty.channel.Channel; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.utils.Preconditions; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; @@ -31,9 +33,12 @@ import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheMan import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.apache.dolphinscheduler.service.process.ProcessService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.dolphinscheduler.common.Constants.*; + /** * task ack processor */ @@ -51,9 +56,16 @@ public class TaskAckProcessor implements NettyRequestProcessor { */ private final TaskInstanceCacheManager taskInstanceCacheManager; + + /** + * processService + */ + private ProcessService processService; + public TaskAckProcessor(){ this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class); this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class); + this.processService = SpringApplicationContext.getBean(ProcessService.class); } /** @@ -71,8 +83,10 @@ public class TaskAckProcessor implements NettyRequestProcessor { String workerAddress = ChannelUtils.toAddress(channel).getAddress(); + ExecutionStatus ackStatus = ExecutionStatus.of(taskAckCommand.getStatus()); + // TaskResponseEvent - TaskResponseEvent taskResponseEvent = TaskResponseEvent.newAck(ExecutionStatus.of(taskAckCommand.getStatus()), + TaskResponseEvent taskResponseEvent = TaskResponseEvent.newAck(ackStatus, taskAckCommand.getStartTime(), workerAddress, taskAckCommand.getExecutePath(), @@ -81,6 +95,18 @@ public class TaskAckProcessor implements NettyRequestProcessor { taskResponseService.addResponse(taskResponseEvent); + while (Stopper.isRunning()){ + TaskInstance taskInstance = processService.findTaskInstanceById(taskAckCommand.getTaskInstanceId()); + + if (taskInstance != null && ackStatus.typeIsRunning()){ + break; + } + + try { + Thread.sleep(SLEEP_TIME_MILLIS); + } catch (InterruptedException e) {} + } + } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java index 36b382313b..ecb8646ad0 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java @@ -19,7 +19,9 @@ package org.apache.dolphinscheduler.server.master.processor; import io.netty.channel.Channel; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.utils.Preconditions; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; @@ -30,9 +32,12 @@ import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheMan import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.apache.dolphinscheduler.service.process.ProcessService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.dolphinscheduler.common.Constants.*; + /** * task response processor */ @@ -50,9 +55,15 @@ public class TaskResponseProcessor implements NettyRequestProcessor { */ private final TaskInstanceCacheManager taskInstanceCacheManager; + /** + * processService + */ + private ProcessService processService; + public TaskResponseProcessor(){ this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class); this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class); + this.processService = SpringApplicationContext.getBean(ProcessService.class); } /** @@ -71,6 +82,8 @@ public class TaskResponseProcessor implements NettyRequestProcessor { taskInstanceCacheManager.cacheTaskInstance(responseCommand); + ExecutionStatus responseStatus = ExecutionStatus.of(responseCommand.getStatus()); + // TaskResponseEvent TaskResponseEvent taskResponseEvent = TaskResponseEvent.newResult(ExecutionStatus.of(responseCommand.getStatus()), responseCommand.getEndTime(), @@ -79,6 +92,18 @@ public class TaskResponseProcessor implements NettyRequestProcessor { responseCommand.getTaskInstanceId()); taskResponseService.addResponse(taskResponseEvent); + + while (Stopper.isRunning()){ + TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId()); + + if (taskInstance != null && responseStatus.typeIsFinished()){ + break; + } + + try { + Thread.sleep(SLEEP_TIME_MILLIS); + } catch (InterruptedException e) {} + } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java index f966591df4..7cd25cba65 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java @@ -18,9 +18,11 @@ package org.apache.dolphinscheduler.server.worker.processor; +import com.alibaba.fastjson.JSONObject; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; +import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.command.Command; @@ -93,8 +95,17 @@ public class TaskCallbackService { } logger.warn("original master : {} is not reachable, random select master", nettyRemoteChannel.getHost()); Set masterNodes = zookeeperRegistryCenter.getMasterNodesDirectly(); - if(CollectionUtils.isEmpty(masterNodes)){ - throw new IllegalStateException("no available master node exception"); + while (Stopper.isRunning()) { + if (CollectionUtils.isEmpty(masterNodes)) { + logger.error("no available master node"); + try { + Thread.sleep(1000); + }catch (Exception e){ + + } + }else { + break; + } } for(String masterNode : masterNodes){ newChannel = nettyRemotingClient.getChannel(Host.of(masterNode)); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java index 5f44e1cee2..a0fee7c36e 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java @@ -17,21 +17,26 @@ package org.apache.dolphinscheduler.server.worker.processor; import io.netty.channel.Channel; +import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; +import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.config.NettyServerConfig; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor; +import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor; +import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService; import org.apache.dolphinscheduler.server.master.registry.MasterRegistry; import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry; import org.apache.dolphinscheduler.server.zk.SpringZKServer; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; import org.junit.Test; @@ -47,9 +52,10 @@ import java.util.Date; * test task call back service */ @RunWith(SpringJUnit4ClassRunner.class) -@ContextConfiguration(classes={TaskCallbackServiceTestConfig.class, SpringZKServer.class, MasterRegistry.class, WorkerRegistry.class, +@ContextConfiguration(classes={TaskCallbackServiceTestConfig.class, SpringZKServer.class, SpringApplicationContext.class, MasterRegistry.class, WorkerRegistry.class, ZookeeperRegistryCenter.class, MasterConfig.class, WorkerConfig.class, - ZookeeperCachedOperator.class, ZookeeperConfig.class, ZookeeperNodeManager.class, TaskCallbackService.class}) + ZookeeperCachedOperator.class, ZookeeperConfig.class, ZookeeperNodeManager.class, TaskCallbackService.class, + TaskResponseService.class, TaskAckProcessor.class,TaskResponseProcessor.class}) public class TaskCallbackServiceTest { @Autowired @@ -58,12 +64,22 @@ public class TaskCallbackServiceTest { @Autowired private MasterRegistry masterRegistry; + @Autowired + private TaskAckProcessor taskAckProcessor; + + @Autowired + private TaskResponseProcessor taskResponseProcessor; + + /** + * send ack test + * @throws Exception + */ @Test - public void testSendAck(){ + public void testSendAck() throws Exception{ final NettyServerConfig serverConfig = new NettyServerConfig(); serverConfig.setListenPort(30000); NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig); - nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, Mockito.mock(TaskAckProcessor.class)); + nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, taskAckProcessor); nettyRemotingServer.start(); final NettyClientConfig clientConfig = new NettyClientConfig(); @@ -75,22 +91,64 @@ public class TaskCallbackServiceTest { ackCommand.setStartTime(new Date()); taskCallbackService.sendAck(1, ackCommand.convert2Command()); + Thread.sleep(5000); + + Stopper.stop(); + + Thread.sleep(5000); + nettyRemotingServer.close(); nettyRemotingClient.close(); } + /** + * send result test + * @throws Exception + */ + @Test + public void testSendResult() throws Exception{ + final NettyServerConfig serverConfig = new NettyServerConfig(); + serverConfig.setListenPort(30000); + NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig); + nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskResponseProcessor); + nettyRemotingServer.start(); + + final NettyClientConfig clientConfig = new NettyClientConfig(); + NettyRemotingClient nettyRemotingClient = new NettyRemotingClient(clientConfig); + Channel channel = nettyRemotingClient.getChannel(Host.of("localhost:30000")); + taskCallbackService.addRemoteChannel(1, new NettyRemoteChannel(channel, 1)); + TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(); + responseCommand.setTaskInstanceId(1); + responseCommand.setEndTime(new Date()); + + taskCallbackService.sendResult(1, responseCommand.convert2Command()); + + Thread.sleep(5000); + + Stopper.stop(); + + Thread.sleep(5000); + + nettyRemotingServer.close(); + nettyRemotingClient.close(); + } + + + @Test(expected = IllegalArgumentException.class) public void testSendAckWithIllegalArgumentException(){ TaskExecuteAckCommand ackCommand = Mockito.mock(TaskExecuteAckCommand.class); taskCallbackService.sendAck(1, ackCommand.convert2Command()); + Stopper.stop(); } @Test(expected = IllegalStateException.class) public void testSendAckWithIllegalStateException1(){ + masterRegistry.registry(); final NettyServerConfig serverConfig = new NettyServerConfig(); serverConfig.setListenPort(30000); NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig); - nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, Mockito.mock(TaskAckProcessor.class)); + nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, taskAckProcessor); nettyRemotingServer.start(); final NettyClientConfig clientConfig = new NettyClientConfig(); @@ -103,7 +161,21 @@ public class TaskCallbackServiceTest { ackCommand.setStartTime(new Date()); nettyRemotingServer.close(); + taskCallbackService.sendAck(1, ackCommand.convert2Command()); + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + Stopper.stop(); + + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } } @Test(expected = IllegalStateException.class) @@ -112,7 +184,7 @@ public class TaskCallbackServiceTest { final NettyServerConfig serverConfig = new NettyServerConfig(); serverConfig.setListenPort(30000); NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig); - nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, Mockito.mock(TaskAckProcessor.class)); + nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, taskAckProcessor); nettyRemotingServer.start(); final NettyClientConfig clientConfig = new NettyClientConfig(); @@ -125,6 +197,20 @@ public class TaskCallbackServiceTest { ackCommand.setStartTime(new Date()); nettyRemotingServer.close(); + taskCallbackService.sendAck(1, ackCommand.convert2Command()); + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + Stopper.stop(); + + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } } } From c4acbdcfd0159df3c3336ff9b491b9fc7387116f Mon Sep 17 00:00:00 2001 From: break60 <790061044@qq.com> Date: Wed, 15 Apr 2020 17:00:31 +0800 Subject: [PATCH 05/10] Repair naming (#2426) * Fix the problem of data echo in script edit box * Optimize resource tree * Change the name of the shell node and modify the parameter transmission method of spark, mr, python, and flink nodes * Repair naming * Modify list style Co-authored-by: dailidong Co-authored-by: qiaozhanwei --- .../conf/home/pages/dag/_source/formModel/tasks/flink.vue | 4 ++-- .../js/conf/home/pages/dag/_source/formModel/tasks/mr.vue | 4 ++-- .../conf/home/pages/dag/_source/formModel/tasks/python.vue | 5 +++-- .../conf/home/pages/dag/_source/formModel/tasks/spark.vue | 4 ++-- .../projects/pages/instance/pages/list/_source/list.vue | 4 ++-- .../home/pages/projects/pages/taskInstance/_source/list.vue | 6 +++--- 6 files changed, 14 insertions(+), 13 deletions(-) diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue index 30b644dd69..195e3c64f9 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue @@ -391,8 +391,8 @@ } let noResources = [{ id: -1, - name: $t('No resources'), - fullName: '/'+$t('No resources'), + name: $t('Unauthorized or deleted resources'), + fullName: '/'+$t('Unauthorized or deleted resources'), children: [] }] if(optionsCmp.length>0) { diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/mr.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/mr.vue index 1df1f93ba9..112e47dc4f 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/mr.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/mr.vue @@ -232,8 +232,8 @@ } let noResources = [{ id: -1, - name: $t('No resources'), - fullName: '/'+$t('No resources'), + name: $t('Unauthorized or deleted resources'), + fullName: '/'+$t('Unauthorized or deleted resources'), children: [] }] if(optionsCmp.length>0) { diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/python.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/python.vue index 3e2abcbb00..dd7ea942dd 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/python.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/python.vue @@ -224,8 +224,8 @@ } let noResources = [{ id: -1, - name: $t('No resources'), - fullName: '/'+$t('No resources'), + name: $t('Unauthorized or deleted resources'), + fullName: '/'+$t('Unauthorized or deleted resources'), children: [] }] if(optionsCmp.length>0) { @@ -308,6 +308,7 @@ }) } else { this.resourceList.push(v.id) + this.dataProcess(backResource) } }) this.cacheResourceList = resourceList diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/spark.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/spark.vue index 2304ab1d20..628d58ab0c 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/spark.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/spark.vue @@ -351,8 +351,8 @@ } let noResources = [{ id: -1, - name: $t('No resources'), - fullName: '/'+$t('No resources'), + name: $t('Unauthorized or deleted resources'), + fullName: '/'+$t('Unauthorized or deleted resources'), children: [] }] if(optionsCmp.length>0) { diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue index 513b8ec6dd..641c7aaa63 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue @@ -49,10 +49,10 @@ {{$t('Run Times')}} - + {{$t('host')}} - + {{$t('fault-tolerant sign')}} diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/taskInstance/_source/list.vue b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/taskInstance/_source/list.vue index f7be553568..f014874175 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/taskInstance/_source/list.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/taskInstance/_source/list.vue @@ -43,13 +43,13 @@ {{$t('Start Time')}} - + {{$t('End Time')}} - + {{$t('host')}} - + {{$t('Duration')}}(s) From f21837076e0647103fc901e604013d0e21139460 Mon Sep 17 00:00:00 2001 From: zixi0825 <649790970@qq.com> Date: Wed, 15 Apr 2020 17:39:12 +0800 Subject: [PATCH 06/10] Solve the failure to execute non-query sql (#2413) Co-authored-by: sunchaohe Co-authored-by: qiaozhanwei Co-authored-by: lgcareer <18610854716@163.com> --- .../dolphinscheduler/server/worker/task/sql/SqlTask.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java index 61d93259df..22fa91dc1d 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java @@ -214,11 +214,8 @@ public class SqlTask extends AbstractTask { try { // if upload resource is HDFS and kerberos startup CommonUtils.loadKerberosConf(); - - // create connection connection = createConnection(); - // create temp function if (CollectionUtils.isNotEmpty(createFuncs)) { createTempFunction(connection,createFuncs); @@ -226,13 +223,12 @@ public class SqlTask extends AbstractTask { // pre sql preSql(connection,preStatementsBinds); - - stmt = prepareStatementAndBind(connection, mainSqlBinds); - resultSet = stmt.executeQuery(); + // decide whether to executeQuery or executeUpdate based on sqlType if (sqlParameters.getSqlType() == SqlType.QUERY.ordinal()) { // query statements need to be convert to JsonArray and inserted into Alert to send + resultSet = stmt.executeQuery(); resultProcess(resultSet); } else if (sqlParameters.getSqlType() == SqlType.NON_QUERY.ordinal()) { From 347f05f08c3203c9e0890b244134196c79a27b9c Mon Sep 17 00:00:00 2001 From: xingchun-chen <55787491+xingchun-chen@users.noreply.github.com> Date: Wed, 15 Apr 2020 18:30:31 +0800 Subject: [PATCH 07/10] Update worker_group_id to worker_group (#2433) * add LoginTest license * Delete useless packages * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * Update worker_group_id to worker_group in init.sql * Update worker_group_id to worker_group in init.sql * Update worker_group_id to worker_group Co-authored-by: chenxingchun <438044805@qq.com> --- .../docker-entrypoint-initdb/init.sql | 6 ++-- .../dao/entity/ErrorCommand.java | 35 +++++++++---------- 2 files changed, 20 insertions(+), 21 deletions(-) diff --git a/docker/postgres/docker-entrypoint-initdb/init.sql b/docker/postgres/docker-entrypoint-initdb/init.sql index b48ddde042..b26520e29c 100755 --- a/docker/postgres/docker-entrypoint-initdb/init.sql +++ b/docker/postgres/docker-entrypoint-initdb/init.sql @@ -234,7 +234,7 @@ CREATE TABLE t_ds_command ( dependence varchar(255) DEFAULT NULL , update_time timestamp DEFAULT NULL , process_instance_priority int DEFAULT NULL , - worker_group_id int DEFAULT '-1' , + worker_group varchar(64), PRIMARY KEY (id) ) ; @@ -275,7 +275,7 @@ CREATE TABLE t_ds_error_command ( update_time timestamp DEFAULT NULL , dependence text , process_instance_priority int DEFAULT NULL , - worker_group_id int DEFAULT '-1' , + worker_group varchar(64), message text , PRIMARY KEY (id) ); @@ -748,7 +748,7 @@ CREATE SEQUENCE t_ds_worker_server_id_sequence; ALTER TABLE t_ds_worker_server ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_worker_server_id_sequence'); --- Records of t_ds_user,user : admin , password : dolphinscheduler123 +-- Records of t_ds_user?user : admin , password : dolphinscheduler123 INSERT INTO t_ds_user(user_name,user_password,user_type,email,phone,tenant_id,create_time,update_time) VALUES ('admin', '7ad2410b2f4c074479a8937a28a22b8f', '0', 'xxx@qq.com', 'xx', '0', '2018-03-27 15:48:50', '2018-10-24 17:40:22'); -- Records of t_ds_alertgroup,dolphinscheduler warning group diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ErrorCommand.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ErrorCommand.java index 7f3eb38760..127c5b7322 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ErrorCommand.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ErrorCommand.java @@ -101,9 +101,9 @@ public class ErrorCommand { private String message; /** - * worker group id + * worker group */ - private int workerGroupId; + private String workerGroup; public ErrorCommand(){} @@ -257,17 +257,25 @@ public class ErrorCommand { this.updateTime = updateTime; } - public int getWorkerGroupId() { - return workerGroupId; + public String getWorkerGroup() { + return workerGroup; } - public void setWorkerGroupId(int workerGroupId) { - this.workerGroupId = workerGroupId; + public void setWorkerGroup(String workerGroup) { + this.workerGroup = workerGroup; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; } @Override public String toString() { - return "Command{" + + return "ErrorCommand{" + "id=" + id + ", commandType=" + commandType + ", processDefinitionId=" + processDefinitionId + @@ -281,17 +289,8 @@ public class ErrorCommand { ", startTime=" + startTime + ", processInstancePriority=" + processInstancePriority + ", updateTime=" + updateTime + - ", message=" + message + + ", message='" + message + '\'' + + ", workerGroup='" + workerGroup + '\'' + '}'; } - - public String getMessage() { - return message; - } - - public void setMessage(String message) { - this.message = message; - } - - } From 8b4eb20b5cd27121cfe1c4bd57bb2369fef884d8 Mon Sep 17 00:00:00 2001 From: lgcareer <18610854716@163.com> Date: Wed, 15 Apr 2020 22:28:49 +0800 Subject: [PATCH 08/10] fix NPE when grant resource or save process definition (#2434) Co-authored-by: dailidong --- .../api/service/ProcessDefinitionService.java | 6 ++++-- .../api/service/UsersService.java | 21 ++++++++++++------- .../mysql/dolphinscheduler_ddl.sql | 10 ++++----- .../postgresql/dolphinscheduler_ddl.sql | 10 ++++----- 4 files changed, 28 insertions(+), 19 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java index eed9c78e74..368492388d 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java @@ -173,8 +173,10 @@ public class ProcessDefinitionService extends BaseDAGService { for(TaskNode taskNode : tasks){ String taskParameter = taskNode.getParams(); AbstractParameters params = TaskParametersUtils.getParameters(taskNode.getType(),taskParameter); - Set tempSet = params.getResourceFilesList().stream().map(t->t.getId()).collect(Collectors.toSet()); - resourceIds.addAll(tempSet); + if (CollectionUtils.isNotEmpty(params.getResourceFilesList())) { + Set tempSet = params.getResourceFilesList().stream().map(t->t.getId()).collect(Collectors.toSet()); + resourceIds.addAll(tempSet); + } } StringBuilder sb = new StringBuilder(); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UsersService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UsersService.java index 4671188d28..220b4fc4d0 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UsersService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UsersService.java @@ -423,6 +423,7 @@ public class UsersService extends BaseService { * @param projectIds project id array * @return grant result code */ + @Transactional(rollbackFor = Exception.class) public Map grantProject(User loginUser, int userId, String projectIds) { Map result = new HashMap<>(5); result.put(Constants.STATUS, false); @@ -472,6 +473,7 @@ public class UsersService extends BaseService { * @param resourceIds resource id array * @return grant result code */ + @Transactional(rollbackFor = Exception.class) public Map grantResources(User loginUser, int userId, String resourceIds) { Map result = new HashMap<>(5); //only admin can operate @@ -484,17 +486,20 @@ public class UsersService extends BaseService { return result; } - String[] resourceFullIdArr = resourceIds.split(","); - // need authorize resource id set Set needAuthorizeResIds = new HashSet(); - for (String resourceFullId : resourceFullIdArr) { - String[] resourceIdArr = resourceFullId.split("-"); - for (int i=0;i<=resourceIdArr.length-1;i++) { - int resourceIdValue = Integer.parseInt(resourceIdArr[i]); - needAuthorizeResIds.add(resourceIdValue); + if (StringUtils.isNotBlank(resourceIds)) { + String[] resourceFullIdArr = resourceIds.split(","); + // need authorize resource id set + for (String resourceFullId : resourceFullIdArr) { + String[] resourceIdArr = resourceFullId.split("-"); + for (int i=0;i<=resourceIdArr.length-1;i++) { + int resourceIdValue = Integer.parseInt(resourceIdArr[i]); + needAuthorizeResIds.add(resourceIdValue); + } } } + //get the authorized resource id list by user id List oldAuthorizedRes = resourceMapper.queryAuthorizedResourceList(userId); //if resource type is UDF,need check whether it is bound by UDF functon @@ -565,6 +570,7 @@ public class UsersService extends BaseService { * @param udfIds udf id array * @return grant result code */ + @Transactional(rollbackFor = Exception.class) public Map grantUDFFunction(User loginUser, int userId, String udfIds) { Map result = new HashMap<>(5); @@ -611,6 +617,7 @@ public class UsersService extends BaseService { * @param datasourceIds data source id array * @return grant result code */ + @Transactional(rollbackFor = Exception.class) public Map grantDataSource(User loginUser, int userId, String datasourceIds) { Map result = new HashMap<>(5); result.put(Constants.STATUS, false); diff --git a/sql/upgrade/1.2.2_schema/mysql/dolphinscheduler_ddl.sql b/sql/upgrade/1.2.2_schema/mysql/dolphinscheduler_ddl.sql index 33d801d4ec..7432396825 100644 --- a/sql/upgrade/1.2.2_schema/mysql/dolphinscheduler_ddl.sql +++ b/sql/upgrade/1.2.2_schema/mysql/dolphinscheduler_ddl.sql @@ -167,7 +167,7 @@ CREATE PROCEDURE ac_dolphin_T_t_ds_process_instance_A_worker_group() AND TABLE_SCHEMA=(SELECT DATABASE()) AND COLUMN_NAME ='worker_group') THEN - ALTER TABLE t_ds_process_instance ADD `worker_group` varchar(255) DEFAULT '' COMMENT 'worker group'; + ALTER TABLE t_ds_process_instance ADD `worker_group` varchar(64) DEFAULT '' COMMENT 'worker group'; END IF; END; @@ -207,7 +207,7 @@ CREATE PROCEDURE ac_dolphin_T_t_ds_task_instance_A_worker_group() AND TABLE_SCHEMA=(SELECT DATABASE()) AND COLUMN_NAME ='worker_group') THEN - ALTER TABLE t_ds_task_instance ADD `worker_group` varchar(255) DEFAULT '' COMMENT 'worker group'; + ALTER TABLE t_ds_task_instance ADD `worker_group` varchar(64) DEFAULT '' COMMENT 'worker group'; END IF; END; @@ -247,7 +247,7 @@ CREATE PROCEDURE ac_dolphin_T_t_ds_schedules_A_worker_group() AND TABLE_SCHEMA=(SELECT DATABASE()) AND COLUMN_NAME ='worker_group') THEN - ALTER TABLE t_ds_schedules ADD `worker_group` varchar(255) DEFAULT '' COMMENT 'worker group'; + ALTER TABLE t_ds_schedules ADD `worker_group` varchar(64) DEFAULT '' COMMENT 'worker group'; END IF; END; @@ -287,7 +287,7 @@ CREATE PROCEDURE ac_dolphin_T_t_ds_command_A_worker_group() AND TABLE_SCHEMA=(SELECT DATABASE()) AND COLUMN_NAME ='worker_group') THEN - ALTER TABLE t_ds_command ADD `worker_group` varchar(255) DEFAULT '' COMMENT 'worker group'; + ALTER TABLE t_ds_command ADD `worker_group` varchar(64) DEFAULT '' COMMENT 'worker group'; END IF; END; @@ -327,7 +327,7 @@ CREATE PROCEDURE ac_dolphin_T_t_ds_error_command_A_worker_group() AND TABLE_SCHEMA=(SELECT DATABASE()) AND COLUMN_NAME ='worker_group') THEN - ALTER TABLE t_ds_error_command ADD `worker_group` varchar(255) DEFAULT '' COMMENT 'worker group'; + ALTER TABLE t_ds_error_command ADD `worker_group` varchar(64) DEFAULT '' COMMENT 'worker group'; END IF; END; diff --git a/sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_ddl.sql b/sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_ddl.sql index a4fdc2d6bf..5312d895c8 100644 --- a/sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_ddl.sql +++ b/sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_ddl.sql @@ -164,7 +164,7 @@ BEGIN AND TABLE_NAME='t_ds_process_instance' AND COLUMN_NAME ='worker_group') THEN - ALTER TABLE t_ds_process_instance ADD COLUMN worker_group varchar(255) DEFAULT null; + ALTER TABLE t_ds_process_instance ADD COLUMN worker_group varchar(64) DEFAULT null; END IF; END; $$ LANGUAGE plpgsql; @@ -207,7 +207,7 @@ BEGIN AND TABLE_NAME='t_ds_task_instance' AND COLUMN_NAME ='worker_group') THEN - ALTER TABLE t_ds_task_instance ADD COLUMN worker_group varchar(255) DEFAULT null; + ALTER TABLE t_ds_task_instance ADD COLUMN worker_group varchar(64) DEFAULT null; END IF; END; $$ LANGUAGE plpgsql; @@ -249,7 +249,7 @@ BEGIN AND TABLE_NAME='t_ds_schedules' AND COLUMN_NAME ='worker_group') THEN - ALTER TABLE t_ds_schedules ADD COLUMN worker_group varchar(255) DEFAULT null; + ALTER TABLE t_ds_schedules ADD COLUMN worker_group varchar(64) DEFAULT null; END IF; END; $$ LANGUAGE plpgsql; @@ -291,7 +291,7 @@ BEGIN AND TABLE_NAME='t_ds_command' AND COLUMN_NAME ='worker_group') THEN - ALTER TABLE t_ds_command ADD COLUMN worker_group varchar(255) DEFAULT null; + ALTER TABLE t_ds_command ADD COLUMN worker_group varchar(64) DEFAULT null; END IF; END; $$ LANGUAGE plpgsql; @@ -333,7 +333,7 @@ BEGIN AND TABLE_NAME='t_ds_error_command' AND COLUMN_NAME ='worker_group') THEN - ALTER TABLE t_ds_error_command ADD COLUMN worker_group varchar(255) DEFAULT null; + ALTER TABLE t_ds_error_command ADD COLUMN worker_group varchar(64) DEFAULT null; END IF; END; $$ LANGUAGE plpgsql; From d6798c6db7fe50189cd83e4b2a619587c7c5a47a Mon Sep 17 00:00:00 2001 From: qiaozhanwei Date: Thu, 16 Apr 2020 18:16:19 +0800 Subject: [PATCH 09/10] =?UTF-8?q?1=EF=BC=8Ctask=20status=20statistics=20an?= =?UTF-8?q?d=20process=20status=20statistics=20bug=20fix=20(#2357)=20=202?= =?UTF-8?q?=EF=BC=8Cworker=20group=20bug=20fix=20(#2430)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * dispatch task fail will set task status failed * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,task status statistics and process status statistics bug fix (#2357) 2,worker group bug fix * 1,task status statistics and process status statistics bug fix (#2357) 2,worker group bug fix * 1,task status statistics and process status statistics bug fix (#2357) 2,worker group bug fix * 1,task status statistics and process status statistics bug fix (#2357) 2,worker group bug fix Co-authored-by: qiaozhanwei --- .../api/service/DataAnalysisService.java | 14 +++++--------- .../api/service/DataAnalysisServiceTest.java | 7 ------- .../common/thread/ThreadUtils.java | 10 ++++++++++ .../dolphinscheduler/dao/entity/Command.java | 2 +- .../dao/mapper/CommandMapperTest.java | 13 ++++--------- .../master/consumer/TaskPriorityQueueConsumer.java | 5 ++--- .../server/master/processor/TaskAckProcessor.java | 6 ++---- .../master/processor/TaskResponseProcessor.java | 6 ++---- .../worker/processor/TaskCallbackService.java | 10 ++++------ sql/dolphinscheduler-postgre.sql | 6 +++--- sql/dolphinscheduler_mysql.sql | 4 ++-- 11 files changed, 35 insertions(+), 48 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataAnalysisService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataAnalysisService.java index 0c93e00a80..39bec56357 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataAnalysisService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataAnalysisService.java @@ -106,14 +106,12 @@ public class DataAnalysisService extends BaseService{ List taskInstanceStateCounts = taskInstanceMapper.countTaskInstanceStateByUser(start, end, projectIds); - if (taskInstanceStateCounts != null && !taskInstanceStateCounts.isEmpty()) { + if (taskInstanceStateCounts != null) { TaskCountDto taskCountResult = new TaskCountDto(taskInstanceStateCounts); result.put(Constants.DATA_LIST, taskCountResult); putMsg(result, Status.SUCCESS); - } else { - putMsg(result, Status.TASK_INSTANCE_STATE_COUNT_ERROR); } - return result; + return result; } private void putErrorRequestParamsMsg(Map result) { @@ -153,14 +151,12 @@ public class DataAnalysisService extends BaseService{ processInstanceMapper.countInstanceStateByUser(start, end, projectIdArray); - if (processInstanceStateCounts != null && !processInstanceStateCounts.isEmpty()) { + if (processInstanceStateCounts != null) { TaskCountDto taskCountResult = new TaskCountDto(processInstanceStateCounts); result.put(Constants.DATA_LIST, taskCountResult); putMsg(result, Status.SUCCESS); - } else { - putMsg(result, Status.COUNT_PROCESS_INSTANCE_STATE_ERROR); } - return result; + return result; } @@ -234,7 +230,7 @@ public class DataAnalysisService extends BaseService{ // count error command state List errorCommandStateCounts = errorCommandMapper.countCommandState( - start, end, projectIdArray); + start, end, projectIdArray); // Map> dataMap = new HashMap<>(); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataAnalysisServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataAnalysisServiceTest.java index 35cc6ae9a6..14612fcef8 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataAnalysisServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataAnalysisServiceTest.java @@ -114,9 +114,6 @@ public class DataAnalysisServiceTest { Map result = dataAnalysisService.countTaskStateByProject(user, 2, startDate, endDate); Assert.assertTrue(result.isEmpty()); - // task instance state count error - result = dataAnalysisService.countTaskStateByProject(user, 1, startDate, endDate); - Assert.assertEquals(Status.TASK_INSTANCE_STATE_COUNT_ERROR,result.get(Constants.STATUS)); //SUCCESS Mockito.when(taskInstanceMapper.countTaskInstanceStateByUser(DateUtils.getScheduleDate(startDate), @@ -137,10 +134,6 @@ public class DataAnalysisServiceTest { Map result = dataAnalysisService.countProcessInstanceStateByProject(user,2,startDate,endDate); Assert.assertTrue(result.isEmpty()); - //COUNT_PROCESS_INSTANCE_STATE_ERROR - result = dataAnalysisService.countProcessInstanceStateByProject(user,1,startDate,endDate); - Assert.assertEquals(Status.COUNT_PROCESS_INSTANCE_STATE_ERROR,result.get(Constants.STATUS)); - //SUCCESS Mockito.when(processInstanceMapper.countInstanceStateByUser(DateUtils.getScheduleDate(startDate), DateUtils.getScheduleDate(endDate), new Integer[]{1})).thenReturn(getTaskInstanceStateCounts()); diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java index e348835363..a9a124547a 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java @@ -223,4 +223,14 @@ public class ThreadUtils { } return id + " (" + name + ")"; } + + /** + * sleep + * @param millis millis + */ + public static void sleep(final long millis) { + try { + Thread.sleep(millis); + } catch (final InterruptedException ignore) {} + } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java index 5a6974803c..7d52dc93f3 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java @@ -111,7 +111,7 @@ public class Command { /** * worker group */ - @TableField(exist = false) + @TableField("worker_group") private String workerGroup; public Command() { diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java index 468824bf6e..297ea66c94 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java @@ -76,7 +76,8 @@ public class CommandMapperTest { //query Command actualCommand = commandMapper.selectById(expectedCommand.getId()); - assertEquals(expectedCommand, actualCommand); + assertNotNull(actualCommand); + assertEquals(expectedCommand.getProcessDefinitionId(), actualCommand.getProcessDefinitionId()); } /** @@ -94,7 +95,8 @@ public class CommandMapperTest { Command actualCommand = commandMapper.selectById(expectedCommand.getId()); - assertEquals(expectedCommand,actualCommand); + assertNotNull(actualCommand); + assertEquals(expectedCommand.getUpdateTime(),actualCommand.getUpdateTime()); } @@ -127,13 +129,6 @@ public class CommandMapperTest { List actualCommands = commandMapper.selectList(null); assertThat(actualCommands.size(), greaterThanOrEqualTo(count)); - - for (Command actualCommand : actualCommands){ - Command expectedCommand = commandMap.get(actualCommand.getId()); - if (expectedCommand != null){ - assertEquals(expectedCommand,actualCommand); - } - } } /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index b2cf53a575..cdd9ff2219 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceMysqlParameter; import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetMysqlParameter; import org.apache.dolphinscheduler.common.thread.Stopper; +import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.dao.entity.*; import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; @@ -122,9 +123,7 @@ public class TaskPriorityQueueConsumer extends Thread{ result = dispatcher.dispatch(executionContext); } catch (ExecuteException e) { logger.error("dispatch error",e); - try { - Thread.sleep(SLEEP_TIME_MILLIS); - } catch (InterruptedException e1) {} + ThreadUtils.sleep(SLEEP_TIME_MILLIS); } if (result){ diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java index 7af9cdc2cc..3460248dfb 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.processor; import io.netty.channel.Channel; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.thread.Stopper; +import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.Preconditions; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.command.Command; @@ -101,10 +102,7 @@ public class TaskAckProcessor implements NettyRequestProcessor { if (taskInstance != null && ackStatus.typeIsRunning()){ break; } - - try { - Thread.sleep(SLEEP_TIME_MILLIS); - } catch (InterruptedException e) {} + ThreadUtils.sleep(SLEEP_TIME_MILLIS); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java index ecb8646ad0..721b146d86 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.processor; import io.netty.channel.Channel; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.thread.Stopper; +import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.Preconditions; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.command.Command; @@ -99,10 +100,7 @@ public class TaskResponseProcessor implements NettyRequestProcessor { if (taskInstance != null && responseStatus.typeIsFinished()){ break; } - - try { - Thread.sleep(SLEEP_TIME_MILLIS); - } catch (InterruptedException e) {} + ThreadUtils.sleep(SLEEP_TIME_MILLIS); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java index 7cd25cba65..ecae9edc1c 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java @@ -18,11 +18,11 @@ package org.apache.dolphinscheduler.server.worker.processor; -import com.alibaba.fastjson.JSONObject; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import org.apache.dolphinscheduler.common.thread.Stopper; +import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.command.Command; @@ -37,6 +37,8 @@ import org.springframework.stereotype.Service; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS; + /** * taks callback service */ @@ -98,11 +100,7 @@ public class TaskCallbackService { while (Stopper.isRunning()) { if (CollectionUtils.isEmpty(masterNodes)) { logger.error("no available master node"); - try { - Thread.sleep(1000); - }catch (Exception e){ - - } + ThreadUtils.sleep(SLEEP_TIME_MILLIS); }else { break; } diff --git a/sql/dolphinscheduler-postgre.sql b/sql/dolphinscheduler-postgre.sql index b48ddde042..b26520e29c 100644 --- a/sql/dolphinscheduler-postgre.sql +++ b/sql/dolphinscheduler-postgre.sql @@ -234,7 +234,7 @@ CREATE TABLE t_ds_command ( dependence varchar(255) DEFAULT NULL , update_time timestamp DEFAULT NULL , process_instance_priority int DEFAULT NULL , - worker_group_id int DEFAULT '-1' , + worker_group varchar(64), PRIMARY KEY (id) ) ; @@ -275,7 +275,7 @@ CREATE TABLE t_ds_error_command ( update_time timestamp DEFAULT NULL , dependence text , process_instance_priority int DEFAULT NULL , - worker_group_id int DEFAULT '-1' , + worker_group varchar(64), message text , PRIMARY KEY (id) ); @@ -748,7 +748,7 @@ CREATE SEQUENCE t_ds_worker_server_id_sequence; ALTER TABLE t_ds_worker_server ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_worker_server_id_sequence'); --- Records of t_ds_user,user : admin , password : dolphinscheduler123 +-- Records of t_ds_user?user : admin , password : dolphinscheduler123 INSERT INTO t_ds_user(user_name,user_password,user_type,email,phone,tenant_id,create_time,update_time) VALUES ('admin', '7ad2410b2f4c074479a8937a28a22b8f', '0', 'xxx@qq.com', 'xx', '0', '2018-03-27 15:48:50', '2018-10-24 17:40:22'); -- Records of t_ds_alertgroup,dolphinscheduler warning group diff --git a/sql/dolphinscheduler_mysql.sql b/sql/dolphinscheduler_mysql.sql index 559e684133..c0c81dcb21 100644 --- a/sql/dolphinscheduler_mysql.sql +++ b/sql/dolphinscheduler_mysql.sql @@ -333,7 +333,7 @@ CREATE TABLE `t_ds_command` ( `dependence` varchar(255) DEFAULT NULL COMMENT 'dependence', `update_time` datetime DEFAULT NULL COMMENT 'update time', `process_instance_priority` int(11) DEFAULT NULL COMMENT 'process instance priority: 0 Highest,1 High,2 Medium,3 Low,4 Lowest', - `worker_group_id` int(11) DEFAULT '-1' COMMENT 'worker group id', + `worker_group` varchar(64) COMMENT 'worker group', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8; @@ -380,7 +380,7 @@ CREATE TABLE `t_ds_error_command` ( `update_time` datetime DEFAULT NULL COMMENT 'update time', `dependence` text COMMENT 'dependence', `process_instance_priority` int(11) DEFAULT NULL COMMENT 'process instance priority, 0 Highest,1 High,2 Medium,3 Low,4 Lowest', - `worker_group_id` int(11) DEFAULT '-1' COMMENT 'worker group id', + `worker_group` varchar(64) COMMENT 'worker group', `message` text COMMENT 'message', PRIMARY KEY (`id`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC; From f929c6f3b7a43b78db977206c3425d88ae3c357b Mon Sep 17 00:00:00 2001 From: songgg <1172417734@qq.com> Date: Thu, 16 Apr 2020 22:50:26 +0800 Subject: [PATCH 10/10] =?UTF-8?q?[optimization]=EF=BC=9Aprevent=20repeated?= =?UTF-8?q?=20database=20updates=20(#2396)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * sqlTask failed to run * prevent repeated database updates * prevent repeated database updates * prevent repeated database updates Co-authored-by: songqh Co-authored-by: dailidong --- .../api/service/ExecutorService.java | 25 +++++++++---------- .../master/runner/MasterExecThread.java | 2 +- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java index 7ce7497e98..d290886572 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java @@ -225,20 +225,14 @@ public class ExecutorService extends BaseService{ if (processInstance.getState() == ExecutionStatus.READY_STOP) { putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(), processInstance.getState()); } else { - processInstance.setCommandType(CommandType.STOP); - processInstance.addHistoryCmd(CommandType.STOP); - processService.updateProcessInstance(processInstance); - result = updateProcessInstanceState(processInstanceId, ExecutionStatus.READY_STOP); + result = updateProcessInstancePrepare(processInstance, CommandType.STOP, ExecutionStatus.READY_STOP); } break; case PAUSE: if (processInstance.getState() == ExecutionStatus.READY_PAUSE) { putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(), processInstance.getState()); } else { - processInstance.setCommandType(CommandType.PAUSE); - processInstance.addHistoryCmd(CommandType.PAUSE); - processService.updateProcessInstance(processInstance); - result = updateProcessInstanceState(processInstanceId, ExecutionStatus.READY_PAUSE); + result = updateProcessInstancePrepare(processInstance, CommandType.PAUSE, ExecutionStatus.READY_PAUSE); } break; default: @@ -308,22 +302,27 @@ public class ExecutorService extends BaseService{ } /** - * update process instance state + * prepare to update process instance command type and status * - * @param processInstanceId process instance id + * @param processInstance process instance + * @param commandType command type * @param executionStatus execute status * @return update result */ - private Map updateProcessInstanceState(Integer processInstanceId, ExecutionStatus executionStatus) { + private Map updateProcessInstancePrepare(ProcessInstance processInstance, CommandType commandType, ExecutionStatus executionStatus) { Map result = new HashMap<>(5); - int update = processService.updateProcessInstanceState(processInstanceId, executionStatus); + processInstance.setCommandType(commandType); + processInstance.addHistoryCmd(commandType); + processInstance.setState(executionStatus); + int update = processService.updateProcessInstance(processInstance); + + // determine whether the process is normal if (update > 0) { putMsg(result, Status.SUCCESS); } else { putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR); } - return result; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index b1ac73cb54..e1e8c090fd 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -914,7 +914,7 @@ public class MasterExecThread implements Runnable { processInstance.getId(), processInstance.getName(), processInstance.getState(), state, processInstance.getCommandType()); - processInstance.setState(state); + ProcessInstance instance = processService.findProcessInstanceById(processInstance.getId()); instance.setState(state); instance.setProcessDefinition(processInstance.getProcessDefinition());