From 88a07f7b55d164b6a022d2f9df75ecbb258c280f Mon Sep 17 00:00:00 2001 From: zhuangchong <37063904+zhuangchong@users.noreply.github.com> Date: Thu, 29 Apr 2021 18:29:44 +0800 Subject: [PATCH] [Feature-4093][server] Support for stored procedures and stored function calls and data source supports DB2 (#4094) * Support for stored procedures and stored function calls and data source supports DB2. Co-authored-by: zhuangchong --- .../server/worker/task/TaskManager.java | 2 +- .../ProcedureTask.java | 230 +++++++----------- .../task/procedure/ProcedureTaskTest.java | 119 +++++++++ .../dag/_source/formModel/tasks/procedure.vue | 20 +- .../src/js/module/i18n/locale/en_US.js | 4 +- .../src/js/module/i18n/locale/zh_CN.js | 4 +- pom.xml | 1 + 7 files changed, 224 insertions(+), 156 deletions(-) rename dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/{processdure => procedure}/ProcedureTask.java (61%) create mode 100644 dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/procedure/ProcedureTaskTest.java diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java index b89c8d4ca6..3ef3a41030 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java @@ -24,7 +24,7 @@ import org.apache.dolphinscheduler.server.worker.task.datax.DataxTask; import org.apache.dolphinscheduler.server.worker.task.flink.FlinkTask; import org.apache.dolphinscheduler.server.worker.task.http.HttpTask; import org.apache.dolphinscheduler.server.worker.task.mr.MapReduceTask; -import org.apache.dolphinscheduler.server.worker.task.processdure.ProcedureTask; +import org.apache.dolphinscheduler.server.worker.task.procedure.ProcedureTask; import org.apache.dolphinscheduler.server.worker.task.python.PythonTask; import org.apache.dolphinscheduler.server.worker.task.shell.ShellTask; import org.apache.dolphinscheduler.server.worker.task.spark.SparkTask; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/procedure/ProcedureTask.java similarity index 61% rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java rename to dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/procedure/ProcedureTask.java index 5e771deb50..d351175fca 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/procedure/ProcedureTask.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.server.worker.task.processdure; +package org.apache.dolphinscheduler.server.worker.task.procedure; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.datasource.ConnectionParam; @@ -77,7 +77,6 @@ public class ProcedureTask extends AbstractTask { this.procedureParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), ProcedureParameters.class); - // check parameters if (!procedureParameters.checkParameters()) { throw new RuntimeException("procedure task params is not valid"); @@ -108,8 +107,6 @@ public class ProcedureTask extends AbstractTask { // get jdbc connection connection = DatasourceUtil.getConnection(dbType, connectionParam); - - // combining local and global parameters Map paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), taskExecutionContext.getDefinedParams(), @@ -117,30 +114,18 @@ public class ProcedureTask extends AbstractTask { CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), taskExecutionContext.getScheduleTime()); - Collection userDefParamsList = null; - - if (procedureParameters.getLocalParametersMap() != null) { - userDefParamsList = procedureParameters.getLocalParametersMap().values(); - } - - String method = getCallMethod(userDefParamsList); - - logger.info("call method : {}",method); - // call method - stmt = connection.prepareCall(method); + stmt = connection.prepareCall(procedureParameters.getMethod()); // set timeout setTimeout(stmt); // outParameterMap - Map outParameterMap = getOutParameterMap(stmt, paramsMap, userDefParamsList); + Map outParameterMap = getOutParameterMap(stmt, paramsMap); stmt.executeUpdate(); - /** - * print the output parameters to the log - */ + // print the output parameters to the log printOutParameter(stmt, outParameterMap); setExitStatusCode(Constants.EXIT_CODE_SUCCESS); @@ -149,37 +134,15 @@ public class ProcedureTask extends AbstractTask { logger.error("procedure task error", e); throw e; } finally { - close(stmt, connection); + close(stmt,connection); } } - /** - * get call method - * @param userDefParamsList userDefParamsList - * @return method - */ - private String getCallMethod(Collection userDefParamsList) { - String method;// no parameters - if (CollectionUtils.isEmpty(userDefParamsList)) { - method = "{call " + procedureParameters.getMethod() + "}"; - } else { // exists parameters - int size = userDefParamsList.size(); - StringBuilder parameter = new StringBuilder(); - parameter.append("("); - for (int i = 0; i < size - 1; i++) { - parameter.append("?,"); - } - parameter.append("?)"); - method = "{call " + procedureParameters.getMethod() + parameter.toString() + "}"; - } - return method; - } - /** * print outParameter * @param stmt CallableStatement * @param outParameterMap outParameterMap - * @throws SQLException + * @throws SQLException SQLException */ private void printOutParameter(CallableStatement stmt, Map outParameterMap) throws SQLException { @@ -201,80 +164,85 @@ public class ProcedureTask extends AbstractTask { * * @param stmt CallableStatement * @param paramsMap paramsMap - * @param userDefParamsList userDefParamsList * @return outParameterMap - * @throws Exception + * @throws Exception Exception */ - private Map getOutParameterMap(CallableStatement stmt, - Map paramsMap, - Collection userDefParamsList) throws Exception { + private Map getOutParameterMap(CallableStatement stmt, Map paramsMap) throws Exception { Map outParameterMap = new HashMap<>(); - if (userDefParamsList != null && userDefParamsList.size() > 0) { - int index = 1; - for (Property property : userDefParamsList) { - logger.info("localParams : prop : {} , dirct : {} , type : {} , value : {}" - , property.getProp(), - property.getDirect(), - property.getType(), - property.getValue()); - // set parameters - if (property.getDirect().equals(Direct.IN)) { - ParameterUtils.setInParameter(index, stmt, property.getType(), paramsMap.get(property.getProp()).getValue()); - } else if (property.getDirect().equals(Direct.OUT)) { - setOutParameter(index, stmt, property.getType(), paramsMap.get(property.getProp()).getValue()); - property.setValue(paramsMap.get(property.getProp()).getValue()); - outParameterMap.put(index, property); - } - index++; + if (procedureParameters.getLocalParametersMap() == null) { + return outParameterMap; + } + + Collection userDefParamsList = procedureParameters.getLocalParametersMap().values(); + + if (CollectionUtils.isEmpty(userDefParamsList)) { + return outParameterMap; + } + + int index = 1; + for (Property property : userDefParamsList) { + logger.info("localParams : prop : {} , dirct : {} , type : {} , value : {}" + ,property.getProp(), + property.getDirect(), + property.getType(), + property.getValue()); + // set parameters + if (property.getDirect().equals(Direct.IN)) { + ParameterUtils.setInParameter(index, stmt, property.getType(), paramsMap.get(property.getProp()).getValue()); + } else if (property.getDirect().equals(Direct.OUT)) { + setOutParameter(index,stmt,property.getType(),paramsMap.get(property.getProp()).getValue()); + property.setValue(paramsMap.get(property.getProp()).getValue()); + outParameterMap.put(index,property); } + index++; } + return outParameterMap; } /** * set timtou * @param stmt CallableStatement - * @throws SQLException + * @throws SQLException SQLException */ private void setTimeout(CallableStatement stmt) throws SQLException { Boolean failed = TaskTimeoutStrategy.of(taskExecutionContext.getTaskTimeoutStrategy()) == TaskTimeoutStrategy.FAILED; - Boolean warnfailed = TaskTimeoutStrategy.of(taskExecutionContext.getTaskTimeoutStrategy()) == TaskTimeoutStrategy.WARNFAILED; - if (failed || warnfailed) { + Boolean warnFailed = TaskTimeoutStrategy.of(taskExecutionContext.getTaskTimeoutStrategy()) == TaskTimeoutStrategy.WARNFAILED; + if (failed || warnFailed) { stmt.setQueryTimeout(taskExecutionContext.getTaskTimeout()); } } /** - * close jdbc resource - * - * @param stmt - * @param connection - */ - private void close(PreparedStatement stmt, - Connection connection) { + * close jdbc resource + * + * @param stmt stmt + * @param connection connection + */ + private void close(PreparedStatement stmt, Connection connection) { if (stmt != null) { try { stmt.close(); } catch (SQLException e) { - + logger.error("close prepared statement error : {}",e.getMessage(),e); } } if (connection != null) { try { connection.close(); } catch (SQLException e) { - + logger.error("close connection error : {}",e.getMessage(),e); } } } /** * get output parameter - * @param stmt - * @param index - * @param prop - * @param dataType - * @throws SQLException + * @param stmt stmt + * @param index index + * @param prop prop + * @param dataType dataType + * @throws SQLException SQLException */ private void getOutputParameter(CallableStatement stmt, int index, String prop, DataType dataType) throws SQLException { switch (dataType) { @@ -324,67 +292,43 @@ public class ProcedureTask extends AbstractTask { * @param value value * @throws Exception exception */ - private void setOutParameter(int index, CallableStatement stmt, DataType dataType, String value) throws Exception { - if (dataType.equals(DataType.VARCHAR)) { - if (StringUtils.isEmpty(value)) { - stmt.registerOutParameter(index, Types.VARCHAR); - } else { - stmt.registerOutParameter(index, Types.VARCHAR, value); - } - - } else if (dataType.equals(DataType.INTEGER)) { - if (StringUtils.isEmpty(value)) { - stmt.registerOutParameter(index, Types.INTEGER); - } else { - stmt.registerOutParameter(index, Types.INTEGER, value); - } - - } else if (dataType.equals(DataType.LONG)) { - if (StringUtils.isEmpty(value)) { - stmt.registerOutParameter(index, Types.INTEGER); - } else { - stmt.registerOutParameter(index, Types.INTEGER, value); - } - } else if (dataType.equals(DataType.FLOAT)) { - if (StringUtils.isEmpty(value)) { - stmt.registerOutParameter(index, Types.FLOAT); - } else { - stmt.registerOutParameter(index, Types.FLOAT, value); - } - } else if (dataType.equals(DataType.DOUBLE)) { - if (StringUtils.isEmpty(value)) { - stmt.registerOutParameter(index, Types.DOUBLE); - } else { - stmt.registerOutParameter(index, Types.DOUBLE, value); - } - - } else if (dataType.equals(DataType.DATE)) { - if (StringUtils.isEmpty(value)) { - stmt.registerOutParameter(index, Types.DATE); - } else { - stmt.registerOutParameter(index, Types.DATE, value); - } - - } else if (dataType.equals(DataType.TIME)) { - if (StringUtils.isEmpty(value)) { - stmt.registerOutParameter(index, Types.TIME); - } else { - stmt.registerOutParameter(index, Types.TIME, value); - } - - } else if (dataType.equals(DataType.TIMESTAMP)) { - if (StringUtils.isEmpty(value)) { - stmt.registerOutParameter(index, Types.TIMESTAMP); - } else { - stmt.registerOutParameter(index, Types.TIMESTAMP, value); - } + private void setOutParameter(int index,CallableStatement stmt,DataType dataType,String value)throws Exception { + int sqlType; + switch (dataType) { + case VARCHAR: + sqlType = Types.VARCHAR; + break; + case INTEGER: + case LONG: + sqlType = Types.INTEGER; + break; + case FLOAT: + sqlType = Types.FLOAT; + break; + case DOUBLE: + sqlType = Types.DOUBLE; + break; + case DATE: + sqlType = Types.DATE; + break; + case TIME: + sqlType = Types.TIME; + break; + case TIMESTAMP: + sqlType = Types.TIMESTAMP; + break; + case BOOLEAN: + sqlType = Types.BOOLEAN; + break; + default: + throw new IllegalStateException("Unexpected value: " + dataType); + } - } else if (dataType.equals(DataType.BOOLEAN)) { - if (StringUtils.isEmpty(value)) { - stmt.registerOutParameter(index, Types.BOOLEAN); - } else { - stmt.registerOutParameter(index, Types.BOOLEAN, value); - } + if (StringUtils.isEmpty(value)) { + stmt.registerOutParameter(index, sqlType); + } else { + stmt.registerOutParameter(index, sqlType, value); } + } -} \ No newline at end of file +} diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/procedure/ProcedureTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/procedure/ProcedureTaskTest.java new file mode 100644 index 0000000000..8ceeab7d3b --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/procedure/ProcedureTaskTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.task.procedure; + +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.server.entity.ProcedureTaskExecutionContext; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.worker.task.TaskProps; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.apache.dolphinscheduler.service.process.ProcessService; + +import java.sql.CallableStatement; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Date; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.ApplicationContext; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({ProcedureTask.class,DriverManager.class}) +public class ProcedureTaskTest { + private static final Logger logger = LoggerFactory.getLogger(ProcedureTaskTest.class); + + private static final String CONNECTION_PARAMS = "{\"user\":\"root\",\"password\":\"123456\",\"address\":\"jdbc:mysql://127.0.0.1:3306\"," + + "\"database\":\"test\",\"jdbcUrl\":\"jdbc:mysql://127.0.0.1:3306/test\"}"; + + private ProcedureTask procedureTask; + + private ProcessService processService; + + private ApplicationContext applicationContext; + + private TaskExecutionContext taskExecutionContext; + + @Before + public void before() throws Exception { + taskExecutionContext = new TaskExecutionContext(); + processService = PowerMockito.mock(ProcessService.class); + applicationContext = PowerMockito.mock(ApplicationContext.class); + SpringApplicationContext springApplicationContext = new SpringApplicationContext(); + springApplicationContext.setApplicationContext(applicationContext); + PowerMockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService); + + TaskProps props = new TaskProps(); + props.setExecutePath("/tmp"); + props.setTaskAppId(String.valueOf(System.currentTimeMillis())); + props.setTaskInstanceId(1); + props.setTenantCode("1"); + props.setEnvFile(".dolphinscheduler_env.sh"); + props.setTaskStartTime(new Date()); + props.setTaskTimeout(0); + props.setTaskParams( + "{\"localParams\":[],\"type\":\"POSTGRESQL\",\"datasource\":1,\"method\":\"add\"}"); + + taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class); + PowerMockito.when(taskExecutionContext.getTaskParams()).thenReturn(props.getTaskParams()); + PowerMockito.when(taskExecutionContext.getExecutePath()).thenReturn("/tmp"); + PowerMockito.when(taskExecutionContext.getTaskAppId()).thenReturn("1"); + PowerMockito.when(taskExecutionContext.getTenantCode()).thenReturn("root"); + PowerMockito.when(taskExecutionContext.getStartTime()).thenReturn(new Date()); + PowerMockito.when(taskExecutionContext.getTaskTimeout()).thenReturn(10000); + PowerMockito.when(taskExecutionContext.getLogPath()).thenReturn("/tmp/dx"); + + ProcedureTaskExecutionContext procedureTaskExecutionContext = new ProcedureTaskExecutionContext(); + procedureTaskExecutionContext.setConnectionParams(CONNECTION_PARAMS); + PowerMockito.when(taskExecutionContext.getProcedureTaskExecutionContext()).thenReturn(procedureTaskExecutionContext); + + procedureTask = new ProcedureTask(taskExecutionContext, logger); + procedureTask.init(); + } + + @Test + public void testGetParameters() { + Assert.assertNotNull(procedureTask.getParameters()); + } + + @Test + public void testHandle() throws SQLException { + + Connection connection = PowerMockito.mock(Connection.class); + PowerMockito.mockStatic(DriverManager.class); + PowerMockito.when(DriverManager.getConnection(Mockito.any(), Mockito.any(), Mockito.any())).thenReturn(connection); + CallableStatement callableStatement = PowerMockito.mock(CallableStatement.class); + PowerMockito.when(connection.prepareCall(Mockito.any())).thenReturn(callableStatement); + try { + procedureTask.handle(); + Assert.assertEquals(Constants.EXIT_CODE_SUCCESS,procedureTask.getExitStatusCode()); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + +} diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/procedure.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/procedure.vue index 790977dd5b..af12806f9e 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/procedure.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/procedure.vue @@ -22,20 +22,23 @@ -
{{$t('methods')}}
+
+ + {{$t('SQL Statement')}} + +
+ :autosize="{minRows:5}" + type="textarea" + :disabled="isDetails" + v-model="method" + :placeholder="$t('Please enter the procedure method')">
@@ -103,7 +106,7 @@ // Verification function if (!this.method) { - this.$message.warning(`${i18n.$t('Please enter method')}`) + this.$message.warning(`${i18n.$t('Please enter a SQL Statement(required)')}`) return false } @@ -111,6 +114,7 @@ if (!this.$refs.refLocalParams._verifProp()) { return false } + // storage this.$emit('on-params', { type: this.type, diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js index 7c9eb0808e..94f4ba3b68 100755 --- a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js +++ b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js @@ -99,7 +99,8 @@ export default { 'Custom template': 'Custom template', Datasource: 'Datasource', methods: 'methods', - 'Please enter method(optional)': 'Please enter method(optional)', + 'Please enter the procedure method': 'Please enter the procedure script \n\ncall procedure:{call [(,, ...)]}\n\ncall function:{?= call [(,, ...)]} ', + 'The procedure method script example': 'example:{call [(?,?, ...)]} or {?= call [(?,?, ...)]}', Script: 'Script', 'Please enter script(required)': 'Please enter script(required)', 'Deploy Mode': 'Deploy Mode', @@ -439,7 +440,6 @@ export default { 'Process instance details': 'Process instance details', 'Create Resource': 'Create Resource', 'User Center': 'User Center', - 'Please enter method': 'Please enter method', None: 'None', Name: 'Name', 'Process priority': 'Process priority', diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js index e18b6a8c83..7026108646 100755 --- a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js +++ b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js @@ -99,7 +99,8 @@ export default { 'Custom template': '自定义模版', Datasource: '数据源', methods: '方法', - 'Please enter method(optional)': '请输入方法(选填)', + 'Please enter the procedure method': '请输入存储脚本 \n\n调用存储过程:{call [(,, ...)]}\n\n调用存储函数:{?= call [(,, ...)]} ', + 'The procedure method script example': '示例:{call [(?,?, ...)]} 或 {?= call [(?,?, ...)]}', Script: '脚本', 'Please enter script(required)': '请输入脚本(必填)', 'Deploy Mode': '部署方式', @@ -439,7 +440,6 @@ export default { 'Process instance details': '流程实例详情', 'Create Resource': '创建资源', 'User Center': '用户中心', - 'Please enter method': '请输入方法', None: '无', Name: '名称', 'Process priority': '流程优先级', diff --git a/pom.xml b/pom.xml index e698304dd8..ea58fec804 100644 --- a/pom.xml +++ b/pom.xml @@ -969,6 +969,7 @@ **/server/worker/task/datax/DataxTaskTest.java **/server/worker/task/sqoop/SqoopTaskTest.java + **/server/worker/task/processdure/ProcedureTaskTest.java **/server/worker/task/shell/ShellTaskTest.java **/server/worker/task/TaskManagerTest.java **/server/worker/task/AbstractCommandExecutorTest.java