Browse Source

[Fix-10392][SqlTask]: Fix list param error when use sql task (#11285)

* FIX: fix list param in.

* FIX: code bug

* FIX: fix one value

* FIX: header

* FIX: ci

* FIX: fix ci

* MOD: for review

* MOD: for review
3.1.0-release
Stalary 2 years ago committed by GitHub
parent
commit
6345a13cdc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 64
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parser/ParameterUtils.java
  2. 52
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/parser/ParameterUtilsTest.java
  3. 6
      dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java

64
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parser/ParameterUtils.java

@ -21,15 +21,17 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETE
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETER_FORMAT_TIME; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETER_FORMAT_TIME;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETER_SHECDULE_TIME; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETER_SHECDULE_TIME;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.enums.DataType; import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.spi.utils.DateUtils; import org.apache.dolphinscheduler.spi.utils.DateUtils;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils; import org.apache.dolphinscheduler.spi.utils.StringUtils;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -48,6 +50,8 @@ public class ParameterUtils {
private static final String DATE_START_PATTERN = "^[0-9]"; private static final String DATE_START_PATTERN = "^[0-9]";
private static final char PARAM_REPLACE_CHAR = '?';
private ParameterUtils() { private ParameterUtils() {
throw new UnsupportedOperationException("Construct ParameterUtils"); throw new UnsupportedOperationException("Construct ParameterUtils");
} }
@ -148,6 +152,64 @@ public class ParameterUtils {
} }
} }
public static String expandListParameter(Map<Integer, Property> params, String sql) {
Map<Integer, Property> expandMap = new HashMap<>();
if (params == null || params.isEmpty()) {
return sql;
}
String[] split = sql.split("\\?");
if (split.length == 0) {
return sql;
}
StringBuilder ret = new StringBuilder(split[0]);
int index = 1;
for (int i = 1; i < split.length; i++) {
Property property = params.get(i);
String value = property.getValue();
if (DataType.LIST.equals(property.getType())) {
List<Object> valueList = JSONUtils.toList(value, Object.class);
if (valueList.isEmpty() && StringUtils.isNotBlank(value)) {
valueList.add(value);
}
for (int j = 0; j < valueList.size(); j++) {
ret.append(PARAM_REPLACE_CHAR);
if (j != valueList.size() - 1) {
ret.append(",");
}
}
for (Object v : valueList ) {
Property newProperty = new Property();
if (v instanceof Integer) {
newProperty.setType(DataType.INTEGER);
} else if (v instanceof Long) {
newProperty.setType(DataType.LONG);
} else if (v instanceof Float) {
newProperty.setType(DataType.FLOAT);
} else if (v instanceof Double) {
newProperty.setType(DataType.DOUBLE);
} else {
newProperty.setType(DataType.VARCHAR);
}
newProperty.setValue(v.toString());
newProperty.setProp(property.getProp());
newProperty.setDirect(property.getDirect());
expandMap.put(index++, newProperty);
}
} else {
ret.append(PARAM_REPLACE_CHAR);
expandMap.put(index++, property);
}
ret.append(split[i]);
}
if (PARAM_REPLACE_CHAR == sql.charAt(sql.length() - 1)) {
ret.append(PARAM_REPLACE_CHAR);
expandMap.put(index, params.get(split.length));
}
params.clear();
params.putAll(expandMap);
return ret.toString();
}
/** /**
* $[yyyyMMdd] replace schedule time * $[yyyyMMdd] replace schedule time
*/ */

52
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/parser/ParameterUtilsTest.java

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api.parser;
import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import java.util.HashMap;
import java.util.Map;
import org.junit.Assert;
import org.junit.Test;
import com.google.common.collect.Lists;
public class ParameterUtilsTest {
@Test
public void expandListParameter() {
Map<Integer, Property> params = new HashMap<>();
params.put(1, new Property(null, null, DataType.LIST, JSONUtils.toJsonString(Lists.newArrayList("c1", "c2", "c3"))));
params.put(2, new Property(null, null, DataType.DATE, "2020-06-30"));
params.put(3, new Property(null, null, DataType.LIST, JSONUtils.toJsonString(Lists.newArrayList(3.1415, 2.44, 3.44))));
String sql = ParameterUtils.expandListParameter(params, "select * from test where col1 in (?) and date=? and col2 in (?)");
Assert.assertEquals("select * from test where col1 in (?,?,?) and date=? and col2 in (?,?,?)", sql);
Assert.assertEquals(7, params.size());
Map<Integer, Property> params2 = new HashMap<>();
params2.put(1, new Property(null, null, DataType.LIST, JSONUtils.toJsonString(Lists.newArrayList("c1"))));
params2.put(2, new Property(null, null, DataType.DATE, "2020-06-30"));
String sql2 = ParameterUtils.expandListParameter(params2, "select * from test where col1 in (?) and date=?");
Assert.assertEquals("select * from test where col1 in (?) and date=?", sql2);
Assert.assertEquals(2, params2.size());
}
}

6
dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java

@ -370,7 +370,6 @@ public class SqlTask extends AbstractTaskExecutor {
} catch (Exception exception) { } catch (Exception exception) {
throw new TaskException("SQL task prepareStatementAndBind error", exception); throw new TaskException("SQL task prepareStatementAndBind error", exception);
} }
} }
/** /**
@ -430,9 +429,10 @@ public class SqlTask extends AbstractTaskExecutor {
sql = replaceOriginalValue(sql, rgexo, paramsMap); sql = replaceOriginalValue(sql, rgexo, paramsMap);
// replace the ${} of the SQL statement with the Placeholder // replace the ${} of the SQL statement with the Placeholder
String formatSql = sql.replaceAll(rgex, "?"); String formatSql = sql.replaceAll(rgex, "?");
// Convert the list parameter
formatSql = ParameterUtils.expandListParameter(sqlParamsMap, formatSql);
sqlBuilder.append(formatSql); sqlBuilder.append(formatSql);
// print replace sql
// print repalce sql
printReplacedSql(sql, formatSql, rgex, sqlParamsMap); printReplacedSql(sql, formatSql, rgex, sqlParamsMap);
return new SqlBinds(sqlBuilder.toString(), sqlParamsMap); return new SqlBinds(sqlBuilder.toString(), sqlParamsMap);
} }

Loading…
Cancel
Save