|
|
|
@ -16,7 +16,6 @@
|
|
|
|
|
*/ |
|
|
|
|
package org.apache.dolphinscheduler.server.worker.task.sql; |
|
|
|
|
|
|
|
|
|
import com.alibaba.fastjson.JSON; |
|
|
|
|
import com.alibaba.fastjson.JSONArray; |
|
|
|
|
import com.alibaba.fastjson.JSONObject; |
|
|
|
|
import com.alibaba.fastjson.serializer.SerializerFeature; |
|
|
|
@ -24,7 +23,6 @@ import org.apache.commons.lang.StringUtils;
|
|
|
|
|
import org.apache.dolphinscheduler.alert.utils.MailUtils; |
|
|
|
|
import org.apache.dolphinscheduler.common.Constants; |
|
|
|
|
import org.apache.dolphinscheduler.common.enums.*; |
|
|
|
|
import org.apache.dolphinscheduler.common.enums.AuthorizationType; |
|
|
|
|
import org.apache.dolphinscheduler.common.enums.DbType; |
|
|
|
|
import org.apache.dolphinscheduler.common.enums.ShowType; |
|
|
|
|
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy; |
|
|
|
@ -37,7 +35,6 @@ import org.apache.dolphinscheduler.common.utils.*;
|
|
|
|
|
import org.apache.dolphinscheduler.dao.AlertDao; |
|
|
|
|
import org.apache.dolphinscheduler.dao.datasource.BaseDataSource; |
|
|
|
|
import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory; |
|
|
|
|
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; |
|
|
|
|
import org.apache.dolphinscheduler.dao.entity.User; |
|
|
|
|
import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext; |
|
|
|
|
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; |
|
|
|
@ -78,6 +75,10 @@ public class SqlTask extends AbstractTask {
|
|
|
|
|
*/ |
|
|
|
|
private TaskExecutionContext taskExecutionContext; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* default query sql limit |
|
|
|
|
*/ |
|
|
|
|
private static final int LIMIT = 10000; |
|
|
|
|
|
|
|
|
|
public SqlTask(TaskExecutionContext taskExecutionContext, Logger logger) { |
|
|
|
|
super(taskExecutionContext, logger); |
|
|
|
@ -257,12 +258,15 @@ public class SqlTask extends AbstractTask {
|
|
|
|
|
ResultSetMetaData md = resultSet.getMetaData(); |
|
|
|
|
int num = md.getColumnCount(); |
|
|
|
|
|
|
|
|
|
while (resultSet.next()) { |
|
|
|
|
int rowCount = 0; |
|
|
|
|
|
|
|
|
|
while (rowCount < LIMIT && resultSet.next()) { |
|
|
|
|
JSONObject mapOfColValues = new JSONObject(true); |
|
|
|
|
for (int i = 1; i <= num; i++) { |
|
|
|
|
mapOfColValues.put(md.getColumnName(i), resultSet.getObject(i)); |
|
|
|
|
} |
|
|
|
|
resultJSONArray.add(mapOfColValues); |
|
|
|
|
rowCount++; |
|
|
|
|
} |
|
|
|
|
logger.debug("execute sql : {}", JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue)); |
|
|
|
|
|
|
|
|
|