Browse Source

Set maxRows in SqlTask (#15342)

(cherry picked from commit b6b88e3e62d8245e08408fb60c48d23c0bd0bb46)
3.2.1-prepare
Wenjun Ruan 11 months ago committed by GitHub
parent
commit
f921457a55
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 12
      dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java

12
dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java

@ -248,25 +248,17 @@ public class SqlTask extends AbstractTask {
ResultSetMetaData md = resultSet.getMetaData();
int num = md.getColumnCount();
int rowCount = 0;
int limit = sqlParameters.getLimit() == 0 ? QUERY_LIMIT : sqlParameters.getLimit();
while (resultSet.next()) {
if (rowCount == limit) {
log.info("sql result limit : {} exceeding results are filtered", limit);
break;
}
ObjectNode mapOfColValues = JSONUtils.createObjectNode();
for (int i = 1; i <= num; i++) {
mapOfColValues.set(md.getColumnLabel(i), JSONUtils.toJsonNode(resultSet.getObject(i)));
}
resultJSONArray.add(mapOfColValues);
rowCount++;
}
int displayRows = sqlParameters.getDisplayRows() > 0 ? sqlParameters.getDisplayRows()
: TaskConstants.DEFAULT_DISPLAY_ROWS;
displayRows = Math.min(displayRows, rowCount);
displayRows = Math.min(displayRows, resultJSONArray.size());
log.info("display sql result {} rows as follows:", displayRows);
for (int i = 0; i < displayRows; i++) {
String row = JSONUtils.toJsonString(resultJSONArray.get(i));
@ -383,6 +375,7 @@ public class SqlTask extends AbstractTask {
*/
private PreparedStatement prepareStatementAndBind(Connection connection, SqlBinds sqlBinds) {
// is the timeout set
// todo: we need control the timeout at master side.
boolean timeoutFlag = taskExecutionContext.getTaskTimeoutStrategy() == TaskTimeoutStrategy.FAILED
|| taskExecutionContext.getTaskTimeoutStrategy() == TaskTimeoutStrategy.WARNFAILED;
try {
@ -390,6 +383,7 @@ public class SqlTask extends AbstractTask {
if (timeoutFlag) {
stmt.setQueryTimeout(taskExecutionContext.getTaskTimeout());
}
stmt.setMaxRows(sqlParameters.getLimit() <= 0 ? QUERY_LIMIT : sqlParameters.getLimit());
Map<Integer, Property> params = sqlBinds.getParamsMap();
if (params != null) {
for (Map.Entry<Integer, Property> entry : params.entrySet()) {

Loading…
Cancel
Save