From 3446fd8ab157974e31226635e277b6c56b6b7cb5 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Fri, 10 May 2024 17:34:26 +0800 Subject: [PATCH] EMR task support replace params placeholder (#15975) Co-authored-by: Eric Gao --- .../plugin/task/emr/EmrAddStepsTask.java | 9 +++++++-- .../dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java | 9 +++++++-- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java index 753b206e21..13dc35c30a 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.plugin.task.emr; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils; import java.util.Collections; import java.util.HashSet; @@ -126,11 +127,15 @@ public class EmrAddStepsTask extends AbstractEmrTask { protected AddJobFlowStepsRequest createAddJobFlowStepsRequest() { final AddJobFlowStepsRequest addJobFlowStepsRequest; + String jobStepDefineJson = null; try { + jobStepDefineJson = ParameterUtils.convertParameterPlaceholders( + emrParameters.getStepsDefineJson(), + ParameterUtils.convert(taskExecutionContext.getPrepareParamsMap())); addJobFlowStepsRequest = - objectMapper.readValue(emrParameters.getStepsDefineJson(), AddJobFlowStepsRequest.class); + objectMapper.readValue(jobStepDefineJson, AddJobFlowStepsRequest.class); } catch (JsonProcessingException e) { - throw new EmrTaskException("can not parse AddJobFlowStepsRequest from json", e); + throw new EmrTaskException("can not parse AddJobFlowStepsRequest from json: " + jobStepDefineJson, e); } // When a single task definition is associated with multiple steps, the state tracking will have high diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java index f4b0534065..8b772a1118 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.plugin.task.emr; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils; import java.util.Collections; import java.util.HashSet; @@ -120,10 +121,14 @@ public class EmrJobFlowTask extends AbstractEmrTask { protected RunJobFlowRequest createRunJobFlowRequest() { final RunJobFlowRequest runJobFlowRequest; + String jobFlowDefineJson = null; try { - runJobFlowRequest = objectMapper.readValue(emrParameters.getJobFlowDefineJson(), RunJobFlowRequest.class); + jobFlowDefineJson = ParameterUtils.convertParameterPlaceholders( + emrParameters.getJobFlowDefineJson(), + ParameterUtils.convert(taskExecutionContext.getPrepareParamsMap())); + runJobFlowRequest = objectMapper.readValue(jobFlowDefineJson, RunJobFlowRequest.class); } catch (JsonProcessingException e) { - throw new EmrTaskException("can not parse RunJobFlowRequest from json", e); + throw new EmrTaskException("can not parse RunJobFlowRequest from json: " + jobFlowDefineJson, e); } return runJobFlowRequest;