diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTaskExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTaskExecutor.java index 5b5661cba7..22fbc6b00f 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTaskExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTaskExecutor.java @@ -19,10 +19,14 @@ package org.apache.dolphinscheduler.plugin.task.api; import org.apache.dolphinscheduler.spi.task.AbstractTask; import org.apache.dolphinscheduler.spi.task.TaskConstants; +import org.apache.dolphinscheduler.spi.task.Property; import org.apache.dolphinscheduler.spi.task.request.TaskRequest; +import java.util.Map; import java.util.StringJoiner; import java.util.concurrent.LinkedBlockingQueue; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,6 +39,7 @@ public abstract class AbstractTaskExecutor extends AbstractTask { protected final Logger logger = LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, getClass())); + public String rgex = "['\"]*\\$\\{(.*?)\\}['\"]*"; /** * constructor * @@ -61,4 +66,35 @@ public abstract class AbstractTaskExecutor extends AbstractTask { logger.info(" -> {}", joiner); } } + + /** + * regular expressions match the contents between two specified strings + * + * @param content content + * @param rgex rgex + * @param sqlParamsMap sql params map + * @param paramsPropsMap params props map + */ + public void setSqlParamsMap(String content, String rgex, Map sqlParamsMap, + Map paramsPropsMap,int taskInstanceId) { + Pattern pattern = Pattern.compile(rgex); + Matcher m = pattern.matcher(content); + int index = 1; + while (m.find()) { + + String paramName = m.group(1); + Property prop = paramsPropsMap.get(paramName); + + if (prop == null) { + logger.error("setSqlParamsMap: No Property with paramName: {} is found in paramsPropsMap of task instance" + + " with id: {}. So couldn't put Property in sqlParamsMap.", paramName, taskInstanceId); + } else { + sqlParamsMap.put(index, prop); + index++; + logger.info("setSqlParamsMap: Property with paramName: {} put in sqlParamsMap of content {} successfully.", paramName, content); + } + + } + } + } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureParameters.java index bfb56c2f5f..d0e35b9642 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureParameters.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureParameters.java @@ -17,12 +17,16 @@ package org.apache.dolphinscheduler.plugin.task.procedure; +import org.apache.commons.collections.CollectionUtils; import org.apache.dolphinscheduler.spi.task.AbstractParameters; +import org.apache.dolphinscheduler.spi.task.Property; import org.apache.dolphinscheduler.spi.task.ResourceInfo; import org.apache.dolphinscheduler.spi.utils.StringUtils; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** * procedure parameter @@ -39,6 +43,8 @@ public class ProcedureParameters extends AbstractParameters { */ private int datasource; + private Map outProperty; + /** * procedure name */ @@ -86,4 +92,33 @@ public class ProcedureParameters extends AbstractParameters { + ", method='" + method + '\'' + '}'; } + + public void dealOutParam4Procedure(Object result, String pop) { + Map properties = getOutProperty(); + if (this.outProperty == null) { + return; + } + properties.get(pop).setValue(String.valueOf(result)); + varPool.add(properties.get(pop)); + } + + public Map getOutProperty() { + if (this.outProperty != null) { + return this.outProperty; + } + if (CollectionUtils.isEmpty(localParams)) { + return null; + } + List outPropertyList = getOutProperty(localParams); + Map outProperty = new HashMap<>(); + for (Property info : outPropertyList) { + outProperty.put(info.getProp(), info); + } + this.outProperty = outProperty; + return this.outProperty; + } + + public void setOutProperty(Map outProperty) { + this.outProperty = outProperty; + } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java index 202b75dab2..35a95f1381 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java @@ -36,17 +36,15 @@ import org.apache.dolphinscheduler.spi.task.request.TaskRequest; import org.apache.dolphinscheduler.spi.utils.JSONUtils; import org.apache.dolphinscheduler.spi.utils.StringUtils; -import org.apache.commons.collections4.CollectionUtils; - import java.sql.CallableStatement; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Types; -import java.util.Collection; import java.util.HashMap; import java.util.Map; + /** * procedure task */ @@ -102,18 +100,17 @@ public class ProcedureTask extends AbstractTaskExecutor { // get jdbc connection connection = DataSourceClientProvider.getInstance().getConnection(dbType, connectionParam); - - // combining local and global parameters - Map paramsMap = ParamUtils.convert(taskExecutionContext,getParameters()); - + Map sqlParamsMap = new HashMap<>(); + Map paramsMap = ParamUtils.convert(taskExecutionContext, getParameters()); + String proceduerSql = formatSql(sqlParamsMap, paramsMap); // call method - stmt = connection.prepareCall(procedureParameters.getMethod()); + stmt = connection.prepareCall(proceduerSql); // set timeout setTimeout(stmt); // outParameterMap - Map outParameterMap = getOutParameterMap(stmt, paramsMap); + Map outParameterMap = getOutParameterMap(stmt, sqlParamsMap, paramsMap); stmt.executeUpdate(); @@ -130,6 +127,12 @@ public class ProcedureTask extends AbstractTaskExecutor { } } + private String formatSql(Map sqlParamsMap, Map paramsMap) { + // combining local and global parameters + setSqlParamsMap(procedureParameters.getMethod(), rgex, sqlParamsMap, paramsMap, taskExecutionContext.getTaskInstanceId()); + return procedureParameters.getMethod().replaceAll(rgex, "?"); + } + /** * print outParameter * @@ -145,7 +148,7 @@ public class ProcedureTask extends AbstractTaskExecutor { String prop = property.getProp(); DataType dataType = property.getType(); // get output parameter - getOutputParameter(stmt, index, prop, dataType); + procedureParameters.dealOutParam4Procedure(getOutputParameter(stmt, index, prop, dataType), prop); } } @@ -157,34 +160,25 @@ public class ProcedureTask extends AbstractTaskExecutor { * @return outParameterMap * @throws Exception Exception */ - private Map getOutParameterMap(CallableStatement stmt, Map paramsMap) throws Exception { + private Map getOutParameterMap(CallableStatement stmt, Map paramsMap + , Map totalParamsMap) throws Exception { Map outParameterMap = new HashMap<>(); if (procedureParameters.getLocalParametersMap() == null) { return outParameterMap; } - Collection userDefParamsList = procedureParameters.getLocalParametersMap().values(); - - if (CollectionUtils.isEmpty(userDefParamsList)) { - return outParameterMap; - } - int index = 1; - for (Property property : userDefParamsList) { - logger.info("localParams : prop : {} , dirct : {} , type : {} , value : {}" - , property.getProp(), - property.getDirect(), - property.getType(), - property.getValue()); - // set parameters - if (property.getDirect().equals(Direct.IN)) { - ParameterUtils.setInParameter(index, stmt, property.getType(), paramsMap.get(property.getProp()).getValue()); - } else if (property.getDirect().equals(Direct.OUT)) { - setOutParameter(index, stmt, property.getType(), paramsMap.get(property.getProp()).getValue()); - property.setValue(paramsMap.get(property.getProp()).getValue()); - outParameterMap.put(index, property); + if (paramsMap != null) { + for (Map.Entry entry : paramsMap.entrySet()) { + Property property = entry.getValue(); + if (property.getDirect().equals(Direct.IN)) { + ParameterUtils.setInParameter(index, stmt, property.getType(), totalParamsMap.get(property.getProp()).getValue()); + } else if (property.getDirect().equals(Direct.OUT)) { + setOutParameter(index, stmt, property.getType(), totalParamsMap.get(property.getProp()).getValue()); + outParameterMap.put(index, property); + } + index++; } - index++; } return outParameterMap; @@ -235,38 +229,49 @@ public class ProcedureTask extends AbstractTaskExecutor { * @param dataType dataType * @throws SQLException SQLException */ - private void getOutputParameter(CallableStatement stmt, int index, String prop, DataType dataType) throws SQLException { + private Object getOutputParameter(CallableStatement stmt, int index, String prop, DataType dataType) throws SQLException { + Object value = null; switch (dataType) { case VARCHAR: logger.info("out prameter varchar key : {} , value : {}", prop, stmt.getString(index)); + value = stmt.getString(index); break; case INTEGER: logger.info("out prameter integer key : {} , value : {}", prop, stmt.getInt(index)); + value = stmt.getInt(index); break; case LONG: logger.info("out prameter long key : {} , value : {}", prop, stmt.getLong(index)); + value = stmt.getLong(index); break; case FLOAT: logger.info("out prameter float key : {} , value : {}", prop, stmt.getFloat(index)); + value = stmt.getFloat(index); break; case DOUBLE: logger.info("out prameter double key : {} , value : {}", prop, stmt.getDouble(index)); + value = stmt.getDouble(index); break; case DATE: logger.info("out prameter date key : {} , value : {}", prop, stmt.getDate(index)); + value = stmt.getDate(index); break; case TIME: logger.info("out prameter time key : {} , value : {}", prop, stmt.getTime(index)); + value = stmt.getTime(index); break; case TIMESTAMP: logger.info("out prameter timestamp key : {} , value : {}", prop, stmt.getTimestamp(index)); + value = stmt.getTimestamp(index); break; case BOOLEAN: logger.info("out prameter boolean key : {} , value : {}", prop, stmt.getBoolean(index)); + value = stmt.getBoolean(index); break; default: break; } + return value; } @Override diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java index d8bae7ad24..a260130423 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java @@ -405,35 +405,6 @@ public class SqlTask extends AbstractTaskExecutor { } - /** - * regular expressions match the contents between two specified strings - * - * @param content content - * @param rgex rgex - * @param sqlParamsMap sql params map - * @param paramsPropsMap params props map - */ - public void setSqlParamsMap(String content, String rgex, Map sqlParamsMap, Map paramsPropsMap) { - Pattern pattern = Pattern.compile(rgex); - Matcher m = pattern.matcher(content); - int index = 1; - while (m.find()) { - - String paramName = m.group(1); - Property prop = paramsPropsMap.get(paramName); - - if (prop == null) { - logger.error("setSqlParamsMap: No Property with paramName: {} is found in paramsPropsMap of task instance" - + " with id: {}. So couldn't put Property in sqlParamsMap.", paramName, taskExecutionContext.getTaskInstanceId()); - } else { - sqlParamsMap.put(index, prop); - index++; - logger.info("setSqlParamsMap: Property with paramName: {} put in sqlParamsMap of content {} successfully.", paramName, content); - } - - } - } - /** * print replace sql * @@ -485,8 +456,7 @@ public class SqlTask extends AbstractTaskExecutor { //replace variable TIME with $[YYYYmmddd...] in sql when history run job and batch complement job sql = ParameterUtils.replaceScheduleTime(sql, taskExecutionContext.getScheduleTime()); // special characters need to be escaped, ${} needs to be escaped - String rgex = "['\"]*\\$\\{(.*?)\\}['\"]*"; - setSqlParamsMap(sql, rgex, sqlParamsMap, paramsMap); + setSqlParamsMap(sql, rgex, sqlParamsMap, paramsMap,taskExecutionContext.getTaskInstanceId()); //Replace the original value in sql !{...} ,Does not participate in precompilation String rgexo = "['\"]*\\!\\{(.*?)\\}['\"]*"; sql = replaceOriginalValue(sql, rgexo, paramsMap);