Browse Source

[Bug][Worker] Fix sql/shell task doesn't replace parameter (#14876)

Fix parameter assign error, fix #14869 which including
* shell task param not work, botch local and global
* others task chain assign not work
3.2.1-prepare
Jay Chung 1 year ago committed by GitHub
parent
commit
3fa69d2db8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      .github/ISSUE_TEMPLATE/bug-report.yml
  2. 2
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/CuringParamsService.java
  3. 14
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/CuringParamsServiceImpl.java
  4. 6
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/expand/CuringParamsServiceTest.java
  5. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java

2
.github/ISSUE_TEMPLATE/bug-report.yml

@ -100,7 +100,7 @@ body:
options:
- dev
- 3.1.x
- 3.2.0-prepare
- 3.2.x
validations:
required: true

2
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/CuringParamsService.java

@ -53,7 +53,7 @@ public interface CuringParamsService {
* @param allParamMap
* @return
*/
String convertParameterPlaceholders(String val, Map<String, String> allParamMap);
String convertParameterPlaceholders(String val, Map<String, Property> allParamMap);
/**
* curing global params

14
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/CuringParamsServiceImpl.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.service.expand;
import static java.util.Objects.nonNull;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETER_PROJECT_CODE;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETER_PROJECT_NAME;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETER_TASK_DEFINITION_CODE;
@ -69,8 +70,13 @@ public class CuringParamsServiceImpl implements CuringParamsService {
private ProjectParameterMapper projectParameterMapper;
@Override
public String convertParameterPlaceholders(String val, Map<String, String> allParamMap) {
return ParameterUtils.convertParameterPlaceholders(val, allParamMap);
public String convertParameterPlaceholders(String val, Map<String, Property> allParamMap) {
Map<String, String> paramMap = allParamMap
.entrySet()
.stream()
.filter(entry -> nonNull(entry.getValue().getValue()))
.collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().getValue()));
return ParameterUtils.convertParameterPlaceholders(val, paramMap);
}
@Override
@ -212,6 +218,10 @@ public class CuringParamsServiceImpl implements CuringParamsService {
// whether external scaling calculation is required
if (timeFunctionNeedExpand(val)) {
val = timeFunctionExtension(taskInstance.getProcessInstanceId(), timeZone, val);
} else {
// handle some chain parameter assign, such as `{"var1": "${var2}", "var2": 1}` should be convert to
// `{"var1": 1, "var2": 1}`
val = convertParameterPlaceholders(val, prepareParamsMap);
}
property.setValue(val);
}

6
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/expand/CuringParamsServiceTest.java

@ -72,17 +72,19 @@ public class CuringParamsServiceTest {
private TimePlaceholderResolverExpandServiceImpl timePlaceholderResolverExpandServiceImpl;
private final Map<String, String> globalParamMap = new HashMap<>();
private final Map<String, Property> paramMap = new HashMap<>();
@BeforeEach
public void init() {
globalParamMap.put("globalParams1", "Params1");
paramMap.put("globalParams1", new Property("globalParams1", Direct.IN, DataType.VARCHAR, "Params1"));
}
@Test
public void testConvertParameterPlaceholders() {
Mockito.when(curingGlobalParamsService.convertParameterPlaceholders(placeHolderName, globalParamMap))
Mockito.when(curingGlobalParamsService.convertParameterPlaceholders(placeHolderName, paramMap))
.thenReturn("2022-06-26");
String result = curingGlobalParamsService.convertParameterPlaceholders(placeHolderName, globalParamMap);
String result = curingGlobalParamsService.convertParameterPlaceholders(placeHolderName, paramMap);
Assertions.assertNotNull(result);
}

2
dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java

@ -81,7 +81,7 @@ public class ShellTask extends AbstractTask {
public void handle(TaskCallBack taskCallBack) throws TaskException {
try {
IShellInterceptorBuilder<?, ?> shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder()
.properties(ParameterUtils.convert(shellParameters.getLocalParametersMap()))
.properties(ParameterUtils.convert(taskExecutionContext.getPrepareParamsMap()))
.appendScript(shellParameters.getRawScript());
TaskResponse commandExecuteResult = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);

Loading…
Cancel
Save