Browse Source

1,encapsulate the parameters required by sqltask 2,SQLTask optimization (#2135)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify

* add comment

* ExecutorManager interface add generic type

* add TaskInstanceCacheManager receive Worker report result

* TaskInstance setExecutePath

* add TaskInstanceCacheManager to receive Worker Task result report

* TaskInstanceCacheManager add remove method

* add license

* add dispatcht task method

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* taskInstanceCache is null ,need load from db

* taskInstanceCache is null ,need load from db

* taskInstanceCache is null ,need load from db

* 1,worker TaskPros use TaskExecutionContext replase
2,Master kill Task , KillTaskProcessor modify

* worker remove db

* ShellTask modify

* master persistence processId and appIds

* master persistence processId and appIds

* master add kill task logic

* master add kill task logic

* master add kill task logic

* javadoc error modify

* remove chinese log

* executeDirectly method add Override

* remote module modify

* TaskKillResponseProcessor command type modify

* create buildKillCommand

* host add host:port format

* host add host:port format

* TaskAckProcessor modify

* TaskAckProcessor modify

* task prioriry refator

* remove ITaskQueue

* task prioriry refator

* remove ITaskQueue

* TaskPriority refactor

* remove logs

* WorkerServer refactor

* MasterSchedulerService modify

* WorkerConfig listen port modify

* modify master and worker listen port

* cancelTaskInstance set TaskExecutionContext host,logPath,executePath

* cancelTaskInstance set TaskExecutionContext host,logPath,executePath

* Encapsulate the parameters required by sqltask

* 1,Encapsulate the parameters required by sqltask
2,SQLTask optimization

* AbstractTask modify

Co-authored-by: qiaozhanwei <qiaozhanwei@analysys.com.cn>
pull/2/head
qiaozhanwei 5 years ago committed by GitHub
parent
commit
fd2c2eeb32
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 11
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/UdfType.java
  2. 32
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
  3. 14
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContext.java
  4. 49
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java
  5. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
  6. 6
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java
  7. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
  8. 6
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
  9. 4
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
  10. 307
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
  11. 1
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

11
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/UdfType.java

@ -44,4 +44,15 @@ public enum UdfType {
public String getDescp() { public String getDescp() {
return descp; return descp;
} }
public static UdfType of(int type){
for(UdfType ut : values()){
if(ut.getCode() == type){
return ut;
}
}
throw new IllegalArgumentException("invalid type : " + type);
}
} }

32
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java

@ -18,11 +18,13 @@
package org.apache.dolphinscheduler.server.builder; package org.apache.dolphinscheduler.server.builder;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import java.util.List;
/** /**
* TaskExecutionContext builder * TaskExecutionContext builder
*/ */
@ -82,6 +84,30 @@ public class TaskExecutionContextBuilder {
return this; return this;
} }
/**
* build SQLTask related info
*
* @param sqlTaskExecutionContext sqlTaskExecutionContext
* @return TaskExecutionContextBuilder
*/
public TaskExecutionContextBuilder buildSQLTaskRelatedInfo(SQLTaskExecutionContext sqlTaskExecutionContext){
taskExecutionContext.setSqlTaskExecutionContext(sqlTaskExecutionContext);
return this;
}
/**
* build DataxTask related info
* @param dataxTaskExecutionContext dataxTaskExecutionContext
* @return TaskExecutionContextBuilder
*/
public TaskExecutionContextBuilder buildDataxTaskRelatedInfo(DataxTaskExecutionContext dataxTaskExecutionContext){
taskExecutionContext.setDataxTaskExecutionContext(dataxTaskExecutionContext);
return this;
}
/** /**
* create * create
* *

14
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContext.java

@ -32,6 +32,11 @@ public class SQLTaskExecutionContext implements Serializable {
* warningGroupId * warningGroupId
*/ */
private int warningGroupId; private int warningGroupId;
/**
* connectionParams
*/
private String connectionParams;
/** /**
* udf function list * udf function list
*/ */
@ -54,10 +59,19 @@ public class SQLTaskExecutionContext implements Serializable {
this.udfFuncList = udfFuncList; this.udfFuncList = udfFuncList;
} }
public String getConnectionParams() {
return connectionParams;
}
public void setConnectionParams(String connectionParams) {
this.connectionParams = connectionParams;
}
@Override @Override
public String toString() { public String toString() {
return "SQLTaskExecutionContext{" + return "SQLTaskExecutionContext{" +
"warningGroupId=" + warningGroupId + "warningGroupId=" + warningGroupId +
", connectionParams='" + connectionParams + '\'' +
", udfFuncList=" + udfFuncList + ", udfFuncList=" + udfFuncList +
'}'; '}';
} }

49
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java

@ -17,13 +17,23 @@
package org.apache.dolphinscheduler.server.master.consumer; package org.apache.dolphinscheduler.server.master.consumer;
import com.alibaba.fastjson.JSONObject;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.enums.UdfType;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.task.sql.SqlParameters;
import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.EnumUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskPriority; import org.apache.dolphinscheduler.server.entity.TaskPriority;
import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher; import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
@ -38,6 +48,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import java.util.List;
/** /**
* TaskUpdateQueue consumer * TaskUpdateQueue consumer
@ -136,10 +147,45 @@ public class TaskUpdateQueueConsumer extends Thread{
taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode()); taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode());
taskInstance.setExecutePath(getExecLocalPath(taskInstance)); taskInstance.setExecutePath(getExecLocalPath(taskInstance));
SQLTaskExecutionContext sqlTaskExecutionContext = new SQLTaskExecutionContext();
DataxTaskExecutionContext dataxTaskExecutionContext = new DataxTaskExecutionContext();
TaskType taskType = TaskType.valueOf(taskInstance.getTaskType());
if (taskType == TaskType.SQL){
TaskNode taskNode = JSONObject.parseObject(taskInstance.getTaskJson(), TaskNode.class);
SqlParameters sqlParameters = JSONObject.parseObject(taskNode.getParams(), SqlParameters.class);
int datasourceId = sqlParameters.getDatasource();
DataSource datasource = processService.findDataSourceById(datasourceId);
sqlTaskExecutionContext.setConnectionParams(datasource.getConnectionParams());
// whether udf type
boolean udfTypeFlag = EnumUtils.isValidEnum(UdfType.class, sqlParameters.getType())
&& StringUtils.isNotEmpty(sqlParameters.getUdfs());
if (udfTypeFlag){
String[] udfFunIds = sqlParameters.getUdfs().split(",");
int[] udfFunIdsArray = new int[udfFunIds.length];
for(int i = 0 ; i < udfFunIds.length;i++){
udfFunIdsArray[i]=Integer.parseInt(udfFunIds[i]);
}
List<UdfFunc> udfFuncList = processService.queryUdfFunListByids(udfFunIdsArray);
sqlTaskExecutionContext.setUdfFuncList(udfFuncList);
}
}
if (taskType == TaskType.DATAX){
}
return TaskExecutionContextBuilder.get() return TaskExecutionContextBuilder.get()
.buildTaskInstanceRelatedInfo(taskInstance) .buildTaskInstanceRelatedInfo(taskInstance)
.buildProcessInstanceRelatedInfo(taskInstance.getProcessInstance()) .buildProcessInstanceRelatedInfo(taskInstance.getProcessInstance())
.buildProcessDefinitionRelatedInfo(taskInstance.getProcessDefine()) .buildProcessDefinitionRelatedInfo(taskInstance.getProcessDefine())
.buildSQLTaskRelatedInfo(sqlTaskExecutionContext)
.buildDataxTaskRelatedInfo(dataxTaskExecutionContext)
.create(); .create();
} }
@ -171,7 +217,4 @@ public class TaskUpdateQueueConsumer extends Thread{
} }
return false; return false;
} }
} }

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java

@ -152,7 +152,7 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
/** /**
* dispatcht task * TODO dispatcht task
* @param taskInstance taskInstance * @param taskInstance taskInstance
* @return whether submit task success * @return whether submit task success
*/ */

6
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.utils; package org.apache.dolphinscheduler.server.utils;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils; import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.UdfFunc; import org.apache.dolphinscheduler.dao.entity.UdfFunc;
@ -48,6 +49,11 @@ public class UDFUtils {
* @return create function list * @return create function list
*/ */
public static List<String> createFuncs(List<UdfFunc> udfFuncs, String tenantCode,Logger logger){ public static List<String> createFuncs(List<UdfFunc> udfFuncs, String tenantCode,Logger logger){
if (CollectionUtils.isEmpty(udfFuncs)){
logger.info("can't find udf function resource");
return null;
}
// get hive udf jar path // get hive udf jar path
String hiveUdfJarPath = HadoopUtils.getHdfsUdfDir(tenantCode); String hiveUdfJarPath = HadoopUtils.getHdfsUdfDir(tenantCode);
logger.info("hive udf jar path : {}" , hiveUdfJarPath); logger.info("hive udf jar path : {}" , hiveUdfJarPath);

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

@ -97,7 +97,7 @@ public class WorkerServer {
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new TaskKillProcessor()); this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new TaskKillProcessor());
this.nettyRemotingServer.start(); this.nettyRemotingServer.start();
// // worker registry
this.workerRegistry.registry(); this.workerRegistry.registry();
/** /**

6
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java

@ -59,7 +59,7 @@ public abstract class AbstractTask {
/** /**
* SHELL process pid * SHELL process pid
*/ */
protected Integer processId; protected int processId;
/** /**
* other resource manager appId , for example : YARN etc * other resource manager appId , for example : YARN etc
@ -139,11 +139,11 @@ public abstract class AbstractTask {
this.appIds = appIds; this.appIds = appIds;
} }
public Integer getProcessId() { public int getProcessId() {
return processId; return processId;
} }
public void setProcessId(Integer processId) { public void setProcessId(int processId) {
this.processId = processId; this.processId = processId;
} }

4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java

@ -95,7 +95,7 @@ public class ShellTask extends AbstractTask {
setAppIds(commandExecuteResult.getAppIds()); setAppIds(commandExecuteResult.getAppIds());
setProcessId(commandExecuteResult.getProcessId()); setProcessId(commandExecuteResult.getProcessId());
} catch (Exception e) { } catch (Exception e) {
logger.error("shell task failure", e); logger.error("shell task error", e);
setExitStatusCode(Constants.EXIT_CODE_FAILURE); setExitStatusCode(Constants.EXIT_CODE_FAILURE);
throw e; throw e;
} }
@ -125,8 +125,6 @@ public class ShellTask extends AbstractTask {
} }
String script = shellParameters.getRawScript().replaceAll("\\r\\n", "\n"); String script = shellParameters.getRawScript().replaceAll("\\r\\n", "\n");
/** /**
* combining local and global parameters * combining local and global parameters
*/ */

307
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java

@ -105,16 +105,14 @@ public class SqlTask extends AbstractTask {
sqlParameters.getUdfs(), sqlParameters.getUdfs(),
sqlParameters.getShowType(), sqlParameters.getShowType(),
sqlParameters.getConnParams()); sqlParameters.getConnParams());
Connection con = null;
List<String> createFuncs = null;
try { try {
SQLTaskExecutionContext sqlTaskExecutionContext = taskExecutionContext.getSqlTaskExecutionContext();
// load class // load class
DataSourceFactory.loadClass(DbType.valueOf(sqlParameters.getType())); DataSourceFactory.loadClass(DbType.valueOf(sqlParameters.getType()));
// get datasource // get datasource
baseDataSource = DataSourceFactory.getDatasource(DbType.valueOf(sqlParameters.getType()), baseDataSource = DataSourceFactory.getDatasource(DbType.valueOf(sqlParameters.getType()),
sqlParameters.getConnParams()); sqlTaskExecutionContext.getConnectionParams());
// ready to execute SQL and parameter entity Map // ready to execute SQL and parameter entity Map
SqlBinds mainSqlBinds = getSqlAndSqlParamsMap(sqlParameters.getSql()); SqlBinds mainSqlBinds = getSqlAndSqlParamsMap(sqlParameters.getSql());
@ -129,32 +127,18 @@ public class SqlTask extends AbstractTask {
.map(this::getSqlAndSqlParamsMap) .map(this::getSqlAndSqlParamsMap)
.collect(Collectors.toList()); .collect(Collectors.toList());
// determine if it is UDF List<String> createFuncs = UDFUtils.createFuncs(sqlTaskExecutionContext.getUdfFuncList(),
boolean udfTypeFlag = EnumUtils.isValidEnum(UdfType.class, sqlParameters.getType()) taskExecutionContext.getTenantCode(),
&& StringUtils.isNotEmpty(sqlParameters.getUdfs()); logger);
if(udfTypeFlag){
String[] ids = sqlParameters.getUdfs().split(",");
int[] idsArray = new int[ids.length];
for(int i=0;i<ids.length;i++){
idsArray[i]=Integer.parseInt(ids[i]);
}
SQLTaskExecutionContext sqlTaskExecutionContext = taskExecutionContext.getSqlTaskExecutionContext();
createFuncs = UDFUtils.createFuncs(sqlTaskExecutionContext.getUdfFuncList(), taskExecutionContext.getTenantCode(), logger);
}
// execute sql task // execute sql task
con = executeFuncAndSql(mainSqlBinds, preStatementSqlBinds, postStatementSqlBinds, createFuncs); executeFuncAndSql(mainSqlBinds, preStatementSqlBinds, postStatementSqlBinds, createFuncs);
setExitStatusCode(Constants.EXIT_CODE_SUCCESS);
} catch (Exception e) { } catch (Exception e) {
logger.error(e.getMessage(), e); setExitStatusCode(Constants.EXIT_CODE_FAILURE);
logger.error("sql task error", e);
throw e; throw e;
} finally {
if (con != null) {
try {
con.close();
} catch (SQLException e) {
logger.error(e.getMessage(),e);
}
}
} }
} }
@ -193,11 +177,11 @@ public class SqlTask extends AbstractTask {
setSqlParamsMap(sql, rgex, sqlParamsMap, paramsMap); setSqlParamsMap(sql, rgex, sqlParamsMap, paramsMap);
// replace the ${} of the SQL statement with the Placeholder // replace the ${} of the SQL statement with the Placeholder
String formatSql = sql.replaceAll(rgex,"?"); String formatSql = sql.replaceAll(rgex, "?");
sqlBuilder.append(formatSql); sqlBuilder.append(formatSql);
// print repalce sql // print repalce sql
printReplacedSql(sql,formatSql,rgex,sqlParamsMap); printReplacedSql(sql, formatSql, rgex, sqlParamsMap);
return new SqlBinds(sqlBuilder.toString(), sqlParamsMap); return new SqlBinds(sqlBuilder.toString(), sqlParamsMap);
} }
@ -214,105 +198,195 @@ public class SqlTask extends AbstractTask {
* @param createFuncs create functions * @param createFuncs create functions
* @return Connection * @return Connection
*/ */
public Connection executeFuncAndSql(SqlBinds mainSqlBinds, public void executeFuncAndSql(SqlBinds mainSqlBinds,
List<SqlBinds> preStatementsBinds, List<SqlBinds> preStatementsBinds,
List<SqlBinds> postStatementsBinds, List<SqlBinds> postStatementsBinds,
List<String> createFuncs){ List<String> createFuncs){
Connection connection = null; Connection connection = null;
PreparedStatement stmt = null;
ResultSet resultSet = null;
try { try {
// if upload resource is HDFS and kerberos startup // if upload resource is HDFS and kerberos startup
CommonUtils.loadKerberosConf(); CommonUtils.loadKerberosConf();
// if hive , load connection params if exists
if (HIVE == DbType.valueOf(sqlParameters.getType())) { // create connection
Properties paramProp = new Properties(); connection = createConnection();
paramProp.setProperty(USER, baseDataSource.getUser());
paramProp.setProperty(PASSWORD, baseDataSource.getPassword());
Map<String, String> connParamMap = CollectionUtils.stringToMap(sqlParameters.getConnParams(),
SEMICOLON,
HIVE_CONF);
paramProp.putAll(connParamMap);
connection = DriverManager.getConnection(baseDataSource.getJdbcUrl(),
paramProp);
}else{
connection = DriverManager.getConnection(baseDataSource.getJdbcUrl(),
baseDataSource.getUser(),
baseDataSource.getPassword());
}
// create temp function // create temp function
if (CollectionUtils.isNotEmpty(createFuncs)) { if (CollectionUtils.isNotEmpty(createFuncs)) {
try (Statement funcStmt = connection.createStatement()) { createTempFunction(connection,createFuncs);
for (String createFunc : createFuncs) {
logger.info("hive create function sql: {}", createFunc);
funcStmt.execute(createFunc);
}
}
} }
for (SqlBinds sqlBind: preStatementsBinds) { // pre sql
try (PreparedStatement stmt = prepareStatementAndBind(connection, sqlBind)) { preSql(connection,preStatementsBinds);
int result = stmt.executeUpdate();
logger.info("pre statement execute result: {}, for sql: {}",result,sqlBind.getSql());
}
}
try (PreparedStatement stmt = prepareStatementAndBind(connection, mainSqlBinds);
ResultSet resultSet = stmt.executeQuery()) {
// decide whether to executeQuery or executeUpdate based on sqlType
if (sqlParameters.getSqlType() == SqlType.QUERY.ordinal()) {
// query statements need to be convert to JsonArray and inserted into Alert to send
JSONArray resultJSONArray = new JSONArray();
ResultSetMetaData md = resultSet.getMetaData();
int num = md.getColumnCount();
while (resultSet.next()) {
JSONObject mapOfColValues = new JSONObject(true);
for (int i = 1; i <= num; i++) {
mapOfColValues.put(md.getColumnName(i), resultSet.getObject(i));
}
resultJSONArray.add(mapOfColValues);
}
logger.debug("execute sql : {}", JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue));
// if there is a result set
if ( !resultJSONArray.isEmpty() ) {
if (StringUtils.isNotEmpty(sqlParameters.getTitle())) {
sendAttachment(sqlParameters.getTitle(),
JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue));
}else{
sendAttachment(taskExecutionContext.getTaskName() + " query resultsets ",
JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue));
}
}
exitStatusCode = 0;
} else if (sqlParameters.getSqlType() == SqlType.NON_QUERY.ordinal()) {
// non query statement
stmt.executeUpdate();
exitStatusCode = 0;
}
}
for (SqlBinds sqlBind: postStatementsBinds) { stmt = prepareStatementAndBind(connection, mainSqlBinds);
try (PreparedStatement stmt = prepareStatementAndBind(connection, sqlBind)) { resultSet = stmt.executeQuery();
int result = stmt.executeUpdate(); // decide whether to executeQuery or executeUpdate based on sqlType
logger.info("post statement execute result: {},for sql: {}",result,sqlBind.getSql()); if (sqlParameters.getSqlType() == SqlType.QUERY.ordinal()) {
} // query statements need to be convert to JsonArray and inserted into Alert to send
resultProcess(resultSet);
} else if (sqlParameters.getSqlType() == SqlType.NON_QUERY.ordinal()) {
// non query statement
stmt.executeUpdate();
} }
postSql(connection,postStatementsBinds);
} catch (Exception e) { } catch (Exception e) {
logger.error(e.getMessage(),e); logger.error("execute sql error",e);
throw new RuntimeException(e.getMessage()); throw new RuntimeException("execute sql error");
} finally { } finally {
close(resultSet,stmt,connection);
}
}
/**
* result process
*
* @param resultSet resultSet
* @throws Exception
*/
private void resultProcess(ResultSet resultSet) throws Exception{
JSONArray resultJSONArray = new JSONArray();
ResultSetMetaData md = resultSet.getMetaData();
int num = md.getColumnCount();
while (resultSet.next()) {
JSONObject mapOfColValues = new JSONObject(true);
for (int i = 1; i <= num; i++) {
mapOfColValues.put(md.getColumnName(i), resultSet.getObject(i));
}
resultJSONArray.add(mapOfColValues);
}
logger.debug("execute sql : {}", JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue));
// if there is a result set
if (!resultJSONArray.isEmpty() ) {
if (StringUtils.isNotEmpty(sqlParameters.getTitle())) {
sendAttachment(sqlParameters.getTitle(),
JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue));
}else{
sendAttachment(taskExecutionContext.getTaskName() + " query resultsets ",
JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue));
}
}
}
/**
* pre sql
*
* @param connection connection
* @param preStatementsBinds preStatementsBinds
*/
private void preSql(Connection connection,
List<SqlBinds> preStatementsBinds) throws Exception{
for (SqlBinds sqlBind: preStatementsBinds) {
try (PreparedStatement pstmt = prepareStatementAndBind(connection, sqlBind)){
int result = pstmt.executeUpdate();
logger.info("pre statement execute result: {}, for sql: {}",result,sqlBind.getSql());
}
}
}
/**
* post psql
*
* @param connection connection
* @param postStatementsBinds postStatementsBinds
* @throws Exception
*/
private void postSql(Connection connection,
List<SqlBinds> postStatementsBinds) throws Exception{
for (SqlBinds sqlBind: postStatementsBinds) {
try (PreparedStatement pstmt = prepareStatementAndBind(connection, sqlBind)){
int result = pstmt.executeUpdate();
logger.info("post statement execute result: {},for sql: {}",result,sqlBind.getSql());
}
}
}
/**
* create temp function
*
* @param connection connection
* @param createFuncs createFuncs
* @throws Exception
*/
private void createTempFunction(Connection connection,
List<String> createFuncs) throws Exception{
try (Statement funcStmt = connection.createStatement()) {
for (String createFunc : createFuncs) {
logger.info("hive create function sql: {}", createFunc);
funcStmt.execute(createFunc);
}
}
}
/**
* create connection
*
* @return connection
* @throws Exception
*/
private Connection createConnection() throws Exception{
// if hive , load connection params if exists
Connection connection = null;
if (HIVE == DbType.valueOf(sqlParameters.getType())) {
Properties paramProp = new Properties();
paramProp.setProperty(USER, baseDataSource.getUser());
paramProp.setProperty(PASSWORD, baseDataSource.getPassword());
Map<String, String> connParamMap = CollectionUtils.stringToMap(sqlParameters.getConnParams(),
SEMICOLON,
HIVE_CONF);
paramProp.putAll(connParamMap);
connection = DriverManager.getConnection(baseDataSource.getJdbcUrl(),
paramProp);
}else{
connection = DriverManager.getConnection(baseDataSource.getJdbcUrl(),
baseDataSource.getUser(),
baseDataSource.getPassword());
}
return connection;
}
/**
* close jdbc resource
*
* @param resultSet resultSet
* @param pstmt pstmt
* @param connection connection
*/
private void close(ResultSet resultSet,
PreparedStatement pstmt,
Connection connection){
if (resultSet != null){
try { try {
connection.close(); connection.close();
} catch (Exception e) { } catch (SQLException e) {
logger.error(e.getMessage(), e);
}
}
if (pstmt != null){
try {
connection.close();
} catch (SQLException e) {
}
}
if (connection != null){
try {
connection.close();
} catch (SQLException e) {
} }
} }
return connection;
} }
/** /**
@ -326,20 +400,19 @@ public class SqlTask extends AbstractTask {
// is the timeout set // is the timeout set
boolean timeoutFlag = TaskTimeoutStrategy.of(taskExecutionContext.getTaskTimeoutStrategy()) == TaskTimeoutStrategy.FAILED || boolean timeoutFlag = TaskTimeoutStrategy.of(taskExecutionContext.getTaskTimeoutStrategy()) == TaskTimeoutStrategy.FAILED ||
TaskTimeoutStrategy.of(taskExecutionContext.getTaskTimeoutStrategy()) == TaskTimeoutStrategy.WARNFAILED; TaskTimeoutStrategy.of(taskExecutionContext.getTaskTimeoutStrategy()) == TaskTimeoutStrategy.WARNFAILED;
try (PreparedStatement stmt = connection.prepareStatement(sqlBinds.getSql())) { PreparedStatement stmt = connection.prepareStatement(sqlBinds.getSql());
if(timeoutFlag){ if(timeoutFlag){
stmt.setQueryTimeout(taskExecutionContext.getTaskTimeout()); stmt.setQueryTimeout(taskExecutionContext.getTaskTimeout());
} }
Map<Integer, Property> params = sqlBinds.getParamsMap(); Map<Integer, Property> params = sqlBinds.getParamsMap();
if(params != null) { if(params != null) {
for (Map.Entry<Integer, Property> entry : params.entrySet()) { for (Map.Entry<Integer, Property> entry : params.entrySet()) {
Property prop = entry.getValue(); Property prop = entry.getValue();
ParameterUtils.setInParameter(entry.getKey(), stmt, prop.getType(), prop.getValue()); ParameterUtils.setInParameter(entry.getKey(), stmt, prop.getType(), prop.getValue());
}
} }
logger.info("prepare statement replace sql : {} ", stmt);
return stmt;
} }
logger.info("prepare statement replace sql : {} ", stmt);
return stmt;
} }
/** /**

1
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -1513,7 +1513,6 @@ public class ProcessService {
* @return udf function list * @return udf function list
*/ */
public List<UdfFunc> queryUdfFunListByids(int[] ids){ public List<UdfFunc> queryUdfFunListByids(int[] ids){
return udfFuncMapper.queryUdfByIdStr(ids, null); return udfFuncMapper.queryUdfByIdStr(ids, null);
} }

Loading…
Cancel
Save