From ae6ae039755ddb2f3442c0dbba784a049c575cbb Mon Sep 17 00:00:00 2001 From: Baoqi Date: Mon, 22 Apr 2019 12:11:01 +0800 Subject: [PATCH] close #85 Add Pre/Post Statement support in SQL Task --- .../escheduler/common/task/sql/SqlBinds.java | 42 +++++ .../common/task/sql/SqlParameters.java | 25 +++ .../server/worker/task/sql/SqlTask.java | 137 ++++++++++------- .../formModel/tasks/_source/statementList.vue | 143 ++++++++++++++++++ .../pages/dag/_source/formModel/tasks/sql.vue | 57 ++++++- .../src/js/module/i18n/locale/en_US.js | 3 + .../src/js/module/i18n/locale/zh_CN.js | 5 +- 7 files changed, 357 insertions(+), 55 deletions(-) create mode 100644 escheduler-common/src/main/java/cn/escheduler/common/task/sql/SqlBinds.java create mode 100644 escheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/statementList.vue diff --git a/escheduler-common/src/main/java/cn/escheduler/common/task/sql/SqlBinds.java b/escheduler-common/src/main/java/cn/escheduler/common/task/sql/SqlBinds.java new file mode 100644 index 0000000000..50975df3ed --- /dev/null +++ b/escheduler-common/src/main/java/cn/escheduler/common/task/sql/SqlBinds.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cn.escheduler.common.task.sql; + +import cn.escheduler.common.process.Property; + +import java.util.Map; + +/** + * Used to contains both prepared sql string and its to-be-bind parameters + */ +public class SqlBinds { + private final String sql; + private final Map paramsMap; + + public SqlBinds(String sql, Map paramsMap) { + this.sql = sql; + this.paramsMap = paramsMap; + } + + public String getSql() { + return sql; + } + + public Map getParamsMap() { + return paramsMap; + } +} diff --git a/escheduler-common/src/main/java/cn/escheduler/common/task/sql/SqlParameters.java b/escheduler-common/src/main/java/cn/escheduler/common/task/sql/SqlParameters.java index 4feb7037f0..1c85a81d96 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/task/sql/SqlParameters.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/task/sql/SqlParameters.java @@ -64,6 +64,14 @@ public class SqlParameters extends AbstractParameters { * SQL connection parameters */ private String connParams; + /** + * Pre Statements + */ + private List preStatements; + /** + * Post Statements + */ + private List postStatements; public String getType() { return type; @@ -121,6 +129,21 @@ public class SqlParameters extends AbstractParameters { this.connParams = connParams; } + public List getPreStatements() { + return preStatements; + } + + public void setPreStatements(List preStatements) { + this.preStatements = preStatements; + } + + public List getPostStatements() { + return postStatements; + } + + public void setPostStatements(List postStatements) { + this.postStatements = postStatements; + } @Override public boolean checkParameters() { @@ -142,6 +165,8 @@ public class SqlParameters extends AbstractParameters { ", udfs='" + udfs + '\'' + ", showType='" + showType + '\'' + ", connParams='" + connParams + '\'' + + ", preStatements=" + preStatements + + ", postStatements=" + postStatements + '}'; } } diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java index bab755ba67..cd9a4cde23 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java @@ -25,6 +25,7 @@ import cn.escheduler.common.enums.UdfType; import cn.escheduler.common.job.db.*; import cn.escheduler.common.process.Property; import cn.escheduler.common.task.AbstractParameters; +import cn.escheduler.common.task.sql.SqlBinds; import cn.escheduler.common.task.sql.SqlParameters; import cn.escheduler.common.task.sql.SqlType; import cn.escheduler.common.utils.CollectionUtils; @@ -48,6 +49,7 @@ import java.sql.*; import java.util.*; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; /** * sql task @@ -131,11 +133,17 @@ public class SqlTask extends AbstractTask { Class.forName(Constants.JDBC_SQLSERVER_CLASS_NAME); } - Map sqlParamMap = new HashMap(); - StringBuilder sqlBuilder = new StringBuilder(); // ready to execute SQL and parameter entity Map - setSqlAndSqlParamsMap(sqlBuilder,sqlParamMap); + SqlBinds mainSqlBinds = getSqlAndSqlParamsMap(sqlParameters.getSql()); + List preStatementSqlBinds = Optional.ofNullable(sqlParameters.getPreStatements()).orElse(new ArrayList<>()) + .stream() + .map(this::getSqlAndSqlParamsMap) + .collect(Collectors.toList()); + List postStatementSqlBinds = Optional.ofNullable(sqlParameters.getPostStatements()).orElse(new ArrayList<>()) + .stream() + .map(this::getSqlAndSqlParamsMap) + .collect(Collectors.toList()); if(EnumUtils.isValidEnum(UdfType.class, sqlParameters.getType()) && StringUtils.isNotEmpty(sqlParameters.getUdfs())){ List udfFuncList = processDao.queryUdfFunListByids(sqlParameters.getUdfs()); @@ -143,7 +151,7 @@ public class SqlTask extends AbstractTask { } // execute sql task - con = executeFuncAndSql(baseDataSource,sqlBuilder.toString(),sqlParamMap,createFuncs); + con = executeFuncAndSql(baseDataSource, mainSqlBinds, preStatementSqlBinds, postStatementSqlBinds, createFuncs); } finally { if (con != null) { @@ -162,9 +170,9 @@ public class SqlTask extends AbstractTask { * ready to execute SQL and parameter entity Map * @return */ - private void setSqlAndSqlParamsMap(StringBuilder sqlBuilder,Map sqlParamsMap) { - - String sql = sqlParameters.getSql(); + private SqlBinds getSqlAndSqlParamsMap(String sql) { + Map sqlParamsMap = new HashMap<>(); + StringBuilder sqlBuilder = new StringBuilder(); // find process instance by task id ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskProps.getTaskInstId()); @@ -178,7 +186,7 @@ public class SqlTask extends AbstractTask { // spell SQL according to the final user-defined variable if(paramsMap == null){ sqlBuilder.append(sql); - return; + return new SqlBinds(sqlBuilder.toString(), sqlParamsMap); } // special characters need to be escaped, ${} needs to be escaped @@ -191,6 +199,7 @@ public class SqlTask extends AbstractTask { // print repalce sql printReplacedSql(sql,formatSql,rgex,sqlParamsMap); + return new SqlBinds(sqlBuilder.toString(), sqlParamsMap); } @Override @@ -201,10 +210,16 @@ public class SqlTask extends AbstractTask { /** * execute sql * @param baseDataSource - * @param sql - * @param params + * @param mainSqlBinds + * @param preStatementsBinds + * @param postStatementsBinds + * @param createFuncs */ - public Connection executeFuncAndSql(BaseDataSource baseDataSource, String sql, Map params, List createFuncs){ + public Connection executeFuncAndSql(BaseDataSource baseDataSource, + SqlBinds mainSqlBinds, + List preStatementsBinds, + List postStatementsBinds, + List createFuncs){ Connection connection = null; try { @@ -223,66 +238,86 @@ public class SqlTask extends AbstractTask { baseDataSource.getUser(), baseDataSource.getPassword()); } - Statement funcStmt = connection.createStatement(); // create temp function - if (createFuncs != null) { - for (String createFunc : createFuncs) { - logger.info("hive create function sql: {}", createFunc); - funcStmt.execute(createFunc); + if (CollectionUtils.isNotEmpty(createFuncs)) { + try (Statement funcStmt = connection.createStatement()) { + for (String createFunc : createFuncs) { + logger.info("hive create function sql: {}", createFunc); + funcStmt.execute(createFunc); + } } } - PreparedStatement stmt = connection.prepareStatement(sql); - if(taskProps.getTaskTimeoutStrategy() == TaskTimeoutStrategy.FAILED || taskProps.getTaskTimeoutStrategy() == TaskTimeoutStrategy.WARNFAILED){ - stmt.setQueryTimeout(taskProps.getTaskTimeout()); - } - if(params != null){ - for(Integer key : params.keySet()){ - Property prop = params.get(key); - ParameterUtils.setInParameter(key,stmt,prop.getType(),prop.getValue()); + for (SqlBinds sqlBind: preStatementsBinds) { + try (PreparedStatement stmt = prepareStatementAndBind(connection, sqlBind)) { + int result = stmt.executeUpdate(); + logger.info("pre statement execute result: " + result + ", for sql: " + sqlBind.getSql()); } } - // decide whether to executeQuery or executeUpdate based on sqlType - if(sqlParameters.getSqlType() == SqlType.QUERY.ordinal()){ - // query statements need to be convert to JsonArray and inserted into Alert to send - JSONArray array=new JSONArray(); - ResultSet resultSet = stmt.executeQuery(); - ResultSetMetaData md=resultSet.getMetaData(); - int num=md.getColumnCount(); - - while(resultSet.next()){ - JSONObject mapOfColValues=new JSONObject(true); - for(int i=1;i<=num;i++){ - mapOfColValues.put(md.getColumnName(i), resultSet.getObject(i)); + + try (PreparedStatement stmt = prepareStatementAndBind(connection, mainSqlBinds)) { + // decide whether to executeQuery or executeUpdate based on sqlType + if (sqlParameters.getSqlType() == SqlType.QUERY.ordinal()) { + // query statements need to be convert to JsonArray and inserted into Alert to send + JSONArray array = new JSONArray(); + ResultSet resultSet = stmt.executeQuery(); + ResultSetMetaData md = resultSet.getMetaData(); + int num = md.getColumnCount(); + + while (resultSet.next()) { + JSONObject mapOfColValues = new JSONObject(true); + for (int i = 1; i <= num; i++) { + mapOfColValues.put(md.getColumnName(i), resultSet.getObject(i)); + } + array.add(mapOfColValues); } - array.add(mapOfColValues); - } - logger.info("execute sql : {}",JSONObject.toJSONString(array, SerializerFeature.WriteMapNullValue)); + logger.info("execute sql : {}", JSONObject.toJSONString(array, SerializerFeature.WriteMapNullValue)); - // send as an attachment - if(StringUtils.isEmpty(sqlParameters.getShowType())){ - logger.info("showType is empty,don't need send email"); - }else{ - if(array.size() > 0 ){ - sendAttachment(taskProps.getNodeName() + " query resultsets ",JSONObject.toJSONString(array, SerializerFeature.WriteMapNullValue)); + // send as an attachment + if (StringUtils.isEmpty(sqlParameters.getShowType())) { + logger.info("showType is empty,don't need send email"); + } else { + if (array.size() > 0) { + sendAttachment(taskProps.getNodeName() + " query resultsets ", JSONObject.toJSONString(array, SerializerFeature.WriteMapNullValue)); + } } - } - exitStatusCode = 0; + exitStatusCode = 0; - }else if(sqlParameters.getSqlType() == SqlType.NON_QUERY.ordinal()){ - // non query statement - int result = stmt.executeUpdate(); - exitStatusCode = 0; + } else if (sqlParameters.getSqlType() == SqlType.NON_QUERY.ordinal()) { + // non query statement + int result = stmt.executeUpdate(); + exitStatusCode = 0; + } } + for (SqlBinds sqlBind: postStatementsBinds) { + try (PreparedStatement stmt = prepareStatementAndBind(connection, sqlBind)) { + int result = stmt.executeUpdate(); + logger.info("post statement execute result: " + result + ", for sql: " + sqlBind.getSql()); + } + } } catch (Exception e) { logger.error(e.getMessage(),e); } return connection; } + private PreparedStatement prepareStatementAndBind(Connection connection, SqlBinds sqlBinds) throws Exception { + PreparedStatement stmt = connection.prepareStatement(sqlBinds.getSql()); + if(taskProps.getTaskTimeoutStrategy() == TaskTimeoutStrategy.FAILED || taskProps.getTaskTimeoutStrategy() == TaskTimeoutStrategy.WARNFAILED){ + stmt.setQueryTimeout(taskProps.getTaskTimeout()); + } + Map params = sqlBinds.getParamsMap(); + if(params != null){ + for(Integer key : params.keySet()){ + Property prop = params.get(key); + ParameterUtils.setInParameter(key,stmt,prop.getType(),prop.getValue()); + } + } + return stmt; + } /** * send mail as an attachment diff --git a/escheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/statementList.vue b/escheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/statementList.vue new file mode 100644 index 0000000000..bd9a7d2382 --- /dev/null +++ b/escheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/statementList.vue @@ -0,0 +1,143 @@ + + + + diff --git a/escheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sql.vue b/escheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sql.vue index c474cb97fb..6691704478 100644 --- a/escheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sql.vue +++ b/escheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sql.vue @@ -72,6 +72,26 @@ + +
{{$t('Pre Statement')}}
+
+ + +
+
+ +
{{$t('Post Statement')}}
+
+ + +
+
diff --git a/escheduler-ui/src/js/module/i18n/locale/en_US.js b/escheduler-ui/src/js/module/i18n/locale/en_US.js index 150c6b013e..a651bd3e29 100644 --- a/escheduler-ui/src/js/module/i18n/locale/en_US.js +++ b/escheduler-ui/src/js/module/i18n/locale/en_US.js @@ -409,4 +409,7 @@ export default { 'Queue manage': 'Queue manage', 'Create queue': 'Create queue', 'Edit queue': 'Edit queue', + 'Pre Statement': 'Pre Statement', + 'Post Statement': 'Post Statement', + 'Statement cannot be empty': 'Statement cannot be empty', } \ No newline at end of file diff --git a/escheduler-ui/src/js/module/i18n/locale/zh_CN.js b/escheduler-ui/src/js/module/i18n/locale/zh_CN.js index 6f153957b7..4ffe32926a 100644 --- a/escheduler-ui/src/js/module/i18n/locale/zh_CN.js +++ b/escheduler-ui/src/js/module/i18n/locale/zh_CN.js @@ -408,4 +408,7 @@ export default { 'Queue manage': '队列管理', 'Create queue': '创建队列', 'Edit queue': '编辑队列', -} \ No newline at end of file + 'Pre Statement': '前置sql', + 'Post Statement': '后置sql', + 'Statement cannot be empty': '语句不能为空', +}