Browse Source
* support emr serverless spark task Signed-off-by: EricGao888 <ericgao.apache@gmail.com> --------- Signed-off-by: EricGao888 <ericgao.apache@gmail.com>dev
Eric Gao
3 months ago
committed by
GitHub
51 changed files with 2094 additions and 14 deletions
@ -0,0 +1,111 @@
|
||||
# Aliyun EMR Serverless Spark |
||||
|
||||
## Introduction |
||||
|
||||
`Aliyun EMR Serverless Spark` task plugin submits spark job to |
||||
[`Aliyun EMR Serverless Spark`](https://help.aliyun.com/zh/emr/emr-serverless-spark/product-overview/what-is-emr-serverless-spark) service. |
||||
|
||||
## Create Connections |
||||
|
||||
- Click `Datasource -> Create Datasource -> ALIYUN_SERVERLESS_SPARK` to create a connection. |
||||
|
||||
![demo-aliyun-serverless-spark-create-datasource-1](../../../../img/tasks/demo/aliyun_serverless_spark_1.png) |
||||
|
||||
- Fill in `Datasource Name`, `Access Key Id`, `Access Key Secret`, `Region Id` and click `Confirm`. |
||||
|
||||
![demo-aliyun-serverless-spark-create-datasource-2](../../../../img/tasks/demo/aliyun_serverless_spark_2.png) |
||||
|
||||
## Create Tasks |
||||
|
||||
- Click `Porject -> Workflow Definition -> Create Workflow` and drag the `ALIYUN_SERVERLESS_SPARK` task to the canvas. |
||||
|
||||
![demo-aliyun-serverless-spark-create-task-1](../../../../img/tasks/demo/aliyun_serverless_spark_3.png) |
||||
|
||||
- Fill in the task parameters and click `Confirm` to create the task node. |
||||
|
||||
![demo-aliyun-serverless-spark-create-task-2](../../../../img/tasks/demo/aliyun_serverless_spark_4.png) |
||||
|
||||
## Task Parameters |
||||
|
||||
- Please refer to [DolphinScheduler Task Parameters Appendix](appendix.md) `Default Task Parameters` section for default parameters. |
||||
|
||||
| **Parameters** | **Description** | |
||||
|-------------------------|-----------------------------------------------------------------------------------------------------| |
||||
| Datasource types | The type of datasource the task uses, should be `ALIYUN_SERVERLESS_SPARK`. | |
||||
| Datasource instances | The instance of `ALIYUN_SERVERLESS_SPARK` datasource. | |
||||
| workspace id | `Aliyun Serverless Spark` workspace id. | |
||||
| resource queue id | `Aliyun Serverless Spark` resource queue the task uses to submit spark job. | |
||||
| code type | `Aliyun Serverless Spark` code type, could be `JAR`, `PYTHON` or `SQL`. | |
||||
| job name | `Aliyun Serverless Spark` job name. | |
||||
| entry point | The location of the job code such as jar package, python file, or sql file. OSS location supported. | |
||||
| entry point arguments | Arguments of the job main program. | |
||||
| spark submit parameters | Spark-submit related parameters. | |
||||
| engine release version | Spark engine release version. | |
||||
| is production | Whether the spark job runs in production or development environment. | |
||||
|
||||
## Examples |
||||
|
||||
### Submit Jar tasks |
||||
|
||||
| **Parameters** | **Example Values / Operations** | |
||||
|-------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| |
||||
| region id | cn-hangzhou | |
||||
| access key id | <your-access-key-id> | |
||||
| access key secret | <your-access-key-secret> | |
||||
| resource queue id | root_queue | |
||||
| code type | JAR | |
||||
| job name | ds-emr-spark-jar | |
||||
| entry point | oss://datadev-oss-hdfs-test/spark-resource/examples/jars/spark-examples_2.12-3.3.1.jar | |
||||
| entry point arguments | 100 | |
||||
| spark submit parameters | --class org.apache.spark.examples.SparkPi --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1 | |
||||
| engine release version | esr-2.1-native (Spark 3.3.1, Scala 2.12, Native Runtime) | |
||||
| is production | Please open the switch | |
||||
|
||||
### Submit SQL tasks |
||||
|
||||
| **Parameters** | **Example Values / Operations** | |
||||
|-------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| |
||||
| region id | cn-hangzhou | |
||||
| access key id | <your-access-key-id> | |
||||
| access key secret | <your-access-key-secret> | |
||||
| resource queue id | root_queue | |
||||
| code type | SQL | |
||||
| job name | ds-emr-spark-sql-1 | |
||||
| entry point | Any non-empty string | |
||||
| entry point arguments | -e#show tables;show tables; | |
||||
| spark submit parameters | --class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1 | |
||||
| engine release version | esr-2.1-native (Spark 3.3.1, Scala 2.12, Native Runtime) | |
||||
| is production | Please open the switch | |
||||
|
||||
### Submit SQL tasks located in OSS |
||||
|
||||
| **Parameters** | **Example Values / Operations** | |
||||
|-------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| |
||||
| region id | cn-hangzhou | |
||||
| access key id | <your-access-key-id> | |
||||
| access key secret | <your-access-key-secret> | |
||||
| resource queue id | root_queue | |
||||
| code type | SQL | |
||||
| job name | ds-emr-spark-sql-2 | |
||||
| entry point | Any non-empty string | |
||||
| entry point arguments | -f#oss://datadev-oss-hdfs-test/spark-resource/examples/sql/show_db.sql | |
||||
| spark submit parameters | --class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1" | |
||||
| engine release version | esr-2.1-native (Spark 3.3.1, Scala 2.12, Native Runtime) | |
||||
| is production | Please open the switch | |
||||
|
||||
### Submit PySpark Tasks |
||||
|
||||
| **Parameters** | **Example Values / Operations** | |
||||
|-------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------| |
||||
| region id | cn-hangzhou | |
||||
| access key id | <your-access-key-id> | |
||||
| access key secret | <your-access-key-secret> | |
||||
| resource queue id | root_queue | |
||||
| code type | PYTHON | |
||||
| job name | ds-emr-spark-python | |
||||
| entry point | oss://datadev-oss-hdfs-test/spark-resource/examples/src/main/python/pi.py | |
||||
| entry point arguments | 100 | |
||||
| spark submit parameters | --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1 | |
||||
| engine release version | esr-2.1-native (Spark 3.3.1, Scala 2.12, Native Runtime) | |
||||
| is production | Please open the switch | |
||||
|
@ -0,0 +1,111 @@
|
||||
# Aliyun EMR Serverless Spark |
||||
|
||||
## 简介 |
||||
|
||||
`Aliyun EMR Serverless Spark` 任务插件用于向 |
||||
[`阿里云EMR Serverless Spark`](https://help.aliyun.com/zh/emr/emr-serverless-spark/product-overview/what-is-emr-serverless-spark) 服务提交作业。 |
||||
|
||||
## 创建链接 |
||||
|
||||
- 点击 `数据源 -> 创建数据源 -> ALIYUN_SERVERLESS_SPARK` 创建链接。 |
||||
|
||||
![demo-aliyun-serverless-spark-create-datasource-1](../../../../img/tasks/demo/aliyun_serverless_spark_1.png) |
||||
|
||||
- 填入 `Datasource Name`, `Access Key Id`, `Access Key Secret`, `Region Id` 参数并且点击 `确认`. |
||||
|
||||
![demo-aliyun-serverless-spark-create-datasource-2](../../../../img/tasks/demo/aliyun_serverless_spark_2.png) |
||||
|
||||
## 创建任务节点 |
||||
|
||||
- 点击 `项目 -> 工作流定义 -> 创建工作流` 并且将 `ALIYUN_SERVERLESS_SPARK` 任务拖到画板中。 |
||||
|
||||
![demo-aliyun-serverless-spark-create-task-1](../../../../img/tasks/demo/aliyun_serverless_spark_3.png) |
||||
|
||||
- 填入相关任务参数并且点击 `确认` 创建任务节点。 |
||||
|
||||
![demo-aliyun-serverless-spark-create-task-2](../../../../img/tasks/demo/aliyun_serverless_spark_4.png) |
||||
|
||||
## 任务参数 |
||||
|
||||
- 默认参数说明请参考[DolphinScheduler任务参数附录](appendix.md)`默认任务参数`一栏。 |
||||
|
||||
| **任务参数** | **描述** | |
||||
|-------------------------|----------------------------------------------------------| |
||||
| Datasource types | 链接类型,应该选择 `ALIYUN_SERVERLESS_SPARK`。 | |
||||
| Datasource instances | `ALIYUN_SERVERLESS_SPARK` 链接实例。 | |
||||
| workspace id | `Aliyun Serverless Spark` 工作空间id。 | |
||||
| resource queue id | `Aliyun Serverless Spark` 任务队列id。 | |
||||
| code type | `Aliyun Serverless Spark` 任务类型,可以是`JAR`、`PYTHON`或者`SQL`。 | |
||||
| job name | `Aliyun Serverless Spark` 任务名。 | |
||||
| entry point | 任务代码(JAR包、PYTHON / SQL脚本)的位置,支持OSS中的文件。 | |
||||
| entry point arguments | 主程序入口参数。 | |
||||
| spark submit parameters | Spark-submit相关参数。 | |
||||
| engine release version | Spark引擎版本。 | |
||||
| is production | Spark任务是否运行在生产环境中。 | |
||||
|
||||
## 示例 |
||||
|
||||
### 提交jar类型任务 |
||||
|
||||
| **参数名** | **参数值 / 按钮操作** | |
||||
|-------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| |
||||
| region id | cn-hangzhou | |
||||
| access key id | <your-access-key-id> | |
||||
| access key secret | <your-access-key-secret> | |
||||
| resource queue id | root_queue | |
||||
| code type | JAR | |
||||
| job name | ds-emr-spark-jar | |
||||
| entry point | oss://datadev-oss-hdfs-test/spark-resource/examples/jars/spark-examples_2.12-3.3.1.jar | |
||||
| entry point arguments | 100 | |
||||
| spark submit parameters | --class org.apache.spark.examples.SparkPi --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1 | |
||||
| engine release version | esr-2.1-native (Spark 3.3.1, Scala 2.12, Native Runtime) | |
||||
| is production | 请您将按钮打开 | |
||||
|
||||
### 提交sql类型任务 |
||||
|
||||
| **参数名** | **参数值 / 按钮操作** | |
||||
|-------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| |
||||
| region id | cn-hangzhou | |
||||
| access key id | <your-access-key-id> | |
||||
| access key secret | <your-access-key-secret> | |
||||
| resource queue id | root_queue | |
||||
| code type | SQL | |
||||
| job name | ds-emr-spark-sql-1 | |
||||
| entry point | 任意非空值 | |
||||
| entry point arguments | -e#show tables;show tables; | |
||||
| spark submit parameters | --class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1 | |
||||
| engine release version | esr-2.1-native (Spark 3.3.1, Scala 2.12, Native Runtime) | |
||||
| is production | 请您将按钮打开 | |
||||
|
||||
### 提交oss中的sql脚本任务 |
||||
|
||||
| **参数名** | **参数值 / 按钮操作** | |
||||
|-------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| |
||||
| region id | cn-hangzhou | |
||||
| access key id | <your-access-key-id> | |
||||
| access key secret | <your-access-key-secret> | |
||||
| resource queue id | root_queue | |
||||
| code type | SQL | |
||||
| job name | ds-emr-spark-sql-2 | |
||||
| entry point | 任意非空值 | |
||||
| entry point arguments | -f#oss://datadev-oss-hdfs-test/spark-resource/examples/sql/show_db.sql | |
||||
| spark submit parameters | --class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1" | |
||||
| engine release version | esr-2.1-native (Spark 3.3.1, Scala 2.12, Native Runtime) | |
||||
| is production | 请您将按钮打开 | |
||||
|
||||
### 提交pyspark任务 |
||||
|
||||
| **参数名** | **参数值 / 按钮操作** | |
||||
|-------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------| |
||||
| region id | cn-hangzhou | |
||||
| access key id | <your-access-key-id> | |
||||
| access key secret | <your-access-key-secret> | |
||||
| resource queue id | root_queue | |
||||
| code type | PYTHON | |
||||
| job name | ds-emr-spark-python | |
||||
| entry point | oss://datadev-oss-hdfs-test/spark-resource/examples/src/main/python/pi.py | |
||||
| entry point arguments | 100 | |
||||
| spark submit parameters | --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1 | |
||||
| engine release version | esr-2.1-native (Spark 3.3.1, Scala 2.12, Native Runtime) | |
||||
| is production | 请您将按钮打开 | |
||||
|
After Width: | Height: | Size: 762 KiB |
After Width: | Height: | Size: 945 KiB |
After Width: | Height: | Size: 919 KiB |
After Width: | Height: | Size: 992 KiB |
@ -0,0 +1,57 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?> |
||||
<!-- |
||||
~ Licensed to the Apache Software Foundation (ASF) under one or more |
||||
~ contributor license agreements. See the NOTICE file distributed with |
||||
~ this work for additional information regarding copyright ownership. |
||||
~ The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
~ (the "License"); you may not use this file except in compliance with |
||||
~ the License. You may obtain a copy of the License at |
||||
~ |
||||
~ http://www.apache.org/licenses/LICENSE-2.0 |
||||
~ |
||||
~ Unless required by applicable law or agreed to in writing, software |
||||
~ distributed under the License is distributed on an "AS IS" BASIS, |
||||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
~ See the License for the specific language governing permissions and |
||||
~ limitations under the License. |
||||
--> |
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" |
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> |
||||
<modelVersion>4.0.0</modelVersion> |
||||
<parent> |
||||
<groupId>org.apache.dolphinscheduler</groupId> |
||||
<artifactId>dolphinscheduler-datasource-plugin</artifactId> |
||||
<version>dev-SNAPSHOT</version> |
||||
</parent> |
||||
|
||||
<artifactId>dolphinscheduler-datasource-aliyunserverlessspark</artifactId> |
||||
<packaging>jar</packaging> |
||||
<name>${project.artifactId}</name> |
||||
|
||||
<dependencies> |
||||
<dependency> |
||||
<groupId>org.apache.dolphinscheduler</groupId> |
||||
<artifactId>dolphinscheduler-spi</artifactId> |
||||
<scope>provided</scope> |
||||
</dependency> |
||||
|
||||
<dependency> |
||||
<groupId>org.apache.dolphinscheduler</groupId> |
||||
<artifactId>dolphinscheduler-datasource-api</artifactId> |
||||
<version>${project.version}</version> |
||||
</dependency> |
||||
|
||||
<dependency> |
||||
<groupId>com.aliyun</groupId> |
||||
<artifactId>emr_serverless_spark20230808</artifactId> |
||||
<version>1.0.0</version> |
||||
</dependency> |
||||
|
||||
<dependency> |
||||
<groupId>com.aliyun</groupId> |
||||
<artifactId>credentials-java</artifactId> |
||||
<version>0.3.0</version> |
||||
</dependency> |
||||
</dependencies> |
||||
|
||||
</project> |
@ -0,0 +1,69 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.datasource.aliyunserverlessspark; |
||||
|
||||
import static com.google.common.base.Preconditions.checkNotNull; |
||||
|
||||
import org.apache.commons.lang3.StringUtils; |
||||
|
||||
import lombok.extern.slf4j.Slf4j; |
||||
|
||||
import com.aliyun.emr_serverless_spark20230808.Client; |
||||
import com.aliyun.teaopenapi.models.Config; |
||||
|
||||
@Slf4j |
||||
public class AliyunServerlessSparkClientWrapper implements AutoCloseable { |
||||
|
||||
private Client aliyunServerlessSparkClient; |
||||
|
||||
public AliyunServerlessSparkClientWrapper( |
||||
String accessKeyId, |
||||
String accessKeySecret, |
||||
String regionId, |
||||
String endpoint) |
||||
throws Exception { |
||||
|
||||
checkNotNull(accessKeyId, accessKeySecret, regionId); |
||||
|
||||
if (StringUtils.isEmpty(endpoint)) { |
||||
endpoint = String.format(AliyunServerlessSparkConstants.ENDPOINT_TEMPLATE, regionId); |
||||
} |
||||
|
||||
Config config = new Config() |
||||
.setEndpoint(endpoint) |
||||
.setAccessKeyId(accessKeyId) |
||||
.setAccessKeySecret(accessKeySecret); |
||||
aliyunServerlessSparkClient = new Client(config); |
||||
} |
||||
|
||||
// TODO: update checkConnect when aliyun serverless spark service support the service connection check
|
||||
public boolean checkConnect(String accessKeyId, String accessKeySecret, String regionId) { |
||||
try { |
||||
// If the login fails, an exception will be thrown directly
|
||||
return true; |
||||
} catch (Exception e) { |
||||
log.info("spark client failed to connect to the server", e); |
||||
return false; |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void close() throws Exception { |
||||
|
||||
} |
||||
} |
@ -0,0 +1,41 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.datasource.aliyunserverlessspark; |
||||
|
||||
import lombok.experimental.UtilityClass; |
||||
|
||||
@UtilityClass |
||||
public class AliyunServerlessSparkConstants { |
||||
|
||||
public String ENDPOINT_TEMPLATE = "emr-serverless-spark.%s.aliyuncs.com"; |
||||
|
||||
public String DEFAULT_ENGINE = "esr-2.1-native (Spark 3.3.1, Scala 2.12, Native Runtime)"; |
||||
|
||||
public String ENV_PROD = "production"; |
||||
|
||||
public String ENV_DEV = "dev"; |
||||
|
||||
public String ENTRY_POINT_ARGUMENTS_DELIMITER = "#"; |
||||
|
||||
public String ENV_KEY = "environment"; |
||||
|
||||
public String WORKFLOW_KEY = "workflow"; |
||||
|
||||
public String WORKFLOW_VALUE = "true"; |
||||
|
||||
} |
@ -0,0 +1,37 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.datasource.aliyunserverlessspark; |
||||
|
||||
import org.apache.dolphinscheduler.spi.datasource.AdHocDataSourceClient; |
||||
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam; |
||||
import org.apache.dolphinscheduler.spi.datasource.DataSourceChannel; |
||||
import org.apache.dolphinscheduler.spi.datasource.PooledDataSourceClient; |
||||
import org.apache.dolphinscheduler.spi.enums.DbType; |
||||
|
||||
public class AliyunServerlessSparkDataSourceChannel implements DataSourceChannel { |
||||
|
||||
@Override |
||||
public AdHocDataSourceClient createAdHocDataSourceClient(BaseConnectionParam baseConnectionParam, DbType dbType) { |
||||
throw new UnsupportedOperationException("Aliyun Serverless Spark AdHocDataSourceClient is not supported"); |
||||
} |
||||
|
||||
@Override |
||||
public PooledDataSourceClient createPooledDataSourceClient(BaseConnectionParam baseConnectionParam, DbType dbType) { |
||||
throw new UnsupportedOperationException("Aliyun Serverless Spark AdHocDataSourceClient is not supported"); |
||||
} |
||||
} |
@ -0,0 +1,39 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.datasource.aliyunserverlessspark; |
||||
|
||||
import org.apache.dolphinscheduler.spi.datasource.DataSourceChannel; |
||||
import org.apache.dolphinscheduler.spi.datasource.DataSourceChannelFactory; |
||||
import org.apache.dolphinscheduler.spi.enums.DbType; |
||||
|
||||
import com.google.auto.service.AutoService; |
||||
|
||||
@AutoService(DataSourceChannelFactory.class) |
||||
public class AliyunServerlessSparkDataSourceChannelFactory implements DataSourceChannelFactory { |
||||
|
||||
@Override |
||||
public DataSourceChannel create() { |
||||
return new AliyunServerlessSparkDataSourceChannel(); |
||||
} |
||||
|
||||
@Override |
||||
public String getName() { |
||||
return DbType.ALIYUN_SERVERLESS_SPARK.getName(); |
||||
} |
||||
|
||||
} |
@ -0,0 +1,41 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.datasource.aliyunserverlessspark; |
||||
|
||||
import org.apache.dolphinscheduler.plugin.datasource.aliyunserverlessspark.param.AliyunServerlessSparkConnectionParam; |
||||
|
||||
import com.aliyun.emr_serverless_spark20230808.Client; |
||||
import com.aliyun.teaopenapi.models.Config; |
||||
|
||||
public class AliyunServerlessSparkUtils { |
||||
|
||||
private AliyunServerlessSparkUtils() { |
||||
throw new IllegalStateException("Utility class"); |
||||
} |
||||
|
||||
public static Client getAliyunServerlessSparkClient(AliyunServerlessSparkConnectionParam connectionParam) throws Exception { |
||||
String endpoint = |
||||
String.format(AliyunServerlessSparkConstants.ENDPOINT_TEMPLATE, connectionParam.getRegionId()); |
||||
Config config = new Config() |
||||
.setEndpoint(endpoint) |
||||
.setAccessKeyId(connectionParam.getAccessKeyId()) |
||||
.setAccessKeySecret(connectionParam.getAccessKeySecret()); |
||||
return new Client(config); |
||||
} |
||||
|
||||
} |
@ -0,0 +1,37 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.datasource.aliyunserverlessspark.param; |
||||
|
||||
import org.apache.dolphinscheduler.spi.datasource.ConnectionParam; |
||||
|
||||
import lombok.Data; |
||||
|
||||
import com.fasterxml.jackson.annotation.JsonInclude; |
||||
|
||||
@Data |
||||
@JsonInclude(JsonInclude.Include.NON_NULL) |
||||
public class AliyunServerlessSparkConnectionParam implements ConnectionParam { |
||||
|
||||
protected String accessKeyId; |
||||
|
||||
protected String accessKeySecret; |
||||
|
||||
protected String regionId; |
||||
|
||||
protected String endpoint; |
||||
} |
@ -0,0 +1,40 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.datasource.aliyunserverlessspark.param; |
||||
|
||||
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO; |
||||
import org.apache.dolphinscheduler.spi.enums.DbType; |
||||
|
||||
import lombok.Data; |
||||
|
||||
@Data |
||||
public class AliyunServerlessSparkDataSourceParamDTO extends BaseDataSourceParamDTO { |
||||
|
||||
protected String accessKeyId; |
||||
|
||||
protected String accessKeySecret; |
||||
|
||||
protected String regionId; |
||||
|
||||
protected String endpoint; |
||||
|
||||
@Override |
||||
public DbType getType() { |
||||
return DbType.ALIYUN_SERVERLESS_SPARK; |
||||
} |
||||
} |
@ -0,0 +1,154 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.datasource.aliyunserverlessspark.param; |
||||
|
||||
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
||||
import org.apache.dolphinscheduler.plugin.datasource.aliyunserverlessspark.AliyunServerlessSparkClientWrapper; |
||||
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.AbstractDataSourceProcessor; |
||||
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO; |
||||
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor; |
||||
import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils; |
||||
import org.apache.dolphinscheduler.spi.datasource.ConnectionParam; |
||||
import org.apache.dolphinscheduler.spi.enums.DbType; |
||||
|
||||
import org.apache.commons.lang3.StringUtils; |
||||
|
||||
import java.sql.Connection; |
||||
import java.text.MessageFormat; |
||||
|
||||
import lombok.extern.slf4j.Slf4j; |
||||
|
||||
import com.google.auto.service.AutoService; |
||||
|
||||
@AutoService(DataSourceProcessor.class) |
||||
@Slf4j |
||||
public class AliyunServerlessSparkDataSourceProcessor extends AbstractDataSourceProcessor { |
||||
|
||||
@Override |
||||
public BaseDataSourceParamDTO castDatasourceParamDTO(String paramJson) { |
||||
return JSONUtils.parseObject(paramJson, AliyunServerlessSparkDataSourceParamDTO.class); |
||||
} |
||||
|
||||
@Override |
||||
public void checkDatasourceParam(BaseDataSourceParamDTO datasourceParamDTO) { |
||||
AliyunServerlessSparkDataSourceParamDTO aliyunServerlessSparkDataSourceParamDTO = |
||||
(AliyunServerlessSparkDataSourceParamDTO) datasourceParamDTO; |
||||
if (StringUtils.isEmpty(aliyunServerlessSparkDataSourceParamDTO.getRegionId()) || |
||||
StringUtils.isEmpty(aliyunServerlessSparkDataSourceParamDTO.getAccessKeyId()) || |
||||
StringUtils.isEmpty(aliyunServerlessSparkDataSourceParamDTO.getRegionId())) { |
||||
throw new IllegalArgumentException("spark datasource param is not valid"); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public String getDatasourceUniqueId(ConnectionParam connectionParam, DbType dbType) { |
||||
AliyunServerlessSparkConnectionParam baseConnectionParam = |
||||
(AliyunServerlessSparkConnectionParam) connectionParam; |
||||
return MessageFormat.format( |
||||
"{0}@{1}@{2}@{3}", |
||||
dbType.getName(), |
||||
baseConnectionParam.getRegionId(), |
||||
PasswordUtils.encodePassword(baseConnectionParam.getAccessKeyId()), |
||||
PasswordUtils.encodePassword(baseConnectionParam.getAccessKeySecret())); |
||||
} |
||||
|
||||
@Override |
||||
public BaseDataSourceParamDTO createDatasourceParamDTO(String connectionJson) { |
||||
AliyunServerlessSparkConnectionParam connectionParams = |
||||
(AliyunServerlessSparkConnectionParam) createConnectionParams(connectionJson); |
||||
AliyunServerlessSparkDataSourceParamDTO aliyunServerlessSparkDataSourceParamDTO = |
||||
new AliyunServerlessSparkDataSourceParamDTO(); |
||||
|
||||
aliyunServerlessSparkDataSourceParamDTO.setAccessKeyId(connectionParams.getAccessKeyId()); |
||||
aliyunServerlessSparkDataSourceParamDTO.setAccessKeySecret(connectionParams.getAccessKeySecret()); |
||||
aliyunServerlessSparkDataSourceParamDTO.setRegionId(connectionParams.getRegionId()); |
||||
aliyunServerlessSparkDataSourceParamDTO.setEndpoint(connectionParams.getEndpoint()); |
||||
return aliyunServerlessSparkDataSourceParamDTO; |
||||
} |
||||
|
||||
@Override |
||||
public AliyunServerlessSparkConnectionParam createConnectionParams(BaseDataSourceParamDTO datasourceParam) { |
||||
AliyunServerlessSparkDataSourceParamDTO aliyunServerlessSparkDataSourceParamDTO = |
||||
(AliyunServerlessSparkDataSourceParamDTO) datasourceParam; |
||||
AliyunServerlessSparkConnectionParam aliyunServerlessSparkConnectionParam = |
||||
new AliyunServerlessSparkConnectionParam(); |
||||
aliyunServerlessSparkConnectionParam.setAccessKeyId(aliyunServerlessSparkDataSourceParamDTO.getAccessKeyId()); |
||||
aliyunServerlessSparkConnectionParam |
||||
.setAccessKeySecret(aliyunServerlessSparkDataSourceParamDTO.getAccessKeySecret()); |
||||
aliyunServerlessSparkConnectionParam.setRegionId(aliyunServerlessSparkDataSourceParamDTO.getRegionId()); |
||||
aliyunServerlessSparkConnectionParam.setEndpoint(aliyunServerlessSparkDataSourceParamDTO.getEndpoint()); |
||||
|
||||
return aliyunServerlessSparkConnectionParam; |
||||
} |
||||
|
||||
@Override |
||||
public ConnectionParam createConnectionParams(String connectionJson) { |
||||
return JSONUtils.parseObject(connectionJson, AliyunServerlessSparkConnectionParam.class); |
||||
} |
||||
|
||||
@Override |
||||
public String getDatasourceDriver() { |
||||
return ""; |
||||
} |
||||
|
||||
@Override |
||||
public String getValidationQuery() { |
||||
return ""; |
||||
} |
||||
|
||||
@Override |
||||
public String getJdbcUrl(ConnectionParam connectionParam) { |
||||
return ""; |
||||
} |
||||
|
||||
@Override |
||||
public Connection getConnection(ConnectionParam connectionParam) { |
||||
return null; |
||||
} |
||||
|
||||
@Override |
||||
public boolean checkDataSourceConnectivity(ConnectionParam connectionParam) { |
||||
AliyunServerlessSparkConnectionParam baseConnectionParam = |
||||
(AliyunServerlessSparkConnectionParam) connectionParam; |
||||
try ( |
||||
AliyunServerlessSparkClientWrapper aliyunServerlessSparkClientWrapper = |
||||
new AliyunServerlessSparkClientWrapper( |
||||
baseConnectionParam.getAccessKeyId(), |
||||
baseConnectionParam.getAccessKeySecret(), |
||||
baseConnectionParam.getRegionId(), |
||||
baseConnectionParam.getEndpoint())) { |
||||
return aliyunServerlessSparkClientWrapper.checkConnect( |
||||
baseConnectionParam.getAccessKeyId(), |
||||
baseConnectionParam.getAccessKeySecret(), |
||||
baseConnectionParam.getRegionId()); |
||||
} catch (Exception e) { |
||||
log.error("spark client failed to connect to the server", e); |
||||
return false; |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public DbType getDbType() { |
||||
return DbType.ALIYUN_SERVERLESS_SPARK; |
||||
} |
||||
|
||||
@Override |
||||
public DataSourceProcessor create() { |
||||
return new AliyunServerlessSparkDataSourceProcessor(); |
||||
} |
||||
} |
@ -0,0 +1,117 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.datasource.aliyunserverlessspark; |
||||
|
||||
import org.apache.dolphinscheduler.plugin.datasource.aliyunserverlessspark.param.AliyunServerlessSparkConnectionParam; |
||||
import org.apache.dolphinscheduler.plugin.datasource.aliyunserverlessspark.param.AliyunServerlessSparkDataSourceParamDTO; |
||||
import org.apache.dolphinscheduler.plugin.datasource.aliyunserverlessspark.param.AliyunServerlessSparkDataSourceProcessor; |
||||
import org.apache.dolphinscheduler.spi.enums.DbType; |
||||
|
||||
import org.junit.jupiter.api.Assertions; |
||||
import org.junit.jupiter.api.BeforeEach; |
||||
import org.junit.jupiter.api.Test; |
||||
import org.junit.jupiter.api.extension.ExtendWith; |
||||
import org.mockito.MockedConstruction; |
||||
import org.mockito.Mockito; |
||||
import org.mockito.junit.jupiter.MockitoExtension; |
||||
|
||||
@ExtendWith(MockitoExtension.class) |
||||
public class AliyunServerlessSparkDataSourceProcessorTest { |
||||
|
||||
private AliyunServerlessSparkDataSourceProcessor aliyunServerlessSparkDataSourceProcessor; |
||||
|
||||
private String connectJson = |
||||
"{\"accessKeyId\":\"mockAccessKeyId\",\"accessKeySecret\":\"mockAccessKeySecret\",\"regionId\":\"cn-hangzhou\"}"; |
||||
|
||||
@BeforeEach |
||||
public void init() { |
||||
aliyunServerlessSparkDataSourceProcessor = new AliyunServerlessSparkDataSourceProcessor(); |
||||
} |
||||
|
||||
@Test |
||||
void testCheckDatasourceParam() { |
||||
AliyunServerlessSparkDataSourceParamDTO aliyunServerlessSparkDataSourceParamDTO = |
||||
new AliyunServerlessSparkDataSourceParamDTO(); |
||||
aliyunServerlessSparkDataSourceParamDTO.setRegionId("cn-hangzhou"); |
||||
Assertions.assertThrows(IllegalArgumentException.class, |
||||
() -> aliyunServerlessSparkDataSourceProcessor |
||||
.checkDatasourceParam(aliyunServerlessSparkDataSourceParamDTO)); |
||||
aliyunServerlessSparkDataSourceParamDTO.setAccessKeyId("mockAccessKeyId"); |
||||
aliyunServerlessSparkDataSourceParamDTO.setAccessKeySecret("mockAccessKeySecret"); |
||||
Assertions |
||||
.assertDoesNotThrow(() -> aliyunServerlessSparkDataSourceProcessor |
||||
.checkDatasourceParam(aliyunServerlessSparkDataSourceParamDTO)); |
||||
} |
||||
|
||||
@Test |
||||
void testGetDatasourceUniqueId() { |
||||
AliyunServerlessSparkConnectionParam aliyunServerlessSparkConnectionParam = |
||||
new AliyunServerlessSparkConnectionParam(); |
||||
aliyunServerlessSparkConnectionParam.setRegionId("cn-hangzhou"); |
||||
aliyunServerlessSparkConnectionParam.setAccessKeyId("mockAccessKeyId"); |
||||
aliyunServerlessSparkConnectionParam.setAccessKeySecret("mockAccessKeySecret"); |
||||
Assertions.assertEquals("aliyun_serverless_spark@cn-hangzhou@mockAccessKeyId@mockAccessKeySecret", |
||||
aliyunServerlessSparkDataSourceProcessor.getDatasourceUniqueId(aliyunServerlessSparkConnectionParam, |
||||
DbType.ALIYUN_SERVERLESS_SPARK)); |
||||
} |
||||
|
||||
@Test |
||||
void testCreateDatasourceParamDTO() { |
||||
AliyunServerlessSparkDataSourceParamDTO aliyunServerlessSparkDataSourceParamDTO = |
||||
(AliyunServerlessSparkDataSourceParamDTO) aliyunServerlessSparkDataSourceProcessor |
||||
.createDatasourceParamDTO(connectJson); |
||||
Assertions.assertEquals("cn-hangzhou", aliyunServerlessSparkDataSourceParamDTO.getRegionId()); |
||||
Assertions.assertEquals("mockAccessKeyId", aliyunServerlessSparkDataSourceParamDTO.getAccessKeyId()); |
||||
Assertions.assertEquals("mockAccessKeySecret", aliyunServerlessSparkDataSourceParamDTO.getAccessKeySecret()); |
||||
} |
||||
|
||||
@Test |
||||
void testCreateConnectionParams() { |
||||
AliyunServerlessSparkDataSourceParamDTO aliyunServerlessSparkDataSourceParamDTO = |
||||
(AliyunServerlessSparkDataSourceParamDTO) aliyunServerlessSparkDataSourceProcessor |
||||
.createDatasourceParamDTO(connectJson); |
||||
AliyunServerlessSparkConnectionParam aliyunServerlessSparkConnectionParam = |
||||
aliyunServerlessSparkDataSourceProcessor |
||||
.createConnectionParams(aliyunServerlessSparkDataSourceParamDTO); |
||||
Assertions.assertEquals("cn-hangzhou", aliyunServerlessSparkConnectionParam.getRegionId()); |
||||
Assertions.assertEquals("mockAccessKeyId", aliyunServerlessSparkConnectionParam.getAccessKeyId()); |
||||
Assertions.assertEquals("mockAccessKeySecret", aliyunServerlessSparkConnectionParam.getAccessKeySecret()); |
||||
} |
||||
|
||||
@Test |
||||
void testTestConnection() { |
||||
AliyunServerlessSparkDataSourceParamDTO aliyunServerlessSparkDataSourceParamDTO = |
||||
(AliyunServerlessSparkDataSourceParamDTO) aliyunServerlessSparkDataSourceProcessor |
||||
.createDatasourceParamDTO(connectJson); |
||||
AliyunServerlessSparkConnectionParam connectionParam = |
||||
aliyunServerlessSparkDataSourceProcessor |
||||
.createConnectionParams(aliyunServerlessSparkDataSourceParamDTO); |
||||
Assertions.assertTrue(aliyunServerlessSparkDataSourceProcessor.checkDataSourceConnectivity(connectionParam)); |
||||
try ( |
||||
MockedConstruction<AliyunServerlessSparkClientWrapper> AliyunServerlessSparkClientWrapper = |
||||
Mockito.mockConstruction(AliyunServerlessSparkClientWrapper.class, (mock, context) -> { |
||||
Mockito.when( |
||||
mock.checkConnect(connectionParam.getAccessKeyId(), |
||||
connectionParam.getAccessKeySecret(), connectionParam.getRegionId())) |
||||
.thenReturn(true); |
||||
})) { |
||||
Assertions |
||||
.assertTrue(aliyunServerlessSparkDataSourceProcessor.checkDataSourceConnectivity(connectionParam)); |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,68 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?> |
||||
<!-- |
||||
~ Licensed to the Apache Software Foundation (ASF) under one or more |
||||
~ contributor license agreements. See the NOTICE file distributed with |
||||
~ this work for additional information regarding copyright ownership. |
||||
~ The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
~ (the "License"); you may not use this file except in compliance with |
||||
~ the License. You may obtain a copy of the License at |
||||
~ |
||||
~ http://www.apache.org/licenses/LICENSE-2.0 |
||||
~ |
||||
~ Unless required by applicable law or agreed to in writing, software |
||||
~ distributed under the License is distributed on an "AS IS" BASIS, |
||||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
~ See the License for the specific language governing permissions and |
||||
~ limitations under the License. |
||||
--> |
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" |
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> |
||||
<modelVersion>4.0.0</modelVersion> |
||||
<parent> |
||||
<groupId>org.apache.dolphinscheduler</groupId> |
||||
<artifactId>dolphinscheduler-task-plugin</artifactId> |
||||
<version>dev-SNAPSHOT</version> |
||||
</parent> |
||||
<artifactId>dolphinscheduler-task-aliyunserverlessspark</artifactId> |
||||
<packaging>jar</packaging> |
||||
|
||||
<dependencies> |
||||
<dependency> |
||||
<groupId>org.apache.dolphinscheduler</groupId> |
||||
<artifactId>dolphinscheduler-spi</artifactId> |
||||
<scope>provided</scope> |
||||
<exclusions> |
||||
<exclusion> |
||||
<groupId>com.aliyun.oss</groupId> |
||||
<artifactId>aliyun-sdk-oss</artifactId> |
||||
</exclusion> |
||||
</exclusions> |
||||
</dependency> |
||||
<dependency> |
||||
<groupId>org.apache.dolphinscheduler</groupId> |
||||
<artifactId>dolphinscheduler-task-api</artifactId> |
||||
<version>${project.version}</version> |
||||
<exclusions> |
||||
<exclusion> |
||||
<groupId>com.aliyun.oss</groupId> |
||||
<artifactId>aliyun-sdk-oss</artifactId> |
||||
</exclusion> |
||||
</exclusions> |
||||
</dependency> |
||||
<dependency> |
||||
<groupId>org.apache.dolphinscheduler</groupId> |
||||
<artifactId>dolphinscheduler-datasource-all</artifactId> |
||||
<version>${project.version}</version> |
||||
</dependency> |
||||
<dependency> |
||||
<groupId>com.aliyun</groupId> |
||||
<artifactId>emr_serverless_spark20230808</artifactId> |
||||
<version>1.0.0</version> |
||||
</dependency> |
||||
<dependency> |
||||
<groupId>com.aliyun</groupId> |
||||
<artifactId>credentials-java</artifactId> |
||||
<version>0.3.0</version> |
||||
</dependency> |
||||
</dependencies> |
||||
</project> |
@ -0,0 +1,67 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.dolphinscheduler.plugin.task.aliyunserverlessspark; |
||||
|
||||
import org.apache.dolphinscheduler.plugin.task.api.enums.ResourceType; |
||||
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; |
||||
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper; |
||||
|
||||
import lombok.Data; |
||||
import lombok.extern.slf4j.Slf4j; |
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty; |
||||
|
||||
@Data |
||||
@Slf4j |
||||
public class AliyunServerlessSparkParameters extends AbstractParameters { |
||||
|
||||
// spark job configurations
|
||||
private String workspaceId; |
||||
|
||||
private String resourceQueueId; |
||||
|
||||
private String codeType; |
||||
|
||||
private String jobName; |
||||
|
||||
private String engineReleaseVersion; |
||||
|
||||
private String entryPoint; |
||||
|
||||
private String entryPointArguments; |
||||
|
||||
private String sparkSubmitParameters; |
||||
|
||||
@JsonProperty("isProduction") |
||||
boolean isProduction; |
||||
|
||||
private int datasource; |
||||
|
||||
private String type; |
||||
|
||||
@Override |
||||
public boolean checkParameters() { |
||||
return true; |
||||
} |
||||
|
||||
@Override |
||||
public ResourceParametersHelper getResources() { |
||||
ResourceParametersHelper resources = super.getResources(); |
||||
resources.put(ResourceType.DATASOURCE, datasource); |
||||
return resources; |
||||
} |
||||
} |
@ -0,0 +1,246 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.task.aliyunserverlessspark; |
||||
|
||||
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
||||
import org.apache.dolphinscheduler.plugin.datasource.aliyunserverlessspark.AliyunServerlessSparkConstants; |
||||
import org.apache.dolphinscheduler.plugin.datasource.aliyunserverlessspark.param.AliyunServerlessSparkConnectionParam; |
||||
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils; |
||||
import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask; |
||||
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack; |
||||
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; |
||||
import org.apache.dolphinscheduler.plugin.task.api.TaskException; |
||||
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; |
||||
import org.apache.dolphinscheduler.plugin.task.api.enums.ResourceType; |
||||
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; |
||||
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.DataSourceParameters; |
||||
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper; |
||||
import org.apache.dolphinscheduler.spi.enums.DbType; |
||||
|
||||
import org.apache.commons.lang3.StringUtils; |
||||
|
||||
import java.util.Arrays; |
||||
import java.util.Collections; |
||||
import java.util.HashMap; |
||||
import java.util.List; |
||||
import java.util.Map; |
||||
|
||||
import lombok.extern.slf4j.Slf4j; |
||||
|
||||
import com.aliyun.emr_serverless_spark20230808.Client; |
||||
import com.aliyun.emr_serverless_spark20230808.models.CancelJobRunRequest; |
||||
import com.aliyun.emr_serverless_spark20230808.models.GetJobRunRequest; |
||||
import com.aliyun.emr_serverless_spark20230808.models.GetJobRunResponse; |
||||
import com.aliyun.emr_serverless_spark20230808.models.JobDriver; |
||||
import com.aliyun.emr_serverless_spark20230808.models.StartJobRunRequest; |
||||
import com.aliyun.emr_serverless_spark20230808.models.StartJobRunResponse; |
||||
import com.aliyun.emr_serverless_spark20230808.models.Tag; |
||||
import com.aliyun.teaopenapi.models.Config; |
||||
import com.aliyun.teautil.models.RuntimeOptions; |
||||
|
||||
@Slf4j |
||||
public class AliyunServerlessSparkTask extends AbstractRemoteTask { |
||||
|
||||
private final TaskExecutionContext taskExecutionContext; |
||||
|
||||
private Client aliyunServerlessSparkClient; |
||||
|
||||
private AliyunServerlessSparkParameters aliyunServerlessSparkParameters; |
||||
|
||||
private AliyunServerlessSparkConnectionParam aliyunServerlessSparkConnectionParam; |
||||
|
||||
private String jobRunId; |
||||
|
||||
private RunState currentState; |
||||
|
||||
private String accessKeyId; |
||||
|
||||
private String accessKeySecret; |
||||
|
||||
private String regionId; |
||||
|
||||
private String endpoint; |
||||
|
||||
protected AliyunServerlessSparkTask(TaskExecutionContext taskExecutionContext) { |
||||
super(taskExecutionContext); |
||||
this.taskExecutionContext = taskExecutionContext; |
||||
} |
||||
|
||||
@Override |
||||
public void init() { |
||||
final String taskParams = taskExecutionContext.getTaskParams(); |
||||
aliyunServerlessSparkParameters = JSONUtils.parseObject(taskParams, AliyunServerlessSparkParameters.class); |
||||
log.info("aliyunServerlessSparkParameters - {}", aliyunServerlessSparkParameters); |
||||
if (this.aliyunServerlessSparkParameters == null || !this.aliyunServerlessSparkParameters.checkParameters()) { |
||||
throw new AliyunServerlessSparkTaskException("Aliyun-Serverless-Spark task parameters are not valid!"); |
||||
} |
||||
|
||||
ResourceParametersHelper resourceParametersHelper = taskExecutionContext.getResourceParametersHelper(); |
||||
DataSourceParameters dataSourceParameters = (DataSourceParameters) resourceParametersHelper |
||||
.getResourceParameters(ResourceType.DATASOURCE, aliyunServerlessSparkParameters.getDatasource()); |
||||
aliyunServerlessSparkConnectionParam = (AliyunServerlessSparkConnectionParam) DataSourceUtils |
||||
.buildConnectionParams( |
||||
DbType.valueOf(aliyunServerlessSparkParameters.getType()), |
||||
dataSourceParameters.getConnectionParams()); |
||||
|
||||
accessKeyId = aliyunServerlessSparkConnectionParam.getAccessKeyId(); |
||||
accessKeySecret = aliyunServerlessSparkConnectionParam.getAccessKeySecret(); |
||||
regionId = aliyunServerlessSparkConnectionParam.getRegionId(); |
||||
endpoint = aliyunServerlessSparkConnectionParam.getEndpoint(); |
||||
|
||||
try { |
||||
aliyunServerlessSparkClient = |
||||
buildAliyunServerlessSparkClient(accessKeyId, accessKeySecret, regionId, endpoint); |
||||
} catch (Exception e) { |
||||
log.error("Failed to build Aliyun-Serverless-Spark client!", e); |
||||
throw new AliyunServerlessSparkTaskException("Failed to build Aliyun-Serverless-Spark client!"); |
||||
} |
||||
|
||||
currentState = RunState.Submitted; |
||||
} |
||||
|
||||
@Override |
||||
public void handle(TaskCallBack taskCallBack) throws TaskException { |
||||
try { |
||||
StartJobRunRequest startJobRunRequest = buildStartJobRunRequest(aliyunServerlessSparkParameters); |
||||
RuntimeOptions runtime = new RuntimeOptions(); |
||||
Map<String, String> headers = new HashMap<>(); |
||||
StartJobRunResponse startJobRunResponse = aliyunServerlessSparkClient.startJobRunWithOptions( |
||||
aliyunServerlessSparkParameters.getWorkspaceId(), startJobRunRequest, headers, runtime); |
||||
jobRunId = startJobRunResponse.getBody().getJobRunId(); |
||||
setAppIds(jobRunId); |
||||
log.info("Successfully submitted serverless spark job, jobRunId - {}", jobRunId); |
||||
|
||||
while (!RunState.isFinal(currentState)) { |
||||
GetJobRunRequest getJobRunRequest = buildGetJobRunRequest(); |
||||
GetJobRunResponse getJobRunResponse = aliyunServerlessSparkClient |
||||
.getJobRun(aliyunServerlessSparkParameters.getWorkspaceId(), jobRunId, getJobRunRequest); |
||||
currentState = RunState.valueOf(getJobRunResponse.getBody().getJobRun().getState()); |
||||
log.info("job - {} state - {}", jobRunId, currentState); |
||||
Thread.sleep(10 * 1000L); |
||||
} |
||||
|
||||
setExitStatusCode(mapFinalStateToExitCode(currentState)); |
||||
|
||||
} catch (Exception e) { |
||||
log.error("Serverless spark job failed!", e); |
||||
throw new AliyunServerlessSparkTaskException("Serverless spark job failed!"); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void submitApplication() throws TaskException { |
||||
|
||||
} |
||||
|
||||
@Override |
||||
public void trackApplicationStatus() throws TaskException { |
||||
|
||||
} |
||||
|
||||
protected int mapFinalStateToExitCode(RunState state) { |
||||
switch (state) { |
||||
case Success: |
||||
return TaskConstants.EXIT_CODE_SUCCESS; |
||||
case Failed: |
||||
return TaskConstants.EXIT_CODE_KILL; |
||||
default: |
||||
return TaskConstants.EXIT_CODE_FAILURE; |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public AbstractParameters getParameters() { |
||||
return aliyunServerlessSparkParameters; |
||||
} |
||||
|
||||
@Override |
||||
public void cancelApplication() throws TaskException { |
||||
CancelJobRunRequest cancelJobRunRequest = buildCancelJobRunRequest(); |
||||
try { |
||||
aliyunServerlessSparkClient.cancelJobRun(aliyunServerlessSparkParameters.getWorkspaceId(), jobRunId, |
||||
cancelJobRunRequest); |
||||
} catch (Exception e) { |
||||
log.error("Failed to cancel serverless spark job run", e); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public List<String> getApplicationIds() throws TaskException { |
||||
return Collections.emptyList(); |
||||
} |
||||
|
||||
protected Client buildAliyunServerlessSparkClient(String accessKeyId, String accessKeySecret, |
||||
String regionId, String endpoint) throws Exception { |
||||
if (StringUtils.isEmpty(endpoint)) { |
||||
endpoint = String.format(AliyunServerlessSparkConstants.ENDPOINT_TEMPLATE, regionId); |
||||
} |
||||
|
||||
Config config = new Config() |
||||
.setEndpoint(endpoint) |
||||
.setAccessKeyId(accessKeyId) |
||||
.setAccessKeySecret(accessKeySecret); |
||||
return new Client(config); |
||||
} |
||||
|
||||
protected StartJobRunRequest buildStartJobRunRequest(AliyunServerlessSparkParameters aliyunServerlessSparkParameters) { |
||||
StartJobRunRequest startJobRunRequest = new StartJobRunRequest(); |
||||
startJobRunRequest.setRegionId(regionId); |
||||
startJobRunRequest.setResourceQueueId(aliyunServerlessSparkParameters.getResourceQueueId()); |
||||
startJobRunRequest.setCodeType(aliyunServerlessSparkParameters.getCodeType()); |
||||
startJobRunRequest.setName(aliyunServerlessSparkParameters.getJobName()); |
||||
String engineReleaseVersion = aliyunServerlessSparkParameters.getEngineReleaseVersion(); |
||||
engineReleaseVersion = |
||||
StringUtils.isEmpty(engineReleaseVersion) ? AliyunServerlessSparkConstants.DEFAULT_ENGINE |
||||
: engineReleaseVersion; |
||||
startJobRunRequest.setReleaseVersion(engineReleaseVersion); |
||||
Tag envTag = new Tag(); |
||||
envTag.setKey(AliyunServerlessSparkConstants.ENV_KEY); |
||||
String envType = aliyunServerlessSparkParameters.isProduction() ? AliyunServerlessSparkConstants.ENV_PROD |
||||
: AliyunServerlessSparkConstants.ENV_DEV; |
||||
envTag.setValue(envType); |
||||
Tag workflowTag = new Tag(); |
||||
workflowTag.setKey(AliyunServerlessSparkConstants.WORKFLOW_KEY); |
||||
workflowTag.setValue(AliyunServerlessSparkConstants.WORKFLOW_VALUE); |
||||
startJobRunRequest.setTags(Arrays.asList(envTag, workflowTag)); |
||||
List<String> entryPointArguments = |
||||
StringUtils.isEmpty(aliyunServerlessSparkParameters.getEntryPointArguments()) ? Collections.emptyList() |
||||
: Arrays.asList(aliyunServerlessSparkParameters.getEntryPointArguments() |
||||
.split(AliyunServerlessSparkConstants.ENTRY_POINT_ARGUMENTS_DELIMITER)); |
||||
JobDriver.JobDriverSparkSubmit jobDriverSparkSubmit = new JobDriver.JobDriverSparkSubmit() |
||||
.setEntryPoint(aliyunServerlessSparkParameters.getEntryPoint()) |
||||
.setEntryPointArguments(entryPointArguments) |
||||
.setSparkSubmitParameters(aliyunServerlessSparkParameters.getSparkSubmitParameters()); |
||||
JobDriver jobDriver = new com.aliyun.emr_serverless_spark20230808.models.JobDriver() |
||||
.setSparkSubmit(jobDriverSparkSubmit); |
||||
startJobRunRequest.setJobDriver(jobDriver); |
||||
return startJobRunRequest; |
||||
} |
||||
|
||||
protected GetJobRunRequest buildGetJobRunRequest() { |
||||
GetJobRunRequest getJobRunRequest = new GetJobRunRequest(); |
||||
getJobRunRequest.setRegionId(regionId); |
||||
return getJobRunRequest; |
||||
} |
||||
|
||||
protected CancelJobRunRequest buildCancelJobRunRequest() { |
||||
CancelJobRunRequest cancelJobRunRequest = new CancelJobRunRequest(); |
||||
cancelJobRunRequest.setRegionId(regionId); |
||||
return cancelJobRunRequest; |
||||
} |
||||
} |
@ -0,0 +1,37 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.dolphinscheduler.plugin.task.aliyunserverlessspark; |
||||
|
||||
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
||||
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask; |
||||
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel; |
||||
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; |
||||
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; |
||||
|
||||
public class AliyunServerlessSparkTaskChannel implements TaskChannel { |
||||
|
||||
@Override |
||||
public AbstractTask createTask(TaskExecutionContext taskRequest) { |
||||
return new AliyunServerlessSparkTask(taskRequest); |
||||
} |
||||
|
||||
@Override |
||||
public AbstractParameters parseParameters(String taskParams) { |
||||
return JSONUtils.parseObject(taskParams, AliyunServerlessSparkParameters.class); |
||||
} |
||||
|
||||
} |
@ -0,0 +1,36 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.dolphinscheduler.plugin.task.aliyunserverlessspark; |
||||
|
||||
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel; |
||||
import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory; |
||||
|
||||
import com.google.auto.service.AutoService; |
||||
|
||||
@AutoService(TaskChannelFactory.class) |
||||
public class AliyunServerlessSparkTaskChannelFactory implements TaskChannelFactory { |
||||
|
||||
@Override |
||||
public String getName() { |
||||
return "ALIYUN_SERVERLESS_SPARK"; |
||||
} |
||||
|
||||
@Override |
||||
public TaskChannel create() { |
||||
return new AliyunServerlessSparkTaskChannel(); |
||||
} |
||||
} |
@ -0,0 +1,33 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.task.aliyunserverlessspark; |
||||
|
||||
public class AliyunServerlessSparkTaskException extends RuntimeException { |
||||
|
||||
public AliyunServerlessSparkTaskException() { |
||||
super(); |
||||
} |
||||
|
||||
public AliyunServerlessSparkTaskException(String message) { |
||||
super(message); |
||||
} |
||||
|
||||
public AliyunServerlessSparkTaskException(String message, Throwable cause) { |
||||
super(message, cause); |
||||
} |
||||
} |
@ -0,0 +1,51 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.dolphinscheduler.plugin.task.aliyunserverlessspark; |
||||
|
||||
import java.util.Objects; |
||||
|
||||
public enum RunState { |
||||
|
||||
Submitted, |
||||
|
||||
Pending, |
||||
|
||||
Running, |
||||
|
||||
Success, |
||||
|
||||
Failed, |
||||
|
||||
Cancelling, |
||||
|
||||
Cancelled, |
||||
|
||||
CancelFailed; |
||||
|
||||
public static boolean isFinal(RunState runState) { |
||||
return Success == runState || Failed == runState || Cancelled == runState; |
||||
} |
||||
|
||||
public static boolean hasLaunched(RunState runState) { |
||||
return Objects.nonNull(runState) && runState != Submitted && runState != Pending; |
||||
} |
||||
|
||||
public static boolean isCancelled(RunState runState) { |
||||
return Cancelled == runState; |
||||
} |
||||
|
||||
} |
@ -0,0 +1,208 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.dolphinscheduler.plugin.task.aliyunserverlessspark; |
||||
|
||||
import static org.mockito.ArgumentMatchers.any; |
||||
import static org.mockito.ArgumentMatchers.eq; |
||||
import static org.mockito.Mockito.doReturn; |
||||
import static org.mockito.Mockito.mock; |
||||
import static org.mockito.Mockito.verify; |
||||
import static org.mockito.Mockito.when; |
||||
|
||||
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack; |
||||
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; |
||||
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; |
||||
import org.apache.dolphinscheduler.plugin.task.api.enums.ResourceType; |
||||
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.DataSourceParameters; |
||||
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper; |
||||
import org.apache.dolphinscheduler.spi.enums.DbType; |
||||
|
||||
import lombok.extern.slf4j.Slf4j; |
||||
|
||||
import org.junit.jupiter.api.Assertions; |
||||
import org.junit.jupiter.api.BeforeEach; |
||||
import org.junit.jupiter.api.Test; |
||||
import org.junit.jupiter.api.extension.ExtendWith; |
||||
import org.mockito.InjectMocks; |
||||
import org.mockito.Mock; |
||||
import org.mockito.Spy; |
||||
import org.mockito.junit.jupiter.MockitoExtension; |
||||
import org.mockito.junit.jupiter.MockitoSettings; |
||||
import org.mockito.quality.Strictness; |
||||
|
||||
import com.aliyun.emr_serverless_spark20230808.Client; |
||||
import com.aliyun.emr_serverless_spark20230808.models.CancelJobRunRequest; |
||||
import com.aliyun.emr_serverless_spark20230808.models.CancelJobRunResponse; |
||||
import com.aliyun.emr_serverless_spark20230808.models.GetJobRunRequest; |
||||
import com.aliyun.emr_serverless_spark20230808.models.GetJobRunResponse; |
||||
import com.aliyun.emr_serverless_spark20230808.models.GetJobRunResponseBody; |
||||
import com.aliyun.emr_serverless_spark20230808.models.StartJobRunRequest; |
||||
import com.aliyun.emr_serverless_spark20230808.models.StartJobRunResponse; |
||||
import com.aliyun.emr_serverless_spark20230808.models.StartJobRunResponseBody; |
||||
|
||||
@Slf4j |
||||
@ExtendWith(MockitoExtension.class) |
||||
@MockitoSettings(strictness = Strictness.LENIENT) |
||||
public class AliyunServerlessSparkTaskTest { |
||||
|
||||
@Mock |
||||
private TaskExecutionContext mockTaskExecutionContext; |
||||
|
||||
@Mock |
||||
private Client mockAliyunServerlessSparkClient; |
||||
|
||||
@Mock |
||||
private ResourceParametersHelper mockResourceParametersHelper; |
||||
|
||||
@Mock |
||||
private TaskCallBack mockTaskCallBack; |
||||
|
||||
@Mock |
||||
private StartJobRunRequest mockStartJobRunRequest; |
||||
|
||||
@Mock |
||||
private StartJobRunResponse mockStartJobRunResponse; |
||||
|
||||
@Mock |
||||
private GetJobRunRequest mockGetJobRunRequest; |
||||
|
||||
@Mock |
||||
private GetJobRunResponse mockGetJobRunResponse; |
||||
|
||||
@Mock |
||||
private CancelJobRunRequest mockCancelJobRunRequest; |
||||
|
||||
@Mock |
||||
private CancelJobRunResponse mockCancelJobRunResponse; |
||||
|
||||
@InjectMocks |
||||
@Spy |
||||
private AliyunServerlessSparkTask aliyunServerlessSparkTask; |
||||
|
||||
private static final String mockAccessKeyId = "mockAccessKeyId"; |
||||
|
||||
private static final String mockAccessKeySecret = "mockAccessKeySecret"; |
||||
|
||||
private static final String mockRegionId = "cn-hangzhou"; |
||||
|
||||
private static final String mockEndpoint = "emr-serverless-spark-vpc.cn-hangzhou.aliyuncs.com"; |
||||
|
||||
private static final int mockDatasourceId = 1; |
||||
|
||||
private static final String taskParamsString = |
||||
"{\"localParams\":[],\"resourceList\":[],\"workspaceId\":\"w-ae42e9c929275cc5\",\"resourceQueueId\":\"root_queue\",\"codeType\":\"JAR\",\"jobName\":\"spark\",\"entryPoint\":\"oss://datadev-oss-hdfs-test/spark-resource/examples/jars/spark-examples_2.12-3.3.1.jar\",\"entryPointArguments\":\"10\",\"sparkSubmitParameters\":\"--class org.apache.spark.examples.SparkPi --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1\",\"isProduction\":true,\"type\":\"ALIYUN_SERVERLESS_SPARK\",\"datasource\":1}"; |
||||
|
||||
private static final String connectionParamsString = |
||||
"{\"accessKeyId\":\"mockAccessKeyId\",\"accessKeySecret\":\"mockAccessKeySecret\",\"regionId\":\"cn-hangzhou\",\"endpoint\":\"emr-serverless-spark-vpc.cn-hangzhou.aliyuncs.com\",\"password\":\"\"}"; |
||||
|
||||
private static final String mockJobRunId = "jr-f6a1d0dd17d6b8a3"; |
||||
|
||||
private static final String mockWorkspaceId = "w-ae42e9c929275cc5"; |
||||
|
||||
private static final String mockResourceQueueId = "root_queue"; |
||||
|
||||
private static final String mockSparkSubmitParameters = |
||||
"--class org.apache.spark.examples.SparkPi --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1"; |
||||
|
||||
private static final String mockEntryPoint = |
||||
"oss://datadev-oss-hdfs-test/spark-resource/examples/jars/spark-examples_2.12-3.3.1.jar"; |
||||
|
||||
private static final String mockEntryPointArguments = "10"; |
||||
|
||||
@BeforeEach |
||||
public void before() { |
||||
when(mockTaskExecutionContext.getTaskParams()).thenReturn(taskParamsString); |
||||
DataSourceParameters dataSourceParameters = new DataSourceParameters(); |
||||
dataSourceParameters.setConnectionParams(connectionParamsString); |
||||
dataSourceParameters.setType(DbType.ALIYUN_SERVERLESS_SPARK); |
||||
when(mockResourceParametersHelper.getResourceParameters(any(), any())).thenReturn(dataSourceParameters); |
||||
when(mockTaskExecutionContext.getResourceParametersHelper()).thenReturn(mockResourceParametersHelper); |
||||
Assertions.assertDoesNotThrow( |
||||
() -> when(aliyunServerlessSparkTask.buildAliyunServerlessSparkClient(any(), any(), any(), any())) |
||||
.thenReturn(mockAliyunServerlessSparkClient)); |
||||
} |
||||
|
||||
@Test |
||||
public void testInit() throws Exception { |
||||
aliyunServerlessSparkTask.init(); |
||||
verify(mockTaskExecutionContext).getTaskParams(); |
||||
verify(mockResourceParametersHelper).getResourceParameters(ResourceType.DATASOURCE, mockDatasourceId); |
||||
verify(aliyunServerlessSparkTask).buildAliyunServerlessSparkClient(mockAccessKeyId, mockAccessKeySecret, |
||||
mockRegionId, mockEndpoint); |
||||
} |
||||
|
||||
@Test |
||||
public void testHandle() { |
||||
doReturn(mockStartJobRunRequest).when(aliyunServerlessSparkTask).buildStartJobRunRequest(any()); |
||||
StartJobRunResponseBody startJobRunResponseBody = new StartJobRunResponseBody(); |
||||
startJobRunResponseBody.setJobRunId(mockJobRunId); |
||||
doReturn(startJobRunResponseBody).when(mockStartJobRunResponse).getBody(); |
||||
Assertions.assertDoesNotThrow( |
||||
() -> doReturn(mockStartJobRunResponse).when(mockAliyunServerlessSparkClient) |
||||
.startJobRunWithOptions(any(), any(), any(), any())); |
||||
|
||||
doReturn(mockGetJobRunRequest).when(aliyunServerlessSparkTask).buildGetJobRunRequest(); |
||||
GetJobRunResponseBody getJobRunResponseBody = new GetJobRunResponseBody(); |
||||
GetJobRunResponseBody.GetJobRunResponseBodyJobRun jobRun = |
||||
new GetJobRunResponseBody.GetJobRunResponseBodyJobRun(); |
||||
jobRun.setState(RunState.Success.name()); |
||||
getJobRunResponseBody.setJobRun(jobRun); |
||||
doReturn(getJobRunResponseBody).when(mockGetJobRunResponse).getBody(); |
||||
Assertions.assertDoesNotThrow( |
||||
() -> doReturn(mockGetJobRunResponse).when(mockAliyunServerlessSparkClient).getJobRun(any(), any(), |
||||
any())); |
||||
|
||||
aliyunServerlessSparkTask.init(); |
||||
aliyunServerlessSparkTask.handle(mockTaskCallBack); |
||||
verify(aliyunServerlessSparkTask).setAppIds(mockJobRunId); |
||||
verify(aliyunServerlessSparkTask).setExitStatusCode(TaskConstants.EXIT_CODE_SUCCESS); |
||||
} |
||||
|
||||
@Test |
||||
public void testCancelApplication() throws Exception { |
||||
doReturn(mockCancelJobRunRequest).when(aliyunServerlessSparkTask).buildCancelJobRunRequest(); |
||||
Assertions.assertDoesNotThrow( |
||||
() -> doReturn(mockCancelJobRunResponse).when(mockAliyunServerlessSparkClient).cancelJobRun(any(), |
||||
any(), any())); |
||||
|
||||
aliyunServerlessSparkTask.init(); |
||||
aliyunServerlessSparkTask.cancelApplication(); |
||||
verify(aliyunServerlessSparkTask).buildCancelJobRunRequest(); |
||||
verify(mockAliyunServerlessSparkClient).cancelJobRun(eq(mockWorkspaceId), any(), eq(mockCancelJobRunRequest)); |
||||
} |
||||
|
||||
@Test |
||||
public void testBuildStartJobRunRequest() { |
||||
AliyunServerlessSparkParameters mockAliyunServerlessSparkParameters = |
||||
mock(AliyunServerlessSparkParameters.class); |
||||
doReturn(mockResourceQueueId).when(mockAliyunServerlessSparkParameters).getResourceQueueId(); |
||||
doReturn("JAR").when(mockAliyunServerlessSparkParameters).getCodeType(); |
||||
doReturn("ds-test").when(mockAliyunServerlessSparkParameters).getJobName(); |
||||
doReturn(mockSparkSubmitParameters).when(mockAliyunServerlessSparkParameters).getSparkSubmitParameters(); |
||||
doReturn(mockEntryPoint).when(mockAliyunServerlessSparkParameters).getEntryPoint(); |
||||
doReturn(mockEntryPointArguments).when(mockAliyunServerlessSparkParameters).getEntryPointArguments(); |
||||
|
||||
aliyunServerlessSparkTask.buildStartJobRunRequest(mockAliyunServerlessSparkParameters); |
||||
|
||||
verify(mockAliyunServerlessSparkParameters).getResourceQueueId(); |
||||
verify(mockAliyunServerlessSparkParameters).getCodeType(); |
||||
verify(mockAliyunServerlessSparkParameters).getJobName(); |
||||
verify(mockAliyunServerlessSparkParameters).getEngineReleaseVersion(); |
||||
verify(mockAliyunServerlessSparkParameters).isProduction(); |
||||
} |
||||
|
||||
} |
After Width: | Height: | Size: 18 KiB |
After Width: | Height: | Size: 18 KiB |
@ -0,0 +1,173 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
import { useI18n } from 'vue-i18n' |
||||
import { useCustomParams } from '.' |
||||
import type { IJsonItem } from '../types' |
||||
|
||||
export function useAliyunServerlessSpark(model: { |
||||
[field: string]: any |
||||
}): IJsonItem[] { |
||||
const { t } = useI18n() |
||||
|
||||
return [ |
||||
// mandatory field
|
||||
{ |
||||
type: 'input', |
||||
field: 'workspaceId', |
||||
name: t('project.node.workspace_id'), |
||||
props: { |
||||
placeholder: t('project.node.workspace_id_tips') |
||||
}, |
||||
validate: { |
||||
trigger: ['input', 'blur'], |
||||
required: true, |
||||
validator(validate: any, value: string) { |
||||
if (!value) { |
||||
return new Error(t('project.node.workspace_id_tips')) |
||||
} |
||||
} |
||||
} |
||||
}, |
||||
|
||||
{ |
||||
type: 'input', |
||||
field: 'resourceQueueId', |
||||
name: t('project.node.resource_queue_id'), |
||||
props: { |
||||
placeholder: t('project.node.resource_queue_id_tips') |
||||
}, |
||||
validate: { |
||||
trigger: ['input', 'blur'], |
||||
required: true, |
||||
validator(validate: any, value: string) { |
||||
if (!value) { |
||||
return new Error(t('project.node.resource_queue_id_tips')) |
||||
} |
||||
} |
||||
} |
||||
}, |
||||
|
||||
{ |
||||
type: 'input', |
||||
field: 'codeType', |
||||
name: t('project.node.code_type'), |
||||
props: { |
||||
placeholder: t('project.node.code_type_tips') |
||||
}, |
||||
validate: { |
||||
trigger: ['input', 'blur'], |
||||
required: true, |
||||
validator(validate: any, value: string) { |
||||
if (!value) { |
||||
return new Error(t('project.node.code_type_tips')) |
||||
} |
||||
} |
||||
} |
||||
}, |
||||
|
||||
{ |
||||
type: 'input', |
||||
field: 'jobName', |
||||
name: t('project.node.job_name'), |
||||
props: { |
||||
placeholder: t('project.node.job_name_tips') |
||||
}, |
||||
validate: { |
||||
trigger: ['input', 'blur'], |
||||
required: true, |
||||
validator(validate: any, value: string) { |
||||
if (!value) { |
||||
return new Error(t('project.node.job_name_tips')) |
||||
} |
||||
} |
||||
} |
||||
}, |
||||
|
||||
{ |
||||
type: 'input', |
||||
field: 'entryPoint', |
||||
name: t('project.node.entry_point'), |
||||
props: { |
||||
placeholder: t('project.node.entry_point_tips') |
||||
}, |
||||
validate: { |
||||
trigger: ['input', 'blur'], |
||||
required: true, |
||||
validator(validate: any, value: string) { |
||||
if (!value) { |
||||
return new Error(t('project.node.entry_point_tips')) |
||||
} |
||||
} |
||||
} |
||||
}, |
||||
|
||||
{ |
||||
type: 'input', |
||||
field: 'entryPointArguments', |
||||
name: t('project.node.entry_point_arguments'), |
||||
props: { |
||||
placeholder: t('project.node.entry_point_arguments_tips') |
||||
}, |
||||
validate: { |
||||
trigger: ['input', 'blur'], |
||||
required: true, |
||||
validator(validate: any, value: string) { |
||||
if (!value) { |
||||
return new Error(t('project.node.entry_point_arguments_tips')) |
||||
} |
||||
} |
||||
} |
||||
}, |
||||
|
||||
{ |
||||
type: 'input', |
||||
field: 'sparkSubmitParameters', |
||||
name: t('project.node.spark_submit_parameters'), |
||||
props: { |
||||
placeholder: t('project.node.spark_submit_parameters_tips') |
||||
}, |
||||
validate: { |
||||
trigger: ['input', 'blur'], |
||||
required: true, |
||||
validator(validate: any, value: string) { |
||||
if (!value) { |
||||
return new Error(t('project.node.spark_submit_parameters_tips')) |
||||
} |
||||
} |
||||
} |
||||
}, |
||||
|
||||
// optional field
|
||||
{ |
||||
type: 'input', |
||||
field: 'engineReleaseVersion', |
||||
name: t('project.node.engine_release_version'), |
||||
props: { |
||||
placeholder: t('project.node.engine_release_version_tips') |
||||
} |
||||
}, |
||||
|
||||
{ |
||||
type: 'switch', |
||||
field: 'isProduction', |
||||
name: t('project.node.is_production'), |
||||
span: 12 |
||||
}, |
||||
|
||||
...useCustomParams({ model, field: 'localParams', isSimple: false }) |
||||
] |
||||
} |
@ -0,0 +1,74 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
import { reactive } from 'vue' |
||||
import * as Fields from '../fields/index' |
||||
import type { IJsonItem, INodeData, ITaskData } from '../types' |
||||
|
||||
export function useAliyunServerlessSpark({ |
||||
projectCode, |
||||
from = 0, |
||||
readonly, |
||||
data |
||||
}: { |
||||
projectCode: number |
||||
from?: number |
||||
readonly?: boolean |
||||
data?: ITaskData |
||||
}) { |
||||
const model = reactive({ |
||||
name: '', |
||||
taskType: 'ALIYUN_SERVERLESS_SPARK', |
||||
flag: 'YES', |
||||
description: '', |
||||
timeoutFlag: false, |
||||
localParams: [], |
||||
environmentCode: null, |
||||
failRetryInterval: 1, |
||||
failRetryTimes: 0, |
||||
workerGroup: 'default', |
||||
delayTime: 0, |
||||
timeout: 30, |
||||
type: 'ALIYUN_SERVERLESS_SPARK', |
||||
displayRows: 10, |
||||
timeoutNotifyStrategy: ['WARN'], |
||||
restEndpoint: '', |
||||
username: '', |
||||
password: '' |
||||
} as INodeData) |
||||
|
||||
return { |
||||
json: [ |
||||
Fields.useName(from), |
||||
...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }), |
||||
Fields.useRunFlag(), |
||||
Fields.useCache(), |
||||
Fields.useDescription(), |
||||
Fields.useTaskPriority(), |
||||
Fields.useWorkerGroup(projectCode), |
||||
Fields.useEnvironmentName(model, !data?.id), |
||||
...Fields.useTaskGroup(model, projectCode), |
||||
...Fields.useFailed(), |
||||
Fields.useDelayTime(model), |
||||
...Fields.useTimeoutAlarm(model), |
||||
...Fields.useDatasource(model), |
||||
...Fields.useAliyunServerlessSpark(model), |
||||
Fields.usePreTasks() |
||||
] as IJsonItem[], |
||||
model |
||||
} |
||||
} |
Loading…
Reference in new issue