Browse Source

EMR task support replace params placeholder (#15975)

Co-authored-by: Eric Gao <ericgao.apache@gmail.com>
3.2.2-release-bak
Wenjun Ruan 7 months ago committed by GitHub
parent
commit
3446fd8ab1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 9
      dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java
  2. 9
      dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java

9
dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.plugin.task.emr;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
@ -126,11 +127,15 @@ public class EmrAddStepsTask extends AbstractEmrTask {
protected AddJobFlowStepsRequest createAddJobFlowStepsRequest() { protected AddJobFlowStepsRequest createAddJobFlowStepsRequest() {
final AddJobFlowStepsRequest addJobFlowStepsRequest; final AddJobFlowStepsRequest addJobFlowStepsRequest;
String jobStepDefineJson = null;
try { try {
jobStepDefineJson = ParameterUtils.convertParameterPlaceholders(
emrParameters.getStepsDefineJson(),
ParameterUtils.convert(taskExecutionContext.getPrepareParamsMap()));
addJobFlowStepsRequest = addJobFlowStepsRequest =
objectMapper.readValue(emrParameters.getStepsDefineJson(), AddJobFlowStepsRequest.class); objectMapper.readValue(jobStepDefineJson, AddJobFlowStepsRequest.class);
} catch (JsonProcessingException e) { } catch (JsonProcessingException e) {
throw new EmrTaskException("can not parse AddJobFlowStepsRequest from json", e); throw new EmrTaskException("can not parse AddJobFlowStepsRequest from json: " + jobStepDefineJson, e);
} }
// When a single task definition is associated with multiple steps, the state tracking will have high // When a single task definition is associated with multiple steps, the state tracking will have high

9
dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.plugin.task.emr;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
@ -120,10 +121,14 @@ public class EmrJobFlowTask extends AbstractEmrTask {
protected RunJobFlowRequest createRunJobFlowRequest() { protected RunJobFlowRequest createRunJobFlowRequest() {
final RunJobFlowRequest runJobFlowRequest; final RunJobFlowRequest runJobFlowRequest;
String jobFlowDefineJson = null;
try { try {
runJobFlowRequest = objectMapper.readValue(emrParameters.getJobFlowDefineJson(), RunJobFlowRequest.class); jobFlowDefineJson = ParameterUtils.convertParameterPlaceholders(
emrParameters.getJobFlowDefineJson(),
ParameterUtils.convert(taskExecutionContext.getPrepareParamsMap()));
runJobFlowRequest = objectMapper.readValue(jobFlowDefineJson, RunJobFlowRequest.class);
} catch (JsonProcessingException e) { } catch (JsonProcessingException e) {
throw new EmrTaskException("can not parse RunJobFlowRequest from json", e); throw new EmrTaskException("can not parse RunJobFlowRequest from json: " + jobFlowDefineJson, e);
} }
return runJobFlowRequest; return runJobFlowRequest;

Loading…
Cancel
Save