Browse Source

[cherry-pick][FIX-#6727] Fix procedure params bug (#7680) (#7692)

* [FIX-#6727][worker-server] Fix procedure params bug (#7680)

* fix Procedure param error

* code style

Co-authored-by: wangxj <wangxj31>
# Conflicts:
#	dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTaskExecutor.java
#	dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java

* update

Co-authored-by: wangxj3 <857234426@qq.com>
2.0.7-release
BaoLiang 3 years ago committed by GitHub
parent
commit
bcef5f5588
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 36
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTaskExecutor.java
  2. 35
      dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureParameters.java
  3. 61
      dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
  4. 32
      dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java

36
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTaskExecutor.java

@ -19,10 +19,14 @@ package org.apache.dolphinscheduler.plugin.task.api;
import org.apache.dolphinscheduler.spi.task.AbstractTask; import org.apache.dolphinscheduler.spi.task.AbstractTask;
import org.apache.dolphinscheduler.spi.task.TaskConstants; import org.apache.dolphinscheduler.spi.task.TaskConstants;
import org.apache.dolphinscheduler.spi.task.Property;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest; import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
import java.util.Map;
import java.util.StringJoiner; import java.util.StringJoiner;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -35,6 +39,7 @@ public abstract class AbstractTaskExecutor extends AbstractTask {
protected final Logger logger = LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, getClass())); protected final Logger logger = LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, getClass()));
public String rgex = "['\"]*\\$\\{(.*?)\\}['\"]*";
/** /**
* constructor * constructor
* *
@ -61,4 +66,35 @@ public abstract class AbstractTaskExecutor extends AbstractTask {
logger.info(" -> {}", joiner); logger.info(" -> {}", joiner);
} }
} }
/**
* regular expressions match the contents between two specified strings
*
* @param content content
* @param rgex rgex
* @param sqlParamsMap sql params map
* @param paramsPropsMap params props map
*/
public void setSqlParamsMap(String content, String rgex, Map<Integer, Property> sqlParamsMap,
Map<String, Property> paramsPropsMap,int taskInstanceId) {
Pattern pattern = Pattern.compile(rgex);
Matcher m = pattern.matcher(content);
int index = 1;
while (m.find()) {
String paramName = m.group(1);
Property prop = paramsPropsMap.get(paramName);
if (prop == null) {
logger.error("setSqlParamsMap: No Property with paramName: {} is found in paramsPropsMap of task instance"
+ " with id: {}. So couldn't put Property in sqlParamsMap.", paramName, taskInstanceId);
} else {
sqlParamsMap.put(index, prop);
index++;
logger.info("setSqlParamsMap: Property with paramName: {} put in sqlParamsMap of content {} successfully.", paramName, content);
}
}
}
} }

35
dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureParameters.java

@ -17,12 +17,16 @@
package org.apache.dolphinscheduler.plugin.task.procedure; package org.apache.dolphinscheduler.plugin.task.procedure;
import org.apache.commons.collections.CollectionUtils;
import org.apache.dolphinscheduler.spi.task.AbstractParameters; import org.apache.dolphinscheduler.spi.task.AbstractParameters;
import org.apache.dolphinscheduler.spi.task.Property;
import org.apache.dolphinscheduler.spi.task.ResourceInfo; import org.apache.dolphinscheduler.spi.task.ResourceInfo;
import org.apache.dolphinscheduler.spi.utils.StringUtils; import org.apache.dolphinscheduler.spi.utils.StringUtils;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
/** /**
* procedure parameter * procedure parameter
@ -39,6 +43,8 @@ public class ProcedureParameters extends AbstractParameters {
*/ */
private int datasource; private int datasource;
private Map<String, Property> outProperty;
/** /**
* procedure name * procedure name
*/ */
@ -86,4 +92,33 @@ public class ProcedureParameters extends AbstractParameters {
+ ", method='" + method + '\'' + ", method='" + method + '\''
+ '}'; + '}';
} }
public void dealOutParam4Procedure(Object result, String pop) {
Map<String, Property> properties = getOutProperty();
if (this.outProperty == null) {
return;
}
properties.get(pop).setValue(String.valueOf(result));
varPool.add(properties.get(pop));
}
public Map<String, Property> getOutProperty() {
if (this.outProperty != null) {
return this.outProperty;
}
if (CollectionUtils.isEmpty(localParams)) {
return null;
}
List<Property> outPropertyList = getOutProperty(localParams);
Map<String, Property> outProperty = new HashMap<>();
for (Property info : outPropertyList) {
outProperty.put(info.getProp(), info);
}
this.outProperty = outProperty;
return this.outProperty;
}
public void setOutProperty(Map<String, Property> outProperty) {
this.outProperty = outProperty;
}
} }

61
dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java

@ -36,17 +36,15 @@ import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
import org.apache.dolphinscheduler.spi.utils.JSONUtils; import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils; import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.apache.commons.collections4.CollectionUtils;
import java.sql.CallableStatement; import java.sql.CallableStatement;
import java.sql.Connection; import java.sql.Connection;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Types; import java.sql.Types;
import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
/** /**
* procedure task * procedure task
*/ */
@ -102,18 +100,17 @@ public class ProcedureTask extends AbstractTaskExecutor {
// get jdbc connection // get jdbc connection
connection = DataSourceClientProvider.getInstance().getConnection(dbType, connectionParam); connection = DataSourceClientProvider.getInstance().getConnection(dbType, connectionParam);
Map<Integer, Property> sqlParamsMap = new HashMap<>();
// combining local and global parameters Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext, getParameters());
Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters()); String proceduerSql = formatSql(sqlParamsMap, paramsMap);
// call method // call method
stmt = connection.prepareCall(procedureParameters.getMethod()); stmt = connection.prepareCall(proceduerSql);
// set timeout // set timeout
setTimeout(stmt); setTimeout(stmt);
// outParameterMap // outParameterMap
Map<Integer, Property> outParameterMap = getOutParameterMap(stmt, paramsMap); Map<Integer, Property> outParameterMap = getOutParameterMap(stmt, sqlParamsMap, paramsMap);
stmt.executeUpdate(); stmt.executeUpdate();
@ -130,6 +127,12 @@ public class ProcedureTask extends AbstractTaskExecutor {
} }
} }
private String formatSql(Map<Integer, Property> sqlParamsMap, Map<String, Property> paramsMap) {
// combining local and global parameters
setSqlParamsMap(procedureParameters.getMethod(), rgex, sqlParamsMap, paramsMap, taskExecutionContext.getTaskInstanceId());
return procedureParameters.getMethod().replaceAll(rgex, "?");
}
/** /**
* print outParameter * print outParameter
* *
@ -145,7 +148,7 @@ public class ProcedureTask extends AbstractTaskExecutor {
String prop = property.getProp(); String prop = property.getProp();
DataType dataType = property.getType(); DataType dataType = property.getType();
// get output parameter // get output parameter
getOutputParameter(stmt, index, prop, dataType); procedureParameters.dealOutParam4Procedure(getOutputParameter(stmt, index, prop, dataType), prop);
} }
} }
@ -157,35 +160,26 @@ public class ProcedureTask extends AbstractTaskExecutor {
* @return outParameterMap * @return outParameterMap
* @throws Exception Exception * @throws Exception Exception
*/ */
private Map<Integer, Property> getOutParameterMap(CallableStatement stmt, Map<String, Property> paramsMap) throws Exception { private Map<Integer, Property> getOutParameterMap(CallableStatement stmt, Map<Integer, Property> paramsMap
, Map<String, Property> totalParamsMap) throws Exception {
Map<Integer, Property> outParameterMap = new HashMap<>(); Map<Integer, Property> outParameterMap = new HashMap<>();
if (procedureParameters.getLocalParametersMap() == null) { if (procedureParameters.getLocalParametersMap() == null) {
return outParameterMap; return outParameterMap;
} }
Collection<Property> userDefParamsList = procedureParameters.getLocalParametersMap().values();
if (CollectionUtils.isEmpty(userDefParamsList)) {
return outParameterMap;
}
int index = 1; int index = 1;
for (Property property : userDefParamsList) { if (paramsMap != null) {
logger.info("localParams : prop : {} , dirct : {} , type : {} , value : {}" for (Map.Entry<Integer, Property> entry : paramsMap.entrySet()) {
, property.getProp(), Property property = entry.getValue();
property.getDirect(),
property.getType(),
property.getValue());
// set parameters
if (property.getDirect().equals(Direct.IN)) { if (property.getDirect().equals(Direct.IN)) {
ParameterUtils.setInParameter(index, stmt, property.getType(), paramsMap.get(property.getProp()).getValue()); ParameterUtils.setInParameter(index, stmt, property.getType(), totalParamsMap.get(property.getProp()).getValue());
} else if (property.getDirect().equals(Direct.OUT)) { } else if (property.getDirect().equals(Direct.OUT)) {
setOutParameter(index, stmt, property.getType(), paramsMap.get(property.getProp()).getValue()); setOutParameter(index, stmt, property.getType(), totalParamsMap.get(property.getProp()).getValue());
property.setValue(paramsMap.get(property.getProp()).getValue());
outParameterMap.put(index, property); outParameterMap.put(index, property);
} }
index++; index++;
} }
}
return outParameterMap; return outParameterMap;
} }
@ -235,38 +229,49 @@ public class ProcedureTask extends AbstractTaskExecutor {
* @param dataType dataType * @param dataType dataType
* @throws SQLException SQLException * @throws SQLException SQLException
*/ */
private void getOutputParameter(CallableStatement stmt, int index, String prop, DataType dataType) throws SQLException { private Object getOutputParameter(CallableStatement stmt, int index, String prop, DataType dataType) throws SQLException {
Object value = null;
switch (dataType) { switch (dataType) {
case VARCHAR: case VARCHAR:
logger.info("out prameter varchar key : {} , value : {}", prop, stmt.getString(index)); logger.info("out prameter varchar key : {} , value : {}", prop, stmt.getString(index));
value = stmt.getString(index);
break; break;
case INTEGER: case INTEGER:
logger.info("out prameter integer key : {} , value : {}", prop, stmt.getInt(index)); logger.info("out prameter integer key : {} , value : {}", prop, stmt.getInt(index));
value = stmt.getInt(index);
break; break;
case LONG: case LONG:
logger.info("out prameter long key : {} , value : {}", prop, stmt.getLong(index)); logger.info("out prameter long key : {} , value : {}", prop, stmt.getLong(index));
value = stmt.getLong(index);
break; break;
case FLOAT: case FLOAT:
logger.info("out prameter float key : {} , value : {}", prop, stmt.getFloat(index)); logger.info("out prameter float key : {} , value : {}", prop, stmt.getFloat(index));
value = stmt.getFloat(index);
break; break;
case DOUBLE: case DOUBLE:
logger.info("out prameter double key : {} , value : {}", prop, stmt.getDouble(index)); logger.info("out prameter double key : {} , value : {}", prop, stmt.getDouble(index));
value = stmt.getDouble(index);
break; break;
case DATE: case DATE:
logger.info("out prameter date key : {} , value : {}", prop, stmt.getDate(index)); logger.info("out prameter date key : {} , value : {}", prop, stmt.getDate(index));
value = stmt.getDate(index);
break; break;
case TIME: case TIME:
logger.info("out prameter time key : {} , value : {}", prop, stmt.getTime(index)); logger.info("out prameter time key : {} , value : {}", prop, stmt.getTime(index));
value = stmt.getTime(index);
break; break;
case TIMESTAMP: case TIMESTAMP:
logger.info("out prameter timestamp key : {} , value : {}", prop, stmt.getTimestamp(index)); logger.info("out prameter timestamp key : {} , value : {}", prop, stmt.getTimestamp(index));
value = stmt.getTimestamp(index);
break; break;
case BOOLEAN: case BOOLEAN:
logger.info("out prameter boolean key : {} , value : {}", prop, stmt.getBoolean(index)); logger.info("out prameter boolean key : {} , value : {}", prop, stmt.getBoolean(index));
value = stmt.getBoolean(index);
break; break;
default: default:
break; break;
} }
return value;
} }
@Override @Override

32
dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java

@ -405,35 +405,6 @@ public class SqlTask extends AbstractTaskExecutor {
} }
/**
* regular expressions match the contents between two specified strings
*
* @param content content
* @param rgex rgex
* @param sqlParamsMap sql params map
* @param paramsPropsMap params props map
*/
public void setSqlParamsMap(String content, String rgex, Map<Integer, Property> sqlParamsMap, Map<String, Property> paramsPropsMap) {
Pattern pattern = Pattern.compile(rgex);
Matcher m = pattern.matcher(content);
int index = 1;
while (m.find()) {
String paramName = m.group(1);
Property prop = paramsPropsMap.get(paramName);
if (prop == null) {
logger.error("setSqlParamsMap: No Property with paramName: {} is found in paramsPropsMap of task instance"
+ " with id: {}. So couldn't put Property in sqlParamsMap.", paramName, taskExecutionContext.getTaskInstanceId());
} else {
sqlParamsMap.put(index, prop);
index++;
logger.info("setSqlParamsMap: Property with paramName: {} put in sqlParamsMap of content {} successfully.", paramName, content);
}
}
}
/** /**
* print replace sql * print replace sql
* *
@ -485,8 +456,7 @@ public class SqlTask extends AbstractTaskExecutor {
//replace variable TIME with $[YYYYmmddd...] in sql when history run job and batch complement job //replace variable TIME with $[YYYYmmddd...] in sql when history run job and batch complement job
sql = ParameterUtils.replaceScheduleTime(sql, taskExecutionContext.getScheduleTime()); sql = ParameterUtils.replaceScheduleTime(sql, taskExecutionContext.getScheduleTime());
// special characters need to be escaped, ${} needs to be escaped // special characters need to be escaped, ${} needs to be escaped
String rgex = "['\"]*\\$\\{(.*?)\\}['\"]*"; setSqlParamsMap(sql, rgex, sqlParamsMap, paramsMap,taskExecutionContext.getTaskInstanceId());
setSqlParamsMap(sql, rgex, sqlParamsMap, paramsMap);
//Replace the original value in sql !{...} ,Does not participate in precompilation //Replace the original value in sql !{...} ,Does not participate in precompilation
String rgexo = "['\"]*\\!\\{(.*?)\\}['\"]*"; String rgexo = "['\"]*\\!\\{(.*?)\\}['\"]*";
sql = replaceOriginalValue(sql, rgexo, paramsMap); sql = replaceOriginalValue(sql, rgexo, paramsMap);

Loading…
Cancel
Save