Browse Source

Merge remote-tracking branch 'upstream/dev' into spilit

pull/3/MERGE
lenboo 4 years ago
parent
commit
5bda7c344c
  1. 4
      .github/workflows/ci_e2e.yml
  2. 2
      .github/workflows/ci_ut.yml
  3. 1
      .licenserc.yaml
  4. 2
      docker/README.md
  5. 3
      docker/build/Dockerfile
  6. 2
      docker/build/README.md
  7. 2
      docker/build/README_zh_CN.md
  8. 9
      docker/build/conf/dolphinscheduler/logback/logback-alert.xml
  9. 9
      docker/build/conf/dolphinscheduler/logback/logback-api.xml
  10. 9
      docker/build/conf/dolphinscheduler/logback/logback-master.xml
  11. 9
      docker/build/conf/dolphinscheduler/logback/logback-worker.xml
  12. 92
      docker/build/conf/dolphinscheduler/supervisor/supervisor.ini
  13. 23
      docker/build/startup-init-conf.sh
  14. 79
      docker/build/startup.sh
  15. 5
      docker/kubernetes/dolphinscheduler/README.md
  16. 11
      docker/kubernetes/dolphinscheduler/templates/NOTES.txt
  17. 14
      docker/kubernetes/dolphinscheduler/templates/configmap-dolphinscheduler-alert.yaml
  18. 73
      docker/kubernetes/dolphinscheduler/templates/deployment-dolphinscheduler-alert.yaml
  19. 4
      docker/kubernetes/dolphinscheduler/templates/deployment-dolphinscheduler-api.yaml
  20. 22
      docker/kubernetes/dolphinscheduler/templates/ingress.yaml
  21. 79
      docker/kubernetes/dolphinscheduler/templates/statefulset-dolphinscheduler-worker.yaml
  22. 35
      docker/kubernetes/dolphinscheduler/templates/svc-dolphinscheduler-alert.yaml
  23. 4
      docker/kubernetes/dolphinscheduler/templates/svc-dolphinscheduler-api.yaml
  24. 40
      docker/kubernetes/dolphinscheduler/values.yaml
  25. 4
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
  26. 21
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
  27. 4
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
  28. 7
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  29. 2
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java
  30. 386
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java
  31. 13
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/NetUtils.java
  32. 46
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/CommonTest.java
  33. 30
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/NetUtilsTest.java
  34. 36
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
  35. 19
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java
  36. 34
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
  37. 8
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
  38. 32
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java
  39. 13
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java
  40. 15
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/RegisterOperatorTest.java
  41. 26
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue
  42. 2
      dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/index/index.vue
  43. 8
      dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue
  44. 12
      dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/taskInstance/_source/list.vue
  45. 1
      dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/subdirectory/index.vue
  46. 2
      dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js
  47. 2
      dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
  48. 47
      script/dolphinscheduler-daemon.sh

4
.github/workflows/ci_e2e.yml

@ -62,8 +62,8 @@ jobs:
unzip chromedriver_linux64.zip
sudo mv -f chromedriver /usr/local/share/chromedriver
sudo ln -s /usr/local/share/chromedriver /usr/local/bin/chromedriver
- name: Run e2e Test
run: cd ./e2e && mvn -B clean test
# - name: Run e2e Test
# run: cd ./e2e && mvn -B clean test
- name: Collect logs
if: failure()
uses: actions/upload-artifact@v2

2
.github/workflows/ci_ut.yml

@ -48,7 +48,7 @@ jobs:
- name: Bootstrap database
run: |
sed -i "s/: root/: test/g" $(pwd)/docker/docker-swarm/docker-compose.yml
docker-compose -f $(pwd)/docker/docker-swarm/docker-compose.yml create --force-recreate dolphinscheduler-zookeeper dolphinscheduler-postgresql
docker-compose -f $(pwd)/docker/docker-swarm/docker-compose.yml up --no-start --force-recreate dolphinscheduler-zookeeper dolphinscheduler-postgresql
sudo cp $(pwd)/sql/dolphinscheduler_postgre.sql $(docker volume inspect docker-swarm_dolphinscheduler-postgresql-initdb | grep "Mountpoint" | awk -F "\"" '{print $4}')
docker-compose -f $(pwd)/docker/docker-swarm/docker-compose.yml up -d dolphinscheduler-zookeeper dolphinscheduler-postgresql
- name: Set up JDK 1.8

1
.licenserc.yaml

@ -34,6 +34,7 @@ header:
- '**/*.md'
- '**/*.json'
- '**/*.iml'
- '**/*.ini'
- '**/.babelrc'
- '**/.eslintignore'
- '**/.gitignore'

2
docker/README.md

@ -1 +1 @@
# Dolphin Scheduler for Docker
# DolphinScheduler for Docker

3
docker/build/Dockerfile

@ -28,7 +28,7 @@ ENV DOCKER true
# RUN sed -i "s/dl-cdn.alpinelinux.org/mirrors.aliyun.com/g" /etc/apk/repositories
# RUN sed -i 's/dl-cdn.alpinelinux.org/mirror.tuna.tsinghua.edu.cn/g' /etc/apk/repositories
RUN apk update && \
apk add --no-cache tzdata dos2unix bash python2 python3 procps sudo shadow tini postgresql-client && \
apk add --no-cache tzdata dos2unix bash python2 python3 supervisor procps sudo shadow tini postgresql-client && \
cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && \
apk del tzdata && \
rm -rf /var/cache/apk/*
@ -44,6 +44,7 @@ COPY ./startup-init-conf.sh /root/startup-init-conf.sh
COPY ./startup.sh /root/startup.sh
COPY ./conf/dolphinscheduler/*.tpl /opt/dolphinscheduler/conf/
COPY ./conf/dolphinscheduler/logback/* /opt/dolphinscheduler/conf/
COPY ./conf/dolphinscheduler/supervisor/supervisor.ini /etc/supervisor.d/
COPY ./conf/dolphinscheduler/env/dolphinscheduler_env.sh.tpl /opt/dolphinscheduler/conf/env/
RUN dos2unix /root/checkpoint.sh && \
dos2unix /root/startup-init-conf.sh && \

2
docker/build/README.md

@ -332,7 +332,7 @@ server.port=${API_SERVER_PORT}
`/root/start-init-conf.sh` will dynamically generate config file:
```sh
echo "generate app config"
echo "generate dolphinscheduler config"
ls ${DOLPHINSCHEDULER_HOME}/conf/ | grep ".tpl" | while read line; do
eval "cat << EOF
$(cat ${DOLPHINSCHEDULER_HOME}/conf/${line})

2
docker/build/README_zh_CN.md

@ -332,7 +332,7 @@ server.port=${API_SERVER_PORT}
`/root/start-init-conf.sh`将根据模板文件动态的生成配置文件:
```sh
echo "generate app config"
echo "generate dolphinscheduler config"
ls ${DOLPHINSCHEDULER_HOME}/conf/ | grep ".tpl" | while read line; do
eval "cat << EOF
$(cat ${DOLPHINSCHEDULER_HOME}/conf/${line})

9
docker/build/conf/dolphinscheduler/logback/logback-alert.xml

@ -20,6 +20,14 @@
<configuration scan="true" scanPeriod="120 seconds"> <!--debug="true" -->
<property name="log.base" value="logs"/>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n
</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<appender name="ALERTLOGFILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log.base}/dolphinscheduler-alert.log</file>
@ -37,6 +45,7 @@
</appender>
<root level="INFO">
<appender-ref ref="STDOUT"/>
<appender-ref ref="ALERTLOGFILE"/>
</root>

9
docker/build/conf/dolphinscheduler/logback/logback-api.xml

@ -20,6 +20,14 @@
<configuration scan="true" scanPeriod="120 seconds"> <!--debug="true" -->
<property name="log.base" value="logs"/>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n
</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<!-- api server logback config start -->
<appender name="APILOGFILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
@ -47,6 +55,7 @@
<root level="INFO">
<appender-ref ref="STDOUT"/>
<appender-ref ref="APILOGFILE"/>
</root>

9
docker/build/conf/dolphinscheduler/logback/logback-master.xml

@ -20,6 +20,14 @@
<configuration scan="true" scanPeriod="120 seconds"> <!--debug="true" -->
<property name="log.base" value="logs"/>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n
</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<conversionRule conversionWord="messsage"
converterClass="org.apache.dolphinscheduler.server.log.SensitiveDataConverter"/>
@ -66,6 +74,7 @@
<!-- master server logback config end -->
<root level="INFO">
<appender-ref ref="STDOUT"/>
<appender-ref ref="TASKLOGFILE"/>
<appender-ref ref="MASTERLOGFILE"/>
</root>

9
docker/build/conf/dolphinscheduler/logback/logback-worker.xml

@ -20,6 +20,14 @@
<configuration scan="true" scanPeriod="120 seconds"> <!--debug="true" -->
<property name="log.base" value="logs"/>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n
</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<!-- worker server logback config start -->
<conversionRule conversionWord="messsage"
@ -66,6 +74,7 @@
<!-- worker server logback config end -->
<root level="INFO">
<appender-ref ref="STDOUT"/>
<appender-ref ref="TASKLOGFILE"/>
<appender-ref ref="WORKERLOGFILE"/>
</root>

92
docker/build/conf/dolphinscheduler/supervisor/supervisor.ini

@ -0,0 +1,92 @@
; Licensed to the Apache Software Foundation (ASF) under one
; or more contributor license agreements. See the NOTICE file
; distributed with this work for additional information
; regarding copyright ownership. The ASF licenses this file
; to you under the Apache License, Version 2.0 (the
; "License"); you may not use this file except in compliance
; with the License. You may obtain a copy of the License at
;
; http://www.apache.org/licenses/LICENSE-2.0
;
; Unless required by applicable law or agreed to in writing, software
; distributed under the License is distributed on an "AS IS" BASIS,
; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
; See the License for the specific language governing permissions and
; limitations under the License.
; program config file
[program:master]
command=%(ENV_DOLPHINSCHEDULER_BIN)s/dolphinscheduler-daemon.sh start master-server
directory=%(ENV_DOLPHINSCHEDULER_HOME)s
priority=999
autostart=%(ENV_MASTER_START_ENABLED)s
autorestart=true
startsecs=10
stopwaitsecs=3
exitcodes=0
stopasgroup=true
killasgroup=true
redirect_stderr=true
stdout_logfile=/dev/fd/1
stdout_logfile_maxbytes=0
[program:worker]
command=%(ENV_DOLPHINSCHEDULER_BIN)s/dolphinscheduler-daemon.sh start worker-server
directory=%(ENV_DOLPHINSCHEDULER_HOME)s
priority=999
autostart=%(ENV_WORKER_START_ENABLED)s
autorestart=true
startsecs=10
stopwaitsecs=3
exitcodes=0
stopasgroup=true
killasgroup=true
redirect_stderr=true
stdout_logfile=/dev/fd/1
stdout_logfile_maxbytes=0
[program:api]
command=%(ENV_DOLPHINSCHEDULER_BIN)s/dolphinscheduler-daemon.sh start api-server
directory=%(ENV_DOLPHINSCHEDULER_HOME)s
priority=999
autostart=%(ENV_API_START_ENABLED)s
autorestart=true
startsecs=10
stopwaitsecs=3
exitcodes=0
stopasgroup=true
killasgroup=true
redirect_stderr=true
stdout_logfile=/dev/fd/1
stdout_logfile_maxbytes=0
[program:alert]
command=%(ENV_DOLPHINSCHEDULER_BIN)s/dolphinscheduler-daemon.sh start alert-server
directory=%(ENV_DOLPHINSCHEDULER_HOME)s
priority=999
autostart=%(ENV_ALERT_START_ENABLED)s
autorestart=true
startsecs=5
stopwaitsecs=3
exitcodes=0
stopasgroup=true
killasgroup=true
redirect_stderr=true
stdout_logfile=/dev/fd/1
stdout_logfile_maxbytes=0
[program:logger]
command=%(ENV_DOLPHINSCHEDULER_BIN)s/dolphinscheduler-daemon.sh start logger-server
directory=%(ENV_DOLPHINSCHEDULER_HOME)s
priority=999
autostart=%(ENV_LOGGER_START_ENABLED)s
autorestart=true
startsecs=5
stopwaitsecs=3
exitcodes=0
stopasgroup=true
killasgroup=true
redirect_stderr=true
stdout_logfile=/dev/fd/1
stdout_logfile_maxbytes=0

23
docker/build/startup-init-conf.sh

@ -25,13 +25,13 @@ echo "init env variables"
#============================================================================
# Database Source
#============================================================================
export DATABASE_TYPE=${DATABASE_TYPE:-"postgresql"}
export DATABASE_DRIVER=${DATABASE_DRIVER:-"org.postgresql.Driver"}
export DATABASE_HOST=${DATABASE_HOST:-"127.0.0.1"}
export DATABASE_PORT=${DATABASE_PORT:-"5432"}
export DATABASE_USERNAME=${DATABASE_USERNAME:-"root"}
export DATABASE_PASSWORD=${DATABASE_PASSWORD:-"root"}
export DATABASE_DATABASE=${DATABASE_DATABASE:-"dolphinscheduler"}
export DATABASE_TYPE=${DATABASE_TYPE:-"postgresql"}
export DATABASE_DRIVER=${DATABASE_DRIVER:-"org.postgresql.Driver"}
export DATABASE_PARAMS=${DATABASE_PARAMS:-"characterEncoding=utf8"}
#============================================================================
@ -92,11 +92,20 @@ export ALERT_LISTEN_HOST=${ALERT_LISTEN_HOST:-"127.0.0.1"}
#============================================================================
export ALERT_PLUGIN_DIR=${ALERT_PLUGIN_DIR:-"lib/plugin/alert"}
echo "generate app config"
find ${DOLPHINSCHEDULER_HOME}/conf/ -name "*.tpl" | while read file; do
echo "generate dolphinscheduler config"
ls ${DOLPHINSCHEDULER_HOME}/conf/ | grep ".tpl" | while read line; do
eval "cat << EOF
$(cat ${file})
$(cat ${DOLPHINSCHEDULER_HOME}/conf/${line})
EOF
" > ${file%.*}
" > ${DOLPHINSCHEDULER_HOME}/conf/${line%.*}
done
find ${DOLPHINSCHEDULER_HOME}/conf/ -name "*.sh" -exec chmod +x {} \;
# generate dolphinscheduler env only in docker
DOLPHINSCHEDULER_ENV_PATH=${DOLPHINSCHEDULER_HOME}/conf/env/dolphinscheduler_env.sh
if [ -z "${KUBERNETES_SERVICE_HOST}" ] && [ -r "${DOLPHINSCHEDULER_ENV_PATH}.tpl" ]; then
eval "cat << EOF
$(cat ${DOLPHINSCHEDULER_ENV_PATH}.tpl)
EOF
" > ${DOLPHINSCHEDULER_ENV_PATH}
chmod +x ${DOLPHINSCHEDULER_ENV_PATH}
fi

79
docker/build/startup.sh

@ -18,9 +18,12 @@
set -e
DOLPHINSCHEDULER_BIN=${DOLPHINSCHEDULER_HOME}/bin
DOLPHINSCHEDULER_SCRIPT=${DOLPHINSCHEDULER_HOME}/script
DOLPHINSCHEDULER_LOGS=${DOLPHINSCHEDULER_HOME}/logs
export DOLPHINSCHEDULER_BIN=${DOLPHINSCHEDULER_HOME}/bin
export MASTER_START_ENABLED=false
export WORKER_START_ENABLED=false
export API_START_ENABLED=false
export ALERT_START_ENABLED=false
export LOGGER_START_ENABLED=false
# wait database
waitDatabase() {
@ -54,7 +57,7 @@ waitDatabase() {
# init database
initDatabase() {
echo "import sql data"
${DOLPHINSCHEDULER_SCRIPT}/create-dolphinscheduler.sh
${DOLPHINSCHEDULER_HOME}/script/create-dolphinscheduler.sh
}
# check ds version
@ -102,41 +105,6 @@ waitZK() {
done
}
# start master-server
initMasterServer() {
echo "start master-server"
${DOLPHINSCHEDULER_BIN}/dolphinscheduler-daemon.sh stop master-server
${DOLPHINSCHEDULER_BIN}/dolphinscheduler-daemon.sh start master-server
}
# start worker-server
initWorkerServer() {
echo "start worker-server"
${DOLPHINSCHEDULER_BIN}/dolphinscheduler-daemon.sh stop worker-server
${DOLPHINSCHEDULER_BIN}/dolphinscheduler-daemon.sh start worker-server
}
# start api-server
initApiServer() {
echo "start api-server"
${DOLPHINSCHEDULER_BIN}/dolphinscheduler-daemon.sh stop api-server
${DOLPHINSCHEDULER_BIN}/dolphinscheduler-daemon.sh start api-server
}
# start logger-server
initLoggerServer() {
echo "start logger-server"
${DOLPHINSCHEDULER_BIN}/dolphinscheduler-daemon.sh stop logger-server
${DOLPHINSCHEDULER_BIN}/dolphinscheduler-daemon.sh start logger-server
}
# start alert-server
initAlertServer() {
echo "start alert-server"
${DOLPHINSCHEDULER_BIN}/dolphinscheduler-daemon.sh stop alert-server
${DOLPHINSCHEDULER_BIN}/dolphinscheduler-daemon.sh start alert-server
}
# print usage
printUsage() {
echo -e "Dolphin Scheduler is a distributed and easy-to-expand visual DAG workflow scheduling system,"
@ -157,38 +125,33 @@ case "$1" in
waitZK
waitDatabase
initDatabase
initMasterServer
initWorkerServer
initApiServer
initAlertServer
initLoggerServer
LOGFILE=${DOLPHINSCHEDULER_LOGS}/dolphinscheduler-api.log
export MASTER_START_ENABLED=true
export WORKER_START_ENABLED=true
export API_START_ENABLED=true
export ALERT_START_ENABLED=true
export LOGGER_START_ENABLED=true
;;
(master-server)
waitZK
waitDatabase
initMasterServer
LOGFILE=${DOLPHINSCHEDULER_LOGS}/dolphinscheduler-master.log
export MASTER_START_ENABLED=true
;;
(worker-server)
waitZK
waitDatabase
initWorkerServer
initLoggerServer
LOGFILE=${DOLPHINSCHEDULER_LOGS}/dolphinscheduler-worker.log
export WORKER_START_ENABLED=true
export LOGGER_START_ENABLED=true
;;
(api-server)
waitZK
waitDatabase
initDatabase
initApiServer
LOGFILE=${DOLPHINSCHEDULER_LOGS}/dolphinscheduler-api.log
export API_START_ENABLED=true
;;
(alert-server)
waitDatabase
checkInitDatabase
initAlertServer
LOGFILE=${DOLPHINSCHEDULER_LOGS}/dolphinscheduler-alert.log
export ALERT_START_ENABLED=true
;;
(help)
printUsage
@ -200,8 +163,8 @@ case "$1" in
;;
esac
# init directories and log files
mkdir -p ${DOLPHINSCHEDULER_LOGS} && cat /dev/null >> ${LOGFILE}
# init directories
mkdir -p ${DOLPHINSCHEDULER_HOME}/logs
echo "tail begin"
exec bash -c "tail -n 1 -f ${LOGFILE}"
# start supervisord
supervisord -n -u root

5
docker/kubernetes/dolphinscheduler/README.md

@ -82,7 +82,7 @@ The Configuration file is `values.yaml`, and the following tables lists the conf
| `fullnameOverride` | String to fully override common.names.fullname | `nil` |
| `timezone` | World time and date for cities in all time zones | `Asia/Shanghai` |
| `image.registry` | Docker image registry for the DolphinScheduler | `docker.io` |
| `image.repository` | Docker image repository for the DolphinScheduler | `dolphinscheduler` |
| `image.repository` | Docker image repository for the DolphinScheduler | `apache/dolphinscheduler` |
| `image.tag` | Docker image version for the DolphinScheduler | `latest` |
| `image.pullPolicy` | Image pull policy. One of Always, Never, IfNotPresent | `IfNotPresent` |
| `image.pullSecrets` | Image pull secrets. An optional list of references to secrets in the same namespace to use for pulling any of the images | `[]` |
@ -253,9 +253,8 @@ The Configuration file is `values.yaml`, and the following tables lists the conf
| | | |
| `ingress.enabled` | Enable ingress | `false` |
| `ingress.host` | Ingress host | `dolphinscheduler.org` |
| `ingress.path` | Ingress path | `/` |
| `ingress.path` | Ingress path | `/dolphinscheduler` |
| `ingress.tls.enabled` | Enable ingress tls | `false` |
| `ingress.tls.hosts` | Ingress tls hosts | `dolphinscheduler.org` |
| `ingress.tls.secretName` | Ingress tls secret name | `dolphinscheduler-tls` |
## FAQ

11
docker/kubernetes/dolphinscheduler/templates/NOTES.txt

@ -15,17 +15,18 @@
# limitations under the License.
#
** Please be patient while the chart Dolphinscheduler {{ .Chart.AppVersion }} is being deployed **
** Please be patient while the chart DolphinScheduler {{ .Chart.AppVersion }} is being deployed **
Get the Dolphinscheduler URL by running:
Access DolphinScheduler by:
{{- if .Values.ingress.enabled }}
export HOSTNAME=$(kubectl get ingress --namespace {{ .Release.Namespace }} {{ template "dolphinscheduler.fullname" . }} -o jsonpath='{.spec.rules[0].host}')
echo "Dolphinscheduler URL: http://$HOSTNAME/"
DolphinScheduler URL: http://{{ .Values.ingress.host }}/dolphinscheduler
{{- else }}
kubectl port-forward --namespace {{ .Release.Namespace }} svc/{{ template "dolphinscheduler.fullname" . }}-api 12345:12345
{{- end }}
DolphinScheduler URL: http://127.0.0.1:12345/dolphinscheduler
{{- end }}

14
docker/kubernetes/dolphinscheduler/templates/configmap-dolphinscheduler-alert.yaml

@ -26,18 +26,4 @@ metadata:
data:
DOLPHINSCHEDULER_OPTS: {{ .Values.alert.configmap.DOLPHINSCHEDULER_OPTS | quote }}
ALERT_PLUGIN_DIR: {{ .Values.alert.configmap.ALERT_PLUGIN_DIR | quote }}
XLS_FILE_PATH: {{ .Values.alert.configmap.XLS_FILE_PATH | quote }}
MAIL_SERVER_HOST: {{ .Values.alert.configmap.MAIL_SERVER_HOST | quote }}
MAIL_SERVER_PORT: {{ .Values.alert.configmap.MAIL_SERVER_PORT | quote }}
MAIL_SENDER: {{ .Values.alert.configmap.MAIL_SENDER | quote }}
MAIL_USER: {{ .Values.alert.configmap.MAIL_USER | quote }}
MAIL_PASSWD: {{ .Values.alert.configmap.MAIL_PASSWD | quote }}
MAIL_SMTP_STARTTLS_ENABLE: {{ .Values.alert.configmap.MAIL_SMTP_STARTTLS_ENABLE | quote }}
MAIL_SMTP_SSL_ENABLE: {{ .Values.alert.configmap.MAIL_SMTP_SSL_ENABLE | quote }}
MAIL_SMTP_SSL_TRUST: {{ .Values.alert.configmap.MAIL_SMTP_SSL_TRUST | quote }}
ENTERPRISE_WECHAT_ENABLE: {{ .Values.alert.configmap.ENTERPRISE_WECHAT_ENABLE | quote }}
ENTERPRISE_WECHAT_CORP_ID: {{ .Values.alert.configmap.ENTERPRISE_WECHAT_CORP_ID | quote }}
ENTERPRISE_WECHAT_SECRET: {{ .Values.alert.configmap.ENTERPRISE_WECHAT_SECRET | quote }}
ENTERPRISE_WECHAT_AGENT_ID: {{ .Values.alert.configmap.ENTERPRISE_WECHAT_AGENT_ID | quote }}
ENTERPRISE_WECHAT_USERS: {{ .Values.alert.configmap.ENTERPRISE_WECHAT_USERS | quote }}
{{- end }}

73
docker/kubernetes/dolphinscheduler/templates/deployment-dolphinscheduler-alert.yaml

@ -67,6 +67,9 @@ spec:
imagePullPolicy: {{ .Values.image.pullPolicy }}
args:
- "alert-server"
ports:
- containerPort: 50052
name: "alert-port"
env:
- name: TZ
value: {{ .Values.timezone }}
@ -80,76 +83,6 @@ spec:
configMapKeyRef:
key: ALERT_PLUGIN_DIR
name: {{ include "dolphinscheduler.fullname" . }}-alert
- name: XLS_FILE_PATH
valueFrom:
configMapKeyRef:
key: XLS_FILE_PATH
name: {{ include "dolphinscheduler.fullname" . }}-alert
- name: MAIL_SERVER_HOST
valueFrom:
configMapKeyRef:
key: MAIL_SERVER_HOST
name: {{ include "dolphinscheduler.fullname" . }}-alert
- name: MAIL_SERVER_PORT
valueFrom:
configMapKeyRef:
key: MAIL_SERVER_PORT
name: {{ include "dolphinscheduler.fullname" . }}-alert
- name: MAIL_SENDER
valueFrom:
configMapKeyRef:
key: MAIL_SENDER
name: {{ include "dolphinscheduler.fullname" . }}-alert
- name: MAIL_USER
valueFrom:
configMapKeyRef:
key: MAIL_USER
name: {{ include "dolphinscheduler.fullname" . }}-alert
- name: MAIL_PASSWD
valueFrom:
configMapKeyRef:
key: MAIL_PASSWD
name: {{ include "dolphinscheduler.fullname" . }}-alert
- name: MAIL_SMTP_STARTTLS_ENABLE
valueFrom:
configMapKeyRef:
key: MAIL_SMTP_STARTTLS_ENABLE
name: {{ include "dolphinscheduler.fullname" . }}-alert
- name: MAIL_SMTP_SSL_ENABLE
valueFrom:
configMapKeyRef:
key: MAIL_SMTP_SSL_ENABLE
name: {{ include "dolphinscheduler.fullname" . }}-alert
- name: MAIL_SMTP_SSL_TRUST
valueFrom:
configMapKeyRef:
key: MAIL_SMTP_SSL_TRUST
name: {{ include "dolphinscheduler.fullname" . }}-alert
- name: ENTERPRISE_WECHAT_ENABLE
valueFrom:
configMapKeyRef:
key: ENTERPRISE_WECHAT_ENABLE
name: {{ include "dolphinscheduler.fullname" . }}-alert
- name: ENTERPRISE_WECHAT_CORP_ID
valueFrom:
configMapKeyRef:
key: ENTERPRISE_WECHAT_CORP_ID
name: {{ include "dolphinscheduler.fullname" . }}-alert
- name: ENTERPRISE_WECHAT_SECRET
valueFrom:
configMapKeyRef:
key: ENTERPRISE_WECHAT_SECRET
name: {{ include "dolphinscheduler.fullname" . }}-alert
- name: ENTERPRISE_WECHAT_AGENT_ID
valueFrom:
configMapKeyRef:
key: ENTERPRISE_WECHAT_AGENT_ID
name: {{ include "dolphinscheduler.fullname" . }}-alert
- name: ENTERPRISE_WECHAT_USERS
valueFrom:
configMapKeyRef:
key: ENTERPRISE_WECHAT_USERS
name: {{ include "dolphinscheduler.fullname" . }}-alert
- name: DATABASE_TYPE
{{- if .Values.postgresql.enabled }}
value: "postgresql"

4
docker/kubernetes/dolphinscheduler/templates/deployment-dolphinscheduler-api.yaml

@ -69,7 +69,7 @@ spec:
- "api-server"
ports:
- containerPort: 12345
name: tcp-port
name: "api-port"
env:
- name: TZ
value: {{ .Values.timezone }}
@ -132,7 +132,7 @@ spec:
{{- end }}
- name: ZOOKEEPER_QUORUM
{{- if .Values.zookeeper.enabled }}
value: "{{ template "dolphinscheduler.zookeeper.quorum" . }}"
value: {{ template "dolphinscheduler.zookeeper.quorum" . }}
{{- else }}
value: {{ .Values.externalZookeeper.zookeeperQuorum }}
{{- end }}

22
docker/kubernetes/dolphinscheduler/templates/ingress.yaml

@ -15,7 +15,13 @@
# limitations under the License.
#
{{- if .Values.ingress.enabled }}
{{- if .Capabilities.APIVersions.Has "networking.k8s.io/v1/Ingress" }}
apiVersion: networking.k8s.io/v1
{{- else if .Capabilities.APIVersions.Has "networking.k8s.io/v1beta1/Ingress" }}
apiVersion: networking.k8s.io/v1beta1
{{- else }}
apiVersion: extensions/v1beta1
{{- end }}
kind: Ingress
metadata:
name: {{ include "dolphinscheduler.fullname" . }}
@ -30,14 +36,22 @@ spec:
paths:
- path: {{ .Values.ingress.path }}
backend:
{{- if .Capabilities.APIVersions.Has "networking.k8s.io/v1/Ingress" }}
service:
name: {{ include "dolphinscheduler.fullname" . }}-api
port:
name: api-port
{{- else }}
serviceName: {{ include "dolphinscheduler.fullname" . }}-api
servicePort: tcp-port
servicePort: api-port
{{- end }}
{{- if .Capabilities.APIVersions.Has "networking.k8s.io/v1/Ingress" }}
pathType: Prefix
{{- end }}
{{- if .Values.ingress.tls.enabled }}
tls:
hosts:
{{- range .Values.ingress.tls.hosts }}
- {{ . | quote }}
{{- end }}
- {{ .Values.ingress.host }}
secretName: {{ .Values.ingress.tls.secretName }}
{{- end }}
{{- end }}

79
docker/kubernetes/dolphinscheduler/templates/statefulset-dolphinscheduler-worker.yaml

@ -117,56 +117,8 @@ spec:
configMapKeyRef:
name: {{ include "dolphinscheduler.fullname" . }}-common
key: DOLPHINSCHEDULER_DATA_BASEDIR_PATH
- name: ALERT_PLUGIN_DIR
valueFrom:
configMapKeyRef:
key: ALERT_PLUGIN_DIR
name: {{ include "dolphinscheduler.fullname" . }}-alert
- name: XLS_FILE_PATH
valueFrom:
configMapKeyRef:
key: XLS_FILE_PATH
name: {{ include "dolphinscheduler.fullname" . }}-alert
- name: MAIL_SERVER_HOST
valueFrom:
configMapKeyRef:
key: MAIL_SERVER_HOST
name: {{ include "dolphinscheduler.fullname" . }}-alert
- name: MAIL_SERVER_PORT
valueFrom:
configMapKeyRef:
key: MAIL_SERVER_PORT
name: {{ include "dolphinscheduler.fullname" . }}-alert
- name: MAIL_SENDER
valueFrom:
configMapKeyRef:
key: MAIL_SENDER
name: {{ include "dolphinscheduler.fullname" . }}-alert
- name: MAIL_USER
valueFrom:
configMapKeyRef:
key: MAIL_USER
name: {{ include "dolphinscheduler.fullname" . }}-alert
- name: MAIL_PASSWD
valueFrom:
configMapKeyRef:
key: MAIL_PASSWD
name: {{ include "dolphinscheduler.fullname" . }}-alert
- name: MAIL_SMTP_STARTTLS_ENABLE
valueFrom:
configMapKeyRef:
key: MAIL_SMTP_STARTTLS_ENABLE
name: {{ include "dolphinscheduler.fullname" . }}-alert
- name: MAIL_SMTP_SSL_ENABLE
valueFrom:
configMapKeyRef:
key: MAIL_SMTP_SSL_ENABLE
name: {{ include "dolphinscheduler.fullname" . }}-alert
- name: MAIL_SMTP_SSL_TRUST
valueFrom:
configMapKeyRef:
key: MAIL_SMTP_SSL_TRUST
name: {{ include "dolphinscheduler.fullname" . }}-alert
- name: ALERT_LISTEN_HOST
value: {{ include "dolphinscheduler.fullname" . }}-alert
- name: DATABASE_TYPE
{{- if .Values.postgresql.enabled }}
value: "postgresql"
@ -221,7 +173,7 @@ spec:
{{- end }}
- name: ZOOKEEPER_QUORUM
{{- if .Values.zookeeper.enabled }}
value: "{{ template "dolphinscheduler.zookeeper.quorum" . }}"
value: {{ template "dolphinscheduler.zookeeper.quorum" . }}
{{- else }}
value: {{ .Values.externalZookeeper.zookeeperQuorum }}
{{- end }}
@ -263,31 +215,6 @@ spec:
key: fs-s3a-secret-key
name: {{ printf "%s-%s" .Release.Name "fs-s3a" }}
{{- end }}
- name: ENTERPRISE_WECHAT_ENABLE
valueFrom:
configMapKeyRef:
key: ENTERPRISE_WECHAT_ENABLE
name: {{ include "dolphinscheduler.fullname" . }}-alert
- name: ENTERPRISE_WECHAT_CORP_ID
valueFrom:
configMapKeyRef:
key: ENTERPRISE_WECHAT_CORP_ID
name: {{ include "dolphinscheduler.fullname" . }}-alert
- name: ENTERPRISE_WECHAT_SECRET
valueFrom:
configMapKeyRef:
key: ENTERPRISE_WECHAT_SECRET
name: {{ include "dolphinscheduler.fullname" . }}-alert
- name: ENTERPRISE_WECHAT_AGENT_ID
valueFrom:
configMapKeyRef:
key: ENTERPRISE_WECHAT_AGENT_ID
name: {{ include "dolphinscheduler.fullname" . }}-alert
- name: ENTERPRISE_WECHAT_USERS
valueFrom:
configMapKeyRef:
key: ENTERPRISE_WECHAT_USERS
name: {{ include "dolphinscheduler.fullname" . }}-alert
{{- if .Values.worker.resources }}
resources:
limits:

35
docker/kubernetes/dolphinscheduler/templates/svc-dolphinscheduler-alert.yaml

@ -0,0 +1,35 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
apiVersion: v1
kind: Service
metadata:
name: {{ include "dolphinscheduler.fullname" . }}-alert
labels:
app.kubernetes.io/name: {{ include "dolphinscheduler.fullname" . }}-alert
app.kubernetes.io/instance: {{ .Release.Name }}
app.kubernetes.io/managed-by: {{ .Release.Service }}
spec:
ports:
- port: 50052
targetPort: alert-port
protocol: TCP
name: alert-port
selector:
app.kubernetes.io/name: {{ include "dolphinscheduler.fullname" . }}-alert
app.kubernetes.io/instance: {{ .Release.Name }}
app.kubernetes.io/managed-by: {{ .Release.Service }}
app.kubernetes.io/component: alert

4
docker/kubernetes/dolphinscheduler/templates/svc-dolphinscheduler-api.yaml

@ -25,9 +25,9 @@ metadata:
spec:
ports:
- port: 12345
targetPort: tcp-port
targetPort: api-port
protocol: TCP
name: tcp-port
name: api-port
selector:
app.kubernetes.io/name: {{ include "dolphinscheduler.fullname" . }}-api
app.kubernetes.io/instance: {{ .Release.Name }}

40
docker/kubernetes/dolphinscheduler/values.yaml

@ -25,13 +25,13 @@ fullnameOverride: ""
timezone: "Asia/Shanghai"
image:
registry: "apache"
repository: "dolphinscheduler"
registry: "docker.io"
repository: "apache/dolphinscheduler"
tag: "latest"
pullPolicy: "IfNotPresent"
pullSecrets: []
# If not exists external database, by default, Dolphinscheduler's database will use it.
## If not exists external database, by default, Dolphinscheduler's database will use it.
postgresql:
enabled: true
postgresqlUsername: "root"
@ -42,8 +42,8 @@ postgresql:
size: "20Gi"
storageClass: "-"
# If exists external database, and set postgresql.enable value to false.
# external database will be used, otherwise Dolphinscheduler's database will be used.
## If exists external database, and set postgresql.enable value to false.
## external database will be used, otherwise Dolphinscheduler's database will be used.
externalDatabase:
type: "postgresql"
driver: "org.postgresql.Driver"
@ -52,13 +52,13 @@ externalDatabase:
username: "root"
password: "root"
database: "dolphinscheduler"
# multi params should join with & char
## multi params should join with & char
params: "characterEncoding=utf8"
# If not exists external zookeeper, by default, Dolphinscheduler's zookeeper will use it.
## If not exists external zookeeper, by default, Dolphinscheduler's zookeeper will use it.
zookeeper:
enabled: true
fourlwCommandsWhitelist: srvr,ruok,wchs,cons
fourlwCommandsWhitelist: "srvr,ruok,wchs,cons"
service:
port: "2181"
persistence:
@ -67,8 +67,8 @@ zookeeper:
storageClass: "-"
zookeeperRoot: "/dolphinscheduler"
# If exists external zookeeper, and set zookeeper.enable value to false.
# If zookeeper.enable is false, Dolphinscheduler's zookeeper will use it.
## If exists external zookeeper, and set zookeeper.enable value to false.
## If zookeeper.enable is false, Dolphinscheduler's zookeeper will use it.
externalZookeeper:
zookeeperQuorum: "127.0.0.1:2181"
zookeeperRoot: "/dolphinscheduler"
@ -283,21 +283,7 @@ alert:
## ConfigMap
configmap:
DOLPHINSCHEDULER_OPTS: ""
ALERT_PLUGIN_DIR: "/opt/dolphinscheduler/alert/plugin"
XLS_FILE_PATH: "/tmp/xls"
MAIL_SERVER_HOST: ""
MAIL_SERVER_PORT: ""
MAIL_SENDER: ""
MAIL_USER: ""
MAIL_PASSWD: ""
MAIL_SMTP_STARTTLS_ENABLE: false
MAIL_SMTP_SSL_ENABLE: false
MAIL_SMTP_SSL_TRUST: ""
ENTERPRISE_WECHAT_ENABLE: false
ENTERPRISE_WECHAT_CORP_ID: ""
ENTERPRISE_WECHAT_SECRET: ""
ENTERPRISE_WECHAT_AGENT_ID: ""
ENTERPRISE_WECHAT_USERS: ""
ALERT_PLUGIN_DIR: "lib/plugin/alert"
## Periodic probe of container liveness. Container will be restarted if the probe fails. Cannot be updated.
## More info: https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle#container-probes
livenessProbe:
@ -390,9 +376,7 @@ api:
ingress:
enabled: false
host: "dolphinscheduler.org"
path: "/"
path: "/dolphinscheduler"
tls:
enabled: false
hosts:
- "dolphinscheduler.org"
secretName: "dolphinscheduler-tls"

4
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java

@ -167,8 +167,8 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst
return result;
}
// check whether the task instance state type is failure
if (!task.getState().typeIsFailure()) {
// check whether the task instance state type is failure or cancel
if (!task.getState().typeIsFailure() && !task.getState().typeIsCancel()) {
putMsg(result, Status.TASK_INSTANCE_STATE_OPERATION_ERROR, taskInstanceId, task.getState().toString());
return result;
}

21
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java

@ -40,6 +40,8 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@ -50,7 +52,7 @@ import org.springframework.stereotype.Service;
@Service
public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGroupService {
private static final String NO_NODE_EXCEPTION_REGEX = "KeeperException$NoNodeException";
private static final Logger logger = LoggerFactory.getLogger(WorkerGroupServiceImpl.class);
@Autowired
protected ZookeeperCachedOperator zookeeperCachedOperator;
@ -137,27 +139,22 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
* @return WorkerGroup list
*/
private List<WorkerGroup> getWorkerGroups(boolean isPaging) {
String workerPath = zookeeperCachedOperator.getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_WORKERS;
List<WorkerGroup> workerGroups = new ArrayList<>();
List<String> workerGroupList;
List<String> workerGroupList = null;
try {
workerGroupList = zookeeperCachedOperator.getChildrenKeys(workerPath);
} catch (Exception e) {
if (e.getMessage().contains(NO_NODE_EXCEPTION_REGEX)) {
if (isPaging) {
return workerGroups;
}
logger.error("getWorkerGroups exception: {}, workerPath: {}, isPaging: {}", e.getMessage(), workerPath, isPaging);
}
//ignore noNodeException return Default
if (workerGroupList == null || workerGroupList.isEmpty()) {
if (!isPaging) {
WorkerGroup wg = new WorkerGroup();
wg.setName(DEFAULT_WORKER_GROUP);
workerGroups.add(wg);
return workerGroups;
} else {
throw e;
}
return workerGroups;
}
for (String workerGroup : workerGroupList) {

4
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java

@ -110,9 +110,7 @@ public class WorkerGroupServiceTest {
}
@Test
public void testQueryAllGroupWithNoNodeException() {
String workerPath = zookeeperCachedOperator.getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_WORKERS;
Mockito.when(zookeeperCachedOperator.getChildrenKeys(workerPath)).thenThrow(new RuntimeException("KeeperException$NoNodeException"));
public void testQueryAllGroupWithDefault() {
Map<String, Object> result = workerGroupService.queryAllGroup();
Set<String> workerGroups = (Set<String>) result.get(Constants.DATA_LIST);
Assert.assertEquals(1, workerGroups.size());

7
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -842,7 +842,9 @@ public final class Constants {
*/
public static final String HIVE_CONF = "hiveconf:";
//flink ??
/**
* flink
*/
public static final String FLINK_YARN_CLUSTER = "yarn-cluster";
public static final String FLINK_RUN_MODE = "-m";
public static final String FLINK_YARN_SLOT = "-ys";
@ -852,8 +854,9 @@ public final class Constants {
public static final String FLINK_JOB_MANAGE_MEM = "-yjm";
public static final String FLINK_TASK_MANAGE_MEM = "-ytm";
public static final String FLINK_DETACH = "-d";
public static final String FLINK_MAIN_CLASS = "-c";
public static final String FLINK_PARALLELISM = "-p";
public static final String FLINK_SHUTDOWN_ON_ATTACHED_EXIT = "-sae";
public static final int[] NOT_TERMINATED_STATES = new int[] {

2
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java

@ -90,7 +90,7 @@ public enum ExecutionStatus {
* @return status
*/
public boolean typeIsFailure() {
return this == FAILURE || this == NEED_FAULT_TOLERANCE || this == KILL;
return this == FAILURE || this == NEED_FAULT_TOLERANCE;
}
/**

386
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java

@ -14,12 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.task.flink;
import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
@ -29,201 +29,213 @@ import java.util.List;
*/
public class FlinkParameters extends AbstractParameters {
/**
* major jar
*/
private ResourceInfo mainJar;
/**
* major class
*/
private String mainClass;
/**
* deploy mode yarn-cluster yarn-client yarn-local
*/
private String deployMode;
/**
* arguments
*/
private String mainArgs;
/**
* slot count
*/
private int slot;
/**
*Yarn application name
*/
private String appName;
/**
* taskManager count
*/
private int taskManager;
/**
* job manager memory
*/
private String jobManagerMemory ;
/**
* task manager memory
*/
private String taskManagerMemory;
/**
* resource list
*/
private List<ResourceInfo> resourceList = new ArrayList<>();
/**
* The YARN queue to submit to
*/
private String queue;
/**
* other arguments
*/
private String others;
/**
* flink version
*/
private String flinkVersion;
/**
* program type
* 0 JAVA,1 SCALA,2 PYTHON
*/
private ProgramType programType;
public ResourceInfo getMainJar() {
return mainJar;
}
public void setMainJar(ResourceInfo mainJar) {
this.mainJar = mainJar;
}
public String getMainClass() {
return mainClass;
}
public void setMainClass(String mainClass) {
this.mainClass = mainClass;
}
public String getDeployMode() {
return deployMode;
}
public void setDeployMode(String deployMode) {
this.deployMode = deployMode;
}
public String getMainArgs() {
return mainArgs;
}
public void setMainArgs(String mainArgs) {
this.mainArgs = mainArgs;
}
public int getSlot() {
return slot;
}
public void setSlot(int slot) {
this.slot = slot;
}
public String getAppName() {
return appName;
}
public void setAppName(String appName) {
this.appName = appName;
}
public int getTaskManager() {
return taskManager;
}
public void setTaskManager(int taskManager) {
this.taskManager = taskManager;
}
public String getJobManagerMemory() {
return jobManagerMemory;
}
public void setJobManagerMemory(String jobManagerMemory) {
this.jobManagerMemory = jobManagerMemory;
}
public String getTaskManagerMemory() {
return taskManagerMemory;
}
public void setTaskManagerMemory(String taskManagerMemory) {
this.taskManagerMemory = taskManagerMemory;
}
public String getQueue() {
return queue;
}
public void setQueue(String queue) {
this.queue = queue;
}
public List<ResourceInfo> getResourceList() {
return resourceList;
}
/**
* major jar
*/
private ResourceInfo mainJar;
/**
* major class
*/
private String mainClass;
/**
* deploy mode yarn-cluster yarn-local
*/
private String deployMode;
/**
* arguments
*/
private String mainArgs;
/**
* slot count
*/
private int slot;
/**
* parallelism
*/
private int parallelism;
/**
* yarn application name
*/
private String appName;
/**
* taskManager count
*/
private int taskManager;
/**
* job manager memory
*/
private String jobManagerMemory;
/**
* task manager memory
*/
private String taskManagerMemory;
/**
* resource list
*/
private List<ResourceInfo> resourceList = new ArrayList<>();
/**
* The YARN queue to submit to
*/
private String queue;
/**
* other arguments
*/
private String others;
/**
* flink version
*/
private String flinkVersion;
/**
* program type
* 0 JAVA,1 SCALA,2 PYTHON
*/
private ProgramType programType;
public ResourceInfo getMainJar() {
return mainJar;
}
public void setMainJar(ResourceInfo mainJar) {
this.mainJar = mainJar;
}
public String getMainClass() {
return mainClass;
}
public void setMainClass(String mainClass) {
this.mainClass = mainClass;
}
public String getDeployMode() {
return deployMode;
}
public void setDeployMode(String deployMode) {
this.deployMode = deployMode;
}
public String getMainArgs() {
return mainArgs;
}
public void setMainArgs(String mainArgs) {
this.mainArgs = mainArgs;
}
public int getSlot() {
return slot;
}
public void setSlot(int slot) {
this.slot = slot;
}
public int getParallelism() {
return parallelism;
}
public void setParallelism(int parallelism) {
this.parallelism = parallelism;
}
public String getAppName() {
return appName;
}
public void setAppName(String appName) {
this.appName = appName;
}
public int getTaskManager() {
return taskManager;
}
public void setResourceList(List<ResourceInfo> resourceList) {
this.resourceList = resourceList;
}
public void setTaskManager(int taskManager) {
this.taskManager = taskManager;
}
public String getOthers() {
return others;
}
public String getJobManagerMemory() {
return jobManagerMemory;
}
public void setJobManagerMemory(String jobManagerMemory) {
this.jobManagerMemory = jobManagerMemory;
}
public void setOthers(String others) {
this.others = others;
}
public String getTaskManagerMemory() {
return taskManagerMemory;
}
public ProgramType getProgramType() {
return programType;
}
public void setTaskManagerMemory(String taskManagerMemory) {
this.taskManagerMemory = taskManagerMemory;
}
public void setProgramType(ProgramType programType) {
this.programType = programType;
}
public String getQueue() {
return queue;
}
public String getFlinkVersion() {
return flinkVersion;
}
public void setQueue(String queue) {
this.queue = queue;
}
public void setFlinkVersion(String flinkVersion) {
this.flinkVersion = flinkVersion;
}
public List<ResourceInfo> getResourceList() {
return resourceList;
}
@Override
public boolean checkParameters() {
return mainJar != null && programType != null;
}
public void setResourceList(List<ResourceInfo> resourceList) {
this.resourceList = resourceList;
}
public String getOthers() {
return others;
}
@Override
public List<ResourceInfo> getResourceFilesList() {
if (mainJar != null && !resourceList.contains(mainJar)) {
resourceList.add(mainJar);
public void setOthers(String others) {
this.others = others;
}
return resourceList;
}
public ProgramType getProgramType() {
return programType;
}
public void setProgramType(ProgramType programType) {
this.programType = programType;
}
public String getFlinkVersion() {
return flinkVersion;
}
public void setFlinkVersion(String flinkVersion) {
this.flinkVersion = flinkVersion;
}
@Override
public boolean checkParameters() {
return mainJar != null && programType != null;
}
@Override
public List<ResourceInfo> getResourceFilesList() {
if (mainJar != null && !resourceList.contains(mainJar)) {
resourceList.add(mainJar);
}
return resourceList;
}
}

13
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/NetUtils.java

@ -44,6 +44,7 @@ import org.slf4j.LoggerFactory;
*/
public class NetUtils {
private static final Pattern STS_PATTERN = Pattern.compile("-\\d+$"); // StatefulSet pattern
private static final Pattern IP_PATTERN = Pattern.compile("\\d{1,3}(\\.\\d{1,3}){3,5}$");
private static final String NETWORK_PRIORITY_DEFAULT = "default";
private static final String NETWORK_PRIORITY_INNER = "inner";
@ -78,7 +79,17 @@ public class NetUtils {
*/
public static String getHost(InetAddress inetAddress) {
if (inetAddress != null) {
return Constants.KUBERNETES_MODE ? inetAddress.getHostName() : inetAddress.getHostAddress();
if (Constants.KUBERNETES_MODE) {
String canonicalHost = inetAddress.getCanonicalHostName();
if (!canonicalHost.contains(".") || IP_PATTERN.matcher(canonicalHost).matches()) {
String host = inetAddress.getHostName();
if (STS_PATTERN.matcher(host).find()) {
return String.format("%s.%s", host, host.replaceFirst("\\d+$", "headless"));
}
}
return canonicalHost;
}
return inetAddress.getHostAddress();
}
return null;
}

46
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/CommonTest.java

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common;
import static org.junit.Assert.assertTrue;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import org.junit.Test;
/**
* CommonTest
*/
public class CommonTest {
public static void setFinalStatic(Field field, Object newValue) throws NoSuchFieldException, IllegalAccessException {
field.setAccessible(true);
Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true);
modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
field.set(null, newValue);
}
@Test
public void testSetFinalStatic() throws Exception {
setFinalStatic(Constants.class.getDeclaredField("KUBERNETES_MODE"), true);
assertTrue(Constants.KUBERNETES_MODE);
}
}

30
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/NetUtilsTest.java

@ -16,13 +16,19 @@
*/
package org.apache.dolphinscheduler.common.utils;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.apache.dolphinscheduler.common.CommonTest;
import org.apache.dolphinscheduler.common.Constants;
import java.net.InetAddress;
import static org.junit.Assert.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.junit.Test;
/**
* NetUtilsTest
@ -36,6 +42,22 @@ public class NetUtilsTest {
assertEquals("localhost:1234", NetUtils.getAddr("localhost", 1234));
}
@Test
public void testGetHost() throws Exception {
InetAddress address = mock(InetAddress.class);
when(address.getCanonicalHostName()).thenReturn("dolphinscheduler-worker-0.dolphinscheduler-worker-headless.default.svc.cluster.local");
when(address.getHostName()).thenReturn("dolphinscheduler-worker-0");
when(address.getHostAddress()).thenReturn("172.17.0.15");
assertEquals("172.17.0.15", NetUtils.getHost(address));
CommonTest.setFinalStatic(Constants.class.getDeclaredField("KUBERNETES_MODE"), true);
assertEquals("dolphinscheduler-worker-0.dolphinscheduler-worker-headless.default.svc.cluster.local", NetUtils.getHost(address));
address = mock(InetAddress.class);
when(address.getCanonicalHostName()).thenReturn("dolphinscheduler-worker-0");
when(address.getHostName()).thenReturn("dolphinscheduler-worker-0");
CommonTest.setFinalStatic(Constants.class.getDeclaredField("KUBERNETES_MODE"), true);
assertEquals("dolphinscheduler-worker-0.dolphinscheduler-worker-headless", NetUtils.getHost(address));
}
@Test
public void testGetLocalHost() {
assertNotNull(NetUtils.getHost());

36
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java

@ -27,7 +27,6 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService;
import org.apache.dolphinscheduler.server.worker.WorkerServer;
import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
@ -73,12 +72,6 @@ public class MasterServer implements IStoppable {
*/
private NettyRemotingServer nettyRemotingServer;
/**
* master registry
*/
@Autowired
private MasterRegistry masterRegistry;
/**
* zk master client
*/
@ -108,25 +101,17 @@ public class MasterServer implements IStoppable {
*/
@PostConstruct
public void run() {
try {
//init remoting server
NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(masterConfig.getListenPort());
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor());
this.nettyRemotingServer.start();
this.masterRegistry.getZookeeperRegistryCenter().setStoppable(this);
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException(e);
}
// init remoting server
NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(masterConfig.getListenPort());
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor());
this.nettyRemotingServer.start();
// self tolerant
this.zkMasterClient.start();
this.zkMasterClient.start(this);
// scheduler start
this.masterSchedulerService.start();
@ -183,10 +168,9 @@ public class MasterServer implements IStoppable {
} catch (Exception e) {
logger.warn("thread sleep exception ", e);
}
//
//close
this.masterSchedulerService.close();
this.nettyRemotingServer.close();
this.masterRegistry.unRegistry();
this.zkMasterClient.close();
//close quartz
try {

19
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java

@ -17,11 +17,11 @@
package org.apache.dolphinscheduler.server.utils;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.flink.FlinkParameters;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import java.util.ArrayList;
import java.util.List;
@ -53,7 +53,7 @@ public class FlinkArgsUtils {
args.add(Constants.FLINK_YARN_CLUSTER); //yarn-cluster
int slot = param.getSlot();
if (slot != 0) {
if (slot > 0) {
args.add(Constants.FLINK_YARN_SLOT);
args.add(String.format("%d", slot)); //-ys
}
@ -68,7 +68,7 @@ public class FlinkArgsUtils {
String flinkVersion = param.getFlinkVersion();
if (flinkVersion == null || FLINK_VERSION_BEFORE_1_10.equals(flinkVersion)) {
int taskManager = param.getTaskManager();
if (taskManager != 0) { //-yn
if (taskManager > 0) { //-yn
args.add(Constants.FLINK_TASK_MANAGE);
args.add(String.format("%d", taskManager));
}
@ -92,12 +92,19 @@ public class FlinkArgsUtils {
args.add(queue);
}
}
}
args.add(Constants.FLINK_DETACH); //-d
int parallelism = param.getParallelism();
if (parallelism > 0) {
args.add(Constants.FLINK_PARALLELISM);
args.add(String.format("%d", parallelism)); // -p
}
// -p -s -yqu -yat -sae -yD -D
// If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated abruptly
// The task status will be synchronized with the cluster job status
args.add(Constants.FLINK_SHUTDOWN_ON_ATTACHED_EXIT); // -sae
// -s -yqu -yat -yD -D
if (StringUtils.isNotEmpty(others)) {
args.add(others);
}

34
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

@ -105,37 +105,35 @@ public class WorkerServer implements IStoppable {
new SpringApplicationBuilder(WorkerServer.class).web(WebApplicationType.NONE).run(args);
}
/**
* worker server run
*/
@PostConstruct
public void run() {
// alert-server client registry
alertClientService = new AlertClientService(workerConfig.getAlertListenHost(), Constants.ALERT_RPC_PORT);
// init remoting server
NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(workerConfig.getListenPort());
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, new TaskExecuteProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new TaskKillProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_ACK, new DBTaskAckProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_RESPONSE, new DBTaskResponseProcessor());
this.nettyRemotingServer.start();
// worker registry
try {
logger.info("start worker server...");
//init remoting server
NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(workerConfig.getListenPort());
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, new TaskExecuteProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new TaskKillProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_ACK, new DBTaskAckProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_RESPONSE, new DBTaskResponseProcessor());
this.nettyRemotingServer.start();
this.workerRegistry.registry();
this.workerRegistry.getZookeeperRegistryCenter().setStoppable(this);
Set<String> workerZkPaths = this.workerRegistry.getWorkerZkPaths();
this.workerRegistry.getZookeeperRegistryCenter().getRegisterOperator().handleDeadServer(workerZkPaths, ZKNodeType.WORKER, Constants.DELETE_ZK_OP);
// worker registry
this.workerRegistry.registry();
// retry report task status
this.retryReportTaskStatusThread.start();
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException(e);
}
// task execute manager
this.workerManagerThread.start();

8
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java

@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.MasterServer;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.service.process.ProcessService;
@ -73,8 +74,7 @@ public class ZKMasterClient extends AbstractZKClient {
@Autowired
private MasterRegistry masterRegistry;
public void start() {
public void start(MasterServer masterServer) {
InterProcessMutex mutex = null;
try {
// create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/failover/master
@ -82,9 +82,9 @@ public class ZKMasterClient extends AbstractZKClient {
mutex = new InterProcessMutex(getZkClient(), znodeLock);
mutex.acquire();
// Master registry
// master registry
masterRegistry.registry();
masterRegistry.getZookeeperRegistryCenter().setStoppable(masterServer);
String registPath = this.masterRegistry.getMasterPath();
masterRegistry.getZookeeperRegistryCenter().getRegisterOperator().handleDeadServer(registPath, ZKNodeType.MASTER, Constants.DELETE_ZK_OP);

32
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java

@ -17,19 +17,20 @@
package org.apache.dolphinscheduler.server.utils;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.flink.FlinkParameters;
import java.util.List;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
/**
* Test FlinkArgsUtils
*/
@ -39,6 +40,7 @@ public class FlinkArgsUtilsTest {
public String mode = "cluster";
public int slot = 2;
public int parallelism = 3;
public String appName = "testFlink";
public int taskManager = 4;
public String taskManagerMemory = "2G";
@ -48,7 +50,7 @@ public class FlinkArgsUtilsTest {
public ResourceInfo mainJar = null;
public String mainArgs = "testArgs --input file:///home";
public String queue = "queue1";
public String others = "-p 4";
public String others = "-s hdfs:///flink/savepoint-1537";
public String flinkVersion = "<1.10";
@ -72,6 +74,7 @@ public class FlinkArgsUtilsTest {
param.setMainClass(mainClass);
param.setAppName(appName);
param.setSlot(slot);
param.setParallelism(parallelism);
param.setTaskManager(taskManager);
param.setJobManagerMemory(jobManagerMemory);
param.setTaskManagerMemory(taskManagerMemory);
@ -89,7 +92,7 @@ public class FlinkArgsUtilsTest {
}
//Expected values and order
assertEquals(20, result.size());
assertEquals(22, result.size());
assertEquals("-m", result.get(0));
assertEquals("yarn-cluster", result.get(1));
@ -112,15 +115,18 @@ public class FlinkArgsUtilsTest {
assertEquals("-yqu", result.get(12));
assertEquals(result.get(13),queue);
assertEquals("-d", result.get(14));
assertEquals("-p", result.get(14));
assertSame(Integer.valueOf(result.get(15)),parallelism);
assertEquals("-sae", result.get(16));
assertEquals(result.get(15),others);
assertEquals(result.get(17),others);
assertEquals("-c", result.get(16));
assertEquals(result.get(17),mainClass);
assertEquals("-c", result.get(18));
assertEquals(result.get(19),mainClass);
assertEquals(result.get(18),mainJar.getRes());
assertEquals(result.get(19),mainArgs);
assertEquals(result.get(20),mainJar.getRes());
assertEquals(result.get(21),mainArgs);
//Others param without -yqu
FlinkParameters param1 = new FlinkParameters();

13
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java

@ -29,12 +29,13 @@ import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
@ -136,10 +137,10 @@ public class ZookeeperOperator implements InitializingBean {
}
public List<String> getChildrenKeys(final String key) {
List<String> values;
try {
values = zkClient.getChildren().forPath(key);
return values;
return zkClient.getChildren().forPath(key);
} catch (NoNodeException ex) {
return new ArrayList<>();
} catch (InterruptedException ex) {
logger.error("getChildrenKeys key : {} InterruptedException", key);
throw new IllegalStateException(ex);
@ -193,7 +194,7 @@ public class ZookeeperOperator implements InitializingBean {
if (isExisted(key)) {
try {
zkClient.delete().deletingChildrenIfNeeded().forPath(key);
} catch (KeeperException.NoNodeException ignore) {
} catch (NoNodeException ignore) {
//NOP
}
}
@ -230,7 +231,7 @@ public class ZookeeperOperator implements InitializingBean {
if (isExisted(key)) {
zkClient.delete().deletingChildrenIfNeeded().forPath(key);
}
} catch (KeeperException.NoNodeException ignore) {
} catch (NoNodeException ignore) {
//NOP
} catch (final Exception ex) {
logger.error("remove key : {}", key, ex);

15
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/RegisterOperatorTest.java

@ -113,4 +113,19 @@ public class RegisterOperatorTest {
Assert.assertFalse(registerOperator.getChildrenKeys(path).contains(String.format("%s_%s",Constants.MASTER_PREFIX,MASTER_NODE)));
}
@Test
public void testGetChildrenKeysWithNoNodeException() throws Exception {
testAfterPropertiesSet();
String path = registerOperator.getDeadZNodeParentPath();
Assert.assertEquals(0, registerOperator.getChildrenKeys(path).size());
}
@Test
public void testNoNodeException() throws Exception {
testAfterPropertiesSet();
String path = registerOperator.getDeadZNodeParentPath();
registerOperator.persistEphemeral(path, "test");
registerOperator.remove(path);
}
}

26
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue

@ -80,7 +80,7 @@
</el-select>
</div>
</m-list-box>
<m-list-4-box v-if="deployMode === 'cluster'">
<m-list-box v-if="deployMode === 'cluster'">
<div slot="text">{{$t('App Name')}}</div>
<div slot="content">
<el-input
@ -91,7 +91,7 @@
:placeholder="$t('Please enter app name(optional)')">
</el-input>
</div>
</m-list-4-box>
</m-list-box>
<m-list-4-box v-if="deployMode === 'cluster'">
<div slot="text">{{$t('JobManager Memory')}}</div>
<div slot="content">
@ -136,6 +136,18 @@
</el-input>
</div>
</m-list-4-box>
<m-list-4-box>
<div slot="text">{{$t('Parallelism')}}</div>
<div slot="content">
<el-input
:disabled="isDetails"
type="input"
size="small"
v-model="parallelism"
:placeholder="$t('Please enter Parallelism')">
</el-input>
</div>
</m-list-4-box>
<m-list-box>
<div slot="text">{{$t('Main Arguments')}}</div>
<div slot="content">
@ -215,6 +227,8 @@
localParams: [],
// Slot number
slot: 1,
// Parallelism
parallelism: 1,
// TaskManager mumber
taskManager: '2',
// JobManager memory
@ -320,6 +334,11 @@
return false
}
if (!Number.isInteger(parseInt(this.parallelism))) {
this.$message.warning(`${i18n.$t('Please enter Parallelism')}`)
return false
}
if (this.flinkVersion === '<1.10' && !Number.isInteger(parseInt(this.taskManager))) {
this.$message.warning(`${i18n.$t('Please enter TaskManager number')}`)
return false
@ -349,6 +368,7 @@
localParams: this.localParams,
flinkVersion: this.flinkVersion,
slot: this.slot,
parallelism: this.parallelism,
taskManager: this.taskManager,
jobManagerMemory: this.jobManagerMemory,
taskManagerMemory: this.taskManagerMemory,
@ -485,6 +505,7 @@
resourceList: this.resourceIdArr,
localParams: this.localParams,
slot: this.slot,
parallelism: this.parallelism,
taskManager: this.taskManager,
jobManagerMemory: this.jobManagerMemory,
taskManagerMemory: this.taskManagerMemory,
@ -516,6 +537,7 @@
this.deployMode = o.params.deployMode || ''
this.flinkVersion = o.params.flinkVersion || '<1.10'
this.slot = o.params.slot || 1
this.parallelism = o.params.parallelism || 1
this.taskManager = o.params.taskManager || '2'
this.jobManagerMemory = o.params.jobManagerMemory || '1G'
this.taskManagerMemory = o.params.taskManagerMemory || '2G'

2
dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/index/index.vue

@ -98,6 +98,8 @@
this.searchParams.projectId = this.id === 0 ? 0 : localStore.getItem('projectId')
this.dataTime[0] = dayjs().format('YYYY-MM-DD 00:00:00')
this.dataTime[1] = dayjs().format('YYYY-MM-DD HH:mm:ss')
this.searchParams.startDate = this.dataTime[0]
this.searchParams.endDate = this.dataTime[1]
},
components: {
mListConstruction,

8
dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue

@ -56,11 +56,15 @@
<span>{{scope.row.endTime | formatDate}}</span>
</template>
</el-table-column>
<el-table-column prop="duration" :label="$t('Duration')"></el-table-column>
<el-table-column :label="$t('Duration')">
<template slot-scope="scope">
<span>{{scope.row.duration | filterNull}}</span>
</template>
</el-table-column>
<el-table-column prop="runTimes" :label="$t('Run Times')"></el-table-column>
<el-table-column prop="recovery" :label="$t('fault-tolerant sign')"></el-table-column>
<el-table-column prop="executorName" :label="$t('Executor')"></el-table-column>
<el-table-column prop="host" :label="$t('host')" width="150"></el-table-column>
<el-table-column prop="host" :label="$t('host')" min-width="190"></el-table-column>
<el-table-column :label="$t('Operation')" width="240" fixed="right">
<template slot-scope="scope">
<div v-show="scope.row.disabled">

12
dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/taskInstance/_source/list.vue

@ -53,9 +53,17 @@
<span>{{scope.row.endTime | formatDate}}</span>
</template>
</el-table-column>
<el-table-column prop="host" :label="$t('host')" width="150"></el-table-column>
<el-table-column prop="duration" :label="$t('Duration')"></el-table-column>
<el-table-column :label="$t('Duration')">
<template slot-scope="scope">
<span>{{scope.row.duration | filterNull}}</span>
</template>
</el-table-column>
<el-table-column prop="retryTimes" :label="$t('Retry Count')"></el-table-column>
<el-table-column :label="$t('host')" min-width="190">
<template slot-scope="scope">
<span>{{scope.row.host | filterNull}}</span>
</template>
</el-table-column>
<el-table-column :label="$t('Operation')" width="80" fixed="right">
<template slot-scope="scope">
<div>

1
dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/subdirectory/index.vue

@ -112,6 +112,7 @@
this.isLeft = true
}
this.isLoading = !flag
this.searchParams.id = this.$route.params.id
this.getResourcesListP(this.searchParams).then(res => {
if (this.searchParams.pageNo > 1 && res.totalList.length === 0) {
this.searchParams.pageNo = this.searchParams.pageNo - 1

2
dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js

@ -123,6 +123,8 @@ export default {
'Please enter TaskManager memory': 'Please enter TaskManager memory',
'Slot Number': 'Slot Number',
'Please enter Slot number': 'Please enter Slot number',
Parallelism: 'Parallelism',
'Please enter Parallelism': 'Please enter Parallelism',
'TaskManager Number': 'TaskManager Number',
'Please enter TaskManager number': 'Please enter TaskManager number',
'App Name': 'App Name',

2
dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js

@ -123,6 +123,8 @@ export default {
'Please enter TaskManager memory': '请输入TaskManager内存数',
'Slot Number': 'Slot数量',
'Please enter Slot number': '请输入Slot数量',
Parallelism: '并行度',
'Please enter Parallelism': '请输入并行度',
'TaskManager Number': 'TaskManager数量',
'Please enter TaskManager number': '请输入TaskManager数量',
'App Name': '任务名称',

47
script/dolphinscheduler-daemon.sh

@ -54,42 +54,36 @@ fi
log=$DOLPHINSCHEDULER_LOG_DIR/dolphinscheduler-$command-$HOSTNAME.out
pid=$DOLPHINSCHEDULER_PID_DIR/dolphinscheduler-$command.pid
# print logs to /dev/null in docker
if [ "$DOCKER" = "true" ]; then
echo "start in docker"
log=/dev/null
fi
cd $DOLPHINSCHEDULER_HOME
if [ "$command" = "api-server" ]; then
HEAP_INITIAL_SIZE=1g
HEAP_MAX_SIZE=1g
HEAP_NEW_GENERATION__SIZE=500m
HEAP_NEW_GENERATION_SIZE=512m
LOG_FILE="-Dlogging.config=classpath:logback-api.xml -Dspring.profiles.active=api"
CLASS=org.apache.dolphinscheduler.api.ApiApplicationServer
elif [ "$command" = "master-server" ]; then
HEAP_INITIAL_SIZE=4g
HEAP_MAX_SIZE=4g
HEAP_NEW_GENERATION__SIZE=2g
HEAP_NEW_GENERATION_SIZE=2g
LOG_FILE="-Dlogging.config=classpath:logback-master.xml -Ddruid.mysql.usePingMethod=false"
CLASS=org.apache.dolphinscheduler.server.master.MasterServer
elif [ "$command" = "worker-server" ]; then
HEAP_INITIAL_SIZE=2g
HEAP_MAX_SIZE=2g
HEAP_NEW_GENERATION__SIZE=1g
HEAP_NEW_GENERATION_SIZE=1g
LOG_FILE="-Dlogging.config=classpath:logback-worker.xml -Ddruid.mysql.usePingMethod=false"
CLASS=org.apache.dolphinscheduler.server.worker.WorkerServer
elif [ "$command" = "alert-server" ]; then
HEAP_INITIAL_SIZE=1g
HEAP_MAX_SIZE=1g
HEAP_NEW_GENERATION__SIZE=500m
HEAP_NEW_GENERATION_SIZE=512m
LOG_FILE="-Dlogback.configurationFile=conf/logback-alert.xml"
CLASS=org.apache.dolphinscheduler.alert.AlertServer
elif [ "$command" = "logger-server" ]; then
HEAP_INITIAL_SIZE=1g
HEAP_MAX_SIZE=1g
HEAP_NEW_GENERATION__SIZE=500m
HEAP_NEW_GENERATION_SIZE=512m
CLASS=org.apache.dolphinscheduler.server.log.LoggerServer
elif [ "$command" = "zookeeper-server" ]; then
#note: this command just for getting a quick experience,not recommended for production. this operation will start a standalone zookeeper server
@ -100,26 +94,29 @@ else
exit 1
fi
export DOLPHINSCHEDULER_OPTS="-server -Xms$HEAP_INITIAL_SIZE -Xmx$HEAP_MAX_SIZE -Xmn$HEAP_NEW_GENERATION__SIZE -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m -Xss512k -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:LargePageSizeInBytes=128m -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70 -XX:+PrintGCDetails -Xloggc:gc.log -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=dump.hprof $DOLPHINSCHEDULER_OPTS"
export DOLPHINSCHEDULER_OPTS="-server -Xms$HEAP_INITIAL_SIZE -Xmx$HEAP_MAX_SIZE -Xmn$HEAP_NEW_GENERATION_SIZE -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m -Xss512k -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:LargePageSizeInBytes=128m -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70 -XX:+PrintGCDetails -Xloggc:$DOLPHINSCHEDULER_LOG_DIR/gc.log -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=dump.hprof $DOLPHINSCHEDULER_OPTS"
case $startStop in
(start)
[ -w "$DOLPHINSCHEDULER_PID_DIR" ] || mkdir -p "$DOLPHINSCHEDULER_PID_DIR"
exec_command="$LOG_FILE $DOLPHINSCHEDULER_OPTS -classpath $DOLPHINSCHEDULER_CONF_DIR:$DOLPHINSCHEDULER_LIB_JARS $CLASS"
if [ "$DOCKER" = "true" ]; then
echo "start in docker"
$JAVA_HOME/bin/java $exec_command
else
[ -w "$DOLPHINSCHEDULER_PID_DIR" ] || mkdir -p "$DOLPHINSCHEDULER_PID_DIR"
if [ -f $pid ]; then
if kill -0 `cat $pid` > /dev/null 2>&1; then
echo $command running as process `cat $pid`. Stop it first.
exit 1
if [ -f $pid ]; then
if kill -0 `cat $pid` > /dev/null 2>&1; then
echo $command running as process `cat $pid`. Stop it first.
exit 1
fi
fi
fi
echo starting $command, logging to $log
exec_command="$LOG_FILE $DOLPHINSCHEDULER_OPTS -classpath $DOLPHINSCHEDULER_CONF_DIR:$DOLPHINSCHEDULER_LIB_JARS $CLASS"
echo "nohup $JAVA_HOME/bin/java $exec_command > $log 2>&1 &"
nohup $JAVA_HOME/bin/java $exec_command > $log 2>&1 &
echo $! > $pid
echo starting $command, logging to $log
echo "nohup $JAVA_HOME/bin/java $exec_command > $log 2>&1 &"
nohup $JAVA_HOME/bin/java $exec_command > $log 2>&1 &
echo $! > $pid
fi
;;
(stop)

Loading…
Cancel
Save