diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParameters.java index 045495ce05..8c7725ccf8 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParameters.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParameters.java @@ -113,6 +113,15 @@ public class SqlParameters extends AbstractParameters { private int limit; + /** + * segment separator + * + *

The segment separator is used + * when the data source does not support multi-segment SQL execution, + * and the client needs to split the SQL and execute it multiple times.

+ */ + private String segmentSeparator; + public int getLimit() { return limit; } @@ -225,6 +234,14 @@ public class SqlParameters extends AbstractParameters { this.groupId = groupId; } + public String getSegmentSeparator() { + return segmentSeparator; + } + + public void setSegmentSeparator(String segmentSeparator) { + this.segmentSeparator = segmentSeparator; + } + @Override public boolean checkParameters() { return datasource != 0 && StringUtils.isNotEmpty(type) && StringUtils.isNotEmpty(sql); @@ -292,6 +309,7 @@ public class SqlParameters extends AbstractParameters { + ", sendEmail=" + sendEmail + ", displayRows=" + displayRows + ", limit=" + limit + + ", segmentSeparator=" + segmentSeparator + ", udfs='" + udfs + '\'' + ", showType='" + showType + '\'' + ", connParams='" + connParams + '\'' diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlSplitter.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlSplitter.java new file mode 100644 index 0000000000..a461748a93 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlSplitter.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.sql; + +import org.apache.commons.lang.StringUtils; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class SqlSplitter { + + private SqlSplitter() { + } + + private static final String LINE_SEPARATOR = "\n"; + + /** + * split sql by segment separator + *

The segment separator is used + * when the data source does not support multi-segment SQL execution, + * and the client needs to split the SQL and execute it multiple times.

+ * @param sql + * @param segmentSeparator + * @return + */ + public static List split(String sql, String segmentSeparator) { + if (StringUtils.isBlank(segmentSeparator)) { + return Collections.singletonList(sql); + } + + String[] lines = sql.split(LINE_SEPARATOR); + List segments = new ArrayList<>(); + StringBuilder stmt = new StringBuilder(); + for (String line : lines) { + if (line.trim().isEmpty() || line.startsWith("--")) { + continue; + } + stmt.append(LINE_SEPARATOR).append(line); + if (line.trim().endsWith(segmentSeparator)) { + segments.add(stmt.toString()); + stmt.setLength(0); + } + } + if (stmt.length() > 0) { + segments.add(stmt.toString()); + } + return segments; + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java index ad31202004..3b0fa0e8c5 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java @@ -136,7 +136,11 @@ public class SqlTask extends AbstractTaskExecutor { sqlTaskExecutionContext.getConnectionParams()); // ready to execute SQL and parameter entity Map - SqlBinds mainSqlBinds = getSqlAndSqlParamsMap(sqlParameters.getSql()); + List mainStatementSqlBinds = SqlSplitter.split(sqlParameters.getSql(), sqlParameters.getSegmentSeparator()) + .stream() + .map(this::getSqlAndSqlParamsMap) + .collect(Collectors.toList()); + List preStatementSqlBinds = Optional.ofNullable(sqlParameters.getPreStatements()) .orElse(new ArrayList<>()) .stream() @@ -151,7 +155,7 @@ public class SqlTask extends AbstractTaskExecutor { List createFuncs = createFuncs(sqlTaskExecutionContext.getUdfFuncParametersList(), logger); // execute sql task - executeFuncAndSql(mainSqlBinds, preStatementSqlBinds, postStatementSqlBinds, createFuncs); + executeFuncAndSql(mainStatementSqlBinds, preStatementSqlBinds, postStatementSqlBinds, createFuncs); setExitStatusCode(TaskConstants.EXIT_CODE_SUCCESS); @@ -165,17 +169,16 @@ public class SqlTask extends AbstractTaskExecutor { /** * execute function and sql * - * @param mainSqlBinds main sql binds + * @param mainStatementsBinds main statements binds * @param preStatementsBinds pre statements binds * @param postStatementsBinds post statements binds * @param createFuncs create functions */ - public void executeFuncAndSql(SqlBinds mainSqlBinds, + public void executeFuncAndSql(List mainStatementsBinds, List preStatementsBinds, List postStatementsBinds, List createFuncs) throws Exception { Connection connection = null; - PreparedStatement stmt = null; ResultSet resultSet = null; try { @@ -186,30 +189,31 @@ public class SqlTask extends AbstractTaskExecutor { createTempFunction(connection, createFuncs); } - // pre sql - preSql(connection, preStatementsBinds); - stmt = prepareStatementAndBind(connection, mainSqlBinds); + // pre execute + executeUpdate(connection, preStatementsBinds, "pre"); + // main execute String result = null; // decide whether to executeQuery or executeUpdate based on sqlType if (sqlParameters.getSqlType() == SqlType.QUERY.ordinal()) { // query statements need to be convert to JsonArray and inserted into Alert to send - resultSet = stmt.executeQuery(); + resultSet = executeQuery(connection, mainStatementsBinds.get(0), "main"); result = resultProcess(resultSet); - } else if (sqlParameters.getSqlType() == SqlType.NON_QUERY.ordinal()) { // non query statement - String updateResult = String.valueOf(stmt.executeUpdate()); + String updateResult = executeUpdate(connection, mainStatementsBinds, "main"); result = setNonQuerySqlReturn(updateResult, sqlParameters.getLocalParams()); } //deal out params sqlParameters.dealOutParam(result); - postSql(connection, postStatementsBinds); + + // post execute + executeUpdate(connection, postStatementsBinds, "post"); } catch (Exception e) { logger.error("execute sql error: {}", e.getMessage()); throw e; } finally { - close(resultSet, stmt, connection); + close(resultSet, connection); } } @@ -288,37 +292,22 @@ public class SqlTask extends AbstractTaskExecutor { setTaskAlertInfo(taskAlertInfo); } - /** - * pre sql - * - * @param connection connection - * @param preStatementsBinds preStatementsBinds - */ - private void preSql(Connection connection, - List preStatementsBinds) throws Exception { - for (SqlBinds sqlBind : preStatementsBinds) { - try (PreparedStatement pstmt = prepareStatementAndBind(connection, sqlBind)) { - int result = pstmt.executeUpdate(); - logger.info("pre statement execute result: {}, for sql: {}", result, sqlBind.getSql()); - - } + private ResultSet executeQuery(Connection connection, SqlBinds sqlBinds, String handlerType) throws Exception { + try (PreparedStatement statement = prepareStatementAndBind(connection, sqlBinds)) { + logger.info("{} statement execute query, for sql: {}", handlerType, sqlBinds.getSql()); + return statement.executeQuery(); } } - /** - * post sql - * - * @param connection connection - * @param postStatementsBinds postStatementsBinds - */ - private void postSql(Connection connection, - List postStatementsBinds) throws Exception { - for (SqlBinds sqlBind : postStatementsBinds) { - try (PreparedStatement pstmt = prepareStatementAndBind(connection, sqlBind)) { - int result = pstmt.executeUpdate(); - logger.info("post statement execute result: {},for sql: {}", result, sqlBind.getSql()); + private String executeUpdate(Connection connection, List statementsBinds, String handlerType) throws Exception { + int result = 0; + for (SqlBinds sqlBind : statementsBinds) { + try (PreparedStatement statement = prepareStatementAndBind(connection, sqlBind)) { + result = statement.executeUpdate(); + logger.info("{} statement execute update result: {}, for sql: {}", handlerType, result, sqlBind.getSql()); } } + return String.valueOf(result); } /** @@ -341,12 +330,9 @@ public class SqlTask extends AbstractTaskExecutor { * close jdbc resource * * @param resultSet resultSet - * @param pstmt pstmt * @param connection connection */ - private void close(ResultSet resultSet, - PreparedStatement pstmt, - Connection connection) { + private void close(ResultSet resultSet, Connection connection) { if (resultSet != null) { try { resultSet.close(); @@ -355,14 +341,6 @@ public class SqlTask extends AbstractTaskExecutor { } } - if (pstmt != null) { - try { - pstmt.close(); - } catch (SQLException e) { - logger.error("close prepared statement error : {}", e.getMessage(), e); - } - } - if (connection != null) { try { connection.close(); diff --git a/dolphinscheduler-ui/src/locales/modules/en_US.ts b/dolphinscheduler-ui/src/locales/modules/en_US.ts index 4a391d0301..67b26f1ae6 100644 --- a/dolphinscheduler-ui/src/locales/modules/en_US.ts +++ b/dolphinscheduler-ui/src/locales/modules/en_US.ts @@ -924,6 +924,8 @@ const project = { required: 'required', emr_flow_define_json: 'jobFlowDefineJson', emr_flow_define_json_tips: 'Please enter the definition of the job flow.', + segment_separator: 'Segment Execution Separator', + segment_separator_tips: 'Please enter the segment execution separator', zeppelin_note_id: 'zeppelinNoteId', zeppelin_note_id_tips: 'Please enter the note id of your zeppelin note', zeppelin_paragraph_id: 'zeppelinParagraphId', diff --git a/dolphinscheduler-ui/src/locales/modules/zh_CN.ts b/dolphinscheduler-ui/src/locales/modules/zh_CN.ts index 88d0aab6c3..ea8a12809f 100644 --- a/dolphinscheduler-ui/src/locales/modules/zh_CN.ts +++ b/dolphinscheduler-ui/src/locales/modules/zh_CN.ts @@ -913,6 +913,8 @@ const project = { required: '必填', emr_flow_define_json: 'jobFlowDefineJson', emr_flow_define_json_tips: '请输入工作流定义', + segment_separator: '分段执行符号', + segment_separator_tips: '请输入分段执行符号', zeppelin_note_id: 'zeppelin_note_id', zeppelin_note_id_tips: '请输入zeppelin note id', zeppelin_paragraph_id: 'zeppelin_paragraph_id', diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-sql-type.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-sql-type.ts index 1e93c9e208..3ad01cf846 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-sql-type.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-sql-type.ts @@ -24,6 +24,7 @@ import type { IJsonItem } from '../types' export function useSqlType(model: { [field: string]: any }): IJsonItem[] { const { t } = useI18n() const querySpan = computed(() => (model.sqlType === '0' ? 6 : 0)) + const nonQuerySpan = computed(() => (model.sqlType === '1' ? 6 : 0)) const emailSpan = computed(() => model.sqlType === '0' && model.sendEmail ? 24 : 0 ) @@ -67,6 +68,15 @@ export function useSqlType(model: { [field: string]: any }): IJsonItem[] { required: true } }, + { + type: 'input', + field: 'segmentSeparator', + name: t('project.node.segment_separator'), + props: { + placeholder: t('project.node.segment_separator_tips') + }, + span: nonQuerySpan + }, { type: 'switch', field: 'sendEmail', diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts index d391203e47..dc583e7c0c 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts @@ -179,6 +179,7 @@ export function formatParams(data: INodeData): { taskParams.sqlType = data.sqlType taskParams.preStatements = data.preStatements taskParams.postStatements = data.postStatements + taskParams.segmentSeparator = data.segmentSeparator taskParams.sendEmail = data.sendEmail taskParams.displayRows = data.displayRows if (data.sqlType === '0' && data.sendEmail) { diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sql.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sql.ts index de02be448e..63111d54fe 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sql.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sql.ts @@ -46,6 +46,7 @@ export function useSql({ timeout: 30, type: 'MYSQL', displayRows: 10, + segmentSeparator: '', sql: '', sqlType: '0', preStatements: [], diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts index d334de299f..9d9e413db1 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts @@ -241,6 +241,7 @@ interface ITaskParams { datasource?: string sql?: string sqlType?: string + segmentSeparator?: string sendEmail?: boolean displayRows?: number title?: string