Browse Source

[improve] Using create or replace function in sql task (#10170)

3.1.0-release
He Zhao 2 years ago committed by GitHub
parent
commit
da3c25dc67
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      docs/docs/en/guide/task/sql.md
  2. 3
      docs/docs/zh/guide/task/sql.md
  3. 5
      dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java

2
docs/docs/en/guide/task/sql.md

@ -46,3 +46,5 @@ Log in to the bigdata cluster and use 'hive' command or 'beeline' or 'JDBC' and
## Notice ## Notice
Pay attention to the selection of SQL type. If it is an insert operation, need to change to "Non-Query" type. Pay attention to the selection of SQL type. If it is an insert operation, need to change to "Non-Query" type.
To compatible with long session,UDF function are created by the syntax(CREATE OR REPLACE)

3
docs/docs/zh/guide/task/sql.md

@ -45,4 +45,5 @@ SQL任务类型,用于连接数据库并执行相应SQL。
## 注意事项 ## 注意事项
注意SQL类型的选择,如果是INSERT等操作需要选择非查询类型。 * 注意SQL类型的选择,如果是INSERT等操作需要选择非查询类型。
* 为了兼容长会话情况,UDF函数的创建是通过CREATE OR REPLACE语句

5
dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java

@ -82,8 +82,9 @@ public class SqlTask extends AbstractTaskExecutor {
/** /**
* create function format * create function format
* include replace here which can be compatible with more cases, for example a long-running Spark session in Kyuubi will keep its own temp functions instead of destroying them right away
*/ */
private static final String CREATE_FUNCTION_FORMAT = "create temporary function {0} as ''{1}''"; private static final String CREATE_OR_REPLACE_FUNCTION_FORMAT = "create or replace temporary function {0} as ''{1}''";
/** /**
* default query sql limit * default query sql limit
@ -479,7 +480,7 @@ public class SqlTask extends AbstractTaskExecutor {
*/ */
private List<String> buildTempFuncSql(List<UdfFuncParameters> udfFuncParameters) { private List<String> buildTempFuncSql(List<UdfFuncParameters> udfFuncParameters) {
return udfFuncParameters.stream().map(value -> MessageFormat return udfFuncParameters.stream().map(value -> MessageFormat
.format(CREATE_FUNCTION_FORMAT, value.getFuncName(), value.getClassName())).collect(Collectors.toList()); .format(CREATE_OR_REPLACE_FUNCTION_FORMAT, value.getFuncName(), value.getClassName())).collect(Collectors.toList());
} }
/** /**

Loading…
Cancel
Save