|
|
|
@ -16,14 +16,9 @@
|
|
|
|
|
*/ |
|
|
|
|
package org.apache.dolphinscheduler.server.worker.task.sql; |
|
|
|
|
|
|
|
|
|
import static org.apache.dolphinscheduler.common.Constants.COMMA; |
|
|
|
|
import static org.apache.dolphinscheduler.common.Constants.HIVE_CONF; |
|
|
|
|
import static org.apache.dolphinscheduler.common.Constants.PASSWORD; |
|
|
|
|
import static org.apache.dolphinscheduler.common.Constants.SEMICOLON; |
|
|
|
|
import static org.apache.dolphinscheduler.common.Constants.STATUS; |
|
|
|
|
import static org.apache.dolphinscheduler.common.Constants.USER; |
|
|
|
|
import static org.apache.dolphinscheduler.common.enums.DbType.HIVE; |
|
|
|
|
|
|
|
|
|
import com.alibaba.fastjson.JSONArray; |
|
|
|
|
import com.alibaba.fastjson.JSONObject; |
|
|
|
|
import org.apache.commons.lang.StringUtils; |
|
|
|
|
import org.apache.dolphinscheduler.alert.utils.MailUtils; |
|
|
|
|
import org.apache.dolphinscheduler.common.Constants; |
|
|
|
|
import org.apache.dolphinscheduler.common.enums.CommandType; |
|
|
|
@ -35,11 +30,7 @@ import org.apache.dolphinscheduler.common.task.AbstractParameters;
|
|
|
|
|
import org.apache.dolphinscheduler.common.task.sql.SqlBinds; |
|
|
|
|
import org.apache.dolphinscheduler.common.task.sql.SqlParameters; |
|
|
|
|
import org.apache.dolphinscheduler.common.task.sql.SqlType; |
|
|
|
|
import org.apache.dolphinscheduler.common.utils.CollectionUtils; |
|
|
|
|
import org.apache.dolphinscheduler.common.utils.CommonUtils; |
|
|
|
|
import org.apache.dolphinscheduler.common.utils.EnumUtils; |
|
|
|
|
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
|
|
|
|
import org.apache.dolphinscheduler.common.utils.ParameterUtils; |
|
|
|
|
import org.apache.dolphinscheduler.common.utils.*; |
|
|
|
|
import org.apache.dolphinscheduler.dao.AlertDao; |
|
|
|
|
import org.apache.dolphinscheduler.dao.datasource.BaseDataSource; |
|
|
|
|
import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory; |
|
|
|
@ -50,30 +41,16 @@ import org.apache.dolphinscheduler.server.utils.ParamUtils;
|
|
|
|
|
import org.apache.dolphinscheduler.server.utils.UDFUtils; |
|
|
|
|
import org.apache.dolphinscheduler.server.worker.task.AbstractTask; |
|
|
|
|
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; |
|
|
|
|
import org.slf4j.Logger; |
|
|
|
|
|
|
|
|
|
import org.apache.commons.lang.StringUtils; |
|
|
|
|
|
|
|
|
|
import java.sql.Connection; |
|
|
|
|
import java.sql.DriverManager; |
|
|
|
|
import java.sql.PreparedStatement; |
|
|
|
|
import java.sql.ResultSet; |
|
|
|
|
import java.sql.ResultSetMetaData; |
|
|
|
|
import java.sql.SQLException; |
|
|
|
|
import java.sql.Statement; |
|
|
|
|
import java.util.ArrayList; |
|
|
|
|
import java.util.HashMap; |
|
|
|
|
import java.util.List; |
|
|
|
|
import java.util.Map; |
|
|
|
|
import java.util.Optional; |
|
|
|
|
import java.util.Properties; |
|
|
|
|
import java.sql.*; |
|
|
|
|
import java.util.*; |
|
|
|
|
import java.util.regex.Matcher; |
|
|
|
|
import java.util.regex.Pattern; |
|
|
|
|
import java.util.stream.Collectors; |
|
|
|
|
|
|
|
|
|
import org.slf4j.Logger; |
|
|
|
|
|
|
|
|
|
import com.alibaba.fastjson.JSONArray; |
|
|
|
|
import com.alibaba.fastjson.JSONObject; |
|
|
|
|
import static org.apache.dolphinscheduler.common.Constants.*; |
|
|
|
|
import static org.apache.dolphinscheduler.common.enums.DbType.HIVE; |
|
|
|
|
/** |
|
|
|
|
* sql task |
|
|
|
|
*/ |
|
|
|
@ -97,11 +74,6 @@ public class SqlTask extends AbstractTask {
|
|
|
|
|
*/ |
|
|
|
|
private TaskExecutionContext taskExecutionContext; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* default query sql limit |
|
|
|
|
*/ |
|
|
|
|
private static final int LIMIT = 10000; |
|
|
|
|
|
|
|
|
|
public SqlTask(TaskExecutionContext taskExecutionContext, Logger logger) { |
|
|
|
|
super(taskExecutionContext, logger); |
|
|
|
|
|
|
|
|
@ -124,14 +96,15 @@ public class SqlTask extends AbstractTask {
|
|
|
|
|
Thread.currentThread().setName(threadLoggerInfoName); |
|
|
|
|
|
|
|
|
|
logger.info("Full sql parameters: {}", sqlParameters); |
|
|
|
|
logger.info("sql type : {}, datasource : {}, sql : {} , localParams : {},udfs : {},showType : {},connParams : {}", |
|
|
|
|
logger.info("sql type : {}, datasource : {}, sql : {} , localParams : {},udfs : {},showType : {},connParams : {}, query max result limit : {}", |
|
|
|
|
sqlParameters.getType(), |
|
|
|
|
sqlParameters.getDatasource(), |
|
|
|
|
sqlParameters.getSql(), |
|
|
|
|
sqlParameters.getLocalParams(), |
|
|
|
|
sqlParameters.getUdfs(), |
|
|
|
|
sqlParameters.getShowType(), |
|
|
|
|
sqlParameters.getConnParams()); |
|
|
|
|
sqlParameters.getConnParams(), |
|
|
|
|
sqlParameters.getLimit()); |
|
|
|
|
try { |
|
|
|
|
SQLTaskExecutionContext sqlTaskExecutionContext = taskExecutionContext.getSqlTaskExecutionContext(); |
|
|
|
|
// load class
|
|
|
|
@ -282,7 +255,7 @@ public class SqlTask extends AbstractTask {
|
|
|
|
|
|
|
|
|
|
int rowCount = 0; |
|
|
|
|
|
|
|
|
|
while (rowCount < LIMIT && resultSet.next()) { |
|
|
|
|
while (rowCount < sqlParameters.getLimit() && resultSet.next()) { |
|
|
|
|
JSONObject mapOfColValues = new JSONObject(true); |
|
|
|
|
for (int i = 1; i <= num; i++) { |
|
|
|
|
mapOfColValues.put(md.getColumnLabel(i), resultSet.getObject(i)); |
|
|
|
|