Browse Source

[improve] Remove the spark version of spark task (#11860)

3.2.0-release
rickchengx 2 years ago committed by GitHub
parent
commit
08a4c7981f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      .github/workflows/cluster-test/mysql/dolphinscheduler_env.sh
  2. 5
      .github/workflows/cluster-test/postgresql/dolphinscheduler_env.sh
  3. 3
      deploy/kubernetes/dolphinscheduler/values.yaml
  4. 5
      docs/docs/en/architecture/configuration.md
  5. 32
      docs/docs/en/architecture/task-structure.md
  6. 2
      docs/docs/en/faq.md
  7. 5
      docs/docs/en/guide/expansion-reduction.md
  8. 51
      docs/docs/en/guide/installation/kubernetes.md
  9. 5
      docs/docs/en/guide/installation/pseudo-cluster.md
  10. 1
      docs/docs/en/guide/task/spark.md
  11. 2
      docs/docs/en/guide/upgrade/incompatible.md
  12. 5
      docs/docs/zh/architecture/configuration.md
  13. 32
      docs/docs/zh/architecture/task-structure.md
  14. 2
      docs/docs/zh/faq.md
  15. 5
      docs/docs/zh/guide/expansion-reduction.md
  16. 51
      docs/docs/zh/guide/installation/kubernetes.md
  17. 5
      docs/docs/zh/guide/installation/pseudo-cluster.md
  18. 1
      docs/docs/zh/guide/task/spark.md
  19. 2
      docs/docs/zh/guide/upgrade/incompatible.md
  20. 5
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/EnvironmentServiceTest.java
  21. 49
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SparkVersion.java
  22. 5
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ClusterMapperTest.java
  23. 5
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/EnvironmentMapperTest.java
  24. 1
      dolphinscheduler-python/pydolphinscheduler/UPDATING.md
  25. 1
      dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Spark.yaml
  26. 10
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/spark.py
  27. 3
      dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_spark.py
  28. 13
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/dataquality/spark/SparkParameters.java
  29. 26
      dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java
  30. 65
      dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkCommand.java
  31. 10
      dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkConstants.java
  32. 13
      dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParameters.java
  33. 29
      dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java
  34. 22
      dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkVersion.java
  35. 75
      dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java
  36. 1
      dolphinscheduler-ui/src/locales/en_US/project.ts
  37. 1
      dolphinscheduler-ui/src/locales/zh_CN/project.ts
  38. 18
      dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-spark.ts
  39. 1
      dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
  40. 1
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-spark.ts
  41. 1
      dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
  42. 5
      script/env/dolphinscheduler_env.sh

5
.github/workflows/cluster-test/mysql/dolphinscheduler_env.sh

@ -37,11 +37,10 @@ export REGISTRY_ZOOKEEPER_CONNECT_STRING=${REGISTRY_ZOOKEEPER_CONNECT_STRING:-zo
# Tasks related configurations, need to change the configuration if you use the related tasks.
export HADOOP_HOME=${HADOOP_HOME:-/opt/soft/hadoop}
export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-/opt/soft/hadoop/etc/hadoop}
export SPARK_HOME1=${SPARK_HOME1:-/opt/soft/spark1}
export SPARK_HOME2=${SPARK_HOME2:-/opt/soft/spark2}
export SPARK_HOME=${SPARK_HOME:-/opt/soft/spark}
export PYTHON_HOME=${PYTHON_HOME:-/opt/soft/python}
export HIVE_HOME=${HIVE_HOME:-/opt/soft/hive}
export FLINK_HOME=${FLINK_HOME:-/opt/soft/flink}
export DATAX_HOME=${DATAX_HOME:-/opt/soft/datax}
export PATH=$HADOOP_HOME/bin:$SPARK_HOME1/bin:$SPARK_HOME2/bin:$PYTHON_HOME/bin:$JAVA_HOME/bin:$HIVE_HOME/bin:$FLINK_HOME/bin:$DATAX_HOME/bin:$PATH
export PATH=$HADOOP_HOME/bin:$SPARK_HOME/bin:$PYTHON_HOME/bin:$JAVA_HOME/bin:$HIVE_HOME/bin:$FLINK_HOME/bin:$DATAX_HOME/bin:$PATH

5
.github/workflows/cluster-test/postgresql/dolphinscheduler_env.sh

@ -37,11 +37,10 @@ export REGISTRY_ZOOKEEPER_CONNECT_STRING=${REGISTRY_ZOOKEEPER_CONNECT_STRING:-zo
# Tasks related configurations, need to change the configuration if you use the related tasks.
export HADOOP_HOME=${HADOOP_HOME:-/opt/soft/hadoop}
export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-/opt/soft/hadoop/etc/hadoop}
export SPARK_HOME1=${SPARK_HOME1:-/opt/soft/spark1}
export SPARK_HOME2=${SPARK_HOME2:-/opt/soft/spark2}
export SPARK_HOME=${SPARK_HOME:-/opt/soft/spark}
export PYTHON_HOME=${PYTHON_HOME:-/opt/soft/python}
export HIVE_HOME=${HIVE_HOME:-/opt/soft/hive}
export FLINK_HOME=${FLINK_HOME:-/opt/soft/flink}
export DATAX_HOME=${DATAX_HOME:-/opt/soft/datax}
export PATH=$HADOOP_HOME/bin:$SPARK_HOME1/bin:$SPARK_HOME2/bin:$PYTHON_HOME/bin:$JAVA_HOME/bin:$HIVE_HOME/bin:$FLINK_HOME/bin:$DATAX_HOME/bin:$PATH
export PATH=$HADOOP_HOME/bin:$SPARK_HOME/bin:$PYTHON_HOME/bin:$JAVA_HOME/bin:$HIVE_HOME/bin:$FLINK_HOME/bin:$DATAX_HOME/bin:$PATH

3
deploy/kubernetes/dolphinscheduler/values.yaml

@ -160,8 +160,7 @@ common:
# dolphinscheduler env
HADOOP_HOME: "/opt/soft/hadoop"
HADOOP_CONF_DIR: "/opt/soft/hadoop/etc/hadoop"
SPARK_HOME1: "/opt/soft/spark1"
SPARK_HOME2: "/opt/soft/spark2"
SPARK_HOME: "/opt/soft/spark"
PYTHON_HOME: "/usr/bin/python"
JAVA_HOME: "/usr/local/openjdk-8"
HIVE_HOME: "/opt/soft/hive"

5
docs/docs/en/architecture/configuration.md

@ -347,14 +347,13 @@ export JAVA_HOME=${JAVA_HOME:-/opt/soft/java}
# Tasks related configurations, need to change the configuration if you use the related tasks.
export HADOOP_HOME=${HADOOP_HOME:-/opt/soft/hadoop}
export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-/opt/soft/hadoop/etc/hadoop}
export SPARK_HOME1=${SPARK_HOME1:-/opt/soft/spark1}
export SPARK_HOME2=${SPARK_HOME2:-/opt/soft/spark2}
export SPARK_HOME=${SPARK_HOME:-/opt/soft/spark}
export PYTHON_HOME=${PYTHON_HOME:-/opt/soft/python}
export HIVE_HOME=${HIVE_HOME:-/opt/soft/hive}
export FLINK_HOME=${FLINK_HOME:-/opt/soft/flink}
export DATAX_HOME=${DATAX_HOME:-/opt/soft/datax}
export PATH=$HADOOP_HOME/bin:$SPARK_HOME1/bin:$SPARK_HOME2/bin:$PYTHON_HOME/bin:$JAVA_HOME/bin:$HIVE_HOME/bin:$FLINK_HOME/bin:$DATAX_HOME/bin:$PATH
export PATH=$HADOOP_HOME/bin:$SPARK_HOME/bin:$PYTHON_HOME/bin:$JAVA_HOME/bin:$HIVE_HOME/bin:$FLINK_HOME/bin:$DATAX_HOME/bin:$PATH
```
### Log related configuration

32
docs/docs/en/architecture/task-structure.md

@ -256,21 +256,20 @@ No.|parameter name||type|description |note
| 13 | | executorMemory | String | executor memory |
| 14 | | executorCores | String | executor cores |
| 15 | | programType | String | program type | JAVA,SCALA,PYTHON |
| 16 | | sparkVersion | String | Spark version | SPARK1 , SPARK2 |
| 17 | | localParams | Array | customized local parameters |
| 18 | | resourceList | Array | resource files |
| 19 | description | | String | description | |
| 20 | runFlag | | String | execution flag | |
| 21 | conditionResult | | Object | condition branch | |
| 22 | | successNode | Array | jump to node if success | |
| 23 | | failedNode | Array | jump to node if failure |
| 24 | dependence | | Object | task dependency | mutual exclusion with params |
| 25 | maxRetryTimes | | String | max retry times | |
| 26 | retryInterval | | String | retry interval | |
| 27 | timeout | | Object | timeout | |
| 28 | taskInstancePriority | | String | task priority | |
| 29 | workerGroup | | String | Worker group | |
| 30 | preTasks | | Array | preposition tasks | |
| 16 | | localParams | Array | customized local parameters |
| 17 | | resourceList | Array | resource files |
| 18 | description | | String | description | |
| 19 | runFlag | | String | execution flag | |
| 20 | conditionResult | | Object | condition branch | |
| 21 | | successNode | Array | jump to node if success | |
| 22 | | failedNode | Array | jump to node if failure |
| 23 | dependence | | Object | task dependency | mutual exclusion with params |
| 24 | maxRetryTimes | | String | max retry times | |
| 25 | retryInterval | | String | retry interval | |
| 26 | timeout | | Object | timeout | |
| 27 | taskInstancePriority | | String | task priority | |
| 28 | workerGroup | | String | Worker group | |
| 29 | preTasks | | Array | preposition tasks | |
**Node data example:**
@ -302,8 +301,7 @@ No.|parameter name||type|description |note
"executorCores":2,
"mainArgs":"10",
"others":"",
"programType":"SCALA",
"sparkVersion":"SPARK2"
"programType":"SCALA"
},
"description":"",
"runFlag":"NORMAL",

2
docs/docs/en/faq.md

@ -229,7 +229,7 @@ export PYTHON_HOME=/bin/python
Note: This is **PYTHON_HOME** , which is the absolute path of the python command, not the simple PYTHON_HOME. Also note that when exporting the PATH, you need to directly
```
export PATH=$HADOOP_HOME/bin:$SPARK_HOME1/bin:$SPARK_HOME2/bin:$PYTHON_HOME:$JAVA_HOME/bin:$HIVE_HOME/bin:$PATH
export PATH=$HADOOP_HOME/bin:$SPARK_HOME/bin:$PYTHON_HOME:$JAVA_HOME/bin:$HIVE_HOME/bin:$PATH
```
2,For versions prior to 1.0.3, the Python task only supports the Python version of the system. It does not support specifying the Python version.

5
docs/docs/en/guide/expansion-reduction.md

@ -79,14 +79,13 @@ Attention:
```shell
export HADOOP_HOME=/opt/soft/hadoop
export HADOOP_CONF_DIR=/opt/soft/hadoop/etc/hadoop
# export SPARK_HOME1=/opt/soft/spark1
export SPARK_HOME2=/opt/soft/spark2
export SPARK_HOME=/opt/soft/spark
export PYTHON_HOME=/opt/soft/python
export JAVA_HOME=/opt/soft/jav
export HIVE_HOME=/opt/soft/hive
export FLINK_HOME=/opt/soft/flink
export DATAX_HOME=/opt/soft/datax/bin/datax.py
export PATH=$HADOOP_HOME/bin:$SPARK_HOME2/bin:$PYTHON_HOME:$JAVA_HOME/bin:$HIVE_HOME/bin:$PATH:$FLINK_HOME/bin:$DATAX_HOME:$PATH
export PATH=$HADOOP_HOME/bin:$SPARK_HOME/bin:$PYTHON_HOME:$JAVA_HOME/bin:$HIVE_HOME/bin:$PATH:$FLINK_HOME/bin:$DATAX_HOME:$PATH
```

51
docs/docs/en/guide/installation/kubernetes.md

@ -360,7 +360,7 @@ kubectl cp -n test spark-2.4.7-bin-hadoop2.7.tgz dolphinscheduler-worker-0:/opt/
Because the volume `sharedStoragePersistence` is mounted on `/opt/soft`, all files in `/opt/soft` will not be lost.
5. Attach the container and ensure that `SPARK_HOME2` exists.
5. Attach the container and ensure that `SPARK_HOME` exists.
```bash
kubectl exec -it dolphinscheduler-worker-0 bash
@ -369,7 +369,7 @@ cd /opt/soft
tar zxf spark-2.4.7-bin-hadoop2.7.tgz
rm -f spark-2.4.7-bin-hadoop2.7.tgz
ln -s spark-2.4.7-bin-hadoop2.7 spark2 # or just mv
$SPARK_HOME2/bin/spark-submit --version
$SPARK_HOME/bin/spark-submit --version
```
The last command will print the Spark version if everything goes well.
@ -377,7 +377,7 @@ The last command will print the Spark version if everything goes well.
6. Verify Spark under a Shell task.
```
$SPARK_HOME2/bin/spark-submit --class org.apache.spark.examples.SparkPi $SPARK_HOME2/examples/jars/spark-examples_2.11-2.4.7.jar
$SPARK_HOME/bin/spark-submit --class org.apache.spark.examples.SparkPi $SPARK_HOME/examples/jars/spark-examples_2.11-2.4.7.jar
```
Check whether the task log contains the output like `Pi is roughly 3.146015`.
@ -386,7 +386,6 @@ Check whether the task log contains the output like `Pi is roughly 3.146015`.
The file `spark-examples_2.11-2.4.7.jar` needs to be uploaded to the resources first, and then create a Spark task with:
- Spark Version: `SPARK2`
- Main Class: `org.apache.spark.examples.SparkPi`
- Main Package: `spark-examples_2.11-2.4.7.jar`
- Deploy Mode: `local`
@ -399,47 +398,6 @@ Spark on YARN (Deploy Mode is `cluster` or `client`) requires Hadoop support. Si
Ensure that `$HADOOP_HOME` and `$HADOOP_CONF_DIR` exists.
### How to Support Spark 3?
In fact, the way to submit applications with `spark-submit` is the same, regardless of Spark 1, 2 or 3. In other words, the semantics of `SPARK_HOME2` is the second `SPARK_HOME` instead of `SPARK2`'s `HOME`, so just set `SPARK_HOME2=/path/to/spark3`.
Take Spark 3.1.1 as an example:
1. Download the Spark 3.1.1 release binary `spark-3.1.1-bin-hadoop2.7.tgz`.
2. Ensure that `common.sharedStoragePersistence.enabled` is turned on.
3. Run a DolphinScheduler release in Kubernetes (See **Install DolphinScheduler**).
4. Copy the Spark 3.1.1 release binary into the Docker container.
```bash
kubectl cp spark-3.1.1-bin-hadoop2.7.tgz dolphinscheduler-worker-0:/opt/soft
kubectl cp -n test spark-3.1.1-bin-hadoop2.7.tgz dolphinscheduler-worker-0:/opt/soft # with test namespace
```
5. Attach the container and ensure that `SPARK_HOME2` exists.
```bash
kubectl exec -it dolphinscheduler-worker-0 bash
kubectl exec -n test -it dolphinscheduler-worker-0 bash # with test namespace
cd /opt/soft
tar zxf spark-3.1.1-bin-hadoop2.7.tgz
rm -f spark-3.1.1-bin-hadoop2.7.tgz
ln -s spark-3.1.1-bin-hadoop2.7 spark2 # or just mv
$SPARK_HOME2/bin/spark-submit --version
```
The last command will print the Spark version if everything goes well.
6. Verify Spark under a Shell task.
```
$SPARK_HOME2/bin/spark-submit --class org.apache.spark.examples.SparkPi $SPARK_HOME2/examples/jars/spark-examples_2.12-3.1.1.jar
```
Check whether the task log contains the output like `Pi is roughly 3.146015`.
### How to Support Shared Storage Between Master, Worker and Api Server?
For example, Master, Worker and API server may use Hadoop at the same time.
@ -579,8 +537,7 @@ common:
| `common.configmap.SW_GRPC_LOG_SERVER_PORT` | Set grpc log server port for skywalking | `11800` |
| `common.configmap.HADOOP_HOME` | Set `HADOOP_HOME` for DolphinScheduler's task environment | `/opt/soft/hadoop` |
| `common.configmap.HADOOP_CONF_DIR` | Set `HADOOP_CONF_DIR` for DolphinScheduler's task environment | `/opt/soft/hadoop/etc/hadoop` |
| `common.configmap.SPARK_HOME1` | Set `SPARK_HOME1` for DolphinScheduler's task environment | `/opt/soft/spark1` |
| `common.configmap.SPARK_HOME2` | Set `SPARK_HOME2` for DolphinScheduler's task environment | `/opt/soft/spark2` |
| `common.configmap.SPARK_HOME` | Set `SPARK_HOME` for DolphinScheduler's task environment | `/opt/soft/spark` |
| `common.configmap.PYTHON_HOME` | Set `PYTHON_HOME` for DolphinScheduler's task environment | `/usr/bin/python` |
| `common.configmap.JAVA_HOME` | Set `JAVA_HOME` for DolphinScheduler's task environment | `/usr/local/openjdk-8` |
| `common.configmap.HIVE_HOME` | Set `HIVE_HOME` for DolphinScheduler's task environment | `/opt/soft/hive` |

5
docs/docs/en/guide/installation/pseudo-cluster.md

@ -131,14 +131,13 @@ export REGISTRY_ZOOKEEPER_CONNECT_STRING=${REGISTRY_ZOOKEEPER_CONNECT_STRING:-lo
# Tasks related configurations, need to change the configuration if you use the related tasks.
export HADOOP_HOME=${HADOOP_HOME:-/opt/soft/hadoop}
export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-/opt/soft/hadoop/etc/hadoop}
export SPARK_HOME1=${SPARK_HOME1:-/opt/soft/spark1}
export SPARK_HOME2=${SPARK_HOME2:-/opt/soft/spark2}
export SPARK_HOME=${SPARK_HOME:-/opt/soft/spark}
export PYTHON_HOME=${PYTHON_HOME:-/opt/soft/python}
export HIVE_HOME=${HIVE_HOME:-/opt/soft/hive}
export FLINK_HOME=${FLINK_HOME:-/opt/soft/flink}
export DATAX_HOME=${DATAX_HOME:-/opt/soft/datax}
export PATH=$HADOOP_HOME/bin:$SPARK_HOME1/bin:$SPARK_HOME2/bin:$PYTHON_HOME/bin:$JAVA_HOME/bin:$HIVE_HOME/bin:$FLINK_HOME/bin:$DATAX_HOME/bin:$PATH
export PATH=$HADOOP_HOME/bin:$SPARK_HOME/bin:$PYTHON_HOME/bin:$JAVA_HOME/bin:$HIVE_HOME/bin:$FLINK_HOME/bin:$DATAX_HOME/bin:$PATH
```
## Initialize the Database

1
docs/docs/en/guide/task/spark.md

@ -20,7 +20,6 @@ Spark task type for executing Spark application. When executing the Spark task,
| **Parameter** | **Description** |
|----------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------|
| Program type | Supports Java, Scala, Python, and SQL. |
| Spark version | Support Spark1 and Spark2. |
| The class of main function | The **full path** of Main Class, the entry point of the Spark program. |
| Main jar package | The Spark jar package (upload by Resource Center). |
| SQL scripts | SQL statements in .sql files that Spark sql runs. |

2
docs/docs/en/guide/upgrade/incompatible.md

@ -4,6 +4,8 @@ This document records the incompatible updates between each version. You need to
## dev
* Remove the spark version of spark task ([#11860](https://github.com/apache/dolphinscheduler/pull/11860)).
## 3.0.0
* Copy and import workflow without 'copy' suffix [#10607](https://github.com/apache/dolphinscheduler/pull/10607)

5
docs/docs/zh/architecture/configuration.md

@ -339,14 +339,13 @@ export JAVA_HOME=${JAVA_HOME:-/opt/soft/java}
# Tasks related configurations, need to change the configuration if you use the related tasks.
export HADOOP_HOME=${HADOOP_HOME:-/opt/soft/hadoop}
export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-/opt/soft/hadoop/etc/hadoop}
export SPARK_HOME1=${SPARK_HOME1:-/opt/soft/spark1}
export SPARK_HOME2=${SPARK_HOME2:-/opt/soft/spark2}
export SPARK_HOME=${SPARK_HOME:-/opt/soft/spark}
export PYTHON_HOME=${PYTHON_HOME:-/opt/soft/python}
export HIVE_HOME=${HIVE_HOME:-/opt/soft/hive}
export FLINK_HOME=${FLINK_HOME:-/opt/soft/flink}
export DATAX_HOME=${DATAX_HOME:-/opt/soft/datax}
export PATH=$HADOOP_HOME/bin:$SPARK_HOME1/bin:$SPARK_HOME2/bin:$PYTHON_HOME/bin:$JAVA_HOME/bin:$HIVE_HOME/bin:$FLINK_HOME/bin:$DATAX_HOME/bin:$PATH
export PATH=$HADOOP_HOME/bin:$SPARK_HOME/bin:$PYTHON_HOME/bin:$JAVA_HOME/bin:$HIVE_HOME/bin:$FLINK_HOME/bin:$DATAX_HOME/bin:$PATH
```
## 日志相关配置

32
docs/docs/zh/architecture/task-structure.md

@ -255,21 +255,20 @@
| 13 | | executorMemory | String | executor内存 |
| 14 | | executorCores | String | executor核数 |
| 15 | | programType | String | 程序类型 | JAVA,SCALA,PYTHON |
| 16 | | sparkVersion | String | Spark 版本 | SPARK1 , SPARK2 |
| 17 | | localParams | Array | 自定义参数 |
| 18 | | resourceList | Array | 资源文件 |
| 19 | description | | String | 描述 | |
| 20 | runFlag | | String | 运行标识 | |
| 21 | conditionResult | | Object | 条件分支 | |
| 22 | | successNode | Array | 成功跳转节点 | |
| 23 | | failedNode | Array | 失败跳转节点 |
| 24 | dependence | | Object | 任务依赖 | 与params互斥 |
| 25 | maxRetryTimes | | String | 最大重试次数 | |
| 26 | retryInterval | | String | 重试间隔 | |
| 27 | timeout | | Object | 超时控制 | |
| 28 | taskInstancePriority | | String | 任务优先级 | |
| 29 | workerGroup | | String | Worker 分组 | |
| 30 | preTasks | | Array | 前置任务 | |
| 16 | | localParams | Array | 自定义参数 |
| 17 | | resourceList | Array | 资源文件 |
| 18 | description | | String | 描述 | |
| 19 | runFlag | | String | 运行标识 | |
| 20 | conditionResult | | Object | 条件分支 | |
| 21 | | successNode | Array | 成功跳转节点 | |
| 22 | | failedNode | Array | 失败跳转节点 |
| 23 | dependence | | Object | 任务依赖 | 与params互斥 |
| 24 | maxRetryTimes | | String | 最大重试次数 | |
| 25 | retryInterval | | String | 重试间隔 | |
| 26 | timeout | | Object | 超时控制 | |
| 27 | taskInstancePriority | | String | 任务优先级 | |
| 28 | workerGroup | | String | Worker 分组 | |
| 29 | preTasks | | Array | 前置任务 | |
**节点数据样例:**
@ -301,8 +300,7 @@
"executorCores":2,
"mainArgs":"10",
"others":"",
"programType":"SCALA",
"sparkVersion":"SPARK2"
"programType":"SCALA"
},
"description":"",
"runFlag":"NORMAL",

2
docs/docs/zh/faq.md

@ -208,7 +208,7 @@ export PYTHON_HOME=/bin/python
注意:这了 **PYTHON_HOME** ,是 python 命令的绝对路径,而不是单纯的 PYTHON_HOME,还需要注意的是 export PATH 的时候,需要直接
```
export PATH=$HADOOP_HOME/bin:$SPARK_HOME1/bin:$SPARK_HOME2/bin:$PYTHON_HOME:$JAVA_HOME/bin:$HIVE_HOME/bin:$PATH
export PATH=$HADOOP_HOME/bin:$SPARK_HOME/bin:$PYTHON_HOME:$JAVA_HOME/bin:$HIVE_HOME/bin:$PATH
```
## Q:Worker Task 通过 sudo -u 租户 sh xxx.command 会产生子进程,在 kill 的时候,是否会杀掉

5
docs/docs/zh/guide/expansion-reduction.md

@ -79,14 +79,13 @@ sed -i 's/Defaults requirett/#Defaults requirett/g' /etc/sudoers
```shell
export HADOOP_HOME=/opt/soft/hadoop
export HADOOP_CONF_DIR=/opt/soft/hadoop/etc/hadoop
#export SPARK_HOME1=/opt/soft/spark1
export SPARK_HOME2=/opt/soft/spark2
export SPARK_HOME=/opt/soft/spark
export PYTHON_HOME=/opt/soft/python
export JAVA_HOME=/opt/soft/java
export HIVE_HOME=/opt/soft/hive
export FLINK_HOME=/opt/soft/flink
export DATAX_HOME=/opt/soft/datax/bin/datax.py
export PATH=$HADOOP_HOME/bin:$SPARK_HOME2/bin:$PYTHON_HOME:$JAVA_HOME/bin:$HIVE_HOME/bin:$PATH:$FLINK_HOME/bin:$DATAX_HOME:$PATH
export PATH=$HADOOP_HOME/bin:$SPARK_HOME/bin:$PYTHON_HOME:$JAVA_HOME/bin:$HIVE_HOME/bin:$PATH:$FLINK_HOME/bin:$DATAX_HOME:$PATH
```

51
docs/docs/zh/guide/installation/kubernetes.md

@ -360,7 +360,7 @@ kubectl cp -n test spark-2.4.7-bin-hadoop2.7.tgz dolphinscheduler-worker-0:/opt/
因为存储卷 `sharedStoragePersistence` 被挂载到 `/opt/soft`, 因此 `/opt/soft` 中的所有文件都不会丢失
5. 登录到容器并确保 `SPARK_HOME2` 存在
5. 登录到容器并确保 `SPARK_HOME` 存在
```bash
kubectl exec -it dolphinscheduler-worker-0 bash
@ -369,7 +369,7 @@ cd /opt/soft
tar zxf spark-2.4.7-bin-hadoop2.7.tgz
rm -f spark-2.4.7-bin-hadoop2.7.tgz
ln -s spark-2.4.7-bin-hadoop2.7 spark2 # or just mv
$SPARK_HOME2/bin/spark-submit --version
$SPARK_HOME/bin/spark-submit --version
```
如果一切执行正常,最后一条命令将会打印 Spark 版本信息
@ -377,7 +377,7 @@ $SPARK_HOME2/bin/spark-submit --version
6. 在一个 Shell 任务下验证 Spark
```
$SPARK_HOME2/bin/spark-submit --class org.apache.spark.examples.SparkPi $SPARK_HOME2/examples/jars/spark-examples_2.11-2.4.7.jar
$SPARK_HOME/bin/spark-submit --class org.apache.spark.examples.SparkPi $SPARK_HOME/examples/jars/spark-examples_2.11-2.4.7.jar
```
检查任务日志是否包含输出 `Pi is roughly 3.146015`
@ -386,7 +386,6 @@ $SPARK_HOME2/bin/spark-submit --class org.apache.spark.examples.SparkPi $SPARK_H
文件 `spark-examples_2.11-2.4.7.jar` 需要先被上传到资源中心,然后创建一个 Spark 任务并设置:
- Spark版本: `SPARK2`
- 主函数的Class: `org.apache.spark.examples.SparkPi`
- 主程序包: `spark-examples_2.11-2.4.7.jar`
- 部署方式: `local`
@ -399,47 +398,6 @@ Spark on YARN (部署方式为 `cluster` 或 `client`) 需要 Hadoop 支持. 类
确保 `$HADOOP_HOME``$HADOOP_CONF_DIR` 存在
### 如何支持 Spark 3?
事实上,使用 `spark-submit` 提交应用的方式是相同的, 无论是 Spark 1, 2 或 3. 换句话说,`SPARK_HOME2` 的语义是第二个 `SPARK_HOME`, 而非 `SPARK2``HOME`, 因此只需设置 `SPARK_HOME2=/path/to/spark3` 即可
以 Spark 3.1.1 为例:
1. 下载 Spark 3.1.1 发布的二进制包 `spark-3.1.1-bin-hadoop2.7.tgz`
2. 确保 `common.sharedStoragePersistence.enabled` 开启
3. 部署 dolphinscheduler (详见**安装 dolphinscheduler**)
4. 复制 Spark 3.1.1 二进制包到 Docker 容器中
```bash
kubectl cp spark-3.1.1-bin-hadoop2.7.tgz dolphinscheduler-worker-0:/opt/soft
kubectl cp -n test spark-3.1.1-bin-hadoop2.7.tgz dolphinscheduler-worker-0:/opt/soft # with test namespace
```
5. 登录到容器并确保 `SPARK_HOME2` 存在
```bash
kubectl exec -it dolphinscheduler-worker-0 bash
kubectl exec -n test -it dolphinscheduler-worker-0 bash # with test namespace
cd /opt/soft
tar zxf spark-3.1.1-bin-hadoop2.7.tgz
rm -f spark-3.1.1-bin-hadoop2.7.tgz
ln -s spark-3.1.1-bin-hadoop2.7 spark2 # or just mv
$SPARK_HOME2/bin/spark-submit --version
```
如果一切执行正常,最后一条命令将会打印 Spark 版本信息
6. 在一个 Shell 任务下验证 Spark
```
$SPARK_HOME2/bin/spark-submit --class org.apache.spark.examples.SparkPi $SPARK_HOME2/examples/jars/spark-examples_2.12-3.1.1.jar
```
检查任务日志是否包含输出 `Pi is roughly 3.146015`
### 如何在 Master、Worker 和 Api 服务之间支持共享存储?
例如, Master、Worker 和 Api 服务可能同时使用 Hadoop
@ -579,8 +537,7 @@ common:
| `common.configmap.SW_GRPC_LOG_SERVER_PORT` | Set grpc log server port for skywalking | `11800` |
| `common.configmap.HADOOP_HOME` | Set `HADOOP_HOME` for DolphinScheduler's task environment | `/opt/soft/hadoop` |
| `common.configmap.HADOOP_CONF_DIR` | Set `HADOOP_CONF_DIR` for DolphinScheduler's task environment | `/opt/soft/hadoop/etc/hadoop` |
| `common.configmap.SPARK_HOME1` | Set `SPARK_HOME1` for DolphinScheduler's task environment | `/opt/soft/spark1` |
| `common.configmap.SPARK_HOME2` | Set `SPARK_HOME2` for DolphinScheduler's task environment | `/opt/soft/spark2` |
| `common.configmap.SPARK_HOME` | Set `SPARK_HOME` for DolphinScheduler's task environment | `/opt/soft/spark` |
| `common.configmap.PYTHON_HOME` | Set `PYTHON_HOME` for DolphinScheduler's task environment | `/usr/bin/python` |
| `common.configmap.JAVA_HOME` | Set `JAVA_HOME` for DolphinScheduler's task environment | `/usr/local/openjdk-8` |
| `common.configmap.HIVE_HOME` | Set `HIVE_HOME` for DolphinScheduler's task environment | `/opt/soft/hive` |

5
docs/docs/zh/guide/installation/pseudo-cluster.md

@ -126,14 +126,13 @@ export REGISTRY_ZOOKEEPER_CONNECT_STRING=${REGISTRY_ZOOKEEPER_CONNECT_STRING:-lo
# Tasks related configurations, need to change the configuration if you use the related tasks.
export HADOOP_HOME=${HADOOP_HOME:-/opt/soft/hadoop}
export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-/opt/soft/hadoop/etc/hadoop}
export SPARK_HOME1=${SPARK_HOME1:-/opt/soft/spark1}
export SPARK_HOME2=${SPARK_HOME2:-/opt/soft/spark2}
export SPARK_HOME=${SPARK_HOME:-/opt/soft/spark}
export PYTHON_HOME=${PYTHON_HOME:-/opt/soft/python}
export HIVE_HOME=${HIVE_HOME:-/opt/soft/hive}
export FLINK_HOME=${FLINK_HOME:-/opt/soft/flink}
export DATAX_HOME=${DATAX_HOME:-/opt/soft/datax}
export PATH=$HADOOP_HOME/bin:$SPARK_HOME1/bin:$SPARK_HOME2/bin:$PYTHON_HOME/bin:$JAVA_HOME/bin:$HIVE_HOME/bin:$FLINK_HOME/bin:$DATAX_HOME/bin:$PATH
export PATH=$HADOOP_HOME/bin:$SPARK_HOME/bin:$PYTHON_HOME/bin:$JAVA_HOME/bin:$HIVE_HOME/bin:$FLINK_HOME/bin:$DATAX_HOME/bin:$PATH
```
## 初始化数据库

1
docs/docs/zh/guide/task/spark.md

@ -18,7 +18,6 @@ Spark 任务类型用于执行 Spark 应用。对于 Spark 节点,worker 支
- 默认参数说明请参考[DolphinScheduler任务参数附录](appendix.md#默认任务参数)。
- 程序类型:支持 Java、Scala、Python 和 SQL 四种语言。
- Spark 版本:支持 Spark1 和 Spark2。
- 主函数的 Class:Spark 程序的入口 Main class 的全路径。
- 主程序包:执行 Spark 程序的 jar 包(通过资源中心上传)。
- SQL脚本:Spark sql 运行的 .sql 文件中的 SQL 语句。

2
docs/docs/zh/guide/upgrade/incompatible.md

@ -4,6 +4,8 @@
## dev
* Remove the spark version of spark task ([#11860](https://github.com/apache/dolphinscheduler/pull/11860)).
## 3.0.0
* Copy and import workflow without 'copy' suffix [#10607](https://github.com/apache/dolphinscheduler/pull/10607)

5
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/EnvironmentServiceTest.java

@ -292,8 +292,7 @@ public class EnvironmentServiceTest {
private String getConfig() {
return "export HADOOP_HOME=/opt/hadoop-2.6.5\n"
+ "export HADOOP_CONF_DIR=/etc/hadoop/conf\n"
+ "export SPARK_HOME1=/opt/soft/spark1\n"
+ "export SPARK_HOME2=/opt/soft/spark2\n"
+ "export SPARK_HOME=/opt/soft/spark\n"
+ "export PYTHON_HOME=/opt/soft/python\n"
+ "export JAVA_HOME=/opt/java/jdk1.8.0_181-amd64\n"
+ "export HIVE_HOME=/opt/soft/hive\n"
@ -301,7 +300,7 @@ public class EnvironmentServiceTest {
+ "export DATAX_HOME=/opt/soft/datax\n"
+ "export YARN_CONF_DIR=\"/etc/hadoop/conf\"\n"
+ "\n"
+ "export PATH=$HADOOP_HOME/bin:$SPARK_HOME1/bin:$SPARK_HOME2/bin:$PYTHON_HOME/bin:$JAVA_HOME/bin:$HIVE_HOME/bin:$FLINK_HOME/bin:$DATAX_HOME/bin:$PATH\n"
+ "export PATH=$HADOOP_HOME/bin:$SPARK_HOME/bin:$PYTHON_HOME/bin:$JAVA_HOME/bin:$HIVE_HOME/bin:$FLINK_HOME/bin:$DATAX_HOME/bin:$PATH\n"
+ "\n"
+ "export HADOOP_CLASSPATH=`hadoop classpath`\n"
+ "\n"

49
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SparkVersion.java

@ -1,49 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.enums;
import com.baomidou.mybatisplus.annotation.EnumValue;
public enum SparkVersion {
/**
* 0 SPARK1
* 1 SPARK2
* 2 SPARKSQL
*/
SPARK1(0, "SPARK1"),
SPARK2(1, "SPARK2"),
SPARKSQL(2, "SPARKSQL");
SparkVersion(int code, String descp) {
this.code = code;
this.descp = descp;
}
@EnumValue
private final int code;
private final String descp;
public int getCode() {
return code;
}
public String getDescp() {
return descp;
}
}

5
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ClusterMapperTest.java

@ -173,8 +173,7 @@ public class ClusterMapperTest extends BaseDaoTest {
private String getConfig() {
return "export HADOOP_HOME=/opt/hadoop-2.6.5\n"
+ "export HADOOP_CONF_DIR=/etc/hadoop/conf\n"
+ "export SPARK_HOME1=/opt/soft/spark1\n"
+ "export SPARK_HOME2=/opt/soft/spark2\n"
+ "export SPARK_HOME=/opt/soft/spark\n"
+ "export PYTHON_HOME=/opt/soft/python\n"
+ "export JAVA_HOME=/opt/java/jdk1.8.0_181-amd64\n"
+ "export HIVE_HOME=/opt/soft/hive\n"
@ -182,7 +181,7 @@ public class ClusterMapperTest extends BaseDaoTest {
+ "export DATAX_HOME=/opt/soft/datax\n"
+ "export YARN_CONF_DIR=\"/etc/hadoop/conf\"\n"
+ "\n"
+ "export PATH=$HADOOP_HOME/bin:$SPARK_HOME1/bin:$SPARK_HOME2/bin:$PYTHON_HOME/bin:$JAVA_HOME/bin:$HIVE_HOME/bin:$FLINK_HOME/bin:$DATAX_HOME/bin:$PATH\n"
+ "export PATH=$HADOOP_HOME/bin:$SPARK_HOME/bin:$PYTHON_HOME/bin:$JAVA_HOME/bin:$HIVE_HOME/bin:$FLINK_HOME/bin:$DATAX_HOME/bin:$PATH\n"
+ "\n"
+ "export HADOOP_CLASSPATH=`hadoop classpath`\n"
+ "\n"

5
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/EnvironmentMapperTest.java

@ -173,8 +173,7 @@ public class EnvironmentMapperTest extends BaseDaoTest {
private String getConfig() {
return "export HADOOP_HOME=/opt/hadoop-2.6.5\n"
+ "export HADOOP_CONF_DIR=/etc/hadoop/conf\n"
+ "export SPARK_HOME1=/opt/soft/spark1\n"
+ "export SPARK_HOME2=/opt/soft/spark2\n"
+ "export SPARK_HOME=/opt/soft/spark\n"
+ "export PYTHON_HOME=/opt/soft/python\n"
+ "export JAVA_HOME=/opt/java/jdk1.8.0_181-amd64\n"
+ "export HIVE_HOME=/opt/soft/hive\n"
@ -182,7 +181,7 @@ public class EnvironmentMapperTest extends BaseDaoTest {
+ "export DATAX_HOME=/opt/soft/datax\n"
+ "export YARN_CONF_DIR=\"/etc/hadoop/conf\"\n"
+ "\n"
+ "export PATH=$HADOOP_HOME/bin:$SPARK_HOME1/bin:$SPARK_HOME2/bin:$PYTHON_HOME/bin:$JAVA_HOME/bin:$HIVE_HOME/bin:$FLINK_HOME/bin:$DATAX_HOME/bin:$PATH\n"
+ "export PATH=$HADOOP_HOME/bin:$SPARK_HOME/bin:$PYTHON_HOME/bin:$JAVA_HOME/bin:$HIVE_HOME/bin:$FLINK_HOME/bin:$DATAX_HOME/bin:$PATH\n"
+ "\n"
+ "export HADOOP_CLASSPATH=`hadoop classpath`\n"
+ "\n"

1
dolphinscheduler-python/pydolphinscheduler/UPDATING.md

@ -25,6 +25,7 @@ It started after version 2.0.5 released
## dev
* Remove parameter ``task_location`` in process definition and Java Gateway service ([#11681](https://github.com/apache/dolphinscheduler/pull/11681))
* Remove the spark version of spark task ([#11860](https://github.com/apache/dolphinscheduler/pull/11860)).
## 3.0.0

1
dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Spark.yaml

@ -27,4 +27,3 @@ tasks:
main_package: test_java.jar
program_type: SCALA
deploy_mode: local
spark_version: SPARK1

10
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/spark.py

@ -23,13 +23,6 @@ from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.engine import Engine, ProgramType
class SparkVersion(str):
"""Spark version, for now it just contain `SPARK1` and `SPARK2`."""
SPARK1 = "SPARK1"
SPARK2 = "SPARK2"
class DeployMode(str):
"""SPARK deploy mode, for now it just contain `LOCAL`, `CLIENT` and `CLUSTER`."""
@ -43,7 +36,6 @@ class Spark(Engine):
_task_custom_attr = {
"deploy_mode",
"spark_version",
"driver_cores",
"driver_memory",
"num_executors",
@ -61,7 +53,6 @@ class Spark(Engine):
main_package: str,
program_type: Optional[ProgramType] = ProgramType.SCALA,
deploy_mode: Optional[DeployMode] = DeployMode.CLUSTER,
spark_version: Optional[SparkVersion] = SparkVersion.SPARK2,
app_name: Optional[str] = None,
driver_cores: Optional[int] = 1,
driver_memory: Optional[str] = "512M",
@ -83,7 +74,6 @@ class Spark(Engine):
**kwargs
)
self.deploy_mode = deploy_mode
self.spark_version = spark_version
self.app_name = app_name
self.driver_cores = driver_cores
self.driver_memory = driver_memory

3
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_spark.py

@ -19,7 +19,7 @@
from unittest.mock import patch
from pydolphinscheduler.tasks.spark import DeployMode, ProgramType, Spark, SparkVersion
from pydolphinscheduler.tasks.spark import DeployMode, ProgramType, Spark
@patch(
@ -50,7 +50,6 @@ def test_spark_get_define(mock_resource):
},
"programType": program_type,
"deployMode": deploy_mode,
"sparkVersion": SparkVersion.SPARK2,
"driverCores": 1,
"driverMemory": "512M",
"numExecutors": 2,

13
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/dataquality/spark/SparkParameters.java

@ -95,11 +95,6 @@ public class SparkParameters extends AbstractParameters {
*/
private ProgramType programType;
/**
* spark version
*/
private String sparkVersion;
/**
* resource list
*/
@ -217,14 +212,6 @@ public class SparkParameters extends AbstractParameters {
this.programType = programType;
}
public String getSparkVersion() {
return sparkVersion;
}
public void setSparkVersion(String sparkVersion) {
this.sparkVersion = sparkVersion;
}
@Override
public boolean checkParameters() {
return mainJar != null && programType != null;

26
dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java

@ -64,9 +64,9 @@ import java.util.Map;
public class DataQualityTask extends AbstractYarnTask {
/**
* spark2 command
* spark command
*/
private static final String SPARK2_COMMAND = "${SPARK_HOME2}/bin/spark-submit";
private static final String SPARK_COMMAND = "${SPARK_HOME}/bin/spark-submit";
private DataQualityParameters dataQualityParameters;
@ -81,7 +81,8 @@ public class DataQualityTask extends AbstractYarnTask {
public void init() {
logger.info("data quality task params {}", dqTaskExecutionContext.getTaskParams());
dataQualityParameters = JSONUtils.parseObject(dqTaskExecutionContext.getTaskParams(), DataQualityParameters.class);
dataQualityParameters =
JSONUtils.parseObject(dqTaskExecutionContext.getTaskParams(), DataQualityParameters.class);
if (null == dataQualityParameters) {
logger.error("data quality params is null");
@ -99,8 +100,8 @@ public class DataQualityTask extends AbstractYarnTask {
}
}
DataQualityTaskExecutionContext dataQualityTaskExecutionContext
= dqTaskExecutionContext.getDataQualityTaskExecutionContext();
DataQualityTaskExecutionContext dataQualityTaskExecutionContext =
dqTaskExecutionContext.getDataQualityTaskExecutionContext();
operateInputParameter(inputParameter, dataQualityTaskExecutionContext);
@ -114,7 +115,9 @@ public class DataQualityTask extends AbstractYarnTask {
dataQualityParameters
.getSparkParameters()
.setMainArgs("\""
+ StringUtils.replaceDoubleBrackets(StringUtils.escapeJava(JSONUtils.toJsonString(dataQualityConfiguration))) + "\"");
+ StringUtils.replaceDoubleBrackets(
StringUtils.escapeJava(JSONUtils.toJsonString(dataQualityConfiguration)))
+ "\"");
dataQualityParameters
.getSparkParameters()
@ -123,7 +126,8 @@ public class DataQualityTask extends AbstractYarnTask {
setMainJarName();
}
private void operateInputParameter(Map<String, String> inputParameter, DataQualityTaskExecutionContext dataQualityTaskExecutionContext) {
private void operateInputParameter(Map<String, String> inputParameter,
DataQualityTaskExecutionContext dataQualityTaskExecutionContext) {
DateTimeFormatter df = DateTimeFormatter.ofPattern(YYYY_MM_DD_HH_MM_SS);
LocalDateTime time = LocalDateTime.now();
String now = df.format(time);
@ -142,7 +146,8 @@ public class DataQualityTask extends AbstractYarnTask {
}
if (StringUtils.isNotEmpty(inputParameter.get(REGEXP_PATTERN))) {
inputParameter.put(REGEXP_PATTERN,StringUtils.escapeJava(StringUtils.escapeJava(inputParameter.get(REGEXP_PATTERN))));
inputParameter.put(REGEXP_PATTERN,
StringUtils.escapeJava(StringUtils.escapeJava(inputParameter.get(REGEXP_PATTERN))));
}
if (StringUtils.isNotEmpty(dataQualityTaskExecutionContext.getHdfsPath())) {
@ -160,12 +165,13 @@ public class DataQualityTask extends AbstractYarnTask {
protected String buildCommand() {
List<String> args = new ArrayList<>();
args.add(SPARK2_COMMAND);
args.add(SPARK_COMMAND);
args.addAll(SparkArgsUtils.buildArgs(dataQualityParameters.getSparkParameters()));
// replace placeholder
Map<String, Property> paramsMap = dqTaskExecutionContext.getPrepareParamsMap();
String command = ParameterUtils.convertParameterPlaceholders(String.join(" ", args), ParamUtils.convert(paramsMap));
String command =
ParameterUtils.convertParameterPlaceholders(String.join(" ", args), ParamUtils.convert(paramsMap));
logger.info("data quality task command: {}", command);
return command;

65
dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkCommand.java

@ -1,65 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.spark;
public enum SparkCommand {
/**
* 0 SPARK1SUBMIT
* 1 SPARK2SUBMIT
* 2 SPARK1SQL
* 3 SPARK2SQL
*/
SPARK1SUBMIT(0, "SPARK1SUBMIT", "${SPARK_HOME1}/bin/spark-submit", SparkVersion.SPARK1),
SPARK2SUBMIT(1, "SPARK2SUBMIT", "${SPARK_HOME2}/bin/spark-submit", SparkVersion.SPARK2),
SPARK1SQL(2, "SPARK1SQL", "${SPARK_HOME1}/bin/spark-sql", SparkVersion.SPARK1),
SPARK2SQL(3, "SPARK2SQL", "${SPARK_HOME2}/bin/spark-sql", SparkVersion.SPARK2);
private final int code;
private final String descp;
/**
* usage: spark-submit [options] <app jar | python file> [app arguments]
*/
private final String command;
private final SparkVersion sparkVersion;
SparkCommand(int code, String descp, String command, SparkVersion sparkVersion) {
this.code = code;
this.descp = descp;
this.command = command;
this.sparkVersion = sparkVersion;
}
public int getCode() {
return code;
}
public String getDescp() {
return descp;
}
public String getCommand() {
return command;
}
public SparkVersion getSparkVersion() {
return sparkVersion;
}
}

10
dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkConstants.java

@ -79,4 +79,14 @@ public class SparkConstants {
*/
public static final String SQL_FROM_FILE = "-f";
/**
* spark submit command for sql
*/
public static final String SPARK_SQL_COMMAND = "${SPARK_HOME}/bin/spark-sql";
/**
* spark submit command
*/
public static final String SPARK_SUBMIT_COMMAND = "${SPARK_HOME}/bin/spark-submit";
}

13
dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParameters.java

@ -95,11 +95,6 @@ public class SparkParameters extends AbstractParameters {
*/
private ProgramType programType;
/**
* spark version
*/
private String sparkVersion;
/**
* spark sql script
*/
@ -222,14 +217,6 @@ public class SparkParameters extends AbstractParameters {
this.programType = programType;
}
public String getSparkVersion() {
return sparkVersion;
}
public void setSparkVersion(String sparkVersion) {
this.sparkVersion = sparkVersion;
}
public String getRawScript() {
return rawScript;
}

29
dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java

@ -97,20 +97,13 @@ public class SparkTask extends AbstractYarnTask {
*/
List<String> args = new ArrayList<>();
// spark version
String sparkCommand = SparkCommand.SPARK2SUBMIT.getCommand();
// If the programType is non-SQL, execute bin/spark-submit
if (SparkCommand.SPARK1SUBMIT.getSparkVersion().name().equals(sparkParameters.getSparkVersion())) {
sparkCommand = SparkCommand.SPARK1SUBMIT.getCommand();
}
String sparkCommand;
// If the programType is SQL, execute bin/spark-sql
if (sparkParameters.getProgramType() == ProgramType.SQL) {
sparkCommand = SparkCommand.SPARK2SQL.getCommand();
if (SparkCommand.SPARK1SQL.getSparkVersion().name().equals(sparkParameters.getSparkVersion())) {
sparkCommand = SparkCommand.SPARK1SQL.getCommand();
}
sparkCommand = SparkConstants.SPARK_SQL_COMMAND;
} else {
// If the programType is non-SQL, execute bin/spark-submit
sparkCommand = SparkConstants.SPARK_SUBMIT_COMMAND;
}
args.add(sparkCommand);
@ -121,7 +114,8 @@ public class SparkTask extends AbstractYarnTask {
// replace placeholder, and combining local and global parameters
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
String command = ParameterUtils.convertParameterPlaceholders(String.join(" ", args), ParamUtils.convert(paramsMap));
String command =
ParameterUtils.convertParameterPlaceholders(String.join(" ", args), ParamUtils.convert(paramsMap));
logger.info("spark task command: {}", command);
@ -137,7 +131,8 @@ public class SparkTask extends AbstractYarnTask {
List<String> args = new ArrayList<>();
args.add(SparkConstants.MASTER);
String deployMode = StringUtils.isNotEmpty(sparkParameters.getDeployMode()) ? sparkParameters.getDeployMode() : SparkConstants.DEPLOY_MODE_LOCAL;
String deployMode = StringUtils.isNotEmpty(sparkParameters.getDeployMode()) ? sparkParameters.getDeployMode()
: SparkConstants.DEPLOY_MODE_LOCAL;
if (!SparkConstants.DEPLOY_MODE_LOCAL.equals(deployMode)) {
args.add(SparkConstants.SPARK_ON_YARN);
args.add(SparkConstants.DEPLOY_MODE);
@ -160,7 +155,8 @@ public class SparkTask extends AbstractYarnTask {
}
String others = sparkParameters.getOthers();
if (!SparkConstants.DEPLOY_MODE_LOCAL.equals(deployMode) && (StringUtils.isEmpty(others) || !others.contains(SparkConstants.SPARK_QUEUE))) {
if (!SparkConstants.DEPLOY_MODE_LOCAL.equals(deployMode)
&& (StringUtils.isEmpty(others) || !others.contains(SparkConstants.SPARK_QUEUE))) {
String queue = sparkParameters.getQueue();
if (StringUtils.isNotEmpty(queue)) {
args.add(SparkConstants.SPARK_QUEUE);
@ -224,7 +220,8 @@ public class SparkTask extends AbstractYarnTask {
}
private String generateScriptFile() {
String scriptFileName = String.format("%s/%s_node.sql", taskExecutionContext.getExecutePath(), taskExecutionContext.getTaskAppId());
String scriptFileName = String.format("%s/%s_node.sql", taskExecutionContext.getExecutePath(),
taskExecutionContext.getTaskAppId());
File file = new File(scriptFileName);
Path path = file.toPath();

22
dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkVersion.java

@ -1,22 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.spark;
public enum SparkVersion {
SPARK1, SPARK2
}

75
dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java

@ -17,12 +17,15 @@
package org.apache.dolphinscheduler.plugin.task.spark;
import java.util.Collections;
import static org.powermock.api.mockito.PowerMockito.spy;
import static org.powermock.api.mockito.PowerMockito.when;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import java.util.Collections;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -31,9 +34,6 @@ import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import static org.powermock.api.mockito.PowerMockito.spy;
import static org.powermock.api.mockito.PowerMockito.when;
@RunWith(PowerMockRunner.class)
@PrepareForTest({
JSONUtils.class
@ -43,8 +43,8 @@ import static org.powermock.api.mockito.PowerMockito.when;
public class SparkTaskTest {
@Test
public void testBuildCommandWithSpark2Sql() throws Exception {
String parameters = buildSparkParametersWithSparkSql(ProgramType.SQL, "SPARK2");
public void testBuildCommandWithSparkSql() throws Exception {
String parameters = buildSparkParametersWithSparkSql();
TaskExecutionContext taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class);
when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
when(taskExecutionContext.getExecutePath()).thenReturn("/tmp");
@ -53,28 +53,7 @@ public class SparkTaskTest {
SparkTask sparkTask = spy(new SparkTask(taskExecutionContext));
sparkTask.init();
Assert.assertEquals(sparkTask.buildCommand(),
"${SPARK_HOME2}/bin/spark-sql " +
"--master yarn " +
"--deploy-mode client " +
"--driver-cores 1 " +
"--driver-memory 512M " +
"--num-executors 2 " +
"--executor-cores 2 " +
"--executor-memory 1G " +
"--name sparksql " +
"-f /tmp/5536_node.sql");
}
@Test
public void testBuildCommandWithSpark1Sql() throws Exception {
String parameters = buildSparkParametersWithSparkSql(ProgramType.SQL, "SPARK1");
TaskExecutionContext taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class);
when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
when(taskExecutionContext.getExecutePath()).thenReturn("/tmp");
when(taskExecutionContext.getTaskAppId()).thenReturn("5536");
SparkTask sparkTask = spy(new SparkTask(taskExecutionContext));
sparkTask.init();
Assert.assertEquals(sparkTask.buildCommand(),
"${SPARK_HOME1}/bin/spark-sql " +
"${SPARK_HOME}/bin/spark-sql " +
"--master yarn " +
"--deploy-mode client " +
"--driver-cores 1 " +
@ -87,8 +66,8 @@ public class SparkTaskTest {
}
@Test
public void testBuildCommandWithSpark2Submit() throws Exception {
String parameters = buildSparkParametersWithSparkSubmit(ProgramType.SCALA, "SPARK2");
public void testBuildCommandWithSparkSubmit() {
String parameters = buildSparkParametersWithSparkSubmit();
TaskExecutionContext taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class);
when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
when(taskExecutionContext.getExecutePath()).thenReturn("/tmp");
@ -96,7 +75,7 @@ public class SparkTaskTest {
SparkTask sparkTask = spy(new SparkTask(taskExecutionContext));
sparkTask.init();
Assert.assertEquals(sparkTask.buildCommand(),
"${SPARK_HOME2}/bin/spark-submit " +
"${SPARK_HOME}/bin/spark-submit " +
"--master yarn " +
"--deploy-mode client " +
"--class org.apache.dolphinscheduler.plugin.task.spark.SparkTaskTest " +
@ -108,38 +87,16 @@ public class SparkTaskTest {
"--name spark " +
"lib/dolphinscheduler-task-spark.jar");
}
@Test
public void testBuildCommandWithSpark1Submit() throws Exception {
String parameters = buildSparkParametersWithSparkSubmit(ProgramType.SCALA, "SPARK1");
TaskExecutionContext taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class);
when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
when(taskExecutionContext.getExecutePath()).thenReturn("/tmp");
when(taskExecutionContext.getTaskAppId()).thenReturn("5536");
SparkTask sparkTask = spy(new SparkTask(taskExecutionContext));
sparkTask.init();
Assert.assertEquals(sparkTask.buildCommand(),
"${SPARK_HOME1}/bin/spark-submit " +
"--master yarn " +
"--deploy-mode client " +
"--class org.apache.dolphinscheduler.plugin.task.spark.SparkTaskTest " +
"--driver-cores 1 " +
"--driver-memory 512M " +
"--num-executors 2 " +
"--executor-cores 2 " +
"--executor-memory 1G " +
"--name spark " +
"lib/dolphinscheduler-task-spark.jar");
}
private String buildSparkParametersWithSparkSql(ProgramType programType, String sparkVersion) {
private String buildSparkParametersWithSparkSql() {
SparkParameters sparkParameters = new SparkParameters();
sparkParameters.setLocalParams(Collections.emptyList());
sparkParameters.setRawScript("selcet 11111;");
sparkParameters.setProgramType(programType);
sparkParameters.setProgramType(ProgramType.SQL);
sparkParameters.setMainClass("");
sparkParameters.setDeployMode("client");
sparkParameters.setAppName("sparksql");
sparkParameters.setOthers("");
sparkParameters.setSparkVersion(sparkVersion);
sparkParameters.setDriverCores(1);
sparkParameters.setDriverMemory("512M");
sparkParameters.setNumExecutors(2);
@ -147,15 +104,15 @@ public class SparkTaskTest {
sparkParameters.setExecutorCores(2);
return JSONUtils.toJsonString(sparkParameters);
}
private String buildSparkParametersWithSparkSubmit(ProgramType programType, String sparkVersion) {
private String buildSparkParametersWithSparkSubmit() {
SparkParameters sparkParameters = new SparkParameters();
sparkParameters.setLocalParams(Collections.emptyList());
sparkParameters.setProgramType(programType);
sparkParameters.setProgramType(ProgramType.SCALA);
sparkParameters.setMainClass("org.apache.dolphinscheduler.plugin.task.spark.SparkTaskTest");
sparkParameters.setDeployMode("client");
sparkParameters.setAppName("spark");
sparkParameters.setOthers("");
sparkParameters.setSparkVersion(sparkVersion);
sparkParameters.setDriverCores(1);
sparkParameters.setDriverMemory("512M");
sparkParameters.setNumExecutors(2);

1
dolphinscheduler-ui/src/locales/en_US/project.ts

@ -400,7 +400,6 @@ export default {
value_required_tips: 'value(required)',
pre_tasks: 'Pre tasks',
program_type: 'Program Type',
spark_version: 'Spark Version',
main_class: 'Main Class',
main_class_tips: 'Please enter main class',
main_package: 'Main Package',

1
dolphinscheduler-ui/src/locales/zh_CN/project.ts

@ -396,7 +396,6 @@ export default {
value_required_tips: 'value(必填)',
pre_tasks: '前置任务',
program_type: '程序类型',
spark_version: 'Spark版本',
main_class: '主函数的Class',
main_class_tips: '请填写主函数的Class',
main_package: '主程序包',

18
dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-spark.ts

@ -55,13 +55,6 @@ export function useSpark(model: { [field: string]: any }): IJsonItem[] {
}
}
},
{
type: 'select',
field: 'sparkVersion',
span: 12,
name: t('project.node.spark_version'),
options: SPARK_VERSIONS
},
{
type: 'input',
field: 'mainClass',
@ -152,14 +145,3 @@ export const PROGRAM_TYPES = [
value: 'SQL'
}
]
export const SPARK_VERSIONS = [
{
label: 'SPARK2',
value: 'SPARK2'
},
{
label: 'SPARK1',
value: 'SPARK1'
}
]

1
dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts

@ -62,7 +62,6 @@ export function formatParams(data: INodeData): {
}
if (data.taskType === 'SPARK') {
taskParams.sparkVersion = data.sparkVersion
taskParams.driverCores = data.driverCores
taskParams.driverMemory = data.driverMemory
taskParams.numExecutors = data.numExecutors

1
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-spark.ts

@ -44,7 +44,6 @@ export function useSpark({
delayTime: 0,
timeout: 30,
programType: 'SCALA',
sparkVersion: 'SPARK2',
rawScript: '',
deployMode: 'local',
driverCores: 1,

1
dolphinscheduler-ui/src/views/projects/task/components/node/types.ts

@ -229,7 +229,6 @@ interface ITaskParams {
rawScript?: string
initScript?: string
programType?: string
sparkVersion?: string
flinkVersion?: string
jobManagerMemory?: string
taskManagerMemory?: string

5
script/env/dolphinscheduler_env.sh vendored

@ -24,8 +24,7 @@ export JAVA_HOME=${JAVA_HOME:-/opt/soft/java}
# Tasks related configurations, need to change the configuration if you use the related tasks.
export HADOOP_HOME=${HADOOP_HOME:-/opt/soft/hadoop}
export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-/opt/soft/hadoop/etc/hadoop}
export SPARK_HOME1=${SPARK_HOME1:-/opt/soft/spark1}
export SPARK_HOME2=${SPARK_HOME2:-/opt/soft/spark2}
export SPARK_HOME=${SPARK_HOME:-/opt/soft/spark}
export PYTHON_HOME=${PYTHON_HOME:-/opt/soft/python}
export HIVE_HOME=${HIVE_HOME:-/opt/soft/hive}
export FLINK_HOME=${FLINK_HOME:-/opt/soft/flink}
@ -33,4 +32,4 @@ export DATAX_HOME=${DATAX_HOME:-/opt/soft/datax}
export SEATUNNEL_HOME=${SEATUNNEL_HOME:-/opt/soft/seatunnel}
export CHUNJUN_HOME=${CHUNJUN_HOME:-/opt/soft/chunjun}
export PATH=$HADOOP_HOME/bin:$SPARK_HOME1/bin:$SPARK_HOME2/bin:$PYTHON_HOME/bin:$JAVA_HOME/bin:$HIVE_HOME/bin:$FLINK_HOME/bin:$DATAX_HOME/bin:$SEATUNNEL_HOME/bin:$CHUNJUN_HOME/bin:$PATH
export PATH=$HADOOP_HOME/bin:$SPARK_HOME/bin:$PYTHON_HOME/bin:$JAVA_HOME/bin:$HIVE_HOME/bin:$FLINK_HOME/bin:$DATAX_HOME/bin:$SEATUNNEL_HOME/bin:$CHUNJUN_HOME/bin:$PATH

Loading…
Cancel
Save