Browse Source

[Fix-3549] [Server][sqlTask]The alias column in the query SQL does not take effect (#3784)

* [Fix-3616][Server] when worker akc/response master exception , async retry

* [Fix-3616][Server] when worker akc/response master exception , async retry

* [Fix-3616][Server] when worker akc/response master exception , async retry

* [Fix-3616][Server] when worker akc/response master exception , async retry

* [Fix-3616][Server] when worker akc/response master exception , async retry

* [Fix-3616][Server] when worker akc/response master exception , async retry

* [Fix-3616][Server] when worker akc/response master exception , async retry

* [Fix-3616][Server] when worker akc/response master exception , async retry

* [Fix-3616][Server] when worker akc/response master exception , async retry

* [Fix-3616][Server] when worker akc/response master exception , async retry

* [Fix-3616][Server] when worker akc/response master exception , async retry

* The batch delete function in the workflow definition and workflow instance pages cannot be canceled if selected.

* [Fix-3549] [Server][sqlTask]The alias column in the query SQL does not take effect

* [Fix-3549] [Server][sqlTask]The alias column in the query SQL does not take effect

Co-authored-by: qiaozhanwei <qiaozhanwei@analysys.com.cn>
Co-authored-by: zhuangchong <zhuangchong8@163.com>
Co-authored-by: JinyLeeChina <42576980+JinyLeeChina@users.noreply.github.com>
pull/3/MERGE
qiaozhanwei 4 years ago committed by GitHub
parent
commit
5c8171e397
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 63
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java

63
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java

@ -18,7 +18,6 @@ package org.apache.dolphinscheduler.server.worker.task.sql;
import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.alert.utils.MailUtils; import org.apache.dolphinscheduler.alert.utils.MailUtils;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
@ -86,7 +85,7 @@ public class SqlTask extends AbstractTask {
this.taskExecutionContext = taskExecutionContext; this.taskExecutionContext = taskExecutionContext;
logger.info("sql task params {}", taskExecutionContext.getTaskParams()); logger.info("sql task params {}", taskExecutionContext.getTaskParams());
this.sqlParameters = JSONObject.parseObject(taskExecutionContext.getTaskParams(), SqlParameters.class); this.sqlParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), SqlParameters.class);
if (!sqlParameters.checkParameters()) { if (!sqlParameters.checkParameters()) {
throw new RuntimeException("sql task params is not valid"); throw new RuntimeException("sql task params is not valid");
@ -148,8 +147,8 @@ public class SqlTask extends AbstractTask {
} }
/** /**
* ready to execute SQL and parameter entity Map * ready to execute SQL and parameter entity Map
* @return * @return SqlBinds
*/ */
private SqlBinds getSqlAndSqlParamsMap(String sql) { private SqlBinds getSqlAndSqlParamsMap(String sql) {
Map<Integer,Property> sqlParamsMap = new HashMap<>(); Map<Integer,Property> sqlParamsMap = new HashMap<>();
@ -176,6 +175,7 @@ public class SqlTask extends AbstractTask {
logger.info("SQL title : {}",title); logger.info("SQL title : {}",title);
sqlParameters.setTitle(title); sqlParameters.setTitle(title);
} }
//new //new
//replace variable TIME with $[YYYYmmddd...] in sql when history run job and batch complement job //replace variable TIME with $[YYYYmmddd...] in sql when history run job and batch complement job
sql = ParameterUtils.replaceScheduleTime(sql, taskExecutionContext.getScheduleTime()); sql = ParameterUtils.replaceScheduleTime(sql, taskExecutionContext.getScheduleTime());
@ -267,18 +267,12 @@ public class SqlTask extends AbstractTask {
resultJSONArray.add(mapOfColValues); resultJSONArray.add(mapOfColValues);
rowCount++; rowCount++;
} }
logger.debug("execute sql : {}", JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue)); String result = JSONUtils.toJsonString(resultJSONArray);
logger.debug("execute sql : {}", result);
// if there is a result set
if (!resultJSONArray.isEmpty() ) { sendAttachment(StringUtils.isNotEmpty(sqlParameters.getTitle()) ?
if (StringUtils.isNotEmpty(sqlParameters.getTitle())) { sqlParameters.getTitle(): taskExecutionContext.getTaskName() + " query result sets",
sendAttachment(sqlParameters.getTitle(), JSONUtils.toJsonString(resultJSONArray));
JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue));
}else{
sendAttachment(taskExecutionContext.getTaskName() + " query resultsets ",
JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue));
}
}
} }
/** /**
@ -299,7 +293,7 @@ public class SqlTask extends AbstractTask {
} }
/** /**
* post psql * post sql
* *
* @param connection connection * @param connection connection
* @param postStatementsBinds postStatementsBinds * @param postStatementsBinds postStatementsBinds
@ -330,11 +324,12 @@ public class SqlTask extends AbstractTask {
} }
} }
} }
/** /**
* create connection * create connection
* *
* @return connection * @return connection
* @throws Exception * @throws Exception Exception
*/ */
private Connection createConnection() throws Exception{ private Connection createConnection() throws Exception{
// if hive , load connection params if exists // if hive , load connection params if exists
@ -370,17 +365,17 @@ public class SqlTask extends AbstractTask {
Connection connection){ Connection connection){
if (resultSet != null){ if (resultSet != null){
try { try {
connection.close(); resultSet.close();
} catch (SQLException e) { } catch (SQLException e) {
logger.error("close result set error : {}",e.getMessage(),e);
} }
} }
if (pstmt != null){ if (pstmt != null){
try { try {
connection.close(); pstmt.close();
} catch (SQLException e) { } catch (SQLException e) {
logger.error("close prepared statement error : {}",e.getMessage(),e);
} }
} }
@ -388,17 +383,17 @@ public class SqlTask extends AbstractTask {
try { try {
connection.close(); connection.close();
} catch (SQLException e) { } catch (SQLException e) {
logger.error("close connection error : {}",e.getMessage(),e);
} }
} }
} }
/** /**
* preparedStatement bind * preparedStatement bind
* @param connection * @param connection connection
* @param sqlBinds * @param sqlBinds sqlBinds
* @return * @return PreparedStatement
* @throws Exception * @throws Exception Exception
*/ */
private PreparedStatement prepareStatementAndBind(Connection connection, SqlBinds sqlBinds) throws Exception { private PreparedStatement prepareStatementAndBind(Connection connection, SqlBinds sqlBinds) throws Exception {
// is the timeout set // is the timeout set
@ -429,34 +424,34 @@ public class SqlTask extends AbstractTask {
List<User> users = alertDao.queryUserByAlertGroupId(taskExecutionContext.getSqlTaskExecutionContext().getWarningGroupId()); List<User> users = alertDao.queryUserByAlertGroupId(taskExecutionContext.getSqlTaskExecutionContext().getWarningGroupId());
// receiving group list // receiving group list
List<String> receviersList = new ArrayList<>(); List<String> receiversList = new ArrayList<>();
for(User user:users){ for(User user:users){
receviersList.add(user.getEmail().trim()); receiversList.add(user.getEmail().trim());
} }
// custom receiver // custom receiver
String receivers = sqlParameters.getReceivers(); String receivers = sqlParameters.getReceivers();
if (StringUtils.isNotEmpty(receivers)){ if (StringUtils.isNotEmpty(receivers)){
String[] splits = receivers.split(COMMA); String[] splits = receivers.split(COMMA);
for (String receiver : splits){ for (String receiver : splits){
receviersList.add(receiver.trim()); receiversList.add(receiver.trim());
} }
} }
// copy list // copy list
List<String> receviersCcList = new ArrayList<>(); List<String> receiversCcList = new ArrayList<>();
// Custom Copier // Custom Copier
String receiversCc = sqlParameters.getReceiversCc(); String receiversCc = sqlParameters.getReceiversCc();
if (StringUtils.isNotEmpty(receiversCc)){ if (StringUtils.isNotEmpty(receiversCc)){
String[] splits = receiversCc.split(COMMA); String[] splits = receiversCc.split(COMMA);
for (String receiverCc : splits){ for (String receiverCc : splits){
receviersCcList.add(receiverCc.trim()); receiversCcList.add(receiverCc.trim());
} }
} }
String showTypeName = sqlParameters.getShowType().replace(COMMA,"").trim(); String showTypeName = sqlParameters.getShowType().replace(COMMA,"").trim();
if(EnumUtils.isValidEnum(ShowType.class,showTypeName)){ if(EnumUtils.isValidEnum(ShowType.class,showTypeName)){
Map<String, Object> mailResult = MailUtils.sendMails(receviersList, Map<String, Object> mailResult = MailUtils.sendMails(receiversList,
receviersCcList, title, content, ShowType.valueOf(showTypeName).getDescp()); receiversCcList, title, content, ShowType.valueOf(showTypeName).getDescp());
if(!(boolean) mailResult.get(STATUS)){ if(!(boolean) mailResult.get(STATUS)){
throw new RuntimeException("send mail failed!"); throw new RuntimeException("send mail failed!");
} }

Loading…
Cancel
Save