diff --git a/docs/docs/en/guide/task/aliyun-serverless-spark.md b/docs/docs/en/guide/task/aliyun-serverless-spark.md new file mode 100644 index 0000000000..6980401bbd --- /dev/null +++ b/docs/docs/en/guide/task/aliyun-serverless-spark.md @@ -0,0 +1,111 @@ +# Aliyun EMR Serverless Spark + +## Introduction + +`Aliyun EMR Serverless Spark` task plugin submits spark job to +[`Aliyun EMR Serverless Spark`](https://help.aliyun.com/zh/emr/emr-serverless-spark/product-overview/what-is-emr-serverless-spark) service. + +## Create Connections + +- Click `Datasource -> Create Datasource -> ALIYUN_SERVERLESS_SPARK` to create a connection. + +![demo-aliyun-serverless-spark-create-datasource-1](../../../../img/tasks/demo/aliyun_serverless_spark_1.png) + +- Fill in `Datasource Name`, `Access Key Id`, `Access Key Secret`, `Region Id` and click `Confirm`. + +![demo-aliyun-serverless-spark-create-datasource-2](../../../../img/tasks/demo/aliyun_serverless_spark_2.png) + +## Create Tasks + +- Click `Porject -> Workflow Definition -> Create Workflow` and drag the `ALIYUN_SERVERLESS_SPARK` task to the canvas. + +![demo-aliyun-serverless-spark-create-task-1](../../../../img/tasks/demo/aliyun_serverless_spark_3.png) + +- Fill in the task parameters and click `Confirm` to create the task node. + +![demo-aliyun-serverless-spark-create-task-2](../../../../img/tasks/demo/aliyun_serverless_spark_4.png) + +## Task Parameters + +- Please refer to [DolphinScheduler Task Parameters Appendix](appendix.md) `Default Task Parameters` section for default parameters. + +| **Parameters** | **Description** | +|-------------------------|-----------------------------------------------------------------------------------------------------| +| Datasource types | The type of datasource the task uses, should be `ALIYUN_SERVERLESS_SPARK`. | +| Datasource instances | The instance of `ALIYUN_SERVERLESS_SPARK` datasource. | +| workspace id | `Aliyun Serverless Spark` workspace id. | +| resource queue id | `Aliyun Serverless Spark` resource queue the task uses to submit spark job. | +| code type | `Aliyun Serverless Spark` code type, could be `JAR`, `PYTHON` or `SQL`. | +| job name | `Aliyun Serverless Spark` job name. | +| entry point | The location of the job code such as jar package, python file, or sql file. OSS location supported. | +| entry point arguments | Arguments of the job main program. | +| spark submit parameters | Spark-submit related parameters. | +| engine release version | Spark engine release version. | +| is production | Whether the spark job runs in production or development environment. | + +## Examples + +### Submit Jar tasks + +| **Parameters** | **Example Values / Operations** | +|-------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| region id | cn-hangzhou | +| access key id | | +| access key secret | | +| resource queue id | root_queue | +| code type | JAR | +| job name | ds-emr-spark-jar | +| entry point | oss://datadev-oss-hdfs-test/spark-resource/examples/jars/spark-examples_2.12-3.3.1.jar | +| entry point arguments | 100 | +| spark submit parameters | --class org.apache.spark.examples.SparkPi --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1 | +| engine release version | esr-2.1-native (Spark 3.3.1, Scala 2.12, Native Runtime) | +| is production | Please open the switch | + +### Submit SQL tasks + +| **Parameters** | **Example Values / Operations** | +|-------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| region id | cn-hangzhou | +| access key id | | +| access key secret | | +| resource queue id | root_queue | +| code type | SQL | +| job name | ds-emr-spark-sql-1 | +| entry point | Any non-empty string | +| entry point arguments | -e#show tables;show tables; | +| spark submit parameters | --class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1 | +| engine release version | esr-2.1-native (Spark 3.3.1, Scala 2.12, Native Runtime) | +| is production | Please open the switch | + +### Submit SQL tasks located in OSS + +| **Parameters** | **Example Values / Operations** | +|-------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| region id | cn-hangzhou | +| access key id | | +| access key secret | | +| resource queue id | root_queue | +| code type | SQL | +| job name | ds-emr-spark-sql-2 | +| entry point | Any non-empty string | +| entry point arguments | -f#oss://datadev-oss-hdfs-test/spark-resource/examples/sql/show_db.sql | +| spark submit parameters | --class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1" | +| engine release version | esr-2.1-native (Spark 3.3.1, Scala 2.12, Native Runtime) | +| is production | Please open the switch | + +### Submit PySpark Tasks + +| **Parameters** | **Example Values / Operations** | +|-------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------| +| region id | cn-hangzhou | +| access key id | | +| access key secret | | +| resource queue id | root_queue | +| code type | PYTHON | +| job name | ds-emr-spark-python | +| entry point | oss://datadev-oss-hdfs-test/spark-resource/examples/src/main/python/pi.py | +| entry point arguments | 100 | +| spark submit parameters | --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1 | +| engine release version | esr-2.1-native (Spark 3.3.1, Scala 2.12, Native Runtime) | +| is production | Please open the switch | + diff --git a/docs/docs/zh/guide/task/aliyun-serverless-spark.md b/docs/docs/zh/guide/task/aliyun-serverless-spark.md new file mode 100644 index 0000000000..2a10ecf6f2 --- /dev/null +++ b/docs/docs/zh/guide/task/aliyun-serverless-spark.md @@ -0,0 +1,111 @@ +# Aliyun EMR Serverless Spark + +## 简介 + +`Aliyun EMR Serverless Spark` 任务插件用于向 +[`阿里云EMR Serverless Spark`](https://help.aliyun.com/zh/emr/emr-serverless-spark/product-overview/what-is-emr-serverless-spark) 服务提交作业。 + +## 创建链接 + +- 点击 `数据源 -> 创建数据源 -> ALIYUN_SERVERLESS_SPARK` 创建链接。 + +![demo-aliyun-serverless-spark-create-datasource-1](../../../../img/tasks/demo/aliyun_serverless_spark_1.png) + +- 填入 `Datasource Name`, `Access Key Id`, `Access Key Secret`, `Region Id` 参数并且点击 `确认`. + +![demo-aliyun-serverless-spark-create-datasource-2](../../../../img/tasks/demo/aliyun_serverless_spark_2.png) + +## 创建任务节点 + +- 点击 `项目 -> 工作流定义 -> 创建工作流` 并且将 `ALIYUN_SERVERLESS_SPARK` 任务拖到画板中。 + +![demo-aliyun-serverless-spark-create-task-1](../../../../img/tasks/demo/aliyun_serverless_spark_3.png) + +- 填入相关任务参数并且点击 `确认` 创建任务节点。 + +![demo-aliyun-serverless-spark-create-task-2](../../../../img/tasks/demo/aliyun_serverless_spark_4.png) + +## 任务参数 + +- 默认参数说明请参考[DolphinScheduler任务参数附录](appendix.md)`默认任务参数`一栏。 + +| **任务参数** | **描述** | +|-------------------------|----------------------------------------------------------| +| Datasource types | 链接类型,应该选择 `ALIYUN_SERVERLESS_SPARK`。 | +| Datasource instances | `ALIYUN_SERVERLESS_SPARK` 链接实例。 | +| workspace id | `Aliyun Serverless Spark` 工作空间id。 | +| resource queue id | `Aliyun Serverless Spark` 任务队列id。 | +| code type | `Aliyun Serverless Spark` 任务类型,可以是`JAR`、`PYTHON`或者`SQL`。 | +| job name | `Aliyun Serverless Spark` 任务名。 | +| entry point | 任务代码(JAR包、PYTHON / SQL脚本)的位置,支持OSS中的文件。 | +| entry point arguments | 主程序入口参数。 | +| spark submit parameters | Spark-submit相关参数。 | +| engine release version | Spark引擎版本。 | +| is production | Spark任务是否运行在生产环境中。 | + +## 示例 + +### 提交jar类型任务 + +| **参数名** | **参数值 / 按钮操作** | +|-------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| region id | cn-hangzhou | +| access key id | | +| access key secret | | +| resource queue id | root_queue | +| code type | JAR | +| job name | ds-emr-spark-jar | +| entry point | oss://datadev-oss-hdfs-test/spark-resource/examples/jars/spark-examples_2.12-3.3.1.jar | +| entry point arguments | 100 | +| spark submit parameters | --class org.apache.spark.examples.SparkPi --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1 | +| engine release version | esr-2.1-native (Spark 3.3.1, Scala 2.12, Native Runtime) | +| is production | 请您将按钮打开 | + +### 提交sql类型任务 + +| **参数名** | **参数值 / 按钮操作** | +|-------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| region id | cn-hangzhou | +| access key id | | +| access key secret | | +| resource queue id | root_queue | +| code type | SQL | +| job name | ds-emr-spark-sql-1 | +| entry point | 任意非空值 | +| entry point arguments | -e#show tables;show tables; | +| spark submit parameters | --class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1 | +| engine release version | esr-2.1-native (Spark 3.3.1, Scala 2.12, Native Runtime) | +| is production | 请您将按钮打开 | + +### 提交oss中的sql脚本任务 + +| **参数名** | **参数值 / 按钮操作** | +|-------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| region id | cn-hangzhou | +| access key id | | +| access key secret | | +| resource queue id | root_queue | +| code type | SQL | +| job name | ds-emr-spark-sql-2 | +| entry point | 任意非空值 | +| entry point arguments | -f#oss://datadev-oss-hdfs-test/spark-resource/examples/sql/show_db.sql | +| spark submit parameters | --class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1" | +| engine release version | esr-2.1-native (Spark 3.3.1, Scala 2.12, Native Runtime) | +| is production | 请您将按钮打开 | + +### 提交pyspark任务 + +| **参数名** | **参数值 / 按钮操作** | +|-------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------| +| region id | cn-hangzhou | +| access key id | | +| access key secret | | +| resource queue id | root_queue | +| code type | PYTHON | +| job name | ds-emr-spark-python | +| entry point | oss://datadev-oss-hdfs-test/spark-resource/examples/src/main/python/pi.py | +| entry point arguments | 100 | +| spark submit parameters | --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1 | +| engine release version | esr-2.1-native (Spark 3.3.1, Scala 2.12, Native Runtime) | +| is production | 请您将按钮打开 | + diff --git a/docs/img/tasks/demo/aliyun_serverless_spark_1.png b/docs/img/tasks/demo/aliyun_serverless_spark_1.png new file mode 100644 index 0000000000..04a793d6ba Binary files /dev/null and b/docs/img/tasks/demo/aliyun_serverless_spark_1.png differ diff --git a/docs/img/tasks/demo/aliyun_serverless_spark_2.png b/docs/img/tasks/demo/aliyun_serverless_spark_2.png new file mode 100644 index 0000000000..3d096a89fe Binary files /dev/null and b/docs/img/tasks/demo/aliyun_serverless_spark_2.png differ diff --git a/docs/img/tasks/demo/aliyun_serverless_spark_3.png b/docs/img/tasks/demo/aliyun_serverless_spark_3.png new file mode 100644 index 0000000000..b7c96133f6 Binary files /dev/null and b/docs/img/tasks/demo/aliyun_serverless_spark_3.png differ diff --git a/docs/img/tasks/demo/aliyun_serverless_spark_4.png b/docs/img/tasks/demo/aliyun_serverless_spark_4.png new file mode 100644 index 0000000000..61d8370a22 Binary files /dev/null and b/docs/img/tasks/demo/aliyun_serverless_spark_4.png differ diff --git a/dolphinscheduler-api/src/main/resources/task-type-config.yaml b/dolphinscheduler-api/src/main/resources/task-type-config.yaml index 05d1e6290a..d92f41f3c5 100644 --- a/dolphinscheduler-api/src/main/resources/task-type-config.yaml +++ b/dolphinscheduler-api/src/main/resources/task-type-config.yaml @@ -35,6 +35,7 @@ task: - 'K8S' - 'DMS' - 'DATA_FACTORY' + - 'ALIYUN_SERVERLESS_SPARK' logic: - 'SUB_PROCESS' - 'DEPENDENT' diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/pom.xml b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/pom.xml new file mode 100644 index 0000000000..80226fe367 --- /dev/null +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/pom.xml @@ -0,0 +1,57 @@ + + + + 4.0.0 + + org.apache.dolphinscheduler + dolphinscheduler-datasource-plugin + dev-SNAPSHOT + + + dolphinscheduler-datasource-aliyunserverlessspark + jar + ${project.artifactId} + + + + org.apache.dolphinscheduler + dolphinscheduler-spi + provided + + + + org.apache.dolphinscheduler + dolphinscheduler-datasource-api + ${project.version} + + + + com.aliyun + emr_serverless_spark20230808 + 1.0.0 + + + + com.aliyun + credentials-java + 0.3.0 + + + + diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/datasource/aliyunserverlessspark/AliyunServerlessSparkClientWrapper.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/datasource/aliyunserverlessspark/AliyunServerlessSparkClientWrapper.java new file mode 100644 index 0000000000..55c21972ef --- /dev/null +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/datasource/aliyunserverlessspark/AliyunServerlessSparkClientWrapper.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.datasource.aliyunserverlessspark; + +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.commons.lang3.StringUtils; + +import lombok.extern.slf4j.Slf4j; + +import com.aliyun.emr_serverless_spark20230808.Client; +import com.aliyun.teaopenapi.models.Config; + +@Slf4j +public class AliyunServerlessSparkClientWrapper implements AutoCloseable { + + private Client aliyunServerlessSparkClient; + + public AliyunServerlessSparkClientWrapper( + String accessKeyId, + String accessKeySecret, + String regionId, + String endpoint) + throws Exception { + + checkNotNull(accessKeyId, accessKeySecret, regionId); + + if (StringUtils.isEmpty(endpoint)) { + endpoint = String.format(AliyunServerlessSparkConstants.ENDPOINT_TEMPLATE, regionId); + } + + Config config = new Config() + .setEndpoint(endpoint) + .setAccessKeyId(accessKeyId) + .setAccessKeySecret(accessKeySecret); + aliyunServerlessSparkClient = new Client(config); + } + + // TODO: update checkConnect when aliyun serverless spark service support the service connection check + public boolean checkConnect(String accessKeyId, String accessKeySecret, String regionId) { + try { + // If the login fails, an exception will be thrown directly + return true; + } catch (Exception e) { + log.info("spark client failed to connect to the server", e); + return false; + } + } + + @Override + public void close() throws Exception { + + } +} diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/datasource/aliyunserverlessspark/AliyunServerlessSparkConstants.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/datasource/aliyunserverlessspark/AliyunServerlessSparkConstants.java new file mode 100644 index 0000000000..d28bd3e7ee --- /dev/null +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/datasource/aliyunserverlessspark/AliyunServerlessSparkConstants.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.datasource.aliyunserverlessspark; + +import lombok.experimental.UtilityClass; + +@UtilityClass +public class AliyunServerlessSparkConstants { + + public String ENDPOINT_TEMPLATE = "emr-serverless-spark.%s.aliyuncs.com"; + + public String DEFAULT_ENGINE = "esr-2.1-native (Spark 3.3.1, Scala 2.12, Native Runtime)"; + + public String ENV_PROD = "production"; + + public String ENV_DEV = "dev"; + + public String ENTRY_POINT_ARGUMENTS_DELIMITER = "#"; + + public String ENV_KEY = "environment"; + + public String WORKFLOW_KEY = "workflow"; + + public String WORKFLOW_VALUE = "true"; + +} diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/datasource/aliyunserverlessspark/AliyunServerlessSparkDataSourceChannel.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/datasource/aliyunserverlessspark/AliyunServerlessSparkDataSourceChannel.java new file mode 100644 index 0000000000..3821a1fdbe --- /dev/null +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/datasource/aliyunserverlessspark/AliyunServerlessSparkDataSourceChannel.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.datasource.aliyunserverlessspark; + +import org.apache.dolphinscheduler.spi.datasource.AdHocDataSourceClient; +import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam; +import org.apache.dolphinscheduler.spi.datasource.DataSourceChannel; +import org.apache.dolphinscheduler.spi.datasource.PooledDataSourceClient; +import org.apache.dolphinscheduler.spi.enums.DbType; + +public class AliyunServerlessSparkDataSourceChannel implements DataSourceChannel { + + @Override + public AdHocDataSourceClient createAdHocDataSourceClient(BaseConnectionParam baseConnectionParam, DbType dbType) { + throw new UnsupportedOperationException("Aliyun Serverless Spark AdHocDataSourceClient is not supported"); + } + + @Override + public PooledDataSourceClient createPooledDataSourceClient(BaseConnectionParam baseConnectionParam, DbType dbType) { + throw new UnsupportedOperationException("Aliyun Serverless Spark AdHocDataSourceClient is not supported"); + } +} diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/datasource/aliyunserverlessspark/AliyunServerlessSparkDataSourceChannelFactory.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/datasource/aliyunserverlessspark/AliyunServerlessSparkDataSourceChannelFactory.java new file mode 100644 index 0000000000..851110aeab --- /dev/null +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/datasource/aliyunserverlessspark/AliyunServerlessSparkDataSourceChannelFactory.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.datasource.aliyunserverlessspark; + +import org.apache.dolphinscheduler.spi.datasource.DataSourceChannel; +import org.apache.dolphinscheduler.spi.datasource.DataSourceChannelFactory; +import org.apache.dolphinscheduler.spi.enums.DbType; + +import com.google.auto.service.AutoService; + +@AutoService(DataSourceChannelFactory.class) +public class AliyunServerlessSparkDataSourceChannelFactory implements DataSourceChannelFactory { + + @Override + public DataSourceChannel create() { + return new AliyunServerlessSparkDataSourceChannel(); + } + + @Override + public String getName() { + return DbType.ALIYUN_SERVERLESS_SPARK.getName(); + } + +} diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/datasource/aliyunserverlessspark/AliyunServerlessSparkUtils.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/datasource/aliyunserverlessspark/AliyunServerlessSparkUtils.java new file mode 100644 index 0000000000..bb53ff8b88 --- /dev/null +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/datasource/aliyunserverlessspark/AliyunServerlessSparkUtils.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.datasource.aliyunserverlessspark; + +import org.apache.dolphinscheduler.plugin.datasource.aliyunserverlessspark.param.AliyunServerlessSparkConnectionParam; + +import com.aliyun.emr_serverless_spark20230808.Client; +import com.aliyun.teaopenapi.models.Config; + +public class AliyunServerlessSparkUtils { + + private AliyunServerlessSparkUtils() { + throw new IllegalStateException("Utility class"); + } + + public static Client getAliyunServerlessSparkClient(AliyunServerlessSparkConnectionParam connectionParam) throws Exception { + String endpoint = + String.format(AliyunServerlessSparkConstants.ENDPOINT_TEMPLATE, connectionParam.getRegionId()); + Config config = new Config() + .setEndpoint(endpoint) + .setAccessKeyId(connectionParam.getAccessKeyId()) + .setAccessKeySecret(connectionParam.getAccessKeySecret()); + return new Client(config); + } + +} diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/datasource/aliyunserverlessspark/param/AliyunServerlessSparkConnectionParam.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/datasource/aliyunserverlessspark/param/AliyunServerlessSparkConnectionParam.java new file mode 100644 index 0000000000..5570465650 --- /dev/null +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/datasource/aliyunserverlessspark/param/AliyunServerlessSparkConnectionParam.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.datasource.aliyunserverlessspark.param; + +import org.apache.dolphinscheduler.spi.datasource.ConnectionParam; + +import lombok.Data; + +import com.fasterxml.jackson.annotation.JsonInclude; + +@Data +@JsonInclude(JsonInclude.Include.NON_NULL) +public class AliyunServerlessSparkConnectionParam implements ConnectionParam { + + protected String accessKeyId; + + protected String accessKeySecret; + + protected String regionId; + + protected String endpoint; +} diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/datasource/aliyunserverlessspark/param/AliyunServerlessSparkDataSourceParamDTO.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/datasource/aliyunserverlessspark/param/AliyunServerlessSparkDataSourceParamDTO.java new file mode 100644 index 0000000000..703dc4c879 --- /dev/null +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/datasource/aliyunserverlessspark/param/AliyunServerlessSparkDataSourceParamDTO.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.datasource.aliyunserverlessspark.param; + +import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO; +import org.apache.dolphinscheduler.spi.enums.DbType; + +import lombok.Data; + +@Data +public class AliyunServerlessSparkDataSourceParamDTO extends BaseDataSourceParamDTO { + + protected String accessKeyId; + + protected String accessKeySecret; + + protected String regionId; + + protected String endpoint; + + @Override + public DbType getType() { + return DbType.ALIYUN_SERVERLESS_SPARK; + } +} diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/datasource/aliyunserverlessspark/param/AliyunServerlessSparkDataSourceProcessor.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/datasource/aliyunserverlessspark/param/AliyunServerlessSparkDataSourceProcessor.java new file mode 100644 index 0000000000..ac1905e237 --- /dev/null +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/datasource/aliyunserverlessspark/param/AliyunServerlessSparkDataSourceProcessor.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.datasource.aliyunserverlessspark.param; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.plugin.datasource.aliyunserverlessspark.AliyunServerlessSparkClientWrapper; +import org.apache.dolphinscheduler.plugin.datasource.api.datasource.AbstractDataSourceProcessor; +import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO; +import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor; +import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils; +import org.apache.dolphinscheduler.spi.datasource.ConnectionParam; +import org.apache.dolphinscheduler.spi.enums.DbType; + +import org.apache.commons.lang3.StringUtils; + +import java.sql.Connection; +import java.text.MessageFormat; + +import lombok.extern.slf4j.Slf4j; + +import com.google.auto.service.AutoService; + +@AutoService(DataSourceProcessor.class) +@Slf4j +public class AliyunServerlessSparkDataSourceProcessor extends AbstractDataSourceProcessor { + + @Override + public BaseDataSourceParamDTO castDatasourceParamDTO(String paramJson) { + return JSONUtils.parseObject(paramJson, AliyunServerlessSparkDataSourceParamDTO.class); + } + + @Override + public void checkDatasourceParam(BaseDataSourceParamDTO datasourceParamDTO) { + AliyunServerlessSparkDataSourceParamDTO aliyunServerlessSparkDataSourceParamDTO = + (AliyunServerlessSparkDataSourceParamDTO) datasourceParamDTO; + if (StringUtils.isEmpty(aliyunServerlessSparkDataSourceParamDTO.getRegionId()) || + StringUtils.isEmpty(aliyunServerlessSparkDataSourceParamDTO.getAccessKeyId()) || + StringUtils.isEmpty(aliyunServerlessSparkDataSourceParamDTO.getRegionId())) { + throw new IllegalArgumentException("spark datasource param is not valid"); + } + } + + @Override + public String getDatasourceUniqueId(ConnectionParam connectionParam, DbType dbType) { + AliyunServerlessSparkConnectionParam baseConnectionParam = + (AliyunServerlessSparkConnectionParam) connectionParam; + return MessageFormat.format( + "{0}@{1}@{2}@{3}", + dbType.getName(), + baseConnectionParam.getRegionId(), + PasswordUtils.encodePassword(baseConnectionParam.getAccessKeyId()), + PasswordUtils.encodePassword(baseConnectionParam.getAccessKeySecret())); + } + + @Override + public BaseDataSourceParamDTO createDatasourceParamDTO(String connectionJson) { + AliyunServerlessSparkConnectionParam connectionParams = + (AliyunServerlessSparkConnectionParam) createConnectionParams(connectionJson); + AliyunServerlessSparkDataSourceParamDTO aliyunServerlessSparkDataSourceParamDTO = + new AliyunServerlessSparkDataSourceParamDTO(); + + aliyunServerlessSparkDataSourceParamDTO.setAccessKeyId(connectionParams.getAccessKeyId()); + aliyunServerlessSparkDataSourceParamDTO.setAccessKeySecret(connectionParams.getAccessKeySecret()); + aliyunServerlessSparkDataSourceParamDTO.setRegionId(connectionParams.getRegionId()); + aliyunServerlessSparkDataSourceParamDTO.setEndpoint(connectionParams.getEndpoint()); + return aliyunServerlessSparkDataSourceParamDTO; + } + + @Override + public AliyunServerlessSparkConnectionParam createConnectionParams(BaseDataSourceParamDTO datasourceParam) { + AliyunServerlessSparkDataSourceParamDTO aliyunServerlessSparkDataSourceParamDTO = + (AliyunServerlessSparkDataSourceParamDTO) datasourceParam; + AliyunServerlessSparkConnectionParam aliyunServerlessSparkConnectionParam = + new AliyunServerlessSparkConnectionParam(); + aliyunServerlessSparkConnectionParam.setAccessKeyId(aliyunServerlessSparkDataSourceParamDTO.getAccessKeyId()); + aliyunServerlessSparkConnectionParam + .setAccessKeySecret(aliyunServerlessSparkDataSourceParamDTO.getAccessKeySecret()); + aliyunServerlessSparkConnectionParam.setRegionId(aliyunServerlessSparkDataSourceParamDTO.getRegionId()); + aliyunServerlessSparkConnectionParam.setEndpoint(aliyunServerlessSparkDataSourceParamDTO.getEndpoint()); + + return aliyunServerlessSparkConnectionParam; + } + + @Override + public ConnectionParam createConnectionParams(String connectionJson) { + return JSONUtils.parseObject(connectionJson, AliyunServerlessSparkConnectionParam.class); + } + + @Override + public String getDatasourceDriver() { + return ""; + } + + @Override + public String getValidationQuery() { + return ""; + } + + @Override + public String getJdbcUrl(ConnectionParam connectionParam) { + return ""; + } + + @Override + public Connection getConnection(ConnectionParam connectionParam) { + return null; + } + + @Override + public boolean checkDataSourceConnectivity(ConnectionParam connectionParam) { + AliyunServerlessSparkConnectionParam baseConnectionParam = + (AliyunServerlessSparkConnectionParam) connectionParam; + try ( + AliyunServerlessSparkClientWrapper aliyunServerlessSparkClientWrapper = + new AliyunServerlessSparkClientWrapper( + baseConnectionParam.getAccessKeyId(), + baseConnectionParam.getAccessKeySecret(), + baseConnectionParam.getRegionId(), + baseConnectionParam.getEndpoint())) { + return aliyunServerlessSparkClientWrapper.checkConnect( + baseConnectionParam.getAccessKeyId(), + baseConnectionParam.getAccessKeySecret(), + baseConnectionParam.getRegionId()); + } catch (Exception e) { + log.error("spark client failed to connect to the server", e); + return false; + } + } + + @Override + public DbType getDbType() { + return DbType.ALIYUN_SERVERLESS_SPARK; + } + + @Override + public DataSourceProcessor create() { + return new AliyunServerlessSparkDataSourceProcessor(); + } +} diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/src/test/java/org/apache/dolphinscheduler/plugin/datasource/aliyunserverlessspark/AliyunServerlessSparkDataSourceProcessorTest.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/src/test/java/org/apache/dolphinscheduler/plugin/datasource/aliyunserverlessspark/AliyunServerlessSparkDataSourceProcessorTest.java new file mode 100644 index 0000000000..9973ef10f5 --- /dev/null +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/src/test/java/org/apache/dolphinscheduler/plugin/datasource/aliyunserverlessspark/AliyunServerlessSparkDataSourceProcessorTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.datasource.aliyunserverlessspark; + +import org.apache.dolphinscheduler.plugin.datasource.aliyunserverlessspark.param.AliyunServerlessSparkConnectionParam; +import org.apache.dolphinscheduler.plugin.datasource.aliyunserverlessspark.param.AliyunServerlessSparkDataSourceParamDTO; +import org.apache.dolphinscheduler.plugin.datasource.aliyunserverlessspark.param.AliyunServerlessSparkDataSourceProcessor; +import org.apache.dolphinscheduler.spi.enums.DbType; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.MockedConstruction; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class AliyunServerlessSparkDataSourceProcessorTest { + + private AliyunServerlessSparkDataSourceProcessor aliyunServerlessSparkDataSourceProcessor; + + private String connectJson = + "{\"accessKeyId\":\"mockAccessKeyId\",\"accessKeySecret\":\"mockAccessKeySecret\",\"regionId\":\"cn-hangzhou\"}"; + + @BeforeEach + public void init() { + aliyunServerlessSparkDataSourceProcessor = new AliyunServerlessSparkDataSourceProcessor(); + } + + @Test + void testCheckDatasourceParam() { + AliyunServerlessSparkDataSourceParamDTO aliyunServerlessSparkDataSourceParamDTO = + new AliyunServerlessSparkDataSourceParamDTO(); + aliyunServerlessSparkDataSourceParamDTO.setRegionId("cn-hangzhou"); + Assertions.assertThrows(IllegalArgumentException.class, + () -> aliyunServerlessSparkDataSourceProcessor + .checkDatasourceParam(aliyunServerlessSparkDataSourceParamDTO)); + aliyunServerlessSparkDataSourceParamDTO.setAccessKeyId("mockAccessKeyId"); + aliyunServerlessSparkDataSourceParamDTO.setAccessKeySecret("mockAccessKeySecret"); + Assertions + .assertDoesNotThrow(() -> aliyunServerlessSparkDataSourceProcessor + .checkDatasourceParam(aliyunServerlessSparkDataSourceParamDTO)); + } + + @Test + void testGetDatasourceUniqueId() { + AliyunServerlessSparkConnectionParam aliyunServerlessSparkConnectionParam = + new AliyunServerlessSparkConnectionParam(); + aliyunServerlessSparkConnectionParam.setRegionId("cn-hangzhou"); + aliyunServerlessSparkConnectionParam.setAccessKeyId("mockAccessKeyId"); + aliyunServerlessSparkConnectionParam.setAccessKeySecret("mockAccessKeySecret"); + Assertions.assertEquals("aliyun_serverless_spark@cn-hangzhou@mockAccessKeyId@mockAccessKeySecret", + aliyunServerlessSparkDataSourceProcessor.getDatasourceUniqueId(aliyunServerlessSparkConnectionParam, + DbType.ALIYUN_SERVERLESS_SPARK)); + } + + @Test + void testCreateDatasourceParamDTO() { + AliyunServerlessSparkDataSourceParamDTO aliyunServerlessSparkDataSourceParamDTO = + (AliyunServerlessSparkDataSourceParamDTO) aliyunServerlessSparkDataSourceProcessor + .createDatasourceParamDTO(connectJson); + Assertions.assertEquals("cn-hangzhou", aliyunServerlessSparkDataSourceParamDTO.getRegionId()); + Assertions.assertEquals("mockAccessKeyId", aliyunServerlessSparkDataSourceParamDTO.getAccessKeyId()); + Assertions.assertEquals("mockAccessKeySecret", aliyunServerlessSparkDataSourceParamDTO.getAccessKeySecret()); + } + + @Test + void testCreateConnectionParams() { + AliyunServerlessSparkDataSourceParamDTO aliyunServerlessSparkDataSourceParamDTO = + (AliyunServerlessSparkDataSourceParamDTO) aliyunServerlessSparkDataSourceProcessor + .createDatasourceParamDTO(connectJson); + AliyunServerlessSparkConnectionParam aliyunServerlessSparkConnectionParam = + aliyunServerlessSparkDataSourceProcessor + .createConnectionParams(aliyunServerlessSparkDataSourceParamDTO); + Assertions.assertEquals("cn-hangzhou", aliyunServerlessSparkConnectionParam.getRegionId()); + Assertions.assertEquals("mockAccessKeyId", aliyunServerlessSparkConnectionParam.getAccessKeyId()); + Assertions.assertEquals("mockAccessKeySecret", aliyunServerlessSparkConnectionParam.getAccessKeySecret()); + } + + @Test + void testTestConnection() { + AliyunServerlessSparkDataSourceParamDTO aliyunServerlessSparkDataSourceParamDTO = + (AliyunServerlessSparkDataSourceParamDTO) aliyunServerlessSparkDataSourceProcessor + .createDatasourceParamDTO(connectJson); + AliyunServerlessSparkConnectionParam connectionParam = + aliyunServerlessSparkDataSourceProcessor + .createConnectionParams(aliyunServerlessSparkDataSourceParamDTO); + Assertions.assertTrue(aliyunServerlessSparkDataSourceProcessor.checkDataSourceConnectivity(connectionParam)); + try ( + MockedConstruction AliyunServerlessSparkClientWrapper = + Mockito.mockConstruction(AliyunServerlessSparkClientWrapper.class, (mock, context) -> { + Mockito.when( + mock.checkConnect(connectionParam.getAccessKeyId(), + connectionParam.getAccessKeySecret(), connectionParam.getRegionId())) + .thenReturn(true); + })) { + Assertions + .assertTrue(aliyunServerlessSparkDataSourceProcessor.checkDataSourceConnectivity(connectionParam)); + } + } +} diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-all/pom.xml b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-all/pom.xml index effe3c9abb..b62a3ce414 100644 --- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-all/pom.xml +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-all/pom.xml @@ -158,5 +158,10 @@ dolphinscheduler-datasource-hana ${project.version} + + org.apache.dolphinscheduler + dolphinscheduler-datasource-aliyunserverlessspark + ${project.version} + diff --git a/dolphinscheduler-datasource-plugin/pom.xml b/dolphinscheduler-datasource-plugin/pom.xml index c30a6b4258..1f712364d9 100644 --- a/dolphinscheduler-datasource-plugin/pom.xml +++ b/dolphinscheduler-datasource-plugin/pom.xml @@ -56,6 +56,7 @@ dolphinscheduler-datasource-sagemaker dolphinscheduler-datasource-k8s dolphinscheduler-datasource-hana + dolphinscheduler-datasource-aliyunserverlessspark diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/DbType.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/DbType.java index 882b170e11..360e788cb3 100644 --- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/DbType.java +++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/DbType.java @@ -55,7 +55,10 @@ public enum DbType { ZEPPELIN(24, "zeppelin", "zeppelin"), SAGEMAKER(25, "sagemaker", "sagemaker"), - K8S(26, "k8s", "k8s"); + K8S(26, "k8s", "k8s"), + + ALIYUN_SERVERLESS_SPARK(27, "aliyun_serverless_spark", "aliyun serverless spark"); + private static final Map DB_TYPE_MAP = Arrays.stream(DbType.values()).collect(toMap(DbType::getCode, Functions.identity())); @EnumValue diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/pom.xml b/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/pom.xml new file mode 100644 index 0000000000..7a61f116c8 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/pom.xml @@ -0,0 +1,68 @@ + + + + 4.0.0 + + org.apache.dolphinscheduler + dolphinscheduler-task-plugin + dev-SNAPSHOT + + dolphinscheduler-task-aliyunserverlessspark + jar + + + + org.apache.dolphinscheduler + dolphinscheduler-spi + provided + + + com.aliyun.oss + aliyun-sdk-oss + + + + + org.apache.dolphinscheduler + dolphinscheduler-task-api + ${project.version} + + + com.aliyun.oss + aliyun-sdk-oss + + + + + org.apache.dolphinscheduler + dolphinscheduler-datasource-all + ${project.version} + + + com.aliyun + emr_serverless_spark20230808 + 1.0.0 + + + com.aliyun + credentials-java + 0.3.0 + + + diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkParameters.java new file mode 100644 index 0000000000..a4cdf07ef5 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkParameters.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.plugin.task.aliyunserverlessspark; + +import org.apache.dolphinscheduler.plugin.task.api.enums.ResourceType; +import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; +import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper; + +import lombok.Data; +import lombok.extern.slf4j.Slf4j; + +import com.fasterxml.jackson.annotation.JsonProperty; + +@Data +@Slf4j +public class AliyunServerlessSparkParameters extends AbstractParameters { + + // spark job configurations + private String workspaceId; + + private String resourceQueueId; + + private String codeType; + + private String jobName; + + private String engineReleaseVersion; + + private String entryPoint; + + private String entryPointArguments; + + private String sparkSubmitParameters; + + @JsonProperty("isProduction") + boolean isProduction; + + private int datasource; + + private String type; + + @Override + public boolean checkParameters() { + return true; + } + + @Override + public ResourceParametersHelper getResources() { + ResourceParametersHelper resources = super.getResources(); + resources.put(ResourceType.DATASOURCE, datasource); + return resources; + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTask.java new file mode 100644 index 0000000000..e2fc7e9842 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTask.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.aliyunserverlessspark; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.plugin.datasource.aliyunserverlessspark.AliyunServerlessSparkConstants; +import org.apache.dolphinscheduler.plugin.datasource.aliyunserverlessspark.param.AliyunServerlessSparkConnectionParam; +import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils; +import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask; +import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack; +import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; +import org.apache.dolphinscheduler.plugin.task.api.TaskException; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.enums.ResourceType; +import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; +import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.DataSourceParameters; +import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper; +import org.apache.dolphinscheduler.spi.enums.DbType; + +import org.apache.commons.lang3.StringUtils; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import lombok.extern.slf4j.Slf4j; + +import com.aliyun.emr_serverless_spark20230808.Client; +import com.aliyun.emr_serverless_spark20230808.models.CancelJobRunRequest; +import com.aliyun.emr_serverless_spark20230808.models.GetJobRunRequest; +import com.aliyun.emr_serverless_spark20230808.models.GetJobRunResponse; +import com.aliyun.emr_serverless_spark20230808.models.JobDriver; +import com.aliyun.emr_serverless_spark20230808.models.StartJobRunRequest; +import com.aliyun.emr_serverless_spark20230808.models.StartJobRunResponse; +import com.aliyun.emr_serverless_spark20230808.models.Tag; +import com.aliyun.teaopenapi.models.Config; +import com.aliyun.teautil.models.RuntimeOptions; + +@Slf4j +public class AliyunServerlessSparkTask extends AbstractRemoteTask { + + private final TaskExecutionContext taskExecutionContext; + + private Client aliyunServerlessSparkClient; + + private AliyunServerlessSparkParameters aliyunServerlessSparkParameters; + + private AliyunServerlessSparkConnectionParam aliyunServerlessSparkConnectionParam; + + private String jobRunId; + + private RunState currentState; + + private String accessKeyId; + + private String accessKeySecret; + + private String regionId; + + private String endpoint; + + protected AliyunServerlessSparkTask(TaskExecutionContext taskExecutionContext) { + super(taskExecutionContext); + this.taskExecutionContext = taskExecutionContext; + } + + @Override + public void init() { + final String taskParams = taskExecutionContext.getTaskParams(); + aliyunServerlessSparkParameters = JSONUtils.parseObject(taskParams, AliyunServerlessSparkParameters.class); + log.info("aliyunServerlessSparkParameters - {}", aliyunServerlessSparkParameters); + if (this.aliyunServerlessSparkParameters == null || !this.aliyunServerlessSparkParameters.checkParameters()) { + throw new AliyunServerlessSparkTaskException("Aliyun-Serverless-Spark task parameters are not valid!"); + } + + ResourceParametersHelper resourceParametersHelper = taskExecutionContext.getResourceParametersHelper(); + DataSourceParameters dataSourceParameters = (DataSourceParameters) resourceParametersHelper + .getResourceParameters(ResourceType.DATASOURCE, aliyunServerlessSparkParameters.getDatasource()); + aliyunServerlessSparkConnectionParam = (AliyunServerlessSparkConnectionParam) DataSourceUtils + .buildConnectionParams( + DbType.valueOf(aliyunServerlessSparkParameters.getType()), + dataSourceParameters.getConnectionParams()); + + accessKeyId = aliyunServerlessSparkConnectionParam.getAccessKeyId(); + accessKeySecret = aliyunServerlessSparkConnectionParam.getAccessKeySecret(); + regionId = aliyunServerlessSparkConnectionParam.getRegionId(); + endpoint = aliyunServerlessSparkConnectionParam.getEndpoint(); + + try { + aliyunServerlessSparkClient = + buildAliyunServerlessSparkClient(accessKeyId, accessKeySecret, regionId, endpoint); + } catch (Exception e) { + log.error("Failed to build Aliyun-Serverless-Spark client!", e); + throw new AliyunServerlessSparkTaskException("Failed to build Aliyun-Serverless-Spark client!"); + } + + currentState = RunState.Submitted; + } + + @Override + public void handle(TaskCallBack taskCallBack) throws TaskException { + try { + StartJobRunRequest startJobRunRequest = buildStartJobRunRequest(aliyunServerlessSparkParameters); + RuntimeOptions runtime = new RuntimeOptions(); + Map headers = new HashMap<>(); + StartJobRunResponse startJobRunResponse = aliyunServerlessSparkClient.startJobRunWithOptions( + aliyunServerlessSparkParameters.getWorkspaceId(), startJobRunRequest, headers, runtime); + jobRunId = startJobRunResponse.getBody().getJobRunId(); + setAppIds(jobRunId); + log.info("Successfully submitted serverless spark job, jobRunId - {}", jobRunId); + + while (!RunState.isFinal(currentState)) { + GetJobRunRequest getJobRunRequest = buildGetJobRunRequest(); + GetJobRunResponse getJobRunResponse = aliyunServerlessSparkClient + .getJobRun(aliyunServerlessSparkParameters.getWorkspaceId(), jobRunId, getJobRunRequest); + currentState = RunState.valueOf(getJobRunResponse.getBody().getJobRun().getState()); + log.info("job - {} state - {}", jobRunId, currentState); + Thread.sleep(10 * 1000L); + } + + setExitStatusCode(mapFinalStateToExitCode(currentState)); + + } catch (Exception e) { + log.error("Serverless spark job failed!", e); + throw new AliyunServerlessSparkTaskException("Serverless spark job failed!"); + } + } + + @Override + public void submitApplication() throws TaskException { + + } + + @Override + public void trackApplicationStatus() throws TaskException { + + } + + protected int mapFinalStateToExitCode(RunState state) { + switch (state) { + case Success: + return TaskConstants.EXIT_CODE_SUCCESS; + case Failed: + return TaskConstants.EXIT_CODE_KILL; + default: + return TaskConstants.EXIT_CODE_FAILURE; + } + } + + @Override + public AbstractParameters getParameters() { + return aliyunServerlessSparkParameters; + } + + @Override + public void cancelApplication() throws TaskException { + CancelJobRunRequest cancelJobRunRequest = buildCancelJobRunRequest(); + try { + aliyunServerlessSparkClient.cancelJobRun(aliyunServerlessSparkParameters.getWorkspaceId(), jobRunId, + cancelJobRunRequest); + } catch (Exception e) { + log.error("Failed to cancel serverless spark job run", e); + } + } + + @Override + public List getApplicationIds() throws TaskException { + return Collections.emptyList(); + } + + protected Client buildAliyunServerlessSparkClient(String accessKeyId, String accessKeySecret, + String regionId, String endpoint) throws Exception { + if (StringUtils.isEmpty(endpoint)) { + endpoint = String.format(AliyunServerlessSparkConstants.ENDPOINT_TEMPLATE, regionId); + } + + Config config = new Config() + .setEndpoint(endpoint) + .setAccessKeyId(accessKeyId) + .setAccessKeySecret(accessKeySecret); + return new Client(config); + } + + protected StartJobRunRequest buildStartJobRunRequest(AliyunServerlessSparkParameters aliyunServerlessSparkParameters) { + StartJobRunRequest startJobRunRequest = new StartJobRunRequest(); + startJobRunRequest.setRegionId(regionId); + startJobRunRequest.setResourceQueueId(aliyunServerlessSparkParameters.getResourceQueueId()); + startJobRunRequest.setCodeType(aliyunServerlessSparkParameters.getCodeType()); + startJobRunRequest.setName(aliyunServerlessSparkParameters.getJobName()); + String engineReleaseVersion = aliyunServerlessSparkParameters.getEngineReleaseVersion(); + engineReleaseVersion = + StringUtils.isEmpty(engineReleaseVersion) ? AliyunServerlessSparkConstants.DEFAULT_ENGINE + : engineReleaseVersion; + startJobRunRequest.setReleaseVersion(engineReleaseVersion); + Tag envTag = new Tag(); + envTag.setKey(AliyunServerlessSparkConstants.ENV_KEY); + String envType = aliyunServerlessSparkParameters.isProduction() ? AliyunServerlessSparkConstants.ENV_PROD + : AliyunServerlessSparkConstants.ENV_DEV; + envTag.setValue(envType); + Tag workflowTag = new Tag(); + workflowTag.setKey(AliyunServerlessSparkConstants.WORKFLOW_KEY); + workflowTag.setValue(AliyunServerlessSparkConstants.WORKFLOW_VALUE); + startJobRunRequest.setTags(Arrays.asList(envTag, workflowTag)); + List entryPointArguments = + StringUtils.isEmpty(aliyunServerlessSparkParameters.getEntryPointArguments()) ? Collections.emptyList() + : Arrays.asList(aliyunServerlessSparkParameters.getEntryPointArguments() + .split(AliyunServerlessSparkConstants.ENTRY_POINT_ARGUMENTS_DELIMITER)); + JobDriver.JobDriverSparkSubmit jobDriverSparkSubmit = new JobDriver.JobDriverSparkSubmit() + .setEntryPoint(aliyunServerlessSparkParameters.getEntryPoint()) + .setEntryPointArguments(entryPointArguments) + .setSparkSubmitParameters(aliyunServerlessSparkParameters.getSparkSubmitParameters()); + JobDriver jobDriver = new com.aliyun.emr_serverless_spark20230808.models.JobDriver() + .setSparkSubmit(jobDriverSparkSubmit); + startJobRunRequest.setJobDriver(jobDriver); + return startJobRunRequest; + } + + protected GetJobRunRequest buildGetJobRunRequest() { + GetJobRunRequest getJobRunRequest = new GetJobRunRequest(); + getJobRunRequest.setRegionId(regionId); + return getJobRunRequest; + } + + protected CancelJobRunRequest buildCancelJobRunRequest() { + CancelJobRunRequest cancelJobRunRequest = new CancelJobRunRequest(); + cancelJobRunRequest.setRegionId(regionId); + return cancelJobRunRequest; + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTaskChannel.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTaskChannel.java new file mode 100644 index 0000000000..1a1205f2c6 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTaskChannel.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.plugin.task.aliyunserverlessspark; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.plugin.task.api.AbstractTask; +import org.apache.dolphinscheduler.plugin.task.api.TaskChannel; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; + +public class AliyunServerlessSparkTaskChannel implements TaskChannel { + + @Override + public AbstractTask createTask(TaskExecutionContext taskRequest) { + return new AliyunServerlessSparkTask(taskRequest); + } + + @Override + public AbstractParameters parseParameters(String taskParams) { + return JSONUtils.parseObject(taskParams, AliyunServerlessSparkParameters.class); + } + +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTaskChannelFactory.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTaskChannelFactory.java new file mode 100644 index 0000000000..eaa797d2d5 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTaskChannelFactory.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.plugin.task.aliyunserverlessspark; + +import org.apache.dolphinscheduler.plugin.task.api.TaskChannel; +import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory; + +import com.google.auto.service.AutoService; + +@AutoService(TaskChannelFactory.class) +public class AliyunServerlessSparkTaskChannelFactory implements TaskChannelFactory { + + @Override + public String getName() { + return "ALIYUN_SERVERLESS_SPARK"; + } + + @Override + public TaskChannel create() { + return new AliyunServerlessSparkTaskChannel(); + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTaskException.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTaskException.java new file mode 100644 index 0000000000..5b54bd4d04 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTaskException.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.aliyunserverlessspark; + +public class AliyunServerlessSparkTaskException extends RuntimeException { + + public AliyunServerlessSparkTaskException() { + super(); + } + + public AliyunServerlessSparkTaskException(String message) { + super(message); + } + + public AliyunServerlessSparkTaskException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/RunState.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/RunState.java new file mode 100644 index 0000000000..0d617170d3 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/RunState.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.plugin.task.aliyunserverlessspark; + +import java.util.Objects; + +public enum RunState { + + Submitted, + + Pending, + + Running, + + Success, + + Failed, + + Cancelling, + + Cancelled, + + CancelFailed; + + public static boolean isFinal(RunState runState) { + return Success == runState || Failed == runState || Cancelled == runState; + } + + public static boolean hasLaunched(RunState runState) { + return Objects.nonNull(runState) && runState != Submitted && runState != Pending; + } + + public static boolean isCancelled(RunState runState) { + return Cancelled == runState; + } + +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/test/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/test/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTaskTest.java new file mode 100644 index 0000000000..c9cd7ef37b --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/test/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTaskTest.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.plugin.task.aliyunserverlessspark; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack; +import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.enums.ResourceType; +import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.DataSourceParameters; +import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper; +import org.apache.dolphinscheduler.spi.enums.DbType; + +import lombok.extern.slf4j.Slf4j; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Spy; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; + +import com.aliyun.emr_serverless_spark20230808.Client; +import com.aliyun.emr_serverless_spark20230808.models.CancelJobRunRequest; +import com.aliyun.emr_serverless_spark20230808.models.CancelJobRunResponse; +import com.aliyun.emr_serverless_spark20230808.models.GetJobRunRequest; +import com.aliyun.emr_serverless_spark20230808.models.GetJobRunResponse; +import com.aliyun.emr_serverless_spark20230808.models.GetJobRunResponseBody; +import com.aliyun.emr_serverless_spark20230808.models.StartJobRunRequest; +import com.aliyun.emr_serverless_spark20230808.models.StartJobRunResponse; +import com.aliyun.emr_serverless_spark20230808.models.StartJobRunResponseBody; + +@Slf4j +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) +public class AliyunServerlessSparkTaskTest { + + @Mock + private TaskExecutionContext mockTaskExecutionContext; + + @Mock + private Client mockAliyunServerlessSparkClient; + + @Mock + private ResourceParametersHelper mockResourceParametersHelper; + + @Mock + private TaskCallBack mockTaskCallBack; + + @Mock + private StartJobRunRequest mockStartJobRunRequest; + + @Mock + private StartJobRunResponse mockStartJobRunResponse; + + @Mock + private GetJobRunRequest mockGetJobRunRequest; + + @Mock + private GetJobRunResponse mockGetJobRunResponse; + + @Mock + private CancelJobRunRequest mockCancelJobRunRequest; + + @Mock + private CancelJobRunResponse mockCancelJobRunResponse; + + @InjectMocks + @Spy + private AliyunServerlessSparkTask aliyunServerlessSparkTask; + + private static final String mockAccessKeyId = "mockAccessKeyId"; + + private static final String mockAccessKeySecret = "mockAccessKeySecret"; + + private static final String mockRegionId = "cn-hangzhou"; + + private static final String mockEndpoint = "emr-serverless-spark-vpc.cn-hangzhou.aliyuncs.com"; + + private static final int mockDatasourceId = 1; + + private static final String taskParamsString = + "{\"localParams\":[],\"resourceList\":[],\"workspaceId\":\"w-ae42e9c929275cc5\",\"resourceQueueId\":\"root_queue\",\"codeType\":\"JAR\",\"jobName\":\"spark\",\"entryPoint\":\"oss://datadev-oss-hdfs-test/spark-resource/examples/jars/spark-examples_2.12-3.3.1.jar\",\"entryPointArguments\":\"10\",\"sparkSubmitParameters\":\"--class org.apache.spark.examples.SparkPi --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1\",\"isProduction\":true,\"type\":\"ALIYUN_SERVERLESS_SPARK\",\"datasource\":1}"; + + private static final String connectionParamsString = + "{\"accessKeyId\":\"mockAccessKeyId\",\"accessKeySecret\":\"mockAccessKeySecret\",\"regionId\":\"cn-hangzhou\",\"endpoint\":\"emr-serverless-spark-vpc.cn-hangzhou.aliyuncs.com\",\"password\":\"\"}"; + + private static final String mockJobRunId = "jr-f6a1d0dd17d6b8a3"; + + private static final String mockWorkspaceId = "w-ae42e9c929275cc5"; + + private static final String mockResourceQueueId = "root_queue"; + + private static final String mockSparkSubmitParameters = + "--class org.apache.spark.examples.SparkPi --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1"; + + private static final String mockEntryPoint = + "oss://datadev-oss-hdfs-test/spark-resource/examples/jars/spark-examples_2.12-3.3.1.jar"; + + private static final String mockEntryPointArguments = "10"; + + @BeforeEach + public void before() { + when(mockTaskExecutionContext.getTaskParams()).thenReturn(taskParamsString); + DataSourceParameters dataSourceParameters = new DataSourceParameters(); + dataSourceParameters.setConnectionParams(connectionParamsString); + dataSourceParameters.setType(DbType.ALIYUN_SERVERLESS_SPARK); + when(mockResourceParametersHelper.getResourceParameters(any(), any())).thenReturn(dataSourceParameters); + when(mockTaskExecutionContext.getResourceParametersHelper()).thenReturn(mockResourceParametersHelper); + Assertions.assertDoesNotThrow( + () -> when(aliyunServerlessSparkTask.buildAliyunServerlessSparkClient(any(), any(), any(), any())) + .thenReturn(mockAliyunServerlessSparkClient)); + } + + @Test + public void testInit() throws Exception { + aliyunServerlessSparkTask.init(); + verify(mockTaskExecutionContext).getTaskParams(); + verify(mockResourceParametersHelper).getResourceParameters(ResourceType.DATASOURCE, mockDatasourceId); + verify(aliyunServerlessSparkTask).buildAliyunServerlessSparkClient(mockAccessKeyId, mockAccessKeySecret, + mockRegionId, mockEndpoint); + } + + @Test + public void testHandle() { + doReturn(mockStartJobRunRequest).when(aliyunServerlessSparkTask).buildStartJobRunRequest(any()); + StartJobRunResponseBody startJobRunResponseBody = new StartJobRunResponseBody(); + startJobRunResponseBody.setJobRunId(mockJobRunId); + doReturn(startJobRunResponseBody).when(mockStartJobRunResponse).getBody(); + Assertions.assertDoesNotThrow( + () -> doReturn(mockStartJobRunResponse).when(mockAliyunServerlessSparkClient) + .startJobRunWithOptions(any(), any(), any(), any())); + + doReturn(mockGetJobRunRequest).when(aliyunServerlessSparkTask).buildGetJobRunRequest(); + GetJobRunResponseBody getJobRunResponseBody = new GetJobRunResponseBody(); + GetJobRunResponseBody.GetJobRunResponseBodyJobRun jobRun = + new GetJobRunResponseBody.GetJobRunResponseBodyJobRun(); + jobRun.setState(RunState.Success.name()); + getJobRunResponseBody.setJobRun(jobRun); + doReturn(getJobRunResponseBody).when(mockGetJobRunResponse).getBody(); + Assertions.assertDoesNotThrow( + () -> doReturn(mockGetJobRunResponse).when(mockAliyunServerlessSparkClient).getJobRun(any(), any(), + any())); + + aliyunServerlessSparkTask.init(); + aliyunServerlessSparkTask.handle(mockTaskCallBack); + verify(aliyunServerlessSparkTask).setAppIds(mockJobRunId); + verify(aliyunServerlessSparkTask).setExitStatusCode(TaskConstants.EXIT_CODE_SUCCESS); + } + + @Test + public void testCancelApplication() throws Exception { + doReturn(mockCancelJobRunRequest).when(aliyunServerlessSparkTask).buildCancelJobRunRequest(); + Assertions.assertDoesNotThrow( + () -> doReturn(mockCancelJobRunResponse).when(mockAliyunServerlessSparkClient).cancelJobRun(any(), + any(), any())); + + aliyunServerlessSparkTask.init(); + aliyunServerlessSparkTask.cancelApplication(); + verify(aliyunServerlessSparkTask).buildCancelJobRunRequest(); + verify(mockAliyunServerlessSparkClient).cancelJobRun(eq(mockWorkspaceId), any(), eq(mockCancelJobRunRequest)); + } + + @Test + public void testBuildStartJobRunRequest() { + AliyunServerlessSparkParameters mockAliyunServerlessSparkParameters = + mock(AliyunServerlessSparkParameters.class); + doReturn(mockResourceQueueId).when(mockAliyunServerlessSparkParameters).getResourceQueueId(); + doReturn("JAR").when(mockAliyunServerlessSparkParameters).getCodeType(); + doReturn("ds-test").when(mockAliyunServerlessSparkParameters).getJobName(); + doReturn(mockSparkSubmitParameters).when(mockAliyunServerlessSparkParameters).getSparkSubmitParameters(); + doReturn(mockEntryPoint).when(mockAliyunServerlessSparkParameters).getEntryPoint(); + doReturn(mockEntryPointArguments).when(mockAliyunServerlessSparkParameters).getEntryPointArguments(); + + aliyunServerlessSparkTask.buildStartJobRunRequest(mockAliyunServerlessSparkParameters); + + verify(mockAliyunServerlessSparkParameters).getResourceQueueId(); + verify(mockAliyunServerlessSparkParameters).getCodeType(); + verify(mockAliyunServerlessSparkParameters).getJobName(); + verify(mockAliyunServerlessSparkParameters).getEngineReleaseVersion(); + verify(mockAliyunServerlessSparkParameters).isProduction(); + } + +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml b/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml index fbaea0dddc..3d107ba14c 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml @@ -216,6 +216,12 @@ dolphinscheduler-task-remoteshell ${project.version} + + + org.apache.dolphinscheduler + dolphinscheduler-task-aliyunserverlessspark + ${project.version} + diff --git a/dolphinscheduler-task-plugin/pom.xml b/dolphinscheduler-task-plugin/pom.xml index 9036e88b67..e1446ec426 100644 --- a/dolphinscheduler-task-plugin/pom.xml +++ b/dolphinscheduler-task-plugin/pom.xml @@ -62,6 +62,7 @@ dolphinscheduler-task-linkis dolphinscheduler-task-datafactory dolphinscheduler-task-remoteshell + dolphinscheduler-task-aliyunserverlessspark diff --git a/dolphinscheduler-ui/public/images/task-icons/aliyun_serverless_spark.png b/dolphinscheduler-ui/public/images/task-icons/aliyun_serverless_spark.png new file mode 100644 index 0000000000..c620bb1f1e Binary files /dev/null and b/dolphinscheduler-ui/public/images/task-icons/aliyun_serverless_spark.png differ diff --git a/dolphinscheduler-ui/public/images/task-icons/aliyun_serverless_spark_hover.png b/dolphinscheduler-ui/public/images/task-icons/aliyun_serverless_spark_hover.png new file mode 100644 index 0000000000..1927e0eb24 Binary files /dev/null and b/dolphinscheduler-ui/public/images/task-icons/aliyun_serverless_spark_hover.png differ diff --git a/dolphinscheduler-ui/src/locales/en_US/datasource.ts b/dolphinscheduler-ui/src/locales/en_US/datasource.ts index e9b799b16e..645edc7f7d 100644 --- a/dolphinscheduler-ui/src/locales/en_US/datasource.ts +++ b/dolphinscheduler-ui/src/locales/en_US/datasource.ts @@ -85,7 +85,7 @@ export default { clientId: 'ClientId', clientSecret: 'ClientSecret', OAuth_token_endpoint: 'OAuth 2.0 token endpoint', - endpoint_tips: 'Please enter OAuth Token', + OAuth_token_endpoint_tips: 'Please enter OAuth Token', AccessKeyID: 'AccessKeyID', AccessKeyID_tips: 'Please input AccessKeyID', SecretAccessKey: 'SecretAccessKey', @@ -97,5 +97,13 @@ export default { kubeConfig: 'kubeConfig', kubeConfig_tips: 'Please input KubeConfig', namespace: 'namespace', - namespace_tips: 'Please input namespace' + namespace_tips: 'Please input namespace', + access_key_id: 'Access Key Id', + access_key_id_tips: 'Please enter access key id', + access_key_secret: 'Access Key Secret', + access_key_secret_tips: 'Please enter access key secret', + region_id: 'Region Id', + region_id_tips: 'Please enter Region Id', + endpoint: 'Endpoint', + endpoint_tips: 'Please enter endpoint' } diff --git a/dolphinscheduler-ui/src/locales/en_US/project.ts b/dolphinscheduler-ui/src/locales/en_US/project.ts index a42b652eff..dce409c797 100644 --- a/dolphinscheduler-ui/src/locales/en_US/project.ts +++ b/dolphinscheduler-ui/src/locales/en_US/project.ts @@ -909,7 +909,33 @@ export default { yarn_queue_tips: 'Please input yarn queue(optional)', dependent_type: 'Dependency Type', dependent_on_workflow: 'Dependent on workflow', - dependent_on_task: 'Dependent on task' + dependent_on_task: 'Dependent on task', + region_id: 'region id', + region_id_tips: 'region id', + endpoint: 'endpoint', + endpoint_tips: 'restful endpoint', + access_key_id: 'access key id', + access_key_id_tips: 'access key id', + access_key_secret: 'access key secret', + access_key_secret_tips: 'access key secret', + workspace_id: 'workspace id', + workspace_id_tips: 'workspace id', + resource_queue_id: 'resource queue id', + resource_queue_id_tips: 'resource queue id', + code_type: 'code type', + code_type_tips: 'code type', + job_name: 'job name', + job_name_tips: 'job name', + engine_release_version: 'engine release version', + engine_release_version_tips: 'engine release version', + entry_point: 'entry point', + entry_point_tips: 'entry point', + entry_point_arguments: 'entry point arguments', + entry_point_arguments_tips: 'entry point arguments', + spark_submit_parameters: 'spark submit parameters', + spark_submit_parameters_tips: 'spark submit parameters', + is_production: 'is production', + is_production_tips: 'is production' }, menu: { fav: 'Favorites', diff --git a/dolphinscheduler-ui/src/locales/zh_CN/datasource.ts b/dolphinscheduler-ui/src/locales/zh_CN/datasource.ts index 7aa797a591..b3a68ec3d8 100644 --- a/dolphinscheduler-ui/src/locales/zh_CN/datasource.ts +++ b/dolphinscheduler-ui/src/locales/zh_CN/datasource.ts @@ -82,7 +82,7 @@ export default { clientId: 'ClientId', clientSecret: 'ClientSecret', OAuth_token_endpoint: 'OAuth 2.0 token endpoint', - endpoint_tips: '请输入OAuth', + OAuth_token_endpoint_tips: '请输入OAuth', AccessKeyID: 'AccessKeyID', AccessKeyID_tips: '请输入AccessKeyID', SecretAccessKey: 'SecretAccessKey', @@ -94,5 +94,13 @@ export default { kubeConfig: 'kubeConfig', kubeConfig_tips: '请输入KubeConfig', namespace: 'namespace', - namespace_tips: '请输入namespace' + namespace_tips: '请输入namespace', + access_key_id: 'Access Key Id', + access_key_id_tips: '请输入access key id', + access_key_secret: 'Access Key Secret', + access_key_secret_tips: '请输入access key secret', + region_id: 'Region Id', + region_id_tips: '请输入Region Id', + endpoint: 'endpoint', + endpoint_tips: '请输入endpoint' } diff --git a/dolphinscheduler-ui/src/locales/zh_CN/project.ts b/dolphinscheduler-ui/src/locales/zh_CN/project.ts index cb7e320118..f3b50c93b8 100644 --- a/dolphinscheduler-ui/src/locales/zh_CN/project.ts +++ b/dolphinscheduler-ui/src/locales/zh_CN/project.ts @@ -878,7 +878,33 @@ export default { yarn_queue_tips: '请输入Yarn队列(选填)', dependent_type: '依赖类型', dependent_on_workflow: '依赖于工作流', - dependent_on_task: '依赖于任务' + dependent_on_task: '依赖于任务', + region_id: 'region id', + region_id_tips: 'region id', + endpoint: 'endpoint', + endpoint_tips: '请输入endpoint', + access_key_id: 'access key id', + access_key_id_tips: 'access key id', + access_key_secret: 'access key secret', + access_key_secret_tips: 'access key secret', + workspace_id: 'workspace id', + workspace_id_tips: 'workspace id', + resource_queue_id: 'resource queue id', + resource_queue_id_tips: 'resource queue id', + code_type: 'code type', + code_type_tips: 'code type', + job_name: 'job name', + job_name_tips: 'job name', + engine_release_version: 'engine release version', + engine_release_version_tips: 'engine release version', + entry_point: 'entry point', + entry_point_tips: 'entry point', + entry_point_arguments: 'entry point arguments', + entry_point_arguments_tips: 'entry point arguments', + spark_submit_parameters: 'spark submit parameters', + spark_submit_parameters_tips: 'spark submit parameters', + is_production: 'is production', + is_production_tips: 'is production' }, menu: { fav: '收藏组件', diff --git a/dolphinscheduler-ui/src/service/modules/data-source/types.ts b/dolphinscheduler-ui/src/service/modules/data-source/types.ts index 444f5293dd..135a8cc553 100644 --- a/dolphinscheduler-ui/src/service/modules/data-source/types.ts +++ b/dolphinscheduler-ui/src/service/modules/data-source/types.ts @@ -42,6 +42,7 @@ type IDataBase = | 'ZEPPELIN' | 'SAGEMAKER' | 'K8S' + | 'ALIYUN_SERVERLESS_SPARK' type IDataBaseLabel = | 'MYSQL' @@ -65,6 +66,7 @@ type IDataBaseLabel = | 'ZEPPELIN' | 'SAGEMAKER' | 'K8S' + | 'ALIYUN_SERVERLESS_SPARK' interface IDataSource { id?: number @@ -85,7 +87,6 @@ interface IDataSource { database?: string connectType?: string other?: object - endpoint?: string restEndpoint?: string kubeConfig?: string namespace?: string @@ -94,6 +95,10 @@ interface IDataSource { compatibleMode?: string publicKey?: string datawarehouse?: string + accessKeyId?: string + accessKeySecret?: string + regionId?: string + endpoint?: string } interface ListReq { diff --git a/dolphinscheduler-ui/src/store/project/task-type.ts b/dolphinscheduler-ui/src/store/project/task-type.ts index 826f0b13c3..9eb1d8cd41 100644 --- a/dolphinscheduler-ui/src/store/project/task-type.ts +++ b/dolphinscheduler-ui/src/store/project/task-type.ts @@ -89,6 +89,10 @@ export const TASK_TYPES_MAP = { alias: 'ZEPPELIN', helperLinkDisable: true }, + ALIYUN_SERVERLESS_SPARK: { + alias: 'ALIYUN_SERVERLESS_SPARK', + helperLinkDisable: true + }, JUPYTER: { alias: 'JUPYTER', helperLinkDisable: true diff --git a/dolphinscheduler-ui/src/store/project/types.ts b/dolphinscheduler-ui/src/store/project/types.ts index cb48ba8654..bc8dbd0b2b 100644 --- a/dolphinscheduler-ui/src/store/project/types.ts +++ b/dolphinscheduler-ui/src/store/project/types.ts @@ -58,6 +58,7 @@ type TaskType = | 'LINKIS' | 'DATA_FACTORY' | 'REMOTESHELL' + | 'ALIYUN_SERVERLESS_SPARK' type ProgramType = 'JAVA' | 'SCALA' | 'PYTHON' diff --git a/dolphinscheduler-ui/src/views/datasource/list/detail.tsx b/dolphinscheduler-ui/src/views/datasource/list/detail.tsx index 4842651290..d8b1cb1f9f 100644 --- a/dolphinscheduler-ui/src/views/datasource/list/detail.tsx +++ b/dolphinscheduler-ui/src/views/datasource/list/detail.tsx @@ -155,6 +155,10 @@ const DetailModal = defineComponent({ showHost, showPort, showRestEndpoint, + showAccessKeyId, + showAccessKeySecret, + showRegionId, + showEndpoint, showAwsRegion, showCompatibleMode, showConnectType, @@ -270,6 +274,65 @@ const DetailModal = defineComponent({ placeholder={t('datasource.zeppelin_rest_endpoint_tips')} /> + + + + + + + + + + + +