Browse Source

[fix-9991][worker]fix statement is closed before resultSet.getMetaData() (#10014)

3.1.0-release
chuxing 3 years ago committed by GitHub
parent
commit
df04c4aef0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 22
      dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java

22
dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java

@ -179,7 +179,6 @@ public class SqlTask extends AbstractTaskExecutor {
List<SqlBinds> postStatementsBinds, List<SqlBinds> postStatementsBinds,
List<String> createFuncs) throws Exception { List<String> createFuncs) throws Exception {
Connection connection = null; Connection connection = null;
ResultSet resultSet = null;
try { try {
// create connection // create connection
@ -197,8 +196,7 @@ public class SqlTask extends AbstractTaskExecutor {
// decide whether to executeQuery or executeUpdate based on sqlType // decide whether to executeQuery or executeUpdate based on sqlType
if (sqlParameters.getSqlType() == SqlType.QUERY.ordinal()) { if (sqlParameters.getSqlType() == SqlType.QUERY.ordinal()) {
// query statements need to be convert to JsonArray and inserted into Alert to send // query statements need to be convert to JsonArray and inserted into Alert to send
resultSet = executeQuery(connection, mainStatementsBinds.get(0), "main"); result = executeQuery(connection, mainStatementsBinds.get(0), "main");
result = resultProcess(resultSet);
} else if (sqlParameters.getSqlType() == SqlType.NON_QUERY.ordinal()) { } else if (sqlParameters.getSqlType() == SqlType.NON_QUERY.ordinal()) {
// non query statement // non query statement
String updateResult = executeUpdate(connection, mainStatementsBinds, "main"); String updateResult = executeUpdate(connection, mainStatementsBinds, "main");
@ -213,7 +211,7 @@ public class SqlTask extends AbstractTaskExecutor {
logger.error("execute sql error: {}", e.getMessage()); logger.error("execute sql error: {}", e.getMessage());
throw e; throw e;
} finally { } finally {
close(resultSet, connection); close(connection);
} }
} }
@ -292,10 +290,11 @@ public class SqlTask extends AbstractTaskExecutor {
setTaskAlertInfo(taskAlertInfo); setTaskAlertInfo(taskAlertInfo);
} }
private ResultSet executeQuery(Connection connection, SqlBinds sqlBinds, String handlerType) throws Exception { private String executeQuery(Connection connection, SqlBinds sqlBinds, String handlerType) throws Exception {
try (PreparedStatement statement = prepareStatementAndBind(connection, sqlBinds)) { try (PreparedStatement statement = prepareStatementAndBind(connection, sqlBinds)) {
logger.info("{} statement execute query, for sql: {}", handlerType, sqlBinds.getSql()); logger.info("{} statement execute query, for sql: {}", handlerType, sqlBinds.getSql());
return statement.executeQuery(); ResultSet resultSet = statement.executeQuery();
return resultProcess(resultSet);
} }
} }
@ -329,18 +328,9 @@ public class SqlTask extends AbstractTaskExecutor {
/** /**
* close jdbc resource * close jdbc resource
* *
* @param resultSet resultSet
* @param connection connection * @param connection connection
*/ */
private void close(ResultSet resultSet, Connection connection) { private void close(Connection connection) {
if (resultSet != null) {
try {
resultSet.close();
} catch (SQLException e) {
logger.error("close result set error : {}", e.getMessage(), e);
}
}
if (connection != null) { if (connection != null) {
try { try {
connection.close(); connection.close();

Loading…
Cancel
Save