diff --git a/docs/docs/en/guide/task/emr.md b/docs/docs/en/guide/task/emr.md index 050d7c2397..ebcaa885aa 100644 --- a/docs/docs/en/guide/task/emr.md +++ b/docs/docs/en/guide/task/emr.md @@ -2,7 +2,11 @@ ## Overview -Amazon EMR task type, for creating EMR clusters on AWS and running computing tasks. Using [aws-java-sdk](https://aws.amazon.com/cn/sdk-for-java/) in the background code, to transfer JSON parameters to [RunJobFlowRequest](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/elasticmapreduce/model/RunJobFlowRequest.html) object and submit to AWS. +Amazon EMR task type, for operation EMR clusters on AWS and running computing tasks. +Using [aws-java-sdk](https://aws.amazon.com/cn/sdk-for-java/) in the background code, to transfer JSON parameters to task object and submit to AWS, Two program types are currently supported: + +* `RUN_JOB_FLOW` Using [API_RunJobFlow](https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html#API_RunJobFlow_Examples) submit [RunJobFlowRequest](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/elasticmapreduce/model/RunJobFlowRequest.html) object +* `ADD_JOB_FLOW_STEPS` Using [API_AddJobFlowSteps](https://docs.aws.amazon.com/emr/latest/APIReference/API_AddJobFlowSteps.html#API_AddJobFlowSteps_Examples) submit [AddJobFlowStepsRequest](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/elasticmapreduce/model/AddJobFlowStepsRequest.html) object ## Create Task @@ -19,12 +23,18 @@ Amazon EMR task type, for creating EMR clusters on AWS and running computing tas | Task priority | When the number of worker threads is insufficient, execute in the order of priority from high to low, and tasks with the same priority will execute in a first-in first-out order. | | Worker grouping | Assign tasks to the machines of the worker group to execute. If `Default` is selected, randomly select a worker machine for execution. | | Times of failed retry attempts | The number of times the task failed to resubmit. You can select from drop-down or fill-in a number. | -| Failed retry interval: The time interval for resubmitting the task after a failed task. You can select from drop-down or fill-in a number. | +| Failed retry interval | The time interval for resubmitting the task after a failed task. You can select from drop-down or fill-in a number. | | Timeout alarm | Check the timeout alarm and timeout failure. When the task runs exceed the "timeout", an alarm email will send and the task execution will fail. | -| JSON | JSON corresponding to the [RunJobFlowRequest](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/elasticmapreduce/model/RunJobFlowRequest.html) object, for details refer to [API_RunJobFlow_Examples](https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html#API_RunJobFlow_Examples). | +| Program Type | Select the program type. If it is `RUN_JOB_FLOW`, you need to fill in `jobFlowDefineJson`, if it is `ADD_JOB_FLOW_STEPS`, you need to fill in `stepsDefineJson`. | +| jobFlowDefineJson | JSON corresponding to the [RunJobFlowRequest](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/elasticmapreduce/model/RunJobFlowRequest.html) object, for details refer to [API_RunJobFlow_Examples](https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html#API_RunJobFlow_Examples). | +| stepsDefineJson | JSON corresponding to the [AddJobFlowStepsRequest](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/elasticmapreduce/model/AddJobFlowStepsRequest.html) object, for details refer to [API_AddJobFlowSteps_Examples](https://docs.aws.amazon.com/emr/latest/APIReference/API_AddJobFlowSteps.html#API_AddJobFlowSteps_Examples). | -## JSON example +## Task Example +### Create an EMR cluster and run Steps +This example shows how to create an `EMR` task node of type `RUN_JOB_FLOW`. Taking the execution of `SparkPi` as an example, the task will create an `EMR` cluster and execute the `SparkPi` sample program. +![RUN_JOB_FLOW](../../../../img/tasks/demo/emr_run_job_flow.png) +jobFlowDefineJson example ```json { "Name": "SparkPi", @@ -65,3 +75,33 @@ Amazon EMR task type, for creating EMR clusters on AWS and running computing tas } ``` +### Add a Step to a Running EMR Cluster +This example shows how to create an `EMR` task node of type `ADD_JOB_FLOW_STEPS`. Taking the execution of `SparkPi` as an example, the task will add a `SparkPi` sample program to the running `EMR` cluster. +![ADD_JOB_FLOW_STEPS](../../../../img/tasks/demo/emr_add_job_flow_steps.png) +![JobFlowId](../../../../img/tasks/demo/emr_jobFlowId.png) + +stepsDefineJson example +```json +{ + "JobFlowId": "j-3V628TKAERHP8", + "Steps": [ + { + "Name": "calculate_pi", + "ActionOnFailure": "CONTINUE", + "HadoopJarStep": { + "Jar": "command-runner.jar", + "Args": [ + "/usr/lib/spark/bin/run-example", + "SparkPi", + "15" + ] + } + } + ] +} +``` + +## Notice + +- Failover on EMR Task type has not been implemented. In this time, DolphinScheduler only supports failover on yarn task type . Other task type, such as EMR task, k8s task not ready yet. +- `stepsDefineJson` A task definition only supports the association of a single step, which can better ensure the reliability of the task state. diff --git a/docs/docs/zh/guide/task/emr.md b/docs/docs/zh/guide/task/emr.md index dfa17f6658..59dab0046e 100644 --- a/docs/docs/zh/guide/task/emr.md +++ b/docs/docs/zh/guide/task/emr.md @@ -2,7 +2,11 @@ ## 综述 -Amazon EMR任务类型,用于在AWS上创建EMR集群并执行计算任务。 后台使用[aws-java-sdk](https://aws.amazon.com/cn/sdk-for-java/) 将json参数转换为[RunJobFlowRequest](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/elasticmapreduce/model/RunJobFlowRequest.html) 对象,提交到AWS +Amazon EMR 任务类型,用于在AWS上操作EMR集群并执行计算任务。 +后台使用 [aws-java-sdk](https://aws.amazon.com/cn/sdk-for-java/) 将JSON参数转换为任务对象,提交到AWS,目前支持两种程序类型: + +* `RUN_JOB_FLOW` 使用 [API_RunJobFlow](https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html#API_RunJobFlow_Examples) 提交 [RunJobFlowRequest](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/elasticmapreduce/model/RunJobFlowRequest.html) 对象 +* `ADD_JOB_FLOW_STEPS` 使用 [API_AddJobFlowSteps](https://docs.aws.amazon.com/emr/latest/APIReference/API_AddJobFlowSteps.html#API_AddJobFlowSteps_Examples) 提交 [AddJobFlowStepsRequest](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/elasticmapreduce/model/AddJobFlowStepsRequest.html) 对象 ## 任务参数 - 节点名称:一个工作流定义中的节点名称是唯一的。 @@ -13,9 +17,16 @@ Amazon EMR任务类型,用于在AWS上创建EMR集群并执行计算任务。 - 失败重试次数:任务失败重新提交的次数,支持下拉和手填。 - 失败重试间隔:任务失败重新提交任务的时间间隔,支持下拉和手填。 - 超时告警:勾选超时告警、超时失败,当任务超过"超时时长"后,会发送告警邮件并且任务执行失败. -- json: [RunJobFlowRequest](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/elasticmapreduce/model/RunJobFlowRequest.html) 对象对应的json,详细json定义参见 [API_RunJobFlow_Examples](https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html#API_RunJobFlow_Examples) +- 程序类型:选择程序类型,如果是`RUN_JOB_FLOW`,则需要填写`jobFlowDefineJson`,如果是`ADD_JOB_FLOW_STEPS`,则需要填写`stepsDefineJson`。 + - jobFlowDefineJson: [RunJobFlowRequest](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/elasticmapreduce/model/RunJobFlowRequest.html) 对象对应的JSON,详细JSON定义参见 [API_RunJobFlow_Examples](https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html#API_RunJobFlow_Examples) + - stepsDefineJson:[AddJobFlowStepsRequest](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/elasticmapreduce/model/AddJobFlowStepsRequest.html) 对象对应的JSON,详细JSON定义参见 [API_AddJobFlowSteps_Examples](https://docs.aws.amazon.com/emr/latest/APIReference/API_AddJobFlowSteps.html#API_AddJobFlowSteps_Examples) + +## 任务样例 +### 创建EMR集群并运行Steps +该样例展示了如何创建`RUN_JOB_FLOW`类型`EMR`任务节点,以执行`SparkPi`为例,该任务会创建一个`EMR`集群,并且执行`SparkPi`示例程序。 +![RUN_JOB_FLOW](../../../../img/tasks/demo/emr_run_job_flow.png) -## json参数样例 +jobFlowDefineJson 参数样例 ```json { "Name": "SparkPi", @@ -56,3 +67,33 @@ Amazon EMR任务类型,用于在AWS上创建EMR集群并执行计算任务。 } ``` +### 向运行中的EMR集群添加Step +该样例展示了如何创建`ADD_JOB_FLOW_STEPS`类型`EMR`任务节点,以执行`SparkPi`为例,该任务会向运行中的`EMR`集群添加一个`SparkPi`示例程序。 +![ADD_JOB_FLOW_STEPS](../../../../img/tasks/demo/emr_add_job_flow_steps.png) +![JobFlowId](../../../../img/tasks/demo/emr_jobFlowId.png) + +stepsDefineJson 参数样例 +```json +{ + "JobFlowId": "j-3V628TKAERHP8", + "Steps": [ + { + "Name": "calculate_pi", + "ActionOnFailure": "CONTINUE", + "HadoopJarStep": { + "Jar": "command-runner.jar", + "Args": [ + "/usr/lib/spark/bin/run-example", + "SparkPi", + "15" + ] + } + } + ] +} +``` + +## 注意事项: + +- EMR 任务类型的故障转移尚未实现。目前,DolphinScheduler 仅支持对 yarn task type 进行故障转移。其他任务类型,如 EMR 任务、k8s 任务尚未准备好。 +- `stepsDefineJson` 一个任务定义仅支持关联单个step,这样可以更好的保证任务状态的可靠性。 \ No newline at end of file diff --git a/docs/img/tasks/demo/emr_add_job_flow_steps.png b/docs/img/tasks/demo/emr_add_job_flow_steps.png new file mode 100644 index 0000000000..bc8de30f72 Binary files /dev/null and b/docs/img/tasks/demo/emr_add_job_flow_steps.png differ diff --git a/docs/img/tasks/demo/emr_jobFlowId.png b/docs/img/tasks/demo/emr_jobFlowId.png new file mode 100644 index 0000000000..b79097afd5 Binary files /dev/null and b/docs/img/tasks/demo/emr_jobFlowId.png differ diff --git a/docs/img/tasks/demo/emr_run_job_flow.png b/docs/img/tasks/demo/emr_run_job_flow.png new file mode 100644 index 0000000000..ad09c5f7c9 Binary files /dev/null and b/docs/img/tasks/demo/emr_run_job_flow.png differ diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/AbstractEmrTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/AbstractEmrTask.java new file mode 100644 index 0000000000..329c3bc3d5 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/AbstractEmrTask.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.emr; + +import static com.fasterxml.jackson.databind.DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT; +import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES; +import static com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL; +import static com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_GETTERS; + +import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor; +import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; +import org.apache.dolphinscheduler.spi.utils.JSONUtils; +import org.apache.dolphinscheduler.spi.utils.PropertyUtils; + +import java.util.TimeZone; + +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce; +import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.PropertyNamingStrategy; + +/** + * ERM Task abstract base class + * + * @since v3.1.0 + */ +public abstract class AbstractEmrTask extends AbstractTaskExecutor { + + final TaskExecutionContext taskExecutionContext; + EmrParameters emrParameters; + AmazonElasticMapReduce emrClient; + String clusterId; + + + /** + * config ObjectMapper features and propertyNamingStrategy + * use UpperCamelCaseStrategy support capital letters parse + * + * @see PropertyNamingStrategy.UpperCamelCaseStrategy + */ + static final ObjectMapper objectMapper = new ObjectMapper() + .configure(FAIL_ON_UNKNOWN_PROPERTIES, false) + .configure(ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true) + .configure(READ_UNKNOWN_ENUM_VALUES_AS_NULL, true) + .configure(REQUIRE_SETTERS_FOR_GETTERS, true) + .setTimeZone(TimeZone.getDefault()) + .setPropertyNamingStrategy(new PropertyNamingStrategy.UpperCamelCaseStrategy()); + + /** + * constructor + * + * @param taskExecutionContext taskExecutionContext + */ + protected AbstractEmrTask(TaskExecutionContext taskExecutionContext) { + super(taskExecutionContext); + this.taskExecutionContext = taskExecutionContext; + } + + @Override + public void init() { + final String taskParams = taskExecutionContext.getTaskParams(); + logger.info("emr task params:{}", taskParams); + emrParameters = JSONUtils.parseObject(taskParams, EmrParameters.class); + if (emrParameters == null || !emrParameters.checkParameters()) { + throw new EmrTaskException("emr task params is not valid"); + } + emrClient = createEmrClient(); + } + + @Override + public AbstractParameters getParameters() { + return emrParameters; + } + + /** + * create emr client from BasicAWSCredentials + * + * @return AmazonElasticMapReduce + */ + private AmazonElasticMapReduce createEmrClient() { + + final String awsAccessKeyId = PropertyUtils.getString(TaskConstants.AWS_ACCESS_KEY_ID); + final String awsSecretAccessKey = PropertyUtils.getString(TaskConstants.AWS_SECRET_ACCESS_KEY); + final String awsRegion = PropertyUtils.getString(TaskConstants.AWS_REGION); + final BasicAWSCredentials basicAWSCredentials = new BasicAWSCredentials(awsAccessKeyId, awsSecretAccessKey); + final AWSCredentialsProvider awsCredentialsProvider = new AWSStaticCredentialsProvider(basicAWSCredentials); + // create an EMR client + return AmazonElasticMapReduceClientBuilder.standard() + .withCredentials(awsCredentialsProvider) + .withRegion(awsRegion) + .build(); + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java new file mode 100644 index 0000000000..d747577b71 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.emr; + +import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; + +import java.util.HashSet; +import java.util.concurrent.TimeUnit; + +import com.amazonaws.SdkBaseException; +import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsRequest; +import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsResult; +import com.amazonaws.services.elasticmapreduce.model.CancelStepsInfo; +import com.amazonaws.services.elasticmapreduce.model.CancelStepsRequest; +import com.amazonaws.services.elasticmapreduce.model.CancelStepsRequestStatus; +import com.amazonaws.services.elasticmapreduce.model.CancelStepsResult; +import com.amazonaws.services.elasticmapreduce.model.DescribeStepRequest; +import com.amazonaws.services.elasticmapreduce.model.DescribeStepResult; +import com.amazonaws.services.elasticmapreduce.model.StepState; +import com.amazonaws.services.elasticmapreduce.model.StepStatus; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.common.collect.Sets; + +/** + * AddJobFlowSteps task executor + * + * @since v3.1.0 + */ +public class EmrAddStepsTask extends AbstractEmrTask { + + private String stepId; + + private final HashSet waitingStateSet = Sets.newHashSet( + StepState.PENDING.toString(), + StepState.CANCEL_PENDING.toString(), + StepState.RUNNING.toString() + ); + + /** + * constructor + * + * @param taskExecutionContext taskExecutionContext + */ + protected EmrAddStepsTask(TaskExecutionContext taskExecutionContext) { + super(taskExecutionContext); + } + + @Override + public void handle() throws InterruptedException { + StepStatus stepStatus = null; + try { + AddJobFlowStepsRequest addJobFlowStepsRequest = createAddJobFlowStepsRequest(); + + // submit addJobFlowStepsRequest to aws + AddJobFlowStepsResult result = emrClient.addJobFlowSteps(addJobFlowStepsRequest); + + clusterId = addJobFlowStepsRequest.getJobFlowId(); + stepId = result.getStepIds().get(0); + // use clusterId-stepId as appIds + setAppIds(clusterId + TaskConstants.SUBTRACT_STRING + stepId); + + stepStatus = getStepStatus(); + + while (waitingStateSet.contains(stepStatus.getState())) { + TimeUnit.SECONDS.sleep(10); + stepStatus = getStepStatus(); + } + + } catch (EmrTaskException | SdkBaseException e) { + logger.error("emr task submit failed with error", e); + } finally { + final int exitStatusCode = calculateExitStatusCode(stepStatus); + setExitStatusCode(exitStatusCode); + logger.info("emr task finished with step status : {}", stepStatus); + } + } + + /** + * parse json string to AddJobFlowStepsRequest + * + * @return AddJobFlowStepsRequest + */ + private AddJobFlowStepsRequest createAddJobFlowStepsRequest() { + + final AddJobFlowStepsRequest addJobFlowStepsRequest; + try { + addJobFlowStepsRequest = objectMapper.readValue(emrParameters.getStepsDefineJson(), AddJobFlowStepsRequest.class); + } catch (JsonProcessingException e) { + throw new EmrTaskException("can not parse AddJobFlowStepsRequest from json", e); + } + + // When a single task definition is associated with multiple steps, the state tracking will have high complexity. + // Therefore, A task definition only supports the association of a single step, which can better ensure the reliability of the task state. + if (addJobFlowStepsRequest.getSteps().size() > 1) { + throw new EmrTaskException("ds emr addJobFlowStepsTask only support one step"); + } + + return addJobFlowStepsRequest; + } + + /** + * calculate task exitStatusCode + * + * @param stepStatus aws emr execution status details of the cluster step. + * @return exitStatusCode + */ + private int calculateExitStatusCode(StepStatus stepStatus) { + if (stepStatus == null) { + return TaskConstants.EXIT_CODE_FAILURE; + } else { + String state = stepStatus.getState(); + StepState stepState = StepState.valueOf(state); + switch (stepState) { + case COMPLETED: + return TaskConstants.EXIT_CODE_SUCCESS; + case CANCELLED: + return TaskConstants.EXIT_CODE_KILL; + default: + return TaskConstants.EXIT_CODE_FAILURE; + } + } + + } + + private StepStatus getStepStatus() { + DescribeStepRequest describeStepRequest = new DescribeStepRequest().withClusterId(clusterId).withStepId(stepId); + DescribeStepResult result = emrClient.describeStep(describeStepRequest); + if (result == null) { + throw new EmrTaskException("fetch step status failed"); + } + StepStatus stepStatus = result.getStep().getStatus(); + logger.info("emr step [clusterId:{}, stepId:{}] running with status:{}", clusterId, stepId, stepStatus); + return stepStatus; + + } + + @Override + public void cancelApplication(boolean status) throws Exception { + super.cancelApplication(status); + logger.info("trying cancel emr step, taskId:{}, clusterId:{}, stepId:{}", this.taskExecutionContext.getTaskInstanceId(), clusterId, stepId); + CancelStepsRequest cancelStepsRequest = new CancelStepsRequest().withClusterId(clusterId).withStepIds(stepId); + CancelStepsResult cancelStepsResult = emrClient.cancelSteps(cancelStepsRequest); + + if (cancelStepsResult == null) { + throw new EmrTaskException("cancel emr step failed"); + } + + CancelStepsInfo cancelEmrStepInfo = cancelStepsResult.getCancelStepsInfoList() + .stream() + .filter(cancelStepsInfo -> cancelStepsInfo.getStepId().equals(stepId)) + .findFirst() + .orElseThrow(() -> new EmrTaskException("cancel emr step failed")); + + if (CancelStepsRequestStatus.FAILED.toString().equals(cancelEmrStepInfo.getStatus())) { + throw new EmrTaskException("cancel emr step failed, message:" + cancelEmrStepInfo.getReason()); + } + + logger.info("the result of cancel emr step is:{}", cancelStepsResult); + } + +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java similarity index 61% rename from dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrTask.java rename to dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java index ada0041bf9..ed42de2831 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java @@ -17,28 +17,13 @@ package org.apache.dolphinscheduler.plugin.task.emr; -import static com.fasterxml.jackson.databind.DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT; -import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES; -import static com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL; -import static com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_GETTERS; - -import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; -import org.apache.dolphinscheduler.spi.utils.JSONUtils; -import org.apache.dolphinscheduler.spi.utils.PropertyUtils; import java.util.HashSet; -import java.util.TimeZone; import java.util.concurrent.TimeUnit; import com.amazonaws.SdkBaseException; -import com.amazonaws.auth.AWSCredentialsProvider; -import com.amazonaws.auth.AWSStaticCredentialsProvider; -import com.amazonaws.auth.BasicAWSCredentials; -import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce; -import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder; import com.amazonaws.services.elasticmapreduce.model.ClusterState; import com.amazonaws.services.elasticmapreduce.model.ClusterStateChangeReason; import com.amazonaws.services.elasticmapreduce.model.ClusterStateChangeReasonCode; @@ -50,23 +35,9 @@ import com.amazonaws.services.elasticmapreduce.model.RunJobFlowResult; import com.amazonaws.services.elasticmapreduce.model.TerminateJobFlowsRequest; import com.amazonaws.services.elasticmapreduce.model.TerminateJobFlowsResult; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.PropertyNamingStrategy; import com.google.common.collect.Sets; -public class EmrTask extends AbstractTaskExecutor { - - /** - * taskExecutionContext - */ - private final TaskExecutionContext taskExecutionContext; - /** - * emr parameters - */ - private EmrParameters emrParameters; - private AmazonElasticMapReduce emrClient; - - private String clusterId; +public class EmrJobFlowTask extends AbstractEmrTask { private final HashSet waitingStateSet = Sets.newHashSet( ClusterState.STARTING.toString(), @@ -74,40 +45,13 @@ public class EmrTask extends AbstractTaskExecutor { ClusterState.RUNNING.toString() ); - /** - * config ObjectMapper features and propertyNamingStrategy - * use UpperCamelCaseStrategy support capital letters parse - * @see PropertyNamingStrategy.UpperCamelCaseStrategy - */ - private static final ObjectMapper objectMapper = new ObjectMapper() - .configure(FAIL_ON_UNKNOWN_PROPERTIES, false) - .configure(ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true) - .configure(READ_UNKNOWN_ENUM_VALUES_AS_NULL, true) - .configure(REQUIRE_SETTERS_FOR_GETTERS, true) - .setTimeZone(TimeZone.getDefault()) - .setPropertyNamingStrategy(new PropertyNamingStrategy.UpperCamelCaseStrategy()); - /** * constructor * * @param taskExecutionContext taskExecutionContext */ - protected EmrTask(TaskExecutionContext taskExecutionContext) { - + protected EmrJobFlowTask(TaskExecutionContext taskExecutionContext) { super(taskExecutionContext); - this.taskExecutionContext = taskExecutionContext; - } - - @Override - public void init() { - - final String taskParams = taskExecutionContext.getTaskParams(); - logger.info("emr task params:{}", taskParams); - emrParameters = JSONUtils.parseObject(taskParams, EmrParameters.class); - if (emrParameters == null || !emrParameters.checkParameters()) { - throw new EmrTaskException("emr task params is not valid"); - } - emrClient = createEmrClient(); } @Override @@ -120,7 +64,7 @@ public class EmrTask extends AbstractTaskExecutor { RunJobFlowResult result = emrClient.runJobFlow(runJobFlowRequest); clusterId = result.getJobFlowId(); - // TODO: Failover on EMR Task type has not been implemented. In this time, DS only supports failover on yarn task type . Other task type, such as EMR task, k8s task not ready yet. + // Failover on EMR Task type has not been implemented. In this time, DS only supports failover on yarn task type . Other task type, such as EMR task, k8s task not ready yet. setAppIds(clusterId); clusterStatus = getClusterStatus(); @@ -199,30 +143,6 @@ public class EmrTask extends AbstractTaskExecutor { } - @Override - public AbstractParameters getParameters() { - return emrParameters; - } - - /** - * create emr client from BasicAWSCredentials - * - * @return AmazonElasticMapReduce - */ - private AmazonElasticMapReduce createEmrClient() { - - final String awsAccessKeyId = PropertyUtils.getString(TaskConstants.AWS_ACCESS_KEY_ID); - final String awsSecretAccessKey = PropertyUtils.getString(TaskConstants.AWS_SECRET_ACCESS_KEY); - final String awsRegion = PropertyUtils.getString(TaskConstants.AWS_REGION); - final BasicAWSCredentials basicAWSCredentials = new BasicAWSCredentials(awsAccessKeyId, awsSecretAccessKey); - final AWSCredentialsProvider awsCredentialsProvider = new AWSStaticCredentialsProvider(basicAWSCredentials); - // create an EMR client - return AmazonElasticMapReduceClientBuilder.standard() - .withCredentials(awsCredentialsProvider) - .withRegion(awsRegion) - .build(); - } - @Override public void cancelApplication(boolean status) throws Exception { super.cancelApplication(status); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrParameters.java index 14396fcebc..1e38a66d81 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrParameters.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrParameters.java @@ -24,18 +24,41 @@ import org.apache.dolphinscheduler.spi.utils.StringUtils; import java.util.Collections; import java.util.List; +import lombok.Data; +import lombok.EqualsAndHashCode; + +@Data +@EqualsAndHashCode(callSuper = true) public class EmrParameters extends AbstractParameters { + /** + * emr program type + * 0 RUN_JOB_FLOW, 1 ADD_JOB_FLOW_STEPS + */ + private ProgramType programType; + /** * job flow define in json format + * * @see API_RunJobFlow_Examples */ private String jobFlowDefineJson; + /** + * steps define in json format + * + * @see API_AddJobFlowSteps_Examples + */ + private String stepsDefineJson; + @Override public boolean checkParameters() { - - return StringUtils.isNotEmpty(jobFlowDefineJson); + /* + * When saving a task, the programType cannot be empty and jobFlowDefineJson or stepsDefineJson cannot be empty: + * (1) When ProgramType is RUN_JOB_FLOW, jobFlowDefineJson cannot be empty. + * (2) When ProgramType is ADD_JOB_FLOW_STEPS, stepsDefineJson cannot be empty. + */ + return programType != null && (StringUtils.isNotEmpty(jobFlowDefineJson) || StringUtils.isNotEmpty(stepsDefineJson)); } @Override @@ -44,18 +67,12 @@ public class EmrParameters extends AbstractParameters { } - public String getJobFlowDefineJson() { - return jobFlowDefineJson; - } - - public void setJobFlowDefineJson(String jobFlowDefineJson) { - this.jobFlowDefineJson = jobFlowDefineJson; - } - @Override public String toString() { return "EmrParameters{" - + "jobFlowDefineJson='" + jobFlowDefineJson + '\'' + + "programType=" + programType + + ", jobFlowDefineJson='" + jobFlowDefineJson + '\'' + + ", stepsDefineJson='" + stepsDefineJson + '\'' + '}'; } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrTaskChannel.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrTaskChannel.java index 898362f2cc..e59b4e3613 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrTaskChannel.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrTaskChannel.java @@ -33,7 +33,15 @@ public class EmrTaskChannel implements TaskChannel { @Override public AbstractTask createTask(TaskExecutionContext taskRequest) { - return new EmrTask(taskRequest); + EmrParameters emrParameters = JSONUtils.parseObject(taskRequest.getTaskParams(), EmrParameters.class); + assert emrParameters != null; + if (ProgramType.RUN_JOB_FLOW.equals(emrParameters.getProgramType())) { + return new EmrJobFlowTask(taskRequest); + } else if (ProgramType.ADD_JOB_FLOW_STEPS.equals(emrParameters.getProgramType())) { + return new EmrAddStepsTask(taskRequest); + } else { + throw new IllegalArgumentException("Unsupported program type: " + emrParameters.getProgramType()); + } } @Override diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/ProgramType.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/ProgramType.java new file mode 100644 index 0000000000..0f0870c119 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/ProgramType.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.emr; + +/** + * emr program type + * + * @since v3.1.0 + */ +public enum ProgramType { + /** + * RunJobFlow + */ + RUN_JOB_FLOW, + /** + * AddJobFlowSteps + */ + ADD_JOB_FLOW_STEPS +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTaskTest.java new file mode 100644 index 0000000000..d74d36fa82 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTaskTest.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.emr; + +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS; + +import static org.mockito.ArgumentMatchers.any; +import static org.powermock.api.mockito.PowerMockito.doReturn; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.mockStatic; +import static org.powermock.api.mockito.PowerMockito.spy; +import static org.powermock.api.mockito.PowerMockito.when; + +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.spi.utils.JSONUtils; + +import org.apache.commons.io.IOUtils; + +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.Collections; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce; +import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder; +import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsResult; +import com.amazonaws.services.elasticmapreduce.model.AmazonElasticMapReduceException; +import com.amazonaws.services.elasticmapreduce.model.DescribeStepResult; +import com.amazonaws.services.elasticmapreduce.model.Step; +import com.amazonaws.services.elasticmapreduce.model.StepState; +import com.amazonaws.services.elasticmapreduce.model.StepStatus; + +/** + * EmrAddStepsTask Test + * + * @since v3.1.0 + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest({ + AmazonElasticMapReduceClientBuilder.class, + EmrAddStepsTask.class, + AmazonElasticMapReduce.class, + JSONUtils.class +}) +@PowerMockIgnore({"javax.*"}) +public class EmrAddStepsTaskTest { + + private final StepStatus pendingState = + new StepStatus().withState(StepState.PENDING); + + private final StepStatus runningState = + new StepStatus().withState(StepState.RUNNING); + + private final StepStatus completedState = + new StepStatus().withState(StepState.COMPLETED); + + private final StepStatus cancelledState = + new StepStatus().withState(StepState.CANCELLED); + + private final StepStatus failedState = + new StepStatus().withState(StepState.FAILED); + + private EmrAddStepsTask emrAddStepsTask; + private AmazonElasticMapReduce emrClient; + private Step step; + + @Before + public void before() throws Exception { + // mock EmrParameters and EmrAddStepsTask + EmrParameters emrParameters = buildEmrTaskParameters(); + String emrParametersString = JSONUtils.toJsonString(emrParameters); + TaskExecutionContext taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class); + when(taskExecutionContext.getTaskParams()).thenReturn(emrParametersString); + emrAddStepsTask = spy(new EmrAddStepsTask(taskExecutionContext)); + + // mock emrClient and behavior + emrClient = mock(AmazonElasticMapReduce.class); + + AddJobFlowStepsResult addJobFlowStepsResult = mock(AddJobFlowStepsResult.class); + when(emrClient.addJobFlowSteps(any())).thenReturn(addJobFlowStepsResult); + when(addJobFlowStepsResult.getStepIds()).thenReturn(Collections.singletonList("step-xx")); + + doReturn(emrClient).when(emrAddStepsTask, "createEmrClient"); + DescribeStepResult describeStepResult = mock(DescribeStepResult.class); + when(emrClient.describeStep(any())).thenReturn(describeStepResult); + + // mock step + step = mock(Step.class); + when(describeStepResult.getStep()).thenReturn(step); + + emrAddStepsTask.init(); + } + + @Test + public void testCanNotParseJson() throws Exception { + mockStatic(JSONUtils.class); + when(emrAddStepsTask, "createAddJobFlowStepsRequest").thenThrow(new EmrTaskException("can not parse AddJobFlowStepsRequest from json", new Exception("error"))); + emrAddStepsTask.handle(); + Assert.assertEquals(EXIT_CODE_FAILURE, emrAddStepsTask.getExitStatusCode()); + } + + @Test + public void testDefineJsonStepNotOne() throws Exception { + // mock EmrParameters and EmrAddStepsTask + EmrParameters emrParameters = buildErrorEmrTaskParameters(); + String emrParametersString = JSONUtils.toJsonString(emrParameters); + TaskExecutionContext taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class); + when(taskExecutionContext.getTaskParams()).thenReturn(emrParametersString); + emrAddStepsTask = spy(new EmrAddStepsTask(taskExecutionContext)); + doReturn(emrClient).when(emrAddStepsTask, "createEmrClient"); + emrAddStepsTask.init(); + emrAddStepsTask.handle(); + + Assert.assertEquals(EXIT_CODE_FAILURE, emrAddStepsTask.getExitStatusCode()); + } + + @Test + public void testHandle() throws Exception { + when(step.getStatus()).thenReturn(pendingState, runningState, completedState); + + emrAddStepsTask.handle(); + Assert.assertEquals(EXIT_CODE_SUCCESS, emrAddStepsTask.getExitStatusCode()); + } + + @Test + public void testHandleUserRequestTerminate() throws Exception { + when(step.getStatus()).thenReturn(pendingState, runningState, cancelledState); + + emrAddStepsTask.handle(); + Assert.assertEquals(EXIT_CODE_KILL, emrAddStepsTask.getExitStatusCode()); + } + + @Test + public void testHandleError() throws Exception { + when(step.getStatus()).thenReturn(pendingState, runningState, failedState); + emrAddStepsTask.handle(); + Assert.assertEquals(EXIT_CODE_FAILURE, emrAddStepsTask.getExitStatusCode()); + + when(emrClient.addJobFlowSteps(any())).thenThrow(new AmazonElasticMapReduceException("error"), new EmrTaskException()); + emrAddStepsTask.handle(); + Assert.assertEquals(EXIT_CODE_FAILURE, emrAddStepsTask.getExitStatusCode()); + } + + private EmrParameters buildEmrTaskParameters() { + EmrParameters emrParameters = new EmrParameters(); + String stepsDefineJson; + try (InputStream i = this.getClass().getResourceAsStream("EmrAddStepsDefine.json")) { + assert i != null; + stepsDefineJson = IOUtils.toString(i, StandardCharsets.UTF_8); + } catch (Exception e) { + throw new RuntimeException(e); + } + emrParameters.setProgramType(ProgramType.ADD_JOB_FLOW_STEPS); + emrParameters.setStepsDefineJson(stepsDefineJson); + + return emrParameters; + } + + private EmrParameters buildErrorEmrTaskParameters() { + EmrParameters emrParameters = new EmrParameters(); + String stepsDefineJson; + try (InputStream i = this.getClass().getResourceAsStream("EmrErrorAddStepsDefine.json")) { + assert i != null; + stepsDefineJson = IOUtils.toString(i, StandardCharsets.UTF_8); + } catch (Exception e) { + throw new RuntimeException(e); + } + emrParameters.setProgramType(ProgramType.ADD_JOB_FLOW_STEPS); + emrParameters.setStepsDefineJson(stepsDefineJson); + + return emrParameters; + } +} \ No newline at end of file diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTaskTest.java similarity index 84% rename from dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrTaskTest.java rename to dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTaskTest.java index 285078f21f..65c6c0c239 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrTaskTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTaskTest.java @@ -59,12 +59,12 @@ import com.amazonaws.services.elasticmapreduce.model.RunJobFlowResult; @RunWith(PowerMockRunner.class) @PrepareForTest({ AmazonElasticMapReduceClientBuilder.class, - EmrTask.class, + EmrJobFlowTask.class, AmazonElasticMapReduce.class, JSONUtils.class }) @PowerMockIgnore({"javax.*"}) -public class EmrTaskTest { +public class EmrJobFlowTaskTest { private final ClusterStatus startingStatus = new ClusterStatus().withState(ClusterState.STARTING) @@ -114,7 +114,7 @@ public class EmrTaskTest { .withCode(ClusterStateChangeReasonCode.STEP_FAILURE) ); - private EmrTask emrTask; + private EmrJobFlowTask emrJobFlowTask; private AmazonElasticMapReduce emrClient; private Cluster cluster; @@ -123,14 +123,14 @@ public class EmrTaskTest { String emrParameters = buildEmrTaskParameters(); TaskExecutionContext taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class); when(taskExecutionContext.getTaskParams()).thenReturn(emrParameters); - emrTask = spy(new EmrTask(taskExecutionContext)); + emrJobFlowTask = spy(new EmrJobFlowTask(taskExecutionContext)); // mock emrClient and behavior emrClient = mock(AmazonElasticMapReduce.class); RunJobFlowResult runJobFlowResult = mock(RunJobFlowResult.class); when(emrClient.runJobFlow(any())).thenReturn(runJobFlowResult); when(runJobFlowResult.getJobFlowId()).thenReturn("xx"); - doReturn(emrClient).when(emrTask, "createEmrClient"); + doReturn(emrClient).when(emrJobFlowTask, "createEmrClient"); DescribeClusterResult describeClusterResult = mock(DescribeClusterResult.class); when(emrClient.describeCluster(any())).thenReturn(describeClusterResult); @@ -138,7 +138,7 @@ public class EmrTaskTest { cluster = mock(Cluster.class); when(describeClusterResult.getCluster()).thenReturn(cluster); - emrTask.init(); + emrJobFlowTask.init(); } @Test @@ -146,8 +146,8 @@ public class EmrTaskTest { when(cluster.getStatus()).thenReturn(startingStatus, softwareConfigStatus, runningStatus, terminatingStatus); - emrTask.handle(); - Assert.assertEquals(EXIT_CODE_SUCCESS, emrTask.getExitStatusCode()); + emrJobFlowTask.handle(); + Assert.assertEquals(EXIT_CODE_SUCCESS, emrJobFlowTask.getExitStatusCode()); } @@ -155,32 +155,32 @@ public class EmrTaskTest { public void testHandleAliveWhenNoSteps() throws Exception { when(cluster.getStatus()).thenReturn(startingStatus, softwareConfigStatus, runningStatus, waitingStatus); - emrTask.handle(); - Assert.assertEquals(EXIT_CODE_SUCCESS, emrTask.getExitStatusCode()); + emrJobFlowTask.handle(); + Assert.assertEquals(EXIT_CODE_SUCCESS, emrJobFlowTask.getExitStatusCode()); } @Test public void testHandleUserRequestTerminate() throws Exception { when(cluster.getStatus()).thenReturn(startingStatus, userRequestTerminateStatus); - emrTask.handle(); - Assert.assertEquals(EXIT_CODE_KILL, emrTask.getExitStatusCode()); + emrJobFlowTask.handle(); + Assert.assertEquals(EXIT_CODE_KILL, emrJobFlowTask.getExitStatusCode()); } @Test public void testHandleTerminatedWithError() throws Exception { when(cluster.getStatus()).thenReturn(startingStatus, softwareConfigStatus, runningStatus, terminatedWithErrorsStatus); - emrTask.handle(); - Assert.assertEquals(EXIT_CODE_FAILURE, emrTask.getExitStatusCode()); + emrJobFlowTask.handle(); + Assert.assertEquals(EXIT_CODE_FAILURE, emrJobFlowTask.getExitStatusCode()); } @Test public void testCanNotParseJson() throws Exception { mockStatic(JSONUtils.class); - when(emrTask, "createRunJobFlowRequest").thenThrow(new EmrTaskException("can not parse RunJobFlowRequest from json", new Exception("error"))); - emrTask.handle(); - Assert.assertEquals(EXIT_CODE_FAILURE, emrTask.getExitStatusCode()); + when(emrJobFlowTask, "createRunJobFlowRequest").thenThrow(new EmrTaskException("can not parse RunJobFlowRequest from json", new Exception("error"))); + emrJobFlowTask.handle(); + Assert.assertEquals(EXIT_CODE_FAILURE, emrJobFlowTask.getExitStatusCode()); } @Test @@ -188,18 +188,18 @@ public class EmrTaskTest { when(emrClient.describeCluster(any())).thenReturn(null); - emrTask.handle(); - Assert.assertEquals(EXIT_CODE_FAILURE, emrTask.getExitStatusCode()); + emrJobFlowTask.handle(); + Assert.assertEquals(EXIT_CODE_FAILURE, emrJobFlowTask.getExitStatusCode()); } @Test public void testRunJobFlowError() throws Exception { when(emrClient.runJobFlow(any())).thenThrow(new AmazonElasticMapReduceException("error"), new EmrTaskException()); - emrTask.handle(); - Assert.assertEquals(EXIT_CODE_FAILURE, emrTask.getExitStatusCode()); - emrTask.handle(); - Assert.assertEquals(EXIT_CODE_FAILURE, emrTask.getExitStatusCode()); + emrJobFlowTask.handle(); + Assert.assertEquals(EXIT_CODE_FAILURE, emrJobFlowTask.getExitStatusCode()); + emrJobFlowTask.handle(); + Assert.assertEquals(EXIT_CODE_FAILURE, emrJobFlowTask.getExitStatusCode()); } @@ -212,6 +212,7 @@ public class EmrTaskTest { } catch (Exception e) { throw new RuntimeException(e); } + emrParameters.setProgramType(ProgramType.RUN_JOB_FLOW); emrParameters.setJobFlowDefineJson(jobFlowDefineJson); return JSONUtils.toJsonString(emrParameters); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/resources/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsDefine.json b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/resources/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsDefine.json new file mode 100644 index 0000000000..a14a259a88 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/resources/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsDefine.json @@ -0,0 +1,17 @@ +{ + "JobFlowId": "j-3V628TKAERHP8", + "Steps": [ + { + "Name": "calculate_pi", + "ActionOnFailure": "CONTINUE", + "HadoopJarStep": { + "Jar": "command-runner.jar", + "Args": [ + "/usr/lib/spark/bin/run-example", + "SparkPi", + "15" + ] + } + } + ] +} \ No newline at end of file diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/resources/org/apache/dolphinscheduler/plugin/task/emr/EmrErrorAddStepsDefine.json b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/resources/org/apache/dolphinscheduler/plugin/task/emr/EmrErrorAddStepsDefine.json new file mode 100644 index 0000000000..46e81b13ba --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/resources/org/apache/dolphinscheduler/plugin/task/emr/EmrErrorAddStepsDefine.json @@ -0,0 +1,29 @@ +{ + "JobFlowId": "j-3V628TKAERHP8", + "Steps": [ + { + "Name": "calculate_pi", + "ActionOnFailure": "CONTINUE", + "HadoopJarStep": { + "Jar": "command-runner.jar", + "Args": [ + "/usr/lib/spark/bin/run-example", + "SparkPi", + "15" + ] + } + }, + { + "Name": "calculate_pi", + "ActionOnFailure": "CONTINUE", + "HadoopJarStep": { + "Jar": "command-runner.jar", + "Args": [ + "/usr/lib/spark/bin/run-example", + "SparkPi", + "15" + ] + } + } + ] +} \ No newline at end of file diff --git a/dolphinscheduler-ui/src/locales/en_US/project.ts b/dolphinscheduler-ui/src/locales/en_US/project.ts index cf2cef1271..43cba0d3d6 100644 --- a/dolphinscheduler-ui/src/locales/en_US/project.ts +++ b/dolphinscheduler-ui/src/locales/en_US/project.ts @@ -607,6 +607,8 @@ export default { required: 'required', emr_flow_define_json: 'jobFlowDefineJson', emr_flow_define_json_tips: 'Please enter the definition of the job flow.', + emr_steps_define_json: 'stepsDefineJson', + emr_steps_define_json_tips: 'Please enter the definition of the emr step.', segment_separator: 'Segment Execution Separator', segment_separator_tips: 'Please enter the segment execution separator', zeppelin_note_id: 'zeppelinNoteId', diff --git a/dolphinscheduler-ui/src/locales/zh_CN/project.ts b/dolphinscheduler-ui/src/locales/zh_CN/project.ts index ab2c9c71f4..3706771854 100644 --- a/dolphinscheduler-ui/src/locales/zh_CN/project.ts +++ b/dolphinscheduler-ui/src/locales/zh_CN/project.ts @@ -600,6 +600,8 @@ export default { required: '必填', emr_flow_define_json: 'jobFlowDefineJson', emr_flow_define_json_tips: '请输入工作流定义', + emr_steps_define_json: 'stepsDefineJson', + emr_steps_define_json_tips: '请输入EMR步骤定义', segment_separator: '分段执行符号', segment_separator_tips: '请输入分段执行符号', zeppelin_note_id: 'zeppelin_note_id', diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-emr.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-emr.ts index a67b370d18..1446fc866a 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-emr.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-emr.ts @@ -17,14 +17,30 @@ import { useI18n } from 'vue-i18n' import { useCustomParams } from '.' import type { IJsonItem } from '../types' +import {computed} from "vue"; export function useEmr(model: { [field: string]: any }): IJsonItem[] { const { t } = useI18n() + const jobFlowDefineJsonSpan = computed(() => (model.programType === 'RUN_JOB_FLOW' ? 24 : 0)) + + const stepsDefineJsonSpan = computed(() => (model.programType === 'ADD_JOB_FLOW_STEPS' ? 24 : 0)) + return [ + { + type: 'select', + field: 'programType', + span: 24, + name: t('project.node.program_type'), + options: PROGRAM_TYPES, + validate: { + required: true + } + }, { type: 'editor', field: 'jobFlowDefineJson', + span: jobFlowDefineJsonSpan, name: t('project.node.emr_flow_define_json'), props: { language: 'json' @@ -35,6 +51,20 @@ export function useEmr(model: { [field: string]: any }): IJsonItem[] { message: t('project.node.emr_flow_define_json_tips') } }, + { + type: 'editor', + field: 'stepsDefineJson', + span: stepsDefineJsonSpan, + name: t('project.node.emr_steps_define_json'), + props: { + language: 'json' + }, + validate: { + trigger: ['input', 'trigger'], + required: true, + message: t('project.node.emr_steps_define_json_tips') + } + }, ...useCustomParams({ model, field: 'localParams', @@ -42,3 +72,14 @@ export function useEmr(model: { [field: string]: any }): IJsonItem[] { }) ] } + +export const PROGRAM_TYPES = [ + { + label: 'RUN_JOB_FLOW', + value: 'RUN_JOB_FLOW' + }, + { + label: 'ADD_JOB_FLOW_STEPS', + value: 'ADD_JOB_FLOW_STEPS' + } +] \ No newline at end of file diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts index c90d1660db..4ab18ccd05 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts @@ -321,7 +321,9 @@ export function formatParams(data: INodeData): { if (data.taskType === 'EMR') { taskParams.type = data.type + taskParams.programType = data.programType taskParams.jobFlowDefineJson = data.jobFlowDefineJson + taskParams.stepsDefineJson = data.stepsDefineJson } if (data.taskType === 'ZEPPELIN') { diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-emr.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-emr.ts index 1face687fb..36dbbefa79 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-emr.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-emr.ts @@ -44,6 +44,7 @@ export function useEmr({ workerGroup: 'default', delayTime: 0, timeout: 30, + programType: 'ADD_JOB_FLOW_STEPS', timeoutNotifyStrategy: ['WARN'] } as INodeData) diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts index 5019f29ce1..187ff7532f 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts @@ -295,6 +295,7 @@ interface ITaskParams { ruleId?: number ruleInputParameter?: IRuleParameters jobFlowDefineJson?: string + stepsDefineJson?: string zeppelinNoteId?: string zeppelinParagraphId?: string noteId?: string