diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceProcessorProvider.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceProcessorProvider.java index 0b4994116a..6e21117c9f 100644 --- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceProcessorProvider.java +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceProcessorProvider.java @@ -18,9 +18,12 @@ package org.apache.dolphinscheduler.plugin.datasource.api.plugin; import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor; +import org.apache.dolphinscheduler.spi.enums.DbType; import java.util.Map; +import lombok.NonNull; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,6 +46,10 @@ public class DataSourceProcessorProvider { return DataSourceClientProviderHolder.INSTANCE; } + public DataSourceProcessor getDataSourceProcessor(@NonNull DbType dbType) { + return dataSourcePluginManager.getDataSourceProcessorMap().get(dbType.name()); + } + public Map getDataSourceProcessorMap() { return dataSourcePluginManager.getDataSourceProcessorMap(); } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java index 68b6c29e09..688374ba62 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java @@ -21,8 +21,8 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_COD import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS; import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider; -import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils; +import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor; +import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceProcessorProvider; import org.apache.dolphinscheduler.plugin.task.api.AbstractTask; import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack; import org.apache.dolphinscheduler.plugin.task.api.TaskException; @@ -40,7 +40,6 @@ import org.apache.commons.lang3.StringUtils; import java.sql.CallableStatement; import java.sql.Connection; -import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Types; import java.util.HashMap; @@ -96,19 +95,12 @@ public class ProcedureTask extends AbstractTask { procedureParameters.getMethod(), procedureParameters.getLocalParams()); - Connection connection = null; - CallableStatement stmt = null; - try { - // load class - DbType dbType = DbType.valueOf(procedureParameters.getType()); - // get datasource - ConnectionParam connectionParam = - DataSourceUtils.buildConnectionParams(DbType.valueOf(procedureParameters.getType()), - procedureTaskExecutionContext.getConnectionParams()); - - // get jdbc connection - connection = DataSourceClientProvider.getInstance().getConnection(dbType, connectionParam); - + DbType dbType = DbType.valueOf(procedureParameters.getType()); + DataSourceProcessor dataSourceProcessor = + DataSourceProcessorProvider.getInstance().getDataSourceProcessor(dbType); + ConnectionParam connectionParams = + dataSourceProcessor.createConnectionParams(procedureTaskExecutionContext.getConnectionParams()); + try (Connection connection = dataSourceProcessor.getConnection(connectionParams)) { Map sqlParamsMap = new HashMap<>(); Map paramsMap = taskExecutionContext.getPrepareParamsMap() == null ? Maps.newHashMap() : taskExecutionContext.getPrepareParamsMap(); @@ -116,30 +108,26 @@ public class ProcedureTask extends AbstractTask { // set out params before format sql paramsMap.putAll(procedureParameters.getOutProperty()); } - - // format sql String proceduerSql = formatSql(sqlParamsMap, paramsMap); // call method - stmt = connection.prepareCall(proceduerSql); - - // set timeout - setTimeout(stmt); + try (CallableStatement stmt = connection.prepareCall(proceduerSql)) { + // set timeout + setTimeout(stmt); - // outParameterMap - Map outParameterMap = getOutParameterMap(stmt, sqlParamsMap, paramsMap); + // outParameterMap + Map outParameterMap = getOutParameterMap(stmt, sqlParamsMap, paramsMap); - stmt.executeUpdate(); + stmt.executeUpdate(); - // print the output parameters to the log - printOutParameter(stmt, outParameterMap); + // print the output parameters to the log + printOutParameter(stmt, outParameterMap); - setExitStatusCode(EXIT_CODE_SUCCESS); + setExitStatusCode(EXIT_CODE_SUCCESS); + } } catch (Exception e) { setExitStatusCode(EXIT_CODE_FAILURE); logger.error("procedure task error", e); throw new TaskException("Execute procedure task failed", e); - } finally { - close(stmt, connection); } } @@ -220,29 +208,6 @@ public class ProcedureTask extends AbstractTask { } } - /** - * close jdbc resource - * - * @param stmt stmt - * @param connection connection - */ - private void close(PreparedStatement stmt, Connection connection) { - if (stmt != null) { - try { - stmt.close(); - } catch (SQLException e) { - logger.error("close prepared statement error : {}", e.getMessage(), e); - } - } - if (connection != null) { - try { - connection.close(); - } catch (SQLException e) { - logger.error("close connection error : {}", e.getMessage(), e); - } - } - } - /** * get output parameter *