diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java index a6eb26668e..56177e166f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java @@ -18,7 +18,6 @@ package org.apache.dolphinscheduler.server.worker.task.sql; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; -import com.alibaba.fastjson.serializer.SerializerFeature; import org.apache.commons.lang.StringUtils; import org.apache.dolphinscheduler.alert.utils.MailUtils; import org.apache.dolphinscheduler.common.Constants; @@ -86,7 +85,7 @@ public class SqlTask extends AbstractTask { this.taskExecutionContext = taskExecutionContext; logger.info("sql task params {}", taskExecutionContext.getTaskParams()); - this.sqlParameters = JSONObject.parseObject(taskExecutionContext.getTaskParams(), SqlParameters.class); + this.sqlParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), SqlParameters.class); if (!sqlParameters.checkParameters()) { throw new RuntimeException("sql task params is not valid"); @@ -148,8 +147,8 @@ public class SqlTask extends AbstractTask { } /** - * ready to execute SQL and parameter entity Map - * @return + * ready to execute SQL and parameter entity Map + * @return SqlBinds */ private SqlBinds getSqlAndSqlParamsMap(String sql) { Map sqlParamsMap = new HashMap<>(); @@ -176,6 +175,7 @@ public class SqlTask extends AbstractTask { logger.info("SQL title : {}",title); sqlParameters.setTitle(title); } + //new //replace variable TIME with $[YYYYmmddd...] in sql when history run job and batch complement job sql = ParameterUtils.replaceScheduleTime(sql, taskExecutionContext.getScheduleTime()); @@ -267,18 +267,12 @@ public class SqlTask extends AbstractTask { resultJSONArray.add(mapOfColValues); rowCount++; } - logger.debug("execute sql : {}", JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue)); - - // if there is a result set - if (!resultJSONArray.isEmpty() ) { - if (StringUtils.isNotEmpty(sqlParameters.getTitle())) { - sendAttachment(sqlParameters.getTitle(), - JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue)); - }else{ - sendAttachment(taskExecutionContext.getTaskName() + " query resultsets ", - JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue)); - } - } + String result = JSONUtils.toJsonString(resultJSONArray); + logger.debug("execute sql : {}", result); + + sendAttachment(StringUtils.isNotEmpty(sqlParameters.getTitle()) ? + sqlParameters.getTitle(): taskExecutionContext.getTaskName() + " query result sets", + JSONUtils.toJsonString(resultJSONArray)); } /** @@ -299,7 +293,7 @@ public class SqlTask extends AbstractTask { } /** - * post psql + * post sql * * @param connection connection * @param postStatementsBinds postStatementsBinds @@ -330,11 +324,12 @@ public class SqlTask extends AbstractTask { } } } + /** * create connection * * @return connection - * @throws Exception + * @throws Exception Exception */ private Connection createConnection() throws Exception{ // if hive , load connection params if exists @@ -370,17 +365,17 @@ public class SqlTask extends AbstractTask { Connection connection){ if (resultSet != null){ try { - connection.close(); + resultSet.close(); } catch (SQLException e) { - + logger.error("close result set error : {}",e.getMessage(),e); } } if (pstmt != null){ try { - connection.close(); + pstmt.close(); } catch (SQLException e) { - + logger.error("close prepared statement error : {}",e.getMessage(),e); } } @@ -388,17 +383,17 @@ public class SqlTask extends AbstractTask { try { connection.close(); } catch (SQLException e) { - + logger.error("close connection error : {}",e.getMessage(),e); } } } /** * preparedStatement bind - * @param connection - * @param sqlBinds - * @return - * @throws Exception + * @param connection connection + * @param sqlBinds sqlBinds + * @return PreparedStatement + * @throws Exception Exception */ private PreparedStatement prepareStatementAndBind(Connection connection, SqlBinds sqlBinds) throws Exception { // is the timeout set @@ -429,34 +424,34 @@ public class SqlTask extends AbstractTask { List users = alertDao.queryUserByAlertGroupId(taskExecutionContext.getSqlTaskExecutionContext().getWarningGroupId()); // receiving group list - List receviersList = new ArrayList<>(); + List receiversList = new ArrayList<>(); for(User user:users){ - receviersList.add(user.getEmail().trim()); + receiversList.add(user.getEmail().trim()); } // custom receiver String receivers = sqlParameters.getReceivers(); if (StringUtils.isNotEmpty(receivers)){ String[] splits = receivers.split(COMMA); for (String receiver : splits){ - receviersList.add(receiver.trim()); + receiversList.add(receiver.trim()); } } // copy list - List receviersCcList = new ArrayList<>(); + List receiversCcList = new ArrayList<>(); // Custom Copier String receiversCc = sqlParameters.getReceiversCc(); if (StringUtils.isNotEmpty(receiversCc)){ String[] splits = receiversCc.split(COMMA); for (String receiverCc : splits){ - receviersCcList.add(receiverCc.trim()); + receiversCcList.add(receiverCc.trim()); } } String showTypeName = sqlParameters.getShowType().replace(COMMA,"").trim(); if(EnumUtils.isValidEnum(ShowType.class,showTypeName)){ - Map mailResult = MailUtils.sendMails(receviersList, - receviersCcList, title, content, ShowType.valueOf(showTypeName).getDescp()); + Map mailResult = MailUtils.sendMails(receiversList, + receiversCcList, title, content, ShowType.valueOf(showTypeName).getDescp()); if(!(boolean) mailResult.get(STATUS)){ throw new RuntimeException("send mail failed!"); }