Browse Source

Merge remote-tracking branch 'upstream/dev' into spilit

pull/3/MERGE
lenboo 4 years ago
parent
commit
81dcd6873d
  1. 7
      docker/build/README.md
  2. 7
      docker/build/README_zh_CN.md
  3. 5
      docker/build/hooks/build
  4. 11
      docker/build/hooks/build.bat
  5. 2
      docker/build/hooks/push
  6. 6
      docker/docker-swarm/docker-compose.yml
  7. 6
      docker/docker-swarm/docker-stack.yml
  8. 14
      docker/kubernetes/dolphinscheduler/README.md
  9. 80
      docker/kubernetes/dolphinscheduler/templates/_helpers.tpl
  10. 10
      docker/kubernetes/dolphinscheduler/templates/deployment-dolphinscheduler-alert.yaml
  11. 12
      docker/kubernetes/dolphinscheduler/templates/deployment-dolphinscheduler-api.yaml
  12. 2
      docker/kubernetes/dolphinscheduler/templates/ingress.yaml
  13. 6
      docker/kubernetes/dolphinscheduler/templates/secret-external-database.yaml
  14. 2
      docker/kubernetes/dolphinscheduler/templates/secret-external-fs-s3a.yaml
  15. 10
      docker/kubernetes/dolphinscheduler/templates/statefulset-dolphinscheduler-master.yaml
  16. 12
      docker/kubernetes/dolphinscheduler/templates/statefulset-dolphinscheduler-worker.yaml
  17. 2
      docker/kubernetes/dolphinscheduler/templates/svc-dolphinscheduler-master-headless.yaml
  18. 2
      docker/kubernetes/dolphinscheduler/templates/svc-dolphinscheduler-worker-headless.yaml
  19. 7
      docker/kubernetes/dolphinscheduler/values.yaml
  20. 4
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  21. 369
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/spark/SparkParameters.java
  22. 2
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.xml
  23. 2
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
  24. 4
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ResourceMapper.xml
  25. 8
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.xml
  26. 1
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java
  27. 21
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
  28. 43
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtils.java
  29. 40
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java
  30. 3
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java
  31. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
  32. 24
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java
  33. 15
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ProcessUtilsTest.java
  34. 65
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtilsTest.java
  35. 17
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/spark.vue

7
docker/build/README.md

@ -11,6 +11,11 @@ Official Website: https://dolphinscheduler.apache.org
[![EN doc](https://img.shields.io/badge/document-English-blue.svg)](README.md)
[![CN doc](https://img.shields.io/badge/文档-中文版-blue.svg)](README_zh_CN.md)
## Prerequisites
- [Docker](https://docs.docker.com/engine/) 1.13.1+
- [Docker Compose](https://docs.docker.com/compose/) 1.11.0+
## How to use this docker image
#### You can start a dolphinscheduler by docker-compose (recommended)
@ -27,6 +32,8 @@ Access the Web UI: http://192.168.xx.xx:12345/dolphinscheduler
The default username is `admin` and the default password is `dolphinscheduler123`
> **Tip**: For quick start in docker, you can create a tenant named `ds` and associate the user `admin` with the tenant `ds`
#### Or via Environment Variables **`DATABASE_HOST`** **`DATABASE_PORT`** **`DATABASE_DATABASE`** **`ZOOKEEPER_QUORUM`**
You can specify **existing postgres and zookeeper service**. Example:

7
docker/build/README_zh_CN.md

@ -11,6 +11,11 @@ Official Website: https://dolphinscheduler.apache.org
[![EN doc](https://img.shields.io/badge/document-English-blue.svg)](README.md)
[![CN doc](https://img.shields.io/badge/文档-中文版-blue.svg)](README_zh_CN.md)
## 先决条件
- [Docker](https://docs.docker.com/engine/) 1.13.1+
- [Docker Compose](https://docs.docker.com/compose/) 1.11.0+
## 如何使用docker镜像
#### 以 docker-compose 的方式启动dolphinscheduler(推荐)
@ -27,6 +32,8 @@ $ docker-compose -f ./docker/docker-swarm/docker-compose.yml up -d
默认的用户是`admin`,默认的密码是`dolphinscheduler123`
> **提示**: 为了在docker中快速开始,你可以创建一个名为`ds`的租户,并将这个租户`ds`关联到用户`admin`
#### 或者通过环境变量 **`DATABASE_HOST`** **`DATABASE_PORT`** **`ZOOKEEPER_QUORUM`** 使用已存在的服务
你可以指定已经存在的 **`Postgres`** 和 **`Zookeeper`** 服务. 如下:

5
docker/build/hooks/build

@ -24,8 +24,7 @@ printenv
if [ -z "${VERSION}" ]
then
echo "set default environment variable [VERSION]"
VERSION=$(grep '<revision>' -m 1 "$(pwd)"/pom.xml | awk '{print $1}' | sed 's/<revision>//' | sed 's/<\/revision>//')
export VERSION
export VERSION=$(cat $(pwd)/pom.xml | grep '<revision>' -m 1 | awk '{print $1}' | sed 's/<revision>//' | sed 's/<\/revision>//')
fi
if [ "${DOCKER_REPO}x" = "x" ]
@ -45,7 +44,7 @@ mvn -B clean compile package -Prelease -Dmaven.test.skip=true
# mv dolphinscheduler-bin.tar.gz file to docker/build directory
echo -e "mv $(pwd)/dolphinscheduler-dist/target/apache-dolphinscheduler-incubating-${VERSION}-dolphinscheduler-bin.tar.gz $(pwd)/docker/build/\n"
mv "$(pwd)"/dolphinscheduler-dist/target/apache-dolphinscheduler-incubating-"${VERSION}"-dolphinscheduler-bin.tar.gz $(pwd)/docker/build/
mv $(pwd)/dolphinscheduler-dist/target/apache-dolphinscheduler-incubating-${VERSION}-dolphinscheduler-bin.tar.gz $(pwd)/docker/build/
# docker build
BUILD_COMMAND="docker build --build-arg VERSION=${VERSION} -t $DOCKER_REPO:${VERSION} $(pwd)/docker/build/"

11
docker/build/hooks/build.bat

@ -17,15 +17,20 @@
echo "------ dolphinscheduler start - build -------"
set
setlocal enableextensions enabledelayedexpansion
if not defined VERSION (
echo "set environment variable [VERSION]"
for /f %%l in (%cd%\sql\soft_version) do (set VERSION=%%l)
set first=1
for /f "tokens=3 delims=<>" %%a in ('findstr "<revision>[0-9].*</revision>" %cd%\pom.xml') do (
if !first! EQU 1 (set VERSION=%%a)
set first=0
)
)
if not defined DOCKER_REPO (
echo "set environment variable [DOCKER_REPO]"
set DOCKER_REPO='dolphinscheduler'
set DOCKER_REPO=dolphinscheduler
)
echo "Version: %VERSION%"
@ -40,7 +45,7 @@ if "%errorlevel%"=="1" goto :mvnFailed
:: move dolphinscheduler-bin.tar.gz file to docker/build directory
echo "move %cd%\dolphinscheduler-dist\target\apache-dolphinscheduler-incubating-%VERSION%-SNAPSHOT-dolphinscheduler-bin.tar.gz %cd%\docker\build\"
move %cd%\dolphinscheduler-dist\target\apache-dolphinscheduler-incubating-%VERSION%-SNAPSHOT-dolphinscheduler-bin.tar.gz %cd%\docker\build\
move %cd%\dolphinscheduler-dist\target\apache-dolphinscheduler-incubating-%VERSION%-dolphinscheduler-bin.tar.gz %cd%\docker\build\
:: docker build
echo "docker build --build-arg VERSION=%VERSION% -t %DOCKER_REPO%:%VERSION% %cd%\docker\build\"

2
docker/build/hooks/push

@ -19,6 +19,6 @@
echo "------ push start -------"
printenv
docker push "$DOCKER_REPO:${VERSION}"
docker push $DOCKER_REPO:${VERSION}
echo "------ push end -------"

6
docker/docker-swarm/docker-compose.yml

@ -14,7 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
version: "3.4"
version: "3.1"
services:
@ -76,7 +76,6 @@ services:
interval: 30s
timeout: 5s
retries: 3
start_period: 30s
depends_on:
- dolphinscheduler-postgresql
- dolphinscheduler-zookeeper
@ -110,7 +109,6 @@ services:
interval: 30s
timeout: 5s
retries: 3
start_period: 30s
depends_on:
- dolphinscheduler-postgresql
volumes:
@ -150,7 +148,6 @@ services:
interval: 30s
timeout: 5s
retries: 3
start_period: 30s
depends_on:
- dolphinscheduler-postgresql
- dolphinscheduler-zookeeper
@ -204,7 +201,6 @@ services:
interval: 30s
timeout: 5s
retries: 3
start_period: 30s
depends_on:
- dolphinscheduler-postgresql
- dolphinscheduler-zookeeper

6
docker/docker-swarm/docker-stack.yml

@ -14,7 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
version: "3.4"
version: "3.1"
services:
@ -76,7 +76,6 @@ services:
interval: 30s
timeout: 5s
retries: 3
start_period: 30s
volumes:
- dolphinscheduler-logs:/opt/dolphinscheduler/logs
networks:
@ -107,7 +106,6 @@ services:
interval: 30s
timeout: 5s
retries: 3
start_period: 30s
volumes:
- dolphinscheduler-logs:/opt/dolphinscheduler/logs
networks:
@ -146,7 +144,6 @@ services:
interval: 30s
timeout: 5s
retries: 3
start_period: 30s
volumes:
- dolphinscheduler-logs:/opt/dolphinscheduler/logs
networks:
@ -198,7 +195,6 @@ services:
interval: 30s
timeout: 5s
retries: 3
start_period: 30s
volumes:
- dolphinscheduler-worker-data:/tmp/dolphinscheduler
- dolphinscheduler-logs:/opt/dolphinscheduler/logs

14
docker/kubernetes/dolphinscheduler/README.md

@ -54,6 +54,8 @@ And then access the web: http://192.168.xx.xx:12345/dolphinscheduler
The default username is `admin` and the default password is `dolphinscheduler123`
> **Tip**: For quick start in docker, you can create a tenant named `ds` and associate the user `admin` with the tenant `ds`
## Uninstalling the Chart
To uninstall/delete the `dolphinscheduler` deployment:
@ -78,14 +80,12 @@ The Configuration file is `values.yaml`, and the following tables lists the conf
| Parameter | Description | Default |
| --------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------ | ----------------------------------------------------- |
| `nameOverride` | String to partially override common.names.fullname | `nil` |
| `fullnameOverride` | String to fully override common.names.fullname | `nil` |
| `timezone` | World time and date for cities in all time zones | `Asia/Shanghai` |
| `image.registry` | Docker image registry for the DolphinScheduler | `docker.io` |
| | | |
| `image.repository` | Docker image repository for the DolphinScheduler | `apache/dolphinscheduler` |
| `image.tag` | Docker image version for the DolphinScheduler | `latest` |
| `image.pullPolicy` | Image pull policy. One of Always, Never, IfNotPresent | `IfNotPresent` |
| `image.pullSecrets` | Image pull secrets. An optional list of references to secrets in the same namespace to use for pulling any of the images | `[]` |
| `image.pullSecret` | Image pull secret. An optional reference to secret in the same namespace to use for pulling any of the images | `nil` |
| | | |
| `postgresql.enabled` | If not exists external PostgreSQL, by default, the DolphinScheduler will use a internal PostgreSQL | `true` |
| `postgresql.postgresqlUsername` | The username for internal PostgreSQL | `root` |
@ -283,7 +283,7 @@ docker build -t apache/dolphinscheduler:mysql .
4. Push the docker image `apache/dolphinscheduler:mysql` to a docker registry
5. Modify image `registry` and `repository`, and update `tag` to `mysql` in `values.yaml`
5. Modify image `repository` and update `tag` to `mysql` in `values.yaml`
6. Modify postgresql `enabled` to `false`
@ -326,7 +326,7 @@ docker build -t apache/dolphinscheduler:mysql-driver .
4. Push the docker image `apache/dolphinscheduler:mysql-driver` to a docker registry
5. Modify image `registry` and `repository`, and update `tag` to `mysql-driver` in `values.yaml`
5. Modify image `repository` and update `tag` to `mysql-driver` in `values.yaml`
6. Run a DolphinScheduler release in Kubernetes (See **Installing the Chart**)
@ -355,7 +355,7 @@ docker build -t apache/dolphinscheduler:oracle-driver .
4. Push the docker image `apache/dolphinscheduler:oracle-driver` to a docker registry
5. Modify image `registry` and `repository`, and update `tag` to `oracle-driver` in `values.yaml`
5. Modify image `repository` and update `tag` to `oracle-driver` in `values.yaml`
6. Run a DolphinScheduler release in Kubernetes (See **Installing the Chart**)

80
docker/kubernetes/dolphinscheduler/templates/_helpers.tpl

@ -16,12 +16,6 @@
#
{{/* vim: set filetype=mustache: */}}
{{/*
Expand the name of the chart.
*/}}
{{- define "dolphinscheduler.name" -}}
{{- default .Chart.Name .Values.nameOverride | trunc 63 | trimSuffix "-" -}}
{{- end -}}
{{/*
Create a default fully qualified app name.
@ -29,79 +23,14 @@ We truncate at 63 chars because some Kubernetes name fields are limited to this
If release name contains chart name it will be used as a full name.
*/}}
{{- define "dolphinscheduler.fullname" -}}
{{- if .Values.fullnameOverride -}}
{{- .Values.fullnameOverride | trunc 63 | trimSuffix "-" -}}
{{- else -}}
{{- $name := default .Chart.Name .Values.nameOverride -}}
{{- if contains $name .Release.Name -}}
{{- .Release.Name | trunc 63 | trimSuffix "-" -}}
{{- else -}}
{{- printf "%s-%s" .Release.Name $name | trunc 63 | trimSuffix "-" -}}
{{- end -}}
{{- end -}}
{{- end -}}
{{/*
Create chart name and version as used by the chart label.
*/}}
{{- define "dolphinscheduler.chart" -}}
{{- printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 63 | trimSuffix "-" -}}
{{- end -}}
{{/*
Common labels
*/}}
{{- define "dolphinscheduler.labels" -}}
helm.sh/chart: {{ include "dolphinscheduler.chart" . }}
{{ include "dolphinscheduler.selectorLabels" . }}
{{- if .Chart.AppVersion }}
app.kubernetes.io/version: {{ .Chart.AppVersion | quote }}
{{- end }}
app.kubernetes.io/managed-by: {{ .Release.Service }}
{{- end -}}
{{/*
Selector labels
Create a default docker image fullname.
*/}}
{{- define "dolphinscheduler.selectorLabels" -}}
app.kubernetes.io/name: {{ include "dolphinscheduler.name" . }}
app.kubernetes.io/instance: {{ .Release.Name }}
{{- end -}}
{{/*
Create the name of the service account to use
*/}}
{{- define "dolphinscheduler.serviceAccountName" -}}
{{- if .Values.serviceAccount.create -}}
{{ default (include "dolphinscheduler.fullname" .) .Values.serviceAccount.name }}
{{- else -}}
{{ default "default" .Values.serviceAccount.name }}
{{- end -}}
{{- end -}}
{{/*
Create a default docker image registry.
We truncate at 63 chars because some Kubernetes name fields are limited to this (by the DNS naming spec).
*/}}
{{- define "dolphinscheduler.image.registry" -}}
{{- $registry := default "docker.io" .Values.image.registry -}}
{{- printf "%s" $registry | trunc 63 | trimSuffix "/" -}}
{{- end -}}
{{/*
Create a default docker image repository.
We truncate at 63 chars because some Kubernetes name fields are limited to this (by the DNS naming spec).
*/}}
{{- define "dolphinscheduler.image.repository" -}}
{{- printf "%s/%s:%s" (include "dolphinscheduler.image.registry" .) .Values.image.repository .Values.image.tag -}}
{{- end -}}
{{/*
Create a default image pull secrects.
We truncate at 63 chars because some Kubernetes name fields are limited to this (by the DNS naming spec).
*/}}
{{- define "dolphinscheduler.image.pullSecrets" -}}
{{- default nil .Values.image.pullSecrets -}}
{{- define "dolphinscheduler.image.fullname" -}}
{{- printf "%s:%s" .Values.image.repository .Values.image.tag -}}
{{- end -}}
{{/*
@ -124,9 +53,8 @@ We truncate at 63 chars because some Kubernetes name fields are limited to this
{{/*
Create a default fully qualified zookkeeper quorum.
We truncate at 63 chars because some Kubernetes name fields are limited to this (by the DNS naming spec).
*/}}
{{- define "dolphinscheduler.zookeeper.quorum" -}}
{{- $port := default "2181" (.Values.zookeeper.service.port | toString) -}}
{{- printf "%s:%s" (include "dolphinscheduler.zookeeper.fullname" .) $port | trunc 63 | trimSuffix "-" -}}
{{- printf "%s:%s" (include "dolphinscheduler.zookeeper.fullname" .) $port -}}
{{- end -}}

10
docker/kubernetes/dolphinscheduler/templates/deployment-dolphinscheduler-alert.yaml

@ -57,13 +57,13 @@ spec:
{{- if .Values.alert.tolerations }}
tolerations: {{- toYaml . | nindent 8 }}
{{- end }}
{{- if .Values.image.pullSecrets }}
{{- if .Values.image.pullSecret }}
imagePullSecrets:
- name: {{ include "dolphinscheduler.image.pullSecrets" . }}
- name: {{ .Values.image.pullSecret }}
{{- end }}
containers:
- name: {{ include "dolphinscheduler.fullname" . }}-alert
image: {{ include "dolphinscheduler.image.repository" . | quote }}
image: {{ include "dolphinscheduler.image.fullname" . }}
imagePullPolicy: {{ .Values.image.pullPolicy }}
args:
- "alert-server"
@ -120,8 +120,8 @@ spec:
name: {{ template "dolphinscheduler.postgresql.fullname" . }}
key: postgresql-password
{{- else }}
name: {{ printf "%s-%s" .Release.Name "externaldb" }}
key: db-password
name: {{ include "dolphinscheduler.fullname" . }}-externaldb
key: database-password
{{- end }}
- name: DATABASE_DATABASE
{{- if .Values.postgresql.enabled }}

12
docker/kubernetes/dolphinscheduler/templates/deployment-dolphinscheduler-api.yaml

@ -57,13 +57,13 @@ spec:
{{- if .Values.api.tolerations }}
tolerations: {{- toYaml . | nindent 8 }}
{{- end }}
{{- if .Values.image.pullSecrets }}
{{- if .Values.image.pullSecret }}
imagePullSecrets:
- name: {{ include "dolphinscheduler.image.pullSecrets" . }}
- name: {{ .Values.image.pullSecret }}
{{- end }}
containers:
- name: {{ include "dolphinscheduler.fullname" . }}-api
image: {{ include "dolphinscheduler.image.repository" . | quote }}
image: {{ include "dolphinscheduler.image.fullname" . }}
imagePullPolicy: {{ .Values.image.pullPolicy }}
args:
- "api-server"
@ -115,8 +115,8 @@ spec:
name: {{ template "dolphinscheduler.postgresql.fullname" . }}
key: postgresql-password
{{- else }}
name: {{ printf "%s-%s" .Release.Name "externaldb" }}
key: db-password
name: {{ include "dolphinscheduler.fullname" . }}-externaldb
key: database-password
{{- end }}
- name: DATABASE_DATABASE
{{- if .Values.postgresql.enabled }}
@ -172,7 +172,7 @@ spec:
valueFrom:
secretKeyRef:
key: fs-s3a-secret-key
name: {{ printf "%s-%s" .Release.Name "fs-s3a" }}
name: {{ include "dolphinscheduler.fullname" . }}-fs-s3a
{{- end }}
{{- if .Values.api.resources }}
resources:

2
docker/kubernetes/dolphinscheduler/templates/ingress.yaml

@ -26,7 +26,7 @@ kind: Ingress
metadata:
name: {{ include "dolphinscheduler.fullname" . }}
labels:
app.kubernetes.io/name: {{ include "dolphinscheduler.name" . }}
app.kubernetes.io/name: {{ include "dolphinscheduler.fullname" . }}
app.kubernetes.io/instance: {{ .Release.Name }}
app.kubernetes.io/managed-by: {{ .Release.Service }}
spec:

6
docker/kubernetes/dolphinscheduler/templates/secret-external-postgresql.yaml → docker/kubernetes/dolphinscheduler/templates/secret-external-database.yaml

@ -18,12 +18,12 @@
apiVersion: v1
kind: Secret
metadata:
name: {{ printf "%s-%s" .Release.Name "externaldb" }}
name: {{ include "dolphinscheduler.fullname" . }}-externaldb
labels:
app.kubernetes.io/name: {{ include "dolphinscheduler.fullname" . }}-postgresql
app.kubernetes.io/name: {{ include "dolphinscheduler.fullname" . }}-externaldb
app.kubernetes.io/instance: {{ .Release.Name }}
app.kubernetes.io/managed-by: {{ .Release.Service }}
type: Opaque
data:
db-password: {{ .Values.externalDatabase.password | b64enc | quote }}
database-password: {{ .Values.externalDatabase.password | b64enc | quote }}
{{- end }}

2
docker/kubernetes/dolphinscheduler/templates/secret-external-fs-s3a.yaml

@ -18,7 +18,7 @@
apiVersion: v1
kind: Secret
metadata:
name: {{ printf "%s-%s" .Release.Name "fs-s3a" }}
name: {{ include "dolphinscheduler.fullname" . }}-fs-s3a
labels:
app.kubernetes.io/name: {{ include "dolphinscheduler.fullname" . }}-fs-s3a
app.kubernetes.io/instance: {{ .Release.Name }}

10
docker/kubernetes/dolphinscheduler/templates/statefulset-dolphinscheduler-master.yaml

@ -54,13 +54,13 @@ spec:
{{- if .Values.master.tolerations }}
tolerations: {{- toYaml . | nindent 8 }}
{{- end }}
{{- if .Values.image.pullSecrets }}
{{- if .Values.image.pullSecret }}
imagePullSecrets:
- name: {{ include "dolphinscheduler.image.pullSecrets" . }}
- name: {{ .Values.image.pullSecret }}
{{- end }}
containers:
- name: {{ include "dolphinscheduler.fullname" . }}-master
image: {{ include "dolphinscheduler.image.repository" . | quote }}
image: {{ include "dolphinscheduler.image.fullname" . }}
imagePullPolicy: {{ .Values.image.pullPolicy }}
args:
- "master-server"
@ -157,8 +157,8 @@ spec:
name: {{ template "dolphinscheduler.postgresql.fullname" . }}
key: postgresql-password
{{- else }}
name: {{ printf "%s-%s" .Release.Name "externaldb" }}
key: db-password
name: {{ include "dolphinscheduler.fullname" . }}-externaldb
key: database-password
{{- end }}
- name: DATABASE_DATABASE
{{- if .Values.postgresql.enabled }}

12
docker/kubernetes/dolphinscheduler/templates/statefulset-dolphinscheduler-worker.yaml

@ -54,13 +54,13 @@ spec:
{{- if .Values.worker.tolerations }}
tolerations: {{- toYaml . | nindent 8 }}
{{- end }}
{{- if .Values.image.pullSecrets }}
{{- if .Values.image.pullSecret }}
imagePullSecrets:
- name: {{ include "dolphinscheduler.image.pullSecrets" . }}
- name: {{ .Values.image.pullSecret }}
{{- end }}
containers:
- name: {{ include "dolphinscheduler.fullname" . }}-worker
image: {{ include "dolphinscheduler.image.repository" . | quote }}
image: {{ include "dolphinscheduler.image.fullname" . }}
imagePullPolicy: {{ .Values.image.pullPolicy }}
args:
- "worker-server"
@ -156,8 +156,8 @@ spec:
name: {{ template "dolphinscheduler.postgresql.fullname" . }}
key: postgresql-password
{{- else }}
name: {{ printf "%s-%s" .Release.Name "externaldb" }}
key: db-password
name: {{ include "dolphinscheduler.fullname" . }}-externaldb
key: database-password
{{- end }}
- name: DATABASE_DATABASE
{{- if .Values.postgresql.enabled }}
@ -213,7 +213,7 @@ spec:
valueFrom:
secretKeyRef:
key: fs-s3a-secret-key
name: {{ printf "%s-%s" .Release.Name "fs-s3a" }}
name: {{ include "dolphinscheduler.fullname" . }}-fs-s3a
{{- end }}
{{- if .Values.worker.resources }}
resources:

2
docker/kubernetes/dolphinscheduler/templates/svc-dolphinscheduler-master-headless.yaml

@ -20,7 +20,7 @@ metadata:
name: {{ include "dolphinscheduler.fullname" . }}-master-headless
labels:
app.kubernetes.io/name: {{ include "dolphinscheduler.fullname" . }}-master-headless
app.kubernetes.io/instance: {{ .Release.Name }}-master-headless
app.kubernetes.io/instance: {{ .Release.Name }}
app.kubernetes.io/managed-by: {{ .Release.Service }}
spec:
clusterIP: "None"

2
docker/kubernetes/dolphinscheduler/templates/svc-dolphinscheduler-worker-headless.yaml

@ -20,7 +20,7 @@ metadata:
name: {{ include "dolphinscheduler.fullname" . }}-worker-headless
labels:
app.kubernetes.io/name: {{ include "dolphinscheduler.fullname" . }}-worker-headless
app.kubernetes.io/instance: {{ .Release.Name }}-worker-headless
app.kubernetes.io/instance: {{ .Release.Name }}
app.kubernetes.io/managed-by: {{ .Release.Service }}
spec:
clusterIP: "None"

7
docker/kubernetes/dolphinscheduler/values.yaml

@ -19,17 +19,13 @@
# This is a YAML-formatted file.
# Declare variables to be passed into your templates.
nameOverride: ""
fullnameOverride: ""
timezone: "Asia/Shanghai"
image:
registry: "docker.io"
repository: "apache/dolphinscheduler"
tag: "latest"
pullPolicy: "IfNotPresent"
pullSecrets: []
pullSecret: ""
## If not exists external database, by default, Dolphinscheduler's database will use it.
postgresql:
@ -52,7 +48,6 @@ externalDatabase:
username: "root"
password: "root"
database: "dolphinscheduler"
## multi params should join with & char
params: "characterEncoding=utf8"
## If not exists external zookeeper, by default, Dolphinscheduler's zookeeper will use it.

4
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -604,6 +604,10 @@ public final class Constants {
*/
public static final String EXECUTOR_MEMORY = "--executor-memory";
/**
* --name NAME
*/
public static final String SPARK_NAME = "--name";
/**
* --queue QUEUE

369
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/spark/SparkParameters.java

@ -14,12 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.task.spark;
import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
@ -29,203 +29,214 @@ import java.util.List;
*/
public class SparkParameters extends AbstractParameters {
/**
* major jar
*/
private ResourceInfo mainJar;
/**
* major class
*/
private String mainClass;
/**
* deploy mode
*/
private String deployMode;
/**
* arguments
*/
private String mainArgs;
/**
* driver-cores Number of cores used by the driver, only in cluster mode
*/
private int driverCores;
/**
* driver-memory Memory for driver
*/
private String driverMemory;
/**
* num-executors Number of executors to launch
*/
private int numExecutors;
/**
* executor-cores Number of cores per executor
*/
private int executorCores;
/**
* Memory per executor
*/
private String executorMemory;
/**
* resource list
*/
private List<ResourceInfo> resourceList = new ArrayList<>();
/**
* The YARN queue to submit to
*/
private String queue;
/**
* other arguments
*/
private String others;
/**
* program type
* 0 JAVA,1 SCALA,2 PYTHON
*/
private ProgramType programType;
/**
* spark version
*/
private String sparkVersion;
public ResourceInfo getMainJar() {
return mainJar;
}
public void setMainJar(ResourceInfo mainJar) {
this.mainJar = mainJar;
}
public String getMainClass() {
return mainClass;
}
public void setMainClass(String mainClass) {
this.mainClass = mainClass;
}
public String getDeployMode() {
return deployMode;
}
public void setDeployMode(String deployMode) {
this.deployMode = deployMode;
}
public String getMainArgs() {
return mainArgs;
}
public void setMainArgs(String mainArgs) {
this.mainArgs = mainArgs;
}
public int getDriverCores() {
return driverCores;
}
public void setDriverCores(int driverCores) {
this.driverCores = driverCores;
}
public String getDriverMemory() {
return driverMemory;
}
public void setDriverMemory(String driverMemory) {
this.driverMemory = driverMemory;
}
/**
* main jar
*/
private ResourceInfo mainJar;
/**
* main class
*/
private String mainClass;
/**
* deploy mode
*/
private String deployMode;
/**
* arguments
*/
private String mainArgs;
/**
* driver-cores Number of cores used by the driver, only in cluster mode
*/
private int driverCores;
/**
* driver-memory Memory for driver
*/
private String driverMemory;
/**
* num-executors Number of executors to launch
*/
private int numExecutors;
/**
* executor-cores Number of cores per executor
*/
private int executorCores;
/**
* Memory per executor
*/
private String executorMemory;
/**
* app name
*/
private String appName;
/**
* The YARN queue to submit to
*/
private String queue;
/**
* other arguments
*/
private String others;
/**
* program type
* 0 JAVA,1 SCALA,2 PYTHON
*/
private ProgramType programType;
/**
* spark version
*/
private String sparkVersion;
/**
* resource list
*/
private List<ResourceInfo> resourceList = new ArrayList<>();
public ResourceInfo getMainJar() {
return mainJar;
}
public void setMainJar(ResourceInfo mainJar) {
this.mainJar = mainJar;
}
public String getMainClass() {
return mainClass;
}
public void setMainClass(String mainClass) {
this.mainClass = mainClass;
}
public String getDeployMode() {
return deployMode;
}
public void setDeployMode(String deployMode) {
this.deployMode = deployMode;
}
public String getMainArgs() {
return mainArgs;
}
public void setMainArgs(String mainArgs) {
this.mainArgs = mainArgs;
}
public int getDriverCores() {
return driverCores;
}
public void setDriverCores(int driverCores) {
this.driverCores = driverCores;
}
public String getDriverMemory() {
return driverMemory;
}
public void setDriverMemory(String driverMemory) {
this.driverMemory = driverMemory;
}
public int getNumExecutors() {
return numExecutors;
}
public int getNumExecutors() {
return numExecutors;
}
public void setNumExecutors(int numExecutors) {
this.numExecutors = numExecutors;
}
public void setNumExecutors(int numExecutors) {
this.numExecutors = numExecutors;
}
public int getExecutorCores() {
return executorCores;
}
public void setExecutorCores(int executorCores) {
this.executorCores = executorCores;
}
public int getExecutorCores() {
return executorCores;
}
public String getExecutorMemory() {
return executorMemory;
}
public void setExecutorCores(int executorCores) {
this.executorCores = executorCores;
}
public void setExecutorMemory(String executorMemory) {
this.executorMemory = executorMemory;
}
public String getExecutorMemory() {
return executorMemory;
}
public void setExecutorMemory(String executorMemory) {
this.executorMemory = executorMemory;
}
public String getQueue() {
return queue;
}
public String getAppName() {
return appName;
}
public void setAppName(String appName) {
this.appName = appName;
}
public void setQueue(String queue) {
this.queue = queue;
}
public String getQueue() {
return queue;
}
public List<ResourceInfo> getResourceList() {
return resourceList;
}
public void setQueue(String queue) {
this.queue = queue;
}
public void setResourceList(List<ResourceInfo> resourceList) {
this.resourceList = resourceList;
}
public String getOthers() {
return others;
}
public String getOthers() {
return others;
}
public void setOthers(String others) {
this.others = others;
}
public void setOthers(String others) {
this.others = others;
}
public List<ResourceInfo> getResourceList() {
return resourceList;
}
public ProgramType getProgramType() {
return programType;
}
public void setResourceList(List<ResourceInfo> resourceList) {
this.resourceList = resourceList;
}
public void setProgramType(ProgramType programType) {
this.programType = programType;
}
public ProgramType getProgramType() {
return programType;
}
public String getSparkVersion() {
return sparkVersion;
}
public void setProgramType(ProgramType programType) {
this.programType = programType;
}
public void setSparkVersion(String sparkVersion) {
this.sparkVersion = sparkVersion;
}
public String getSparkVersion() {
return sparkVersion;
}
@Override
public boolean checkParameters() {
return mainJar != null && programType != null;
}
public void setSparkVersion(String sparkVersion) {
this.sparkVersion = sparkVersion;
}
@Override
public List<ResourceInfo> getResourceFilesList() {
if (mainJar != null && !resourceList.contains(mainJar)) {
resourceList.add(mainJar);
@Override
public boolean checkParameters() {
return mainJar != null && programType != null;
}
return resourceList;
}
@Override
public List<ResourceInfo> getResourceFilesList() {
if (mainJar != null && !resourceList.contains(mainJar)) {
resourceList.add(mainJar);
}
return resourceList;
}
}

2
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.xml

@ -91,7 +91,7 @@
where
id in (select datasource_id from t_ds_relation_datasource_user where user_id=#{userId}
union select id as datasource_id from t_ds_datasource where user_id=#{userId})
<if test="dataSourceIds != null and dataSourceIds != ''">
<if test="dataSourceIds != null and dataSourceIds.length > 0">
and id in
<foreach collection="dataSourceIds" item="i" open="(" close=")" separator=",">
#{i}

2
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml

@ -104,7 +104,7 @@
<if test="startTime != null ">
and instance.start_time > #{startTime} and instance.start_time <![CDATA[ <=]]> #{endTime}
</if>
<if test="states != null and states != ''">
<if test="states != null and states.length > 0">
and instance.state in
<foreach collection="states" index="index" item="i" open="(" separator="," close=")">
#{i}

4
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ResourceMapper.xml

@ -102,7 +102,7 @@
where type=0
and id in (select resources_id from t_ds_relation_resources_user where user_id=#{userId} and perm=7
union select id as resources_id from t_ds_resources where user_id=#{userId})
<if test="resNames != null and resNames != ''">
<if test="resNames != null and resNames.length > 0">
and full_name in
<foreach collection="resNames" item="i" open="(" close=")" separator=",">
#{i}
@ -115,7 +115,7 @@
from t_ds_resources
where id in (select resources_id from t_ds_relation_resources_user where user_id=#{userId} and perm=7
union select id as resources_id from t_ds_resources where user_id=#{userId})
<if test="resIds != null and resIds != ''">
<if test="resIds != null and resIds.length > 0">
and id in
<foreach collection="resIds" item="i" open="(" close=")" separator=",">
#{i}

8
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.xml

@ -40,7 +40,7 @@
</include>
from t_ds_udfs udf
where 1 = 1
<if test="ids != null and ids != ''">
<if test="ids != null and ids.length > 0">
and udf.id in
<foreach collection="ids" item="i" open="(" close=")" separator=",">
#{i}
@ -107,7 +107,7 @@
where
udf.id in (select udf_id from t_ds_relation_udfs_user where user_id=#{userId}
union select id as udf_id from t_ds_udfs where user_id=#{userId})
<if test="udfIds != null and udfIds != ''">
<if test="udfIds != null and udfIds.length > 0">
and udf.id in
<foreach collection="udfIds" item="i" open="(" close=")" separator=",">
#{i}
@ -121,7 +121,7 @@
</include>
from t_ds_udfs udf
where 1=1
<if test="resourceIds != null and resourceIds != ''">
<if test="resourceIds != null and resourceIds.length > 0">
and udf.resource_id in
<foreach collection="resourceIds" item="i" open="(" close=")" separator=",">
#{i}
@ -137,7 +137,7 @@
where
udf.id in (select udf_id from t_ds_relation_udfs_user where user_id=#{userId}
union select id as udf_id from t_ds_udfs where user_id=#{userId})
<if test="resourceIds != null and resourceIds != ''">
<if test="resourceIds != null and resourceIds.length > 0">
and udf.resource_id in
<foreach collection="resourceIds" item="i" open="(" close=")" separator=",">
#{i}

1
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java

@ -129,5 +129,4 @@ public class FlinkArgsUtils {
return args;
}
}

21
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java

@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
@ -306,7 +307,7 @@ public class ProcessUtils {
if (!applicationStatus.typeIsFinished()) {
String commandFile = String
.format("%s/%s.kill", executePath, appId);
String cmd = "yarn application -kill " + appId;
String cmd = getKerberosInitCommand() + "yarn application -kill " + appId;
execYarnKillCommand(logger, tenantCode, appId, commandFile, cmd);
}
} catch (Exception e) {
@ -316,6 +317,24 @@ public class ProcessUtils {
}
}
/**
* get kerberos init command
*/
public static String getKerberosInitCommand() {
logger.info("get kerberos init command");
StringBuilder kerberosCommandBuilder = new StringBuilder();
boolean hadoopKerberosState = PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE,false);
if (hadoopKerberosState) {
kerberosCommandBuilder.append("export KRB5_CONFIG=")
.append(PropertyUtils.getString(Constants.JAVA_SECURITY_KRB5_CONF_PATH))
.append("\n\n")
.append(String.format("kinit -k -t %s %s || true",PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_PATH),PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_USERNAME)))
.append("\n\n");
logger.info("kerberos init command: {}", kerberosCommandBuilder);
}
return kerberosCommandBuilder.toString();
}
/**
* build kill command for yarn application
*

43
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtils.java

@ -45,19 +45,14 @@ public class SparkArgsUtils {
*/
public static List<String> buildArgs(SparkParameters param) {
List<String> args = new ArrayList<>();
String deployMode = SPARK_CLUSTER;
args.add(Constants.MASTER);
if (StringUtils.isNotEmpty(param.getDeployMode())) {
deployMode = param.getDeployMode();
}
String deployMode = StringUtils.isNotEmpty(param.getDeployMode()) ? param.getDeployMode() : SPARK_CLUSTER;
if (!SPARK_LOCAL.equals(deployMode)) {
args.add(SPARK_ON_YARN);
args.add(Constants.DEPLOY_MODE);
}
args.add(param.getDeployMode());
args.add(deployMode);
ProgramType type = param.getProgramType();
String mainClass = param.getMainClass();
@ -67,7 +62,7 @@ public class SparkArgsUtils {
}
int driverCores = param.getDriverCores();
if (driverCores != 0) {
if (driverCores > 0) {
args.add(Constants.DRIVER_CORES);
args.add(String.format("%d", driverCores));
}
@ -79,13 +74,13 @@ public class SparkArgsUtils {
}
int numExecutors = param.getNumExecutors();
if (numExecutors != 0) {
if (numExecutors > 0) {
args.add(Constants.NUM_EXECUTORS);
args.add(String.format("%d", numExecutors));
}
int executorCores = param.getExecutorCores();
if (executorCores != 0) {
if (executorCores > 0) {
args.add(Constants.EXECUTOR_CORES);
args.add(String.format("%d", executorCores));
}
@ -96,22 +91,26 @@ public class SparkArgsUtils {
args.add(executorMemory);
}
// --files --conf --libjar ...
String others = param.getOthers();
String queue = param.getQueue();
if (StringUtils.isNotEmpty(others)) {
String appName = param.getAppName();
if (StringUtils.isNotEmpty(appName)) {
args.add(Constants.SPARK_NAME);
args.add(ArgsUtils.escape(appName));
}
if (!others.contains(Constants.SPARK_QUEUE) && StringUtils.isNotEmpty(queue)) {
args.add(Constants.SPARK_QUEUE);
args.add(queue);
String others = param.getOthers();
if (!SPARK_LOCAL.equals(deployMode)) {
if (StringUtils.isEmpty(others) || !others.contains(Constants.SPARK_QUEUE)) {
String queue = param.getQueue();
if (StringUtils.isNotEmpty(queue)) {
args.add(Constants.SPARK_QUEUE);
args.add(queue);
}
}
}
// --conf --files --jars --packages
if (StringUtils.isNotEmpty(others)) {
args.add(others);
} else if (StringUtils.isNotEmpty(queue)) {
args.add(Constants.SPARK_QUEUE);
args.add(queue);
}
ResourceInfo mainJar = param.getMainJar();

40
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java

@ -23,24 +23,30 @@ import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.mr.MapreduceParameters;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
/**
* mapreduce task
*/
public class MapReduceTask extends AbstractYarnTask {
/**
* map reduce command
* usage: hadoop jar <jar> [mainClass] [GENERIC_OPTIONS] args...
*/
private static final String MAP_REDUCE_COMMAND = Constants.HADOOP;
/**
* mapreduce parameters
@ -77,7 +83,6 @@ public class MapReduceTask extends AbstractYarnTask {
mapreduceParameters.setQueue(taskExecutionContext.getQueue());
setMainJarName();
// replace placeholder
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
taskExecutionContext.getDefinedParams(),
@ -85,10 +90,10 @@ public class MapReduceTask extends AbstractYarnTask {
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
if (paramsMap != null){
if (paramsMap != null) {
String args = ParameterUtils.convertParameterPlaceholders(mapreduceParameters.getMainArgs(), ParamUtils.convert(paramsMap));
mapreduceParameters.setMainArgs(args);
if(mapreduceParameters.getProgramType() != null && mapreduceParameters.getProgramType() == ProgramType.PYTHON){
if (mapreduceParameters.getProgramType() != null && mapreduceParameters.getProgramType() == ProgramType.PYTHON) {
String others = ParameterUtils.convertParameterPlaceholders(mapreduceParameters.getOthers(), ParamUtils.convert(paramsMap));
mapreduceParameters.setOthers(others);
}
@ -102,9 +107,13 @@ public class MapReduceTask extends AbstractYarnTask {
*/
@Override
protected String buildCommand() throws Exception {
List<String> parameterList = buildParameters(mapreduceParameters);
// hadoop jar <jar> [mainClass] [GENERIC_OPTIONS] args...
List<String> args = new ArrayList<>();
args.add(MAP_REDUCE_COMMAND);
args.addAll(buildParameters(mapreduceParameters));
String command = ParameterUtils.convertParameterPlaceholders(String.join(" ", parameterList),
String command = ParameterUtils.convertParameterPlaceholders(String.join(" ", args),
taskExecutionContext.getDefinedParams());
logger.info("mapreduce task command: {}", command);
@ -143,21 +152,18 @@ public class MapReduceTask extends AbstractYarnTask {
* @param mapreduceParameters mapreduce parameters
* @return parameter list
*/
private List<String> buildParameters(MapreduceParameters mapreduceParameters){
private List<String> buildParameters(MapreduceParameters mapreduceParameters) {
List<String> result = new ArrayList<>();
result.add(Constants.HADOOP);
// main jar
if(mapreduceParameters.getMainJar()!= null){
if (mapreduceParameters.getMainJar() != null) {
result.add(Constants.JAR);
result.add(mapreduceParameters.getMainJar().getRes());
}
// main class
if(!ProgramType.PYTHON.equals(mapreduceParameters.getProgramType())
&& StringUtils.isNotEmpty(mapreduceParameters.getMainClass())){
if (!ProgramType.PYTHON.equals(mapreduceParameters.getProgramType())
&& StringUtils.isNotEmpty(mapreduceParameters.getMainClass())) {
result.add(mapreduceParameters.getMainClass());
}
@ -170,13 +176,13 @@ public class MapReduceTask extends AbstractYarnTask {
}
result.add(mapreduceParameters.getOthers());
}else if (StringUtils.isNotEmpty(mapreduceParameters.getQueue())) {
} else if (StringUtils.isNotEmpty(mapreduceParameters.getQueue())) {
result.add(String.format("%s %s=%s", Constants.D, Constants.MR_QUEUE, mapreduceParameters.getQueue()));
}
// command args
if(StringUtils.isNotEmpty(mapreduceParameters.getMainArgs())){
if (StringUtils.isNotEmpty(mapreduceParameters.getMainArgs())) {
result.add(mapreduceParameters.getMainArgs());
}
return result;

3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java

@ -44,11 +44,13 @@ public class SparkTask extends AbstractYarnTask {
/**
* spark1 command
* usage: spark-submit [options] <app jar | python file> [app arguments]
*/
private static final String SPARK1_COMMAND = "${SPARK_HOME1}/bin/spark-submit";
/**
* spark2 command
* usage: spark-submit [options] <app jar | python file> [app arguments]
*/
private static final String SPARK2_COMMAND = "${SPARK_HOME2}/bin/spark-submit";
@ -93,6 +95,7 @@ public class SparkTask extends AbstractYarnTask {
*/
@Override
protected String buildCommand() {
// spark-submit [options] <app jar | python file> [app arguments]
List<String> args = new ArrayList<>();
//spark version

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java

@ -77,7 +77,7 @@ public class ZKMasterClient extends AbstractZKClient {
public void start(MasterServer masterServer) {
InterProcessMutex mutex = null;
try {
// create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/failover/master
// create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/failover/startup-masters
String znodeLock = getMasterStartUpLockPath();
mutex = new InterProcessMutex(getZkClient(), znodeLock);
mutex.acquire();

24
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java

@ -98,35 +98,35 @@ public class FlinkArgsUtilsTest {
assertEquals("yarn-cluster", result.get(1));
assertEquals("-ys", result.get(2));
assertSame(Integer.valueOf(result.get(3)),slot);
assertSame(slot, Integer.valueOf(result.get(3)));
assertEquals("-ynm",result.get(4));
assertEquals(result.get(5),appName);
assertEquals("-ynm", result.get(4));
assertEquals(appName, result.get(5));
assertEquals("-yn", result.get(6));
assertSame(Integer.valueOf(result.get(7)),taskManager);
assertSame(taskManager, Integer.valueOf(result.get(7)));
assertEquals("-yjm", result.get(8));
assertEquals(result.get(9),jobManagerMemory);
assertEquals(jobManagerMemory, result.get(9));
assertEquals("-ytm", result.get(10));
assertEquals(result.get(11),taskManagerMemory);
assertEquals(taskManagerMemory, result.get(11));
assertEquals("-yqu", result.get(12));
assertEquals(result.get(13),queue);
assertEquals(queue, result.get(13));
assertEquals("-p", result.get(14));
assertSame(Integer.valueOf(result.get(15)),parallelism);
assertSame(parallelism, Integer.valueOf(result.get(15)));
assertEquals("-sae", result.get(16));
assertEquals(result.get(17),others);
assertEquals(others, result.get(17));
assertEquals("-c", result.get(18));
assertEquals(result.get(19),mainClass);
assertEquals(mainClass, result.get(19));
assertEquals(result.get(20),mainJar.getRes());
assertEquals(result.get(21),mainArgs);
assertEquals(mainJar.getRes(), result.get(20));
assertEquals(mainArgs, result.get(21));
//Others param without -yqu
FlinkParameters param1 = new FlinkParameters();

15
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ProcessUtilsTest.java

@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import java.util.ArrayList;
@ -40,7 +41,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@RunWith(PowerMockRunner.class)
@PrepareForTest({System.class, OSUtils.class, HadoopUtils.class})
@PrepareForTest({System.class, OSUtils.class, HadoopUtils.class, PropertyUtils.class})
public class ProcessUtilsTest {
private static final Logger logger = LoggerFactory.getLogger(ProcessUtils.class);
@ -110,6 +111,18 @@ public class ProcessUtilsTest {
Assert.assertEquals(1, taskExecutionContext.getProcessId());
}
@Test
public void testGetKerberosInitCommand() {
PowerMockito.mockStatic(PropertyUtils.class);
PowerMockito.when(PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE,false)).thenReturn(true);
PowerMockito.when(PropertyUtils.getString(Constants.JAVA_SECURITY_KRB5_CONF_PATH)).thenReturn("/etc/krb5.conf");
PowerMockito.when(PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_PATH)).thenReturn("/etc/krb5.keytab");
PowerMockito.when(PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_USERNAME)).thenReturn("test@DS.COM");
Assert.assertNotEquals("", ProcessUtils.getKerberosInitCommand());
PowerMockito.when(PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE,false)).thenReturn(false);
Assert.assertEquals("", ProcessUtils.getKerberosInitCommand());
}
@Test
public void testCancelApplication() {
List<String> appIds = new ArrayList<>();

65
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtilsTest.java

@ -17,19 +17,20 @@
package org.apache.dolphinscheduler.server.utils;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.spark.SparkParameters;
import java.util.List;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
/**
* Test SparkArgsUtils
*/
@ -48,12 +49,11 @@ public class SparkArgsUtilsTest {
public int executorCores = 6;
public String sparkVersion = "SPARK1";
public int numExecutors = 4;
public String appName = "spark test";
public String queue = "queue1";
@Before
public void setUp() throws Exception {
public void setUp() {
ResourceInfo main = new ResourceInfo();
main.setRes("testspark-1.0.0-SNAPSHOT.jar");
mainJar = main;
@ -78,6 +78,7 @@ public class SparkArgsUtilsTest {
param.setProgramType(programType);
param.setSparkVersion(sparkVersion);
param.setMainArgs(mainArgs);
param.setAppName(appName);
param.setQueue(queue);
//Invoke buildArgs
@ -87,42 +88,46 @@ public class SparkArgsUtilsTest {
}
//Expected values and order
assertEquals(result.size(),20);
assertEquals(22, result.size());
assertEquals("--master", result.get(0));
assertEquals("yarn", result.get(1));
assertEquals("--deploy-mode", result.get(2));
assertEquals(mode, result.get(3));
assertEquals(result.get(0),"--master");
assertEquals(result.get(1),"yarn");
assertEquals("--class", result.get(4));
assertEquals(mainClass, result.get(5));
assertEquals(result.get(2),"--deploy-mode");
assertEquals(result.get(3),mode);
assertEquals("--driver-cores", result.get(6));
assertSame(driverCores, Integer.valueOf(result.get(7)));
assertEquals(result.get(4),"--class");
assertEquals(result.get(5),mainClass);
assertEquals("--driver-memory", result.get(8));
assertEquals(driverMemory, result.get(9));
assertEquals(result.get(6),"--driver-cores");
assertSame(Integer.valueOf(result.get(7)),driverCores);
assertEquals("--num-executors", result.get(10));
assertSame(numExecutors, Integer.valueOf(result.get(11)));
assertEquals(result.get(8),"--driver-memory");
assertEquals(result.get(9),driverMemory);
assertEquals("--executor-cores", result.get(12));
assertSame(executorCores, Integer.valueOf(result.get(13)));
assertEquals(result.get(10),"--num-executors");
assertSame(Integer.valueOf(result.get(11)),numExecutors);
assertEquals("--executor-memory", result.get(14));
assertEquals(executorMemory, result.get(15));
assertEquals(result.get(12),"--executor-cores");
assertSame(Integer.valueOf(result.get(13)),executorCores);
assertEquals("--name", result.get(16));
assertEquals(ArgsUtils.escape(appName), result.get(17));
assertEquals(result.get(14),"--executor-memory");
assertEquals(result.get(15),executorMemory);
assertEquals("--queue", result.get(18));
assertEquals(queue, result.get(19));
assertEquals(result.get(16),"--queue");
assertEquals(result.get(17),queue);
assertEquals(result.get(18),mainJar.getRes());
assertEquals(result.get(19),mainArgs);
assertEquals(mainJar.getRes(), result.get(20));
assertEquals(mainArgs, result.get(21));
//Others param without --queue
SparkParameters param1 = new SparkParameters();
param1.setOthers("--files xxx/hive-site.xml");
param1.setQueue(queue);
result = SparkArgsUtils.buildArgs(param1);
assertEquals(result.size(),7);
assertEquals(7, result.size());
}
}

17
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/spark.vue

@ -80,6 +80,18 @@
</el-radio-group>
</div>
</m-list-box>
<m-list-box>
<div slot="text">{{$t('App Name')}}</div>
<div slot="content">
<el-input
:disabled="isDetails"
type="input"
size="small"
v-model="appName"
:placeholder="$t('Please enter app name(optional)')">
</el-input>
</div>
</m-list-box>
<m-list-4-box>
<div slot="text">{{$t('Driver Cores')}}</div>
<div slot="content">
@ -223,6 +235,8 @@
executorMemory: '2G',
// Executor cores
executorCores: 2,
// Spark app name
appName: '',
// Main arguments
mainArgs: '',
// Option parameters
@ -448,6 +462,7 @@
numExecutors: this.numExecutors,
executorMemory: this.executorMemory,
executorCores: this.executorCores,
appName: this.appName,
mainArgs: this.mainArgs,
others: this.others,
programType: this.programType,
@ -512,6 +527,7 @@
numExecutors: this.numExecutors,
executorMemory: this.executorMemory,
executorCores: this.executorCores,
appName: this.appName,
mainArgs: this.mainArgs,
others: this.others,
programType: this.programType,
@ -544,6 +560,7 @@
this.numExecutors = o.params.numExecutors || 2
this.executorMemory = o.params.executorMemory || '2G'
this.executorCores = o.params.executorCores || 2
this.appName = o.params.appName || ''
this.mainArgs = o.params.mainArgs || ''
this.others = o.params.others
this.programType = o.params.programType || 'SCALA'

Loading…
Cancel
Save