Browse Source

[Feature-10219][EMR] EMR supports use <add-Steps> to add steps to an existing cluster (#10657)

* Add the ProgramType parameter to distinguish task types
* EmrAddStepsTask supports Add-Steps
* UI supports Add-Steps
* EmrTask modify the name of the class to EmrJobFlowTask
* add ERM Task abstract base class AbstractEmrTask
* add testcase for EmrAddStepsTask
* Modifying help Documents
3.1.0-release
ZhaoGuodong 2 years ago committed by GitHub
parent
commit
8eaf5a2309
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 48
      docs/docs/en/guide/task/emr.md
  2. 47
      docs/docs/zh/guide/task/emr.md
  3. BIN
      docs/img/tasks/demo/emr_add_job_flow_steps.png
  4. BIN
      docs/img/tasks/demo/emr_jobFlowId.png
  5. BIN
      docs/img/tasks/demo/emr_run_job_flow.png
  6. 113
      dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/AbstractEmrTask.java
  7. 177
      dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java
  8. 86
      dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java
  9. 39
      dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrParameters.java
  10. 10
      dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrTaskChannel.java
  11. 34
      dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/ProgramType.java
  12. 198
      dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTaskTest.java
  13. 47
      dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTaskTest.java
  14. 17
      dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/resources/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsDefine.json
  15. 29
      dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/resources/org/apache/dolphinscheduler/plugin/task/emr/EmrErrorAddStepsDefine.json
  16. 2
      dolphinscheduler-ui/src/locales/en_US/project.ts
  17. 2
      dolphinscheduler-ui/src/locales/zh_CN/project.ts
  18. 41
      dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-emr.ts
  19. 2
      dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
  20. 1
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-emr.ts
  21. 1
      dolphinscheduler-ui/src/views/projects/task/components/node/types.ts

48
docs/docs/en/guide/task/emr.md

@ -2,7 +2,11 @@
## Overview ## Overview
Amazon EMR task type, for creating EMR clusters on AWS and running computing tasks. Using [aws-java-sdk](https://aws.amazon.com/cn/sdk-for-java/) in the background code, to transfer JSON parameters to [RunJobFlowRequest](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/elasticmapreduce/model/RunJobFlowRequest.html) object and submit to AWS. Amazon EMR task type, for operation EMR clusters on AWS and running computing tasks.
Using [aws-java-sdk](https://aws.amazon.com/cn/sdk-for-java/) in the background code, to transfer JSON parameters to task object and submit to AWS, Two program types are currently supported:
* `RUN_JOB_FLOW` Using [API_RunJobFlow](https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html#API_RunJobFlow_Examples) submit [RunJobFlowRequest](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/elasticmapreduce/model/RunJobFlowRequest.html) object
* `ADD_JOB_FLOW_STEPS` Using [API_AddJobFlowSteps](https://docs.aws.amazon.com/emr/latest/APIReference/API_AddJobFlowSteps.html#API_AddJobFlowSteps_Examples) submit [AddJobFlowStepsRequest](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/elasticmapreduce/model/AddJobFlowStepsRequest.html) object
## Create Task ## Create Task
@ -19,12 +23,18 @@ Amazon EMR task type, for creating EMR clusters on AWS and running computing tas
| Task priority | When the number of worker threads is insufficient, execute in the order of priority from high to low, and tasks with the same priority will execute in a first-in first-out order. | | Task priority | When the number of worker threads is insufficient, execute in the order of priority from high to low, and tasks with the same priority will execute in a first-in first-out order. |
| Worker grouping | Assign tasks to the machines of the worker group to execute. If `Default` is selected, randomly select a worker machine for execution. | | Worker grouping | Assign tasks to the machines of the worker group to execute. If `Default` is selected, randomly select a worker machine for execution. |
| Times of failed retry attempts | The number of times the task failed to resubmit. You can select from drop-down or fill-in a number. | | Times of failed retry attempts | The number of times the task failed to resubmit. You can select from drop-down or fill-in a number. |
| Failed retry interval: The time interval for resubmitting the task after a failed task. You can select from drop-down or fill-in a number. | | Failed retry interval | The time interval for resubmitting the task after a failed task. You can select from drop-down or fill-in a number. |
| Timeout alarm | Check the timeout alarm and timeout failure. When the task runs exceed the "timeout", an alarm email will send and the task execution will fail. | | Timeout alarm | Check the timeout alarm and timeout failure. When the task runs exceed the "timeout", an alarm email will send and the task execution will fail. |
| JSON | JSON corresponding to the [RunJobFlowRequest](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/elasticmapreduce/model/RunJobFlowRequest.html) object, for details refer to [API_RunJobFlow_Examples](https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html#API_RunJobFlow_Examples). | | Program Type | Select the program type. If it is `RUN_JOB_FLOW`, you need to fill in `jobFlowDefineJson`, if it is `ADD_JOB_FLOW_STEPS`, you need to fill in `stepsDefineJson`. |
| jobFlowDefineJson | JSON corresponding to the [RunJobFlowRequest](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/elasticmapreduce/model/RunJobFlowRequest.html) object, for details refer to [API_RunJobFlow_Examples](https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html#API_RunJobFlow_Examples). |
| stepsDefineJson | JSON corresponding to the [AddJobFlowStepsRequest](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/elasticmapreduce/model/AddJobFlowStepsRequest.html) object, for details refer to [API_AddJobFlowSteps_Examples](https://docs.aws.amazon.com/emr/latest/APIReference/API_AddJobFlowSteps.html#API_AddJobFlowSteps_Examples). |
## JSON example ## Task Example
### Create an EMR cluster and run Steps
This example shows how to create an `EMR` task node of type `RUN_JOB_FLOW`. Taking the execution of `SparkPi` as an example, the task will create an `EMR` cluster and execute the `SparkPi` sample program.
![RUN_JOB_FLOW](../../../../img/tasks/demo/emr_run_job_flow.png)
jobFlowDefineJson example
```json ```json
{ {
"Name": "SparkPi", "Name": "SparkPi",
@ -65,3 +75,33 @@ Amazon EMR task type, for creating EMR clusters on AWS and running computing tas
} }
``` ```
### Add a Step to a Running EMR Cluster
This example shows how to create an `EMR` task node of type `ADD_JOB_FLOW_STEPS`. Taking the execution of `SparkPi` as an example, the task will add a `SparkPi` sample program to the running `EMR` cluster.
![ADD_JOB_FLOW_STEPS](../../../../img/tasks/demo/emr_add_job_flow_steps.png)
![JobFlowId](../../../../img/tasks/demo/emr_jobFlowId.png)
stepsDefineJson example
```json
{
"JobFlowId": "j-3V628TKAERHP8",
"Steps": [
{
"Name": "calculate_pi",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"/usr/lib/spark/bin/run-example",
"SparkPi",
"15"
]
}
}
]
}
```
## Notice
- Failover on EMR Task type has not been implemented. In this time, DolphinScheduler only supports failover on yarn task type . Other task type, such as EMR task, k8s task not ready yet.
- `stepsDefineJson` A task definition only supports the association of a single step, which can better ensure the reliability of the task state.

47
docs/docs/zh/guide/task/emr.md

@ -2,7 +2,11 @@
## 综述 ## 综述
Amazon EMR任务类型,用于在AWS上创建EMR集群并执行计算任务。 后台使用[aws-java-sdk](https://aws.amazon.com/cn/sdk-for-java/) 将json参数转换为[RunJobFlowRequest](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/elasticmapreduce/model/RunJobFlowRequest.html) 对象,提交到AWS Amazon EMR 任务类型,用于在AWS上操作EMR集群并执行计算任务。
后台使用 [aws-java-sdk](https://aws.amazon.com/cn/sdk-for-java/) 将JSON参数转换为任务对象,提交到AWS,目前支持两种程序类型:
* `RUN_JOB_FLOW` 使用 [API_RunJobFlow](https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html#API_RunJobFlow_Examples) 提交 [RunJobFlowRequest](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/elasticmapreduce/model/RunJobFlowRequest.html) 对象
* `ADD_JOB_FLOW_STEPS` 使用 [API_AddJobFlowSteps](https://docs.aws.amazon.com/emr/latest/APIReference/API_AddJobFlowSteps.html#API_AddJobFlowSteps_Examples) 提交 [AddJobFlowStepsRequest](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/elasticmapreduce/model/AddJobFlowStepsRequest.html) 对象
## 任务参数 ## 任务参数
- 节点名称:一个工作流定义中的节点名称是唯一的。 - 节点名称:一个工作流定义中的节点名称是唯一的。
@ -13,9 +17,16 @@ Amazon EMR任务类型,用于在AWS上创建EMR集群并执行计算任务。
- 失败重试次数:任务失败重新提交的次数,支持下拉和手填。 - 失败重试次数:任务失败重新提交的次数,支持下拉和手填。
- 失败重试间隔:任务失败重新提交任务的时间间隔,支持下拉和手填。 - 失败重试间隔:任务失败重新提交任务的时间间隔,支持下拉和手填。
- 超时告警:勾选超时告警、超时失败,当任务超过"超时时长"后,会发送告警邮件并且任务执行失败. - 超时告警:勾选超时告警、超时失败,当任务超过"超时时长"后,会发送告警邮件并且任务执行失败.
- json: [RunJobFlowRequest](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/elasticmapreduce/model/RunJobFlowRequest.html) 对象对应的json,详细json定义参见 [API_RunJobFlow_Examples](https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html#API_RunJobFlow_Examples) - 程序类型:选择程序类型,如果是`RUN_JOB_FLOW`,则需要填写`jobFlowDefineJson`,如果是`ADD_JOB_FLOW_STEPS`,则需要填写`stepsDefineJson`。
- jobFlowDefineJson: [RunJobFlowRequest](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/elasticmapreduce/model/RunJobFlowRequest.html) 对象对应的JSON,详细JSON定义参见 [API_RunJobFlow_Examples](https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html#API_RunJobFlow_Examples)
- stepsDefineJson:[AddJobFlowStepsRequest](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/elasticmapreduce/model/AddJobFlowStepsRequest.html) 对象对应的JSON,详细JSON定义参见 [API_AddJobFlowSteps_Examples](https://docs.aws.amazon.com/emr/latest/APIReference/API_AddJobFlowSteps.html#API_AddJobFlowSteps_Examples)
## 任务样例
### 创建EMR集群并运行Steps
该样例展示了如何创建`RUN_JOB_FLOW`类型`EMR`任务节点,以执行`SparkPi`为例,该任务会创建一个`EMR`集群,并且执行`SparkPi`示例程序。
![RUN_JOB_FLOW](../../../../img/tasks/demo/emr_run_job_flow.png)
## json参数样例 jobFlowDefineJson 参数样例
```json ```json
{ {
"Name": "SparkPi", "Name": "SparkPi",
@ -56,3 +67,33 @@ Amazon EMR任务类型,用于在AWS上创建EMR集群并执行计算任务。
} }
``` ```
### 向运行中的EMR集群添加Step
该样例展示了如何创建`ADD_JOB_FLOW_STEPS`类型`EMR`任务节点,以执行`SparkPi`为例,该任务会向运行中的`EMR`集群添加一个`SparkPi`示例程序。
![ADD_JOB_FLOW_STEPS](../../../../img/tasks/demo/emr_add_job_flow_steps.png)
![JobFlowId](../../../../img/tasks/demo/emr_jobFlowId.png)
stepsDefineJson 参数样例
```json
{
"JobFlowId": "j-3V628TKAERHP8",
"Steps": [
{
"Name": "calculate_pi",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"/usr/lib/spark/bin/run-example",
"SparkPi",
"15"
]
}
}
]
}
```
## 注意事项:
- EMR 任务类型的故障转移尚未实现。目前,DolphinScheduler 仅支持对 yarn task type 进行故障转移。其他任务类型,如 EMR 任务、k8s 任务尚未准备好。
- `stepsDefineJson` 一个任务定义仅支持关联单个step,这样可以更好的保证任务状态的可靠性。

BIN
docs/img/tasks/demo/emr_add_job_flow_steps.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 129 KiB

BIN
docs/img/tasks/demo/emr_jobFlowId.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 130 KiB

BIN
docs/img/tasks/demo/emr_run_job_flow.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 136 KiB

113
dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/AbstractEmrTask.java

@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.emr;
import static com.fasterxml.jackson.databind.DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT;
import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
import static com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL;
import static com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_GETTERS;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
import java.util.TimeZone;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
/**
* ERM Task abstract base class
*
* @since v3.1.0
*/
public abstract class AbstractEmrTask extends AbstractTaskExecutor {
final TaskExecutionContext taskExecutionContext;
EmrParameters emrParameters;
AmazonElasticMapReduce emrClient;
String clusterId;
/**
* config ObjectMapper features and propertyNamingStrategy
* use UpperCamelCaseStrategy support capital letters parse
*
* @see PropertyNamingStrategy.UpperCamelCaseStrategy
*/
static final ObjectMapper objectMapper = new ObjectMapper()
.configure(FAIL_ON_UNKNOWN_PROPERTIES, false)
.configure(ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true)
.configure(READ_UNKNOWN_ENUM_VALUES_AS_NULL, true)
.configure(REQUIRE_SETTERS_FOR_GETTERS, true)
.setTimeZone(TimeZone.getDefault())
.setPropertyNamingStrategy(new PropertyNamingStrategy.UpperCamelCaseStrategy());
/**
* constructor
*
* @param taskExecutionContext taskExecutionContext
*/
protected AbstractEmrTask(TaskExecutionContext taskExecutionContext) {
super(taskExecutionContext);
this.taskExecutionContext = taskExecutionContext;
}
@Override
public void init() {
final String taskParams = taskExecutionContext.getTaskParams();
logger.info("emr task params:{}", taskParams);
emrParameters = JSONUtils.parseObject(taskParams, EmrParameters.class);
if (emrParameters == null || !emrParameters.checkParameters()) {
throw new EmrTaskException("emr task params is not valid");
}
emrClient = createEmrClient();
}
@Override
public AbstractParameters getParameters() {
return emrParameters;
}
/**
* create emr client from BasicAWSCredentials
*
* @return AmazonElasticMapReduce
*/
private AmazonElasticMapReduce createEmrClient() {
final String awsAccessKeyId = PropertyUtils.getString(TaskConstants.AWS_ACCESS_KEY_ID);
final String awsSecretAccessKey = PropertyUtils.getString(TaskConstants.AWS_SECRET_ACCESS_KEY);
final String awsRegion = PropertyUtils.getString(TaskConstants.AWS_REGION);
final BasicAWSCredentials basicAWSCredentials = new BasicAWSCredentials(awsAccessKeyId, awsSecretAccessKey);
final AWSCredentialsProvider awsCredentialsProvider = new AWSStaticCredentialsProvider(basicAWSCredentials);
// create an EMR client
return AmazonElasticMapReduceClientBuilder.standard()
.withCredentials(awsCredentialsProvider)
.withRegion(awsRegion)
.build();
}
}

177
dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java

@ -0,0 +1,177 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.emr;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import com.amazonaws.SdkBaseException;
import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsRequest;
import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsResult;
import com.amazonaws.services.elasticmapreduce.model.CancelStepsInfo;
import com.amazonaws.services.elasticmapreduce.model.CancelStepsRequest;
import com.amazonaws.services.elasticmapreduce.model.CancelStepsRequestStatus;
import com.amazonaws.services.elasticmapreduce.model.CancelStepsResult;
import com.amazonaws.services.elasticmapreduce.model.DescribeStepRequest;
import com.amazonaws.services.elasticmapreduce.model.DescribeStepResult;
import com.amazonaws.services.elasticmapreduce.model.StepState;
import com.amazonaws.services.elasticmapreduce.model.StepStatus;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.Sets;
/**
* AddJobFlowSteps task executor
*
* @since v3.1.0
*/
public class EmrAddStepsTask extends AbstractEmrTask {
private String stepId;
private final HashSet<String> waitingStateSet = Sets.newHashSet(
StepState.PENDING.toString(),
StepState.CANCEL_PENDING.toString(),
StepState.RUNNING.toString()
);
/**
* constructor
*
* @param taskExecutionContext taskExecutionContext
*/
protected EmrAddStepsTask(TaskExecutionContext taskExecutionContext) {
super(taskExecutionContext);
}
@Override
public void handle() throws InterruptedException {
StepStatus stepStatus = null;
try {
AddJobFlowStepsRequest addJobFlowStepsRequest = createAddJobFlowStepsRequest();
// submit addJobFlowStepsRequest to aws
AddJobFlowStepsResult result = emrClient.addJobFlowSteps(addJobFlowStepsRequest);
clusterId = addJobFlowStepsRequest.getJobFlowId();
stepId = result.getStepIds().get(0);
// use clusterId-stepId as appIds
setAppIds(clusterId + TaskConstants.SUBTRACT_STRING + stepId);
stepStatus = getStepStatus();
while (waitingStateSet.contains(stepStatus.getState())) {
TimeUnit.SECONDS.sleep(10);
stepStatus = getStepStatus();
}
} catch (EmrTaskException | SdkBaseException e) {
logger.error("emr task submit failed with error", e);
} finally {
final int exitStatusCode = calculateExitStatusCode(stepStatus);
setExitStatusCode(exitStatusCode);
logger.info("emr task finished with step status : {}", stepStatus);
}
}
/**
* parse json string to AddJobFlowStepsRequest
*
* @return AddJobFlowStepsRequest
*/
private AddJobFlowStepsRequest createAddJobFlowStepsRequest() {
final AddJobFlowStepsRequest addJobFlowStepsRequest;
try {
addJobFlowStepsRequest = objectMapper.readValue(emrParameters.getStepsDefineJson(), AddJobFlowStepsRequest.class);
} catch (JsonProcessingException e) {
throw new EmrTaskException("can not parse AddJobFlowStepsRequest from json", e);
}
// When a single task definition is associated with multiple steps, the state tracking will have high complexity.
// Therefore, A task definition only supports the association of a single step, which can better ensure the reliability of the task state.
if (addJobFlowStepsRequest.getSteps().size() > 1) {
throw new EmrTaskException("ds emr addJobFlowStepsTask only support one step");
}
return addJobFlowStepsRequest;
}
/**
* calculate task exitStatusCode
*
* @param stepStatus aws emr execution status details of the cluster step.
* @return exitStatusCode
*/
private int calculateExitStatusCode(StepStatus stepStatus) {
if (stepStatus == null) {
return TaskConstants.EXIT_CODE_FAILURE;
} else {
String state = stepStatus.getState();
StepState stepState = StepState.valueOf(state);
switch (stepState) {
case COMPLETED:
return TaskConstants.EXIT_CODE_SUCCESS;
case CANCELLED:
return TaskConstants.EXIT_CODE_KILL;
default:
return TaskConstants.EXIT_CODE_FAILURE;
}
}
}
private StepStatus getStepStatus() {
DescribeStepRequest describeStepRequest = new DescribeStepRequest().withClusterId(clusterId).withStepId(stepId);
DescribeStepResult result = emrClient.describeStep(describeStepRequest);
if (result == null) {
throw new EmrTaskException("fetch step status failed");
}
StepStatus stepStatus = result.getStep().getStatus();
logger.info("emr step [clusterId:{}, stepId:{}] running with status:{}", clusterId, stepId, stepStatus);
return stepStatus;
}
@Override
public void cancelApplication(boolean status) throws Exception {
super.cancelApplication(status);
logger.info("trying cancel emr step, taskId:{}, clusterId:{}, stepId:{}", this.taskExecutionContext.getTaskInstanceId(), clusterId, stepId);
CancelStepsRequest cancelStepsRequest = new CancelStepsRequest().withClusterId(clusterId).withStepIds(stepId);
CancelStepsResult cancelStepsResult = emrClient.cancelSteps(cancelStepsRequest);
if (cancelStepsResult == null) {
throw new EmrTaskException("cancel emr step failed");
}
CancelStepsInfo cancelEmrStepInfo = cancelStepsResult.getCancelStepsInfoList()
.stream()
.filter(cancelStepsInfo -> cancelStepsInfo.getStepId().equals(stepId))
.findFirst()
.orElseThrow(() -> new EmrTaskException("cancel emr step failed"));
if (CancelStepsRequestStatus.FAILED.toString().equals(cancelEmrStepInfo.getStatus())) {
throw new EmrTaskException("cancel emr step failed, message:" + cancelEmrStepInfo.getReason());
}
logger.info("the result of cancel emr step is:{}", cancelStepsResult);
}
}

86
dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrTask.java → dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java

@ -17,28 +17,13 @@
package org.apache.dolphinscheduler.plugin.task.emr; package org.apache.dolphinscheduler.plugin.task.emr;
import static com.fasterxml.jackson.databind.DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT;
import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
import static com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL;
import static com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_GETTERS;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
import java.util.HashSet; import java.util.HashSet;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import com.amazonaws.SdkBaseException; import com.amazonaws.SdkBaseException;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder;
import com.amazonaws.services.elasticmapreduce.model.ClusterState; import com.amazonaws.services.elasticmapreduce.model.ClusterState;
import com.amazonaws.services.elasticmapreduce.model.ClusterStateChangeReason; import com.amazonaws.services.elasticmapreduce.model.ClusterStateChangeReason;
import com.amazonaws.services.elasticmapreduce.model.ClusterStateChangeReasonCode; import com.amazonaws.services.elasticmapreduce.model.ClusterStateChangeReasonCode;
@ -50,23 +35,9 @@ import com.amazonaws.services.elasticmapreduce.model.RunJobFlowResult;
import com.amazonaws.services.elasticmapreduce.model.TerminateJobFlowsRequest; import com.amazonaws.services.elasticmapreduce.model.TerminateJobFlowsRequest;
import com.amazonaws.services.elasticmapreduce.model.TerminateJobFlowsResult; import com.amazonaws.services.elasticmapreduce.model.TerminateJobFlowsResult;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
public class EmrTask extends AbstractTaskExecutor { public class EmrJobFlowTask extends AbstractEmrTask {
/**
* taskExecutionContext
*/
private final TaskExecutionContext taskExecutionContext;
/**
* emr parameters
*/
private EmrParameters emrParameters;
private AmazonElasticMapReduce emrClient;
private String clusterId;
private final HashSet<String> waitingStateSet = Sets.newHashSet( private final HashSet<String> waitingStateSet = Sets.newHashSet(
ClusterState.STARTING.toString(), ClusterState.STARTING.toString(),
@ -74,40 +45,13 @@ public class EmrTask extends AbstractTaskExecutor {
ClusterState.RUNNING.toString() ClusterState.RUNNING.toString()
); );
/**
* config ObjectMapper features and propertyNamingStrategy
* use UpperCamelCaseStrategy support capital letters parse
* @see PropertyNamingStrategy.UpperCamelCaseStrategy
*/
private static final ObjectMapper objectMapper = new ObjectMapper()
.configure(FAIL_ON_UNKNOWN_PROPERTIES, false)
.configure(ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true)
.configure(READ_UNKNOWN_ENUM_VALUES_AS_NULL, true)
.configure(REQUIRE_SETTERS_FOR_GETTERS, true)
.setTimeZone(TimeZone.getDefault())
.setPropertyNamingStrategy(new PropertyNamingStrategy.UpperCamelCaseStrategy());
/** /**
* constructor * constructor
* *
* @param taskExecutionContext taskExecutionContext * @param taskExecutionContext taskExecutionContext
*/ */
protected EmrTask(TaskExecutionContext taskExecutionContext) { protected EmrJobFlowTask(TaskExecutionContext taskExecutionContext) {
super(taskExecutionContext); super(taskExecutionContext);
this.taskExecutionContext = taskExecutionContext;
}
@Override
public void init() {
final String taskParams = taskExecutionContext.getTaskParams();
logger.info("emr task params:{}", taskParams);
emrParameters = JSONUtils.parseObject(taskParams, EmrParameters.class);
if (emrParameters == null || !emrParameters.checkParameters()) {
throw new EmrTaskException("emr task params is not valid");
}
emrClient = createEmrClient();
} }
@Override @Override
@ -120,7 +64,7 @@ public class EmrTask extends AbstractTaskExecutor {
RunJobFlowResult result = emrClient.runJobFlow(runJobFlowRequest); RunJobFlowResult result = emrClient.runJobFlow(runJobFlowRequest);
clusterId = result.getJobFlowId(); clusterId = result.getJobFlowId();
// TODO: Failover on EMR Task type has not been implemented. In this time, DS only supports failover on yarn task type . Other task type, such as EMR task, k8s task not ready yet. // Failover on EMR Task type has not been implemented. In this time, DS only supports failover on yarn task type . Other task type, such as EMR task, k8s task not ready yet.
setAppIds(clusterId); setAppIds(clusterId);
clusterStatus = getClusterStatus(); clusterStatus = getClusterStatus();
@ -199,30 +143,6 @@ public class EmrTask extends AbstractTaskExecutor {
} }
@Override
public AbstractParameters getParameters() {
return emrParameters;
}
/**
* create emr client from BasicAWSCredentials
*
* @return AmazonElasticMapReduce
*/
private AmazonElasticMapReduce createEmrClient() {
final String awsAccessKeyId = PropertyUtils.getString(TaskConstants.AWS_ACCESS_KEY_ID);
final String awsSecretAccessKey = PropertyUtils.getString(TaskConstants.AWS_SECRET_ACCESS_KEY);
final String awsRegion = PropertyUtils.getString(TaskConstants.AWS_REGION);
final BasicAWSCredentials basicAWSCredentials = new BasicAWSCredentials(awsAccessKeyId, awsSecretAccessKey);
final AWSCredentialsProvider awsCredentialsProvider = new AWSStaticCredentialsProvider(basicAWSCredentials);
// create an EMR client
return AmazonElasticMapReduceClientBuilder.standard()
.withCredentials(awsCredentialsProvider)
.withRegion(awsRegion)
.build();
}
@Override @Override
public void cancelApplication(boolean status) throws Exception { public void cancelApplication(boolean status) throws Exception {
super.cancelApplication(status); super.cancelApplication(status);

39
dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrParameters.java

@ -24,18 +24,41 @@ import org.apache.dolphinscheduler.spi.utils.StringUtils;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import lombok.Data;
import lombok.EqualsAndHashCode;
@Data
@EqualsAndHashCode(callSuper = true)
public class EmrParameters extends AbstractParameters { public class EmrParameters extends AbstractParameters {
/**
* emr program type
* 0 RUN_JOB_FLOW, 1 ADD_JOB_FLOW_STEPS
*/
private ProgramType programType;
/** /**
* job flow define in json format * job flow define in json format
*
* @see <a href="https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html#API_RunJobFlow_Examples">API_RunJobFlow_Examples</a> * @see <a href="https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html#API_RunJobFlow_Examples">API_RunJobFlow_Examples</a>
*/ */
private String jobFlowDefineJson; private String jobFlowDefineJson;
/**
* steps define in json format
*
* @see <a href="https://docs.aws.amazon.com/emr/latest/APIReference/API_AddJobFlowSteps.html#API_AddJobFlowSteps_Examples">API_AddJobFlowSteps_Examples</a>
*/
private String stepsDefineJson;
@Override @Override
public boolean checkParameters() { public boolean checkParameters() {
/*
return StringUtils.isNotEmpty(jobFlowDefineJson); * When saving a task, the programType cannot be empty and jobFlowDefineJson or stepsDefineJson cannot be empty:
* (1) When ProgramType is RUN_JOB_FLOW, jobFlowDefineJson cannot be empty.
* (2) When ProgramType is ADD_JOB_FLOW_STEPS, stepsDefineJson cannot be empty.
*/
return programType != null && (StringUtils.isNotEmpty(jobFlowDefineJson) || StringUtils.isNotEmpty(stepsDefineJson));
} }
@Override @Override
@ -44,18 +67,12 @@ public class EmrParameters extends AbstractParameters {
} }
public String getJobFlowDefineJson() {
return jobFlowDefineJson;
}
public void setJobFlowDefineJson(String jobFlowDefineJson) {
this.jobFlowDefineJson = jobFlowDefineJson;
}
@Override @Override
public String toString() { public String toString() {
return "EmrParameters{" return "EmrParameters{"
+ "jobFlowDefineJson='" + jobFlowDefineJson + '\'' + "programType=" + programType
+ ", jobFlowDefineJson='" + jobFlowDefineJson + '\''
+ ", stepsDefineJson='" + stepsDefineJson + '\''
+ '}'; + '}';
} }
} }

10
dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrTaskChannel.java

@ -33,7 +33,15 @@ public class EmrTaskChannel implements TaskChannel {
@Override @Override
public AbstractTask createTask(TaskExecutionContext taskRequest) { public AbstractTask createTask(TaskExecutionContext taskRequest) {
return new EmrTask(taskRequest); EmrParameters emrParameters = JSONUtils.parseObject(taskRequest.getTaskParams(), EmrParameters.class);
assert emrParameters != null;
if (ProgramType.RUN_JOB_FLOW.equals(emrParameters.getProgramType())) {
return new EmrJobFlowTask(taskRequest);
} else if (ProgramType.ADD_JOB_FLOW_STEPS.equals(emrParameters.getProgramType())) {
return new EmrAddStepsTask(taskRequest);
} else {
throw new IllegalArgumentException("Unsupported program type: " + emrParameters.getProgramType());
}
} }
@Override @Override

34
dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/ProgramType.java

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.emr;
/**
* emr program type
*
* @since v3.1.0
*/
public enum ProgramType {
/**
* RunJobFlow
*/
RUN_JOB_FLOW,
/**
* AddJobFlowSteps
*/
ADD_JOB_FLOW_STEPS
}

198
dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTaskTest.java

@ -0,0 +1,198 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.emr;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS;
import static org.mockito.ArgumentMatchers.any;
import static org.powermock.api.mockito.PowerMockito.doReturn;
import static org.powermock.api.mockito.PowerMockito.mock;
import static org.powermock.api.mockito.PowerMockito.mockStatic;
import static org.powermock.api.mockito.PowerMockito.spy;
import static org.powermock.api.mockito.PowerMockito.when;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.commons.io.IOUtils;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder;
import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsResult;
import com.amazonaws.services.elasticmapreduce.model.AmazonElasticMapReduceException;
import com.amazonaws.services.elasticmapreduce.model.DescribeStepResult;
import com.amazonaws.services.elasticmapreduce.model.Step;
import com.amazonaws.services.elasticmapreduce.model.StepState;
import com.amazonaws.services.elasticmapreduce.model.StepStatus;
/**
* EmrAddStepsTask Test
*
* @since v3.1.0
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({
AmazonElasticMapReduceClientBuilder.class,
EmrAddStepsTask.class,
AmazonElasticMapReduce.class,
JSONUtils.class
})
@PowerMockIgnore({"javax.*"})
public class EmrAddStepsTaskTest {
private final StepStatus pendingState =
new StepStatus().withState(StepState.PENDING);
private final StepStatus runningState =
new StepStatus().withState(StepState.RUNNING);
private final StepStatus completedState =
new StepStatus().withState(StepState.COMPLETED);
private final StepStatus cancelledState =
new StepStatus().withState(StepState.CANCELLED);
private final StepStatus failedState =
new StepStatus().withState(StepState.FAILED);
private EmrAddStepsTask emrAddStepsTask;
private AmazonElasticMapReduce emrClient;
private Step step;
@Before
public void before() throws Exception {
// mock EmrParameters and EmrAddStepsTask
EmrParameters emrParameters = buildEmrTaskParameters();
String emrParametersString = JSONUtils.toJsonString(emrParameters);
TaskExecutionContext taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class);
when(taskExecutionContext.getTaskParams()).thenReturn(emrParametersString);
emrAddStepsTask = spy(new EmrAddStepsTask(taskExecutionContext));
// mock emrClient and behavior
emrClient = mock(AmazonElasticMapReduce.class);
AddJobFlowStepsResult addJobFlowStepsResult = mock(AddJobFlowStepsResult.class);
when(emrClient.addJobFlowSteps(any())).thenReturn(addJobFlowStepsResult);
when(addJobFlowStepsResult.getStepIds()).thenReturn(Collections.singletonList("step-xx"));
doReturn(emrClient).when(emrAddStepsTask, "createEmrClient");
DescribeStepResult describeStepResult = mock(DescribeStepResult.class);
when(emrClient.describeStep(any())).thenReturn(describeStepResult);
// mock step
step = mock(Step.class);
when(describeStepResult.getStep()).thenReturn(step);
emrAddStepsTask.init();
}
@Test
public void testCanNotParseJson() throws Exception {
mockStatic(JSONUtils.class);
when(emrAddStepsTask, "createAddJobFlowStepsRequest").thenThrow(new EmrTaskException("can not parse AddJobFlowStepsRequest from json", new Exception("error")));
emrAddStepsTask.handle();
Assert.assertEquals(EXIT_CODE_FAILURE, emrAddStepsTask.getExitStatusCode());
}
@Test
public void testDefineJsonStepNotOne() throws Exception {
// mock EmrParameters and EmrAddStepsTask
EmrParameters emrParameters = buildErrorEmrTaskParameters();
String emrParametersString = JSONUtils.toJsonString(emrParameters);
TaskExecutionContext taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class);
when(taskExecutionContext.getTaskParams()).thenReturn(emrParametersString);
emrAddStepsTask = spy(new EmrAddStepsTask(taskExecutionContext));
doReturn(emrClient).when(emrAddStepsTask, "createEmrClient");
emrAddStepsTask.init();
emrAddStepsTask.handle();
Assert.assertEquals(EXIT_CODE_FAILURE, emrAddStepsTask.getExitStatusCode());
}
@Test
public void testHandle() throws Exception {
when(step.getStatus()).thenReturn(pendingState, runningState, completedState);
emrAddStepsTask.handle();
Assert.assertEquals(EXIT_CODE_SUCCESS, emrAddStepsTask.getExitStatusCode());
}
@Test
public void testHandleUserRequestTerminate() throws Exception {
when(step.getStatus()).thenReturn(pendingState, runningState, cancelledState);
emrAddStepsTask.handle();
Assert.assertEquals(EXIT_CODE_KILL, emrAddStepsTask.getExitStatusCode());
}
@Test
public void testHandleError() throws Exception {
when(step.getStatus()).thenReturn(pendingState, runningState, failedState);
emrAddStepsTask.handle();
Assert.assertEquals(EXIT_CODE_FAILURE, emrAddStepsTask.getExitStatusCode());
when(emrClient.addJobFlowSteps(any())).thenThrow(new AmazonElasticMapReduceException("error"), new EmrTaskException());
emrAddStepsTask.handle();
Assert.assertEquals(EXIT_CODE_FAILURE, emrAddStepsTask.getExitStatusCode());
}
private EmrParameters buildEmrTaskParameters() {
EmrParameters emrParameters = new EmrParameters();
String stepsDefineJson;
try (InputStream i = this.getClass().getResourceAsStream("EmrAddStepsDefine.json")) {
assert i != null;
stepsDefineJson = IOUtils.toString(i, StandardCharsets.UTF_8);
} catch (Exception e) {
throw new RuntimeException(e);
}
emrParameters.setProgramType(ProgramType.ADD_JOB_FLOW_STEPS);
emrParameters.setStepsDefineJson(stepsDefineJson);
return emrParameters;
}
private EmrParameters buildErrorEmrTaskParameters() {
EmrParameters emrParameters = new EmrParameters();
String stepsDefineJson;
try (InputStream i = this.getClass().getResourceAsStream("EmrErrorAddStepsDefine.json")) {
assert i != null;
stepsDefineJson = IOUtils.toString(i, StandardCharsets.UTF_8);
} catch (Exception e) {
throw new RuntimeException(e);
}
emrParameters.setProgramType(ProgramType.ADD_JOB_FLOW_STEPS);
emrParameters.setStepsDefineJson(stepsDefineJson);
return emrParameters;
}
}

47
dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrTaskTest.java → dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTaskTest.java

@ -59,12 +59,12 @@ import com.amazonaws.services.elasticmapreduce.model.RunJobFlowResult;
@RunWith(PowerMockRunner.class) @RunWith(PowerMockRunner.class)
@PrepareForTest({ @PrepareForTest({
AmazonElasticMapReduceClientBuilder.class, AmazonElasticMapReduceClientBuilder.class,
EmrTask.class, EmrJobFlowTask.class,
AmazonElasticMapReduce.class, AmazonElasticMapReduce.class,
JSONUtils.class JSONUtils.class
}) })
@PowerMockIgnore({"javax.*"}) @PowerMockIgnore({"javax.*"})
public class EmrTaskTest { public class EmrJobFlowTaskTest {
private final ClusterStatus startingStatus = private final ClusterStatus startingStatus =
new ClusterStatus().withState(ClusterState.STARTING) new ClusterStatus().withState(ClusterState.STARTING)
@ -114,7 +114,7 @@ public class EmrTaskTest {
.withCode(ClusterStateChangeReasonCode.STEP_FAILURE) .withCode(ClusterStateChangeReasonCode.STEP_FAILURE)
); );
private EmrTask emrTask; private EmrJobFlowTask emrJobFlowTask;
private AmazonElasticMapReduce emrClient; private AmazonElasticMapReduce emrClient;
private Cluster cluster; private Cluster cluster;
@ -123,14 +123,14 @@ public class EmrTaskTest {
String emrParameters = buildEmrTaskParameters(); String emrParameters = buildEmrTaskParameters();
TaskExecutionContext taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class); TaskExecutionContext taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class);
when(taskExecutionContext.getTaskParams()).thenReturn(emrParameters); when(taskExecutionContext.getTaskParams()).thenReturn(emrParameters);
emrTask = spy(new EmrTask(taskExecutionContext)); emrJobFlowTask = spy(new EmrJobFlowTask(taskExecutionContext));
// mock emrClient and behavior // mock emrClient and behavior
emrClient = mock(AmazonElasticMapReduce.class); emrClient = mock(AmazonElasticMapReduce.class);
RunJobFlowResult runJobFlowResult = mock(RunJobFlowResult.class); RunJobFlowResult runJobFlowResult = mock(RunJobFlowResult.class);
when(emrClient.runJobFlow(any())).thenReturn(runJobFlowResult); when(emrClient.runJobFlow(any())).thenReturn(runJobFlowResult);
when(runJobFlowResult.getJobFlowId()).thenReturn("xx"); when(runJobFlowResult.getJobFlowId()).thenReturn("xx");
doReturn(emrClient).when(emrTask, "createEmrClient"); doReturn(emrClient).when(emrJobFlowTask, "createEmrClient");
DescribeClusterResult describeClusterResult = mock(DescribeClusterResult.class); DescribeClusterResult describeClusterResult = mock(DescribeClusterResult.class);
when(emrClient.describeCluster(any())).thenReturn(describeClusterResult); when(emrClient.describeCluster(any())).thenReturn(describeClusterResult);
@ -138,7 +138,7 @@ public class EmrTaskTest {
cluster = mock(Cluster.class); cluster = mock(Cluster.class);
when(describeClusterResult.getCluster()).thenReturn(cluster); when(describeClusterResult.getCluster()).thenReturn(cluster);
emrTask.init(); emrJobFlowTask.init();
} }
@Test @Test
@ -146,8 +146,8 @@ public class EmrTaskTest {
when(cluster.getStatus()).thenReturn(startingStatus, softwareConfigStatus, runningStatus, terminatingStatus); when(cluster.getStatus()).thenReturn(startingStatus, softwareConfigStatus, runningStatus, terminatingStatus);
emrTask.handle(); emrJobFlowTask.handle();
Assert.assertEquals(EXIT_CODE_SUCCESS, emrTask.getExitStatusCode()); Assert.assertEquals(EXIT_CODE_SUCCESS, emrJobFlowTask.getExitStatusCode());
} }
@ -155,32 +155,32 @@ public class EmrTaskTest {
public void testHandleAliveWhenNoSteps() throws Exception { public void testHandleAliveWhenNoSteps() throws Exception {
when(cluster.getStatus()).thenReturn(startingStatus, softwareConfigStatus, runningStatus, waitingStatus); when(cluster.getStatus()).thenReturn(startingStatus, softwareConfigStatus, runningStatus, waitingStatus);
emrTask.handle(); emrJobFlowTask.handle();
Assert.assertEquals(EXIT_CODE_SUCCESS, emrTask.getExitStatusCode()); Assert.assertEquals(EXIT_CODE_SUCCESS, emrJobFlowTask.getExitStatusCode());
} }
@Test @Test
public void testHandleUserRequestTerminate() throws Exception { public void testHandleUserRequestTerminate() throws Exception {
when(cluster.getStatus()).thenReturn(startingStatus, userRequestTerminateStatus); when(cluster.getStatus()).thenReturn(startingStatus, userRequestTerminateStatus);
emrTask.handle(); emrJobFlowTask.handle();
Assert.assertEquals(EXIT_CODE_KILL, emrTask.getExitStatusCode()); Assert.assertEquals(EXIT_CODE_KILL, emrJobFlowTask.getExitStatusCode());
} }
@Test @Test
public void testHandleTerminatedWithError() throws Exception { public void testHandleTerminatedWithError() throws Exception {
when(cluster.getStatus()).thenReturn(startingStatus, softwareConfigStatus, runningStatus, terminatedWithErrorsStatus); when(cluster.getStatus()).thenReturn(startingStatus, softwareConfigStatus, runningStatus, terminatedWithErrorsStatus);
emrTask.handle(); emrJobFlowTask.handle();
Assert.assertEquals(EXIT_CODE_FAILURE, emrTask.getExitStatusCode()); Assert.assertEquals(EXIT_CODE_FAILURE, emrJobFlowTask.getExitStatusCode());
} }
@Test @Test
public void testCanNotParseJson() throws Exception { public void testCanNotParseJson() throws Exception {
mockStatic(JSONUtils.class); mockStatic(JSONUtils.class);
when(emrTask, "createRunJobFlowRequest").thenThrow(new EmrTaskException("can not parse RunJobFlowRequest from json", new Exception("error"))); when(emrJobFlowTask, "createRunJobFlowRequest").thenThrow(new EmrTaskException("can not parse RunJobFlowRequest from json", new Exception("error")));
emrTask.handle(); emrJobFlowTask.handle();
Assert.assertEquals(EXIT_CODE_FAILURE, emrTask.getExitStatusCode()); Assert.assertEquals(EXIT_CODE_FAILURE, emrJobFlowTask.getExitStatusCode());
} }
@Test @Test
@ -188,18 +188,18 @@ public class EmrTaskTest {
when(emrClient.describeCluster(any())).thenReturn(null); when(emrClient.describeCluster(any())).thenReturn(null);
emrTask.handle(); emrJobFlowTask.handle();
Assert.assertEquals(EXIT_CODE_FAILURE, emrTask.getExitStatusCode()); Assert.assertEquals(EXIT_CODE_FAILURE, emrJobFlowTask.getExitStatusCode());
} }
@Test @Test
public void testRunJobFlowError() throws Exception { public void testRunJobFlowError() throws Exception {
when(emrClient.runJobFlow(any())).thenThrow(new AmazonElasticMapReduceException("error"), new EmrTaskException()); when(emrClient.runJobFlow(any())).thenThrow(new AmazonElasticMapReduceException("error"), new EmrTaskException());
emrTask.handle(); emrJobFlowTask.handle();
Assert.assertEquals(EXIT_CODE_FAILURE, emrTask.getExitStatusCode()); Assert.assertEquals(EXIT_CODE_FAILURE, emrJobFlowTask.getExitStatusCode());
emrTask.handle(); emrJobFlowTask.handle();
Assert.assertEquals(EXIT_CODE_FAILURE, emrTask.getExitStatusCode()); Assert.assertEquals(EXIT_CODE_FAILURE, emrJobFlowTask.getExitStatusCode());
} }
@ -212,6 +212,7 @@ public class EmrTaskTest {
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
emrParameters.setProgramType(ProgramType.RUN_JOB_FLOW);
emrParameters.setJobFlowDefineJson(jobFlowDefineJson); emrParameters.setJobFlowDefineJson(jobFlowDefineJson);
return JSONUtils.toJsonString(emrParameters); return JSONUtils.toJsonString(emrParameters);

17
dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/resources/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsDefine.json

@ -0,0 +1,17 @@
{
"JobFlowId": "j-3V628TKAERHP8",
"Steps": [
{
"Name": "calculate_pi",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"/usr/lib/spark/bin/run-example",
"SparkPi",
"15"
]
}
}
]
}

29
dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/resources/org/apache/dolphinscheduler/plugin/task/emr/EmrErrorAddStepsDefine.json

@ -0,0 +1,29 @@
{
"JobFlowId": "j-3V628TKAERHP8",
"Steps": [
{
"Name": "calculate_pi",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"/usr/lib/spark/bin/run-example",
"SparkPi",
"15"
]
}
},
{
"Name": "calculate_pi",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"/usr/lib/spark/bin/run-example",
"SparkPi",
"15"
]
}
}
]
}

2
dolphinscheduler-ui/src/locales/en_US/project.ts

@ -607,6 +607,8 @@ export default {
required: 'required', required: 'required',
emr_flow_define_json: 'jobFlowDefineJson', emr_flow_define_json: 'jobFlowDefineJson',
emr_flow_define_json_tips: 'Please enter the definition of the job flow.', emr_flow_define_json_tips: 'Please enter the definition of the job flow.',
emr_steps_define_json: 'stepsDefineJson',
emr_steps_define_json_tips: 'Please enter the definition of the emr step.',
segment_separator: 'Segment Execution Separator', segment_separator: 'Segment Execution Separator',
segment_separator_tips: 'Please enter the segment execution separator', segment_separator_tips: 'Please enter the segment execution separator',
zeppelin_note_id: 'zeppelinNoteId', zeppelin_note_id: 'zeppelinNoteId',

2
dolphinscheduler-ui/src/locales/zh_CN/project.ts

@ -600,6 +600,8 @@ export default {
required: '必填', required: '必填',
emr_flow_define_json: 'jobFlowDefineJson', emr_flow_define_json: 'jobFlowDefineJson',
emr_flow_define_json_tips: '请输入工作流定义', emr_flow_define_json_tips: '请输入工作流定义',
emr_steps_define_json: 'stepsDefineJson',
emr_steps_define_json_tips: '请输入EMR步骤定义',
segment_separator: '分段执行符号', segment_separator: '分段执行符号',
segment_separator_tips: '请输入分段执行符号', segment_separator_tips: '请输入分段执行符号',
zeppelin_note_id: 'zeppelin_note_id', zeppelin_note_id: 'zeppelin_note_id',

41
dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-emr.ts

@ -17,14 +17,30 @@
import { useI18n } from 'vue-i18n' import { useI18n } from 'vue-i18n'
import { useCustomParams } from '.' import { useCustomParams } from '.'
import type { IJsonItem } from '../types' import type { IJsonItem } from '../types'
import {computed} from "vue";
export function useEmr(model: { [field: string]: any }): IJsonItem[] { export function useEmr(model: { [field: string]: any }): IJsonItem[] {
const { t } = useI18n() const { t } = useI18n()
const jobFlowDefineJsonSpan = computed(() => (model.programType === 'RUN_JOB_FLOW' ? 24 : 0))
const stepsDefineJsonSpan = computed(() => (model.programType === 'ADD_JOB_FLOW_STEPS' ? 24 : 0))
return [ return [
{
type: 'select',
field: 'programType',
span: 24,
name: t('project.node.program_type'),
options: PROGRAM_TYPES,
validate: {
required: true
}
},
{ {
type: 'editor', type: 'editor',
field: 'jobFlowDefineJson', field: 'jobFlowDefineJson',
span: jobFlowDefineJsonSpan,
name: t('project.node.emr_flow_define_json'), name: t('project.node.emr_flow_define_json'),
props: { props: {
language: 'json' language: 'json'
@ -35,6 +51,20 @@ export function useEmr(model: { [field: string]: any }): IJsonItem[] {
message: t('project.node.emr_flow_define_json_tips') message: t('project.node.emr_flow_define_json_tips')
} }
}, },
{
type: 'editor',
field: 'stepsDefineJson',
span: stepsDefineJsonSpan,
name: t('project.node.emr_steps_define_json'),
props: {
language: 'json'
},
validate: {
trigger: ['input', 'trigger'],
required: true,
message: t('project.node.emr_steps_define_json_tips')
}
},
...useCustomParams({ ...useCustomParams({
model, model,
field: 'localParams', field: 'localParams',
@ -42,3 +72,14 @@ export function useEmr(model: { [field: string]: any }): IJsonItem[] {
}) })
] ]
} }
export const PROGRAM_TYPES = [
{
label: 'RUN_JOB_FLOW',
value: 'RUN_JOB_FLOW'
},
{
label: 'ADD_JOB_FLOW_STEPS',
value: 'ADD_JOB_FLOW_STEPS'
}
]

2
dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts

@ -321,7 +321,9 @@ export function formatParams(data: INodeData): {
if (data.taskType === 'EMR') { if (data.taskType === 'EMR') {
taskParams.type = data.type taskParams.type = data.type
taskParams.programType = data.programType
taskParams.jobFlowDefineJson = data.jobFlowDefineJson taskParams.jobFlowDefineJson = data.jobFlowDefineJson
taskParams.stepsDefineJson = data.stepsDefineJson
} }
if (data.taskType === 'ZEPPELIN') { if (data.taskType === 'ZEPPELIN') {

1
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-emr.ts

@ -44,6 +44,7 @@ export function useEmr({
workerGroup: 'default', workerGroup: 'default',
delayTime: 0, delayTime: 0,
timeout: 30, timeout: 30,
programType: 'ADD_JOB_FLOW_STEPS',
timeoutNotifyStrategy: ['WARN'] timeoutNotifyStrategy: ['WARN']
} as INodeData) } as INodeData)

1
dolphinscheduler-ui/src/views/projects/task/components/node/types.ts

@ -295,6 +295,7 @@ interface ITaskParams {
ruleId?: number ruleId?: number
ruleInputParameter?: IRuleParameters ruleInputParameter?: IRuleParameters
jobFlowDefineJson?: string jobFlowDefineJson?: string
stepsDefineJson?: string
zeppelinNoteId?: string zeppelinNoteId?: string
zeppelinParagraphId?: string zeppelinParagraphId?: string
noteId?: string noteId?: string

Loading…
Cancel
Save