Browse Source

[FIX-#6727][worker-server] Fix procedure params bug (#7680)

* fix Procedure param error

* code style

Co-authored-by: wangxj <wangxj31>
3.0.0/version-upgrade
wangxj3 3 years ago committed by GitHub
parent
commit
cc77963522
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 36
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTaskExecutor.java
  2. 35
      dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureParameters.java
  3. 76
      dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
  4. 32
      dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java

36
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTaskExecutor.java

@ -18,10 +18,14 @@
package org.apache.dolphinscheduler.plugin.task.api;
import org.apache.dolphinscheduler.spi.task.AbstractTask;
import org.apache.dolphinscheduler.spi.task.Property;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
import java.util.Map;
import java.util.StringJoiner;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -34,6 +38,7 @@ public abstract class AbstractTaskExecutor extends AbstractTask {
protected Logger logger;
public String rgex = "['\"]*\\$\\{(.*?)\\}['\"]*";
/**
* constructor
*
@ -61,4 +66,35 @@ public abstract class AbstractTaskExecutor extends AbstractTask {
logger.info(" -> {}", joiner);
}
}
/**
* regular expressions match the contents between two specified strings
*
* @param content content
* @param rgex rgex
* @param sqlParamsMap sql params map
* @param paramsPropsMap params props map
*/
public void setSqlParamsMap(String content, String rgex, Map<Integer, Property> sqlParamsMap,
Map<String, Property> paramsPropsMap,int taskInstanceId) {
Pattern pattern = Pattern.compile(rgex);
Matcher m = pattern.matcher(content);
int index = 1;
while (m.find()) {
String paramName = m.group(1);
Property prop = paramsPropsMap.get(paramName);
if (prop == null) {
logger.error("setSqlParamsMap: No Property with paramName: {} is found in paramsPropsMap of task instance"
+ " with id: {}. So couldn't put Property in sqlParamsMap.", paramName, taskInstanceId);
} else {
sqlParamsMap.put(index, prop);
index++;
logger.info("setSqlParamsMap: Property with paramName: {} put in sqlParamsMap of content {} successfully.", paramName, content);
}
}
}
}

35
dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureParameters.java

@ -17,12 +17,16 @@
package org.apache.dolphinscheduler.plugin.task.procedure;
import org.apache.commons.collections.CollectionUtils;
import org.apache.dolphinscheduler.spi.task.AbstractParameters;
import org.apache.dolphinscheduler.spi.task.Property;
import org.apache.dolphinscheduler.spi.task.ResourceInfo;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* procedure parameter
@ -39,6 +43,8 @@ public class ProcedureParameters extends AbstractParameters {
*/
private int datasource;
private Map<String, Property> outProperty;
/**
* procedure name
*/
@ -86,4 +92,33 @@ public class ProcedureParameters extends AbstractParameters {
+ ", method='" + method + '\''
+ '}';
}
public void dealOutParam4Procedure(Object result, String pop) {
Map<String, Property> properties = getOutProperty();
if (this.outProperty == null) {
return;
}
properties.get(pop).setValue(String.valueOf(result));
varPool.add(properties.get(pop));
}
public Map<String, Property> getOutProperty() {
if (this.outProperty != null) {
return this.outProperty;
}
if (CollectionUtils.isEmpty(localParams)) {
return null;
}
List<Property> outPropertyList = getOutProperty(localParams);
Map<String, Property> outProperty = new HashMap<>();
for (Property info : outPropertyList) {
outProperty.put(info.getProp(), info);
}
this.outProperty = outProperty;
return this.outProperty;
}
public void setOutProperty(Map<String, Property> outProperty) {
this.outProperty = outProperty;
}
}

76
dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java

@ -17,10 +17,6 @@
package org.apache.dolphinscheduler.plugin.task.procedure;
import static org.apache.dolphinscheduler.spi.task.TaskConstants.EXIT_CODE_FAILURE;
import static org.apache.dolphinscheduler.spi.task.TaskConstants.EXIT_CODE_SUCCESS;
import static org.apache.dolphinscheduler.spi.task.TaskConstants.TASK_LOG_INFO_FORMAT;
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
@ -37,17 +33,18 @@ import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.apache.commons.collections4.CollectionUtils;
import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Types;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import static org.apache.dolphinscheduler.spi.task.TaskConstants.EXIT_CODE_FAILURE;
import static org.apache.dolphinscheduler.spi.task.TaskConstants.EXIT_CODE_SUCCESS;
import static org.apache.dolphinscheduler.spi.task.TaskConstants.TASK_LOG_INFO_FORMAT;
/**
* procedure task
*/
@ -106,18 +103,17 @@ public class ProcedureTask extends AbstractTaskExecutor {
// get jdbc connection
connection = DataSourceClientProvider.getInstance().getConnection(dbType, connectionParam);
// combining local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
Map<Integer, Property> sqlParamsMap = new HashMap<>();
Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext, getParameters());
String proceduerSql = formatSql(sqlParamsMap, paramsMap);
// call method
stmt = connection.prepareCall(procedureParameters.getMethod());
stmt = connection.prepareCall(proceduerSql);
// set timeout
setTimeout(stmt);
// outParameterMap
Map<Integer, Property> outParameterMap = getOutParameterMap(stmt, paramsMap);
Map<Integer, Property> outParameterMap = getOutParameterMap(stmt, sqlParamsMap, paramsMap);
stmt.executeUpdate();
@ -134,6 +130,12 @@ public class ProcedureTask extends AbstractTaskExecutor {
}
}
private String formatSql(Map<Integer, Property> sqlParamsMap, Map<String, Property> paramsMap) {
// combining local and global parameters
setSqlParamsMap(procedureParameters.getMethod(), rgex, sqlParamsMap, paramsMap, taskExecutionContext.getTaskInstanceId());
return procedureParameters.getMethod().replaceAll(rgex, "?");
}
/**
* print outParameter
*
@ -149,7 +151,7 @@ public class ProcedureTask extends AbstractTaskExecutor {
String prop = property.getProp();
DataType dataType = property.getType();
// get output parameter
getOutputParameter(stmt, index, prop, dataType);
procedureParameters.dealOutParam4Procedure(getOutputParameter(stmt, index, prop, dataType), prop);
}
}
@ -161,34 +163,25 @@ public class ProcedureTask extends AbstractTaskExecutor {
* @return outParameterMap
* @throws Exception Exception
*/
private Map<Integer, Property> getOutParameterMap(CallableStatement stmt, Map<String, Property> paramsMap) throws Exception {
private Map<Integer, Property> getOutParameterMap(CallableStatement stmt, Map<Integer, Property> paramsMap
, Map<String, Property> totalParamsMap) throws Exception {
Map<Integer, Property> outParameterMap = new HashMap<>();
if (procedureParameters.getLocalParametersMap() == null) {
return outParameterMap;
}
Collection<Property> userDefParamsList = procedureParameters.getLocalParametersMap().values();
if (CollectionUtils.isEmpty(userDefParamsList)) {
return outParameterMap;
}
int index = 1;
for (Property property : userDefParamsList) {
logger.info("localParams : prop : {} , dirct : {} , type : {} , value : {}"
, property.getProp(),
property.getDirect(),
property.getType(),
property.getValue());
// set parameters
if (property.getDirect().equals(Direct.IN)) {
ParameterUtils.setInParameter(index, stmt, property.getType(), paramsMap.get(property.getProp()).getValue());
} else if (property.getDirect().equals(Direct.OUT)) {
setOutParameter(index, stmt, property.getType(), paramsMap.get(property.getProp()).getValue());
property.setValue(paramsMap.get(property.getProp()).getValue());
outParameterMap.put(index, property);
if (paramsMap != null) {
for (Map.Entry<Integer, Property> entry : paramsMap.entrySet()) {
Property property = entry.getValue();
if (property.getDirect().equals(Direct.IN)) {
ParameterUtils.setInParameter(index, stmt, property.getType(), totalParamsMap.get(property.getProp()).getValue());
} else if (property.getDirect().equals(Direct.OUT)) {
setOutParameter(index, stmt, property.getType(), totalParamsMap.get(property.getProp()).getValue());
outParameterMap.put(index, property);
}
index++;
}
index++;
}
return outParameterMap;
@ -239,38 +232,49 @@ public class ProcedureTask extends AbstractTaskExecutor {
* @param dataType dataType
* @throws SQLException SQLException
*/
private void getOutputParameter(CallableStatement stmt, int index, String prop, DataType dataType) throws SQLException {
private Object getOutputParameter(CallableStatement stmt, int index, String prop, DataType dataType) throws SQLException {
Object value = null;
switch (dataType) {
case VARCHAR:
logger.info("out prameter varchar key : {} , value : {}", prop, stmt.getString(index));
value = stmt.getString(index);
break;
case INTEGER:
logger.info("out prameter integer key : {} , value : {}", prop, stmt.getInt(index));
value = stmt.getInt(index);
break;
case LONG:
logger.info("out prameter long key : {} , value : {}", prop, stmt.getLong(index));
value = stmt.getLong(index);
break;
case FLOAT:
logger.info("out prameter float key : {} , value : {}", prop, stmt.getFloat(index));
value = stmt.getFloat(index);
break;
case DOUBLE:
logger.info("out prameter double key : {} , value : {}", prop, stmt.getDouble(index));
value = stmt.getDouble(index);
break;
case DATE:
logger.info("out prameter date key : {} , value : {}", prop, stmt.getDate(index));
value = stmt.getDate(index);
break;
case TIME:
logger.info("out prameter time key : {} , value : {}", prop, stmt.getTime(index));
value = stmt.getTime(index);
break;
case TIMESTAMP:
logger.info("out prameter timestamp key : {} , value : {}", prop, stmt.getTimestamp(index));
value = stmt.getTimestamp(index);
break;
case BOOLEAN:
logger.info("out prameter boolean key : {} , value : {}", prop, stmt.getBoolean(index));
value = stmt.getBoolean(index);
break;
default:
break;
}
return value;
}
@Override

32
dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java

@ -408,35 +408,6 @@ public class SqlTask extends AbstractTaskExecutor {
}
/**
* regular expressions match the contents between two specified strings
*
* @param content content
* @param rgex rgex
* @param sqlParamsMap sql params map
* @param paramsPropsMap params props map
*/
public void setSqlParamsMap(String content, String rgex, Map<Integer, Property> sqlParamsMap, Map<String, Property> paramsPropsMap) {
Pattern pattern = Pattern.compile(rgex);
Matcher m = pattern.matcher(content);
int index = 1;
while (m.find()) {
String paramName = m.group(1);
Property prop = paramsPropsMap.get(paramName);
if (prop == null) {
logger.error("setSqlParamsMap: No Property with paramName: {} is found in paramsPropsMap of task instance"
+ " with id: {}. So couldn't put Property in sqlParamsMap.", paramName, taskExecutionContext.getTaskInstanceId());
} else {
sqlParamsMap.put(index, prop);
index++;
logger.info("setSqlParamsMap: Property with paramName: {} put in sqlParamsMap of content {} successfully.", paramName, content);
}
}
}
/**
* print replace sql
*
@ -488,8 +459,7 @@ public class SqlTask extends AbstractTaskExecutor {
//replace variable TIME with $[YYYYmmddd...] in sql when history run job and batch complement job
sql = ParameterUtils.replaceScheduleTime(sql, taskExecutionContext.getScheduleTime());
// special characters need to be escaped, ${} needs to be escaped
String rgex = "['\"]*\\$\\{(.*?)\\}['\"]*";
setSqlParamsMap(sql, rgex, sqlParamsMap, paramsMap);
setSqlParamsMap(sql, rgex, sqlParamsMap, paramsMap,taskExecutionContext.getTaskInstanceId());
//Replace the original value in sql !{...} ,Does not participate in precompilation
String rgexo = "['\"]*\\!\\{(.*?)\\}['\"]*";
sql = replaceOriginalValue(sql, rgexo, paramsMap);

Loading…
Cancel
Save