Browse Source

Merge remote-tracking branch 'origin/dev' into dev

pull/3/MERGE
dailidong 4 years ago
parent
commit
bbeb03e0dd
  1. 13
      .github/workflows/ci_e2e.yml
  2. 10
      .github/workflows/ci_ut.yml
  3. 36
      docker/build/Dockerfile
  4. 6
      docker/build/README.md
  5. 6
      docker/build/README_zh_CN.md
  6. 0
      docker/build/checkpoint.sh
  7. 0
      docker/build/conf/dolphinscheduler/alert.properties.tpl
  8. 0
      docker/build/conf/dolphinscheduler/application-api.properties.tpl
  9. 0
      docker/build/conf/dolphinscheduler/common.properties.tpl
  10. 0
      docker/build/conf/dolphinscheduler/datasource.properties.tpl
  11. 0
      docker/build/conf/dolphinscheduler/env/dolphinscheduler_env.sh
  12. 0
      docker/build/conf/dolphinscheduler/logback/logback-alert.xml
  13. 0
      docker/build/conf/dolphinscheduler/logback/logback-api.xml
  14. 0
      docker/build/conf/dolphinscheduler/logback/logback-master.xml
  15. 0
      docker/build/conf/dolphinscheduler/logback/logback-worker.xml
  16. 0
      docker/build/conf/dolphinscheduler/master.properties.tpl
  17. 0
      docker/build/conf/dolphinscheduler/quartz.properties.tpl
  18. 0
      docker/build/conf/dolphinscheduler/worker.properties.tpl
  19. 0
      docker/build/conf/dolphinscheduler/zookeeper.properties.tpl
  20. 0
      docker/build/conf/nginx/dolphinscheduler.conf
  21. 0
      docker/build/conf/zookeeper/zoo.cfg
  22. 14
      docker/build/hooks/build
  23. 12
      docker/build/hooks/build.bat
  24. 0
      docker/build/hooks/push
  25. 0
      docker/build/hooks/push.bat
  26. 0
      docker/build/startup-init-conf.sh
  27. 53
      docker/build/startup.sh
  28. 40
      docker/docker-compose.yml
  29. 2
      docker/docker-swarm/check
  30. 47
      docker/docker-swarm/docker-compose.yml
  31. 27
      docker/docker-swarm/docker-stack.yml
  32. 6
      docker/kubernetes/dolphinscheduler/values.yaml
  33. 762
      docker/postgres/docker-entrypoint-initdb/init.sql
  34. 53
      dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/manager/DingTalkManager.java
  35. 8
      dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/plugin/EmailAlertPlugin.java
  36. 18
      dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/Constants.java
  37. 136
      dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/DingTalkUtils.java
  38. 11
      dolphinscheduler-alert/src/main/resources/alert.properties
  39. 125
      dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/utils/DingTalkUtilsTest.java
  40. 25
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/UsersController.java
  41. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/visitor/ResourceTreeVisitor.java
  42. 1
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
  43. 10
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/interceptor/LoginHandlerInterceptor.java
  44. 8
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/security/PasswordAuthenticator.java
  45. 6
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java
  46. 14
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UsersService.java
  47. 1
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/LoginControllerTest.java
  48. 6
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/interceptor/LoginHandlerInterceptorTest.java
  49. 8
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/security/PasswordAuthenticatorTest.java
  50. 18
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java
  51. 14
      dolphinscheduler-common/pom.xml
  52. 2
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java
  53. 4
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java
  54. 11
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java
  55. 108
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/RetryerUtils.java
  56. 86
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/process/ProcessBuilderForWin32.java
  57. 38
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/process/ProcessEnvironmentForWin32.java
  58. 89
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/process/ProcessImplForWin32.java
  59. 216
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/RetryerUtilsTest.java
  60. 20
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstanceMap.java
  61. 16
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Queue.java
  62. 20
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Session.java
  63. 8
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Tenant.java
  64. 14
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/User.java
  65. 8
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/MysqlPerformance.java
  66. 2
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/UserMapper.xml
  67. 12
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
  68. 27
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
  69. 2
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
  70. 15
      dolphinscheduler-ui/src/js/conf/home/pages/security/pages/users/_source/createUser.vue
  71. 12
      dolphinscheduler-ui/src/js/conf/home/pages/security/pages/users/_source/list.vue
  72. 4
      dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js
  73. 4
      dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
  74. 2
      e2e/src/test/java/org/apache/dolphinscheduler/locator/security/UserManageLocator.java
  75. 9
      pom.xml
  76. 2
      script/scp-hosts.sh
  77. 3
      sql/dolphinscheduler-postgre.sql
  78. 1
      sql/dolphinscheduler_mysql.sql
  79. 2
      sql/soft_version
  80. 21
      sql/upgrade/1.2.2_schema/mysql/dolphinscheduler_ddl.sql
  81. 20
      sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_ddl.sql
  82. 1
      tools/dependencies/known-dependencies.txt

13
.github/workflows/ci_e2e.yml

@ -44,15 +44,14 @@ jobs:
${{ runner.os }}-maven-
- name: Build Image
run: |
export VERSION=`cat $(pwd)/pom.xml| grep "SNAPSHOT</version>" | awk -F "-SNAPSHOT" '{print $1}' | awk -F ">" '{print $2}'`
sh ./dockerfile/hooks/build
sh ./docker/build/hooks/build
- name: Docker Run
run: |
VERSION=`cat $(pwd)/pom.xml| grep "SNAPSHOT</version>" | awk -F "-SNAPSHOT" '{print $1}' | awk -F ">" '{print $2}'`
mkdir -p /tmp/logs
docker run -dit -e POSTGRESQL_USERNAME=test -e POSTGRESQL_PASSWORD=test -v /tmp/logs:/opt/dolphinscheduler/logs -p 8888:8888 dolphinscheduler:$VERSION all
export VERSION=$(cat $(pwd)/pom.xml | grep '<version>' -m 1 | awk '{print $1}' | sed 's/<version>//' | sed 's/<\/version>//')
sed -i "s/apache\/dolphinscheduler:latest/apache\/dolphinscheduler:${VERSION}/g" $(pwd)/docker/docker-swarm/docker-compose.yml
docker-compose -f $(pwd)/docker/docker-swarm/docker-compose.yml up -d
- name: Check Server Status
run: sh ./dockerfile/hooks/check
run: sh $(pwd)/docker/docker-swarm/check
- name: Prepare e2e env
run: |
sudo apt-get install -y libxss1 libappindicator1 libindicator7 xvfb unzip libgbm1
@ -70,6 +69,6 @@ jobs:
uses: actions/upload-artifact@v1
with:
name: dslogs
path: /tmp/logs
path: /var/lib/docker/volumes/docker-swarm_dolphinscheduler-logs/_data

10
.github/workflows/ci_ut.yml

@ -21,7 +21,6 @@ on:
branches:
- dev
env:
DOCKER_DIR: ./docker
LOG_DIR: /tmp/dolphinscheduler
name: Unit Test
@ -47,7 +46,11 @@ jobs:
restore-keys: |
${{ runner.os }}-maven-
- name: Bootstrap database
run: cd ${DOCKER_DIR} && docker-compose up -d
run: |
sed -i "s/: root/: test/g" $(pwd)/docker/docker-swarm/docker-compose.yml
docker-compose -f $(pwd)/docker/docker-swarm/docker-compose.yml create --force-recreate dolphinscheduler-zookeeper dolphinscheduler-postgresql
sudo cp $(pwd)/sql/dolphinscheduler-postgre.sql $(docker volume inspect docker-swarm_dolphinscheduler-postgresql-initdb | grep "Mountpoint" | awk -F "\"" '{print $4}')
docker-compose -f $(pwd)/docker/docker-swarm/docker-compose.yml up -d dolphinscheduler-zookeeper dolphinscheduler-postgresql
- name: Set up JDK 1.8
uses: actions/setup-java@v1
with:
@ -82,6 +85,5 @@ jobs:
- name: Collect logs
run: |
mkdir -p ${LOG_DIR}
cd ${DOCKER_DIR}
docker-compose logs db > ${LOG_DIR}/db.txt
docker-compose -f $(pwd)/docker/docker-swarm/docker-compose.yml logs dolphinscheduler-postgresql > ${LOG_DIR}/db.txt
continue-on-error: true

36
dockerfile/Dockerfile → docker/build/Dockerfile

@ -37,32 +37,19 @@ RUN apk add openjdk8
ENV JAVA_HOME /usr/lib/jvm/java-1.8-openjdk
ENV PATH $JAVA_HOME/bin:$PATH
#3. install zk
RUN cd /opt && \
wget https://downloads.apache.org/zookeeper/zookeeper-3.5.7/apache-zookeeper-3.5.7-bin.tar.gz && \
tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz && \
mv apache-zookeeper-3.5.7-bin zookeeper && \
mkdir -p /tmp/zookeeper && \
rm -rf ./zookeeper-*tar.gz && \
rm -rf /opt/zookeeper/conf/zoo_sample.cfg
ADD ./conf/zookeeper/zoo.cfg /opt/zookeeper/conf
ENV ZK_HOME /opt/zookeeper
ENV PATH $ZK_HOME/bin:$PATH
#3. add dolphinscheduler
ADD ./apache-dolphinscheduler-incubating-${VERSION}-dolphinscheduler-bin.tar.gz /opt/
RUN mv /opt/apache-dolphinscheduler-incubating-${VERSION}-dolphinscheduler-bin/ /opt/dolphinscheduler/
ENV DOLPHINSCHEDULER_HOME /opt/dolphinscheduler
#4. install pg
RUN apk add postgresql postgresql-contrib
#5. add dolphinscheduler
ADD ./apache-dolphinscheduler-incubating-${VERSION}-SNAPSHOT-dolphinscheduler-bin.tar.gz /opt/
RUN mv /opt/apache-dolphinscheduler-incubating-${VERSION}-SNAPSHOT-dolphinscheduler-bin/ /opt/dolphinscheduler/
ENV DOLPHINSCHEDULER_HOME /opt/dolphinscheduler
#6. modify nginx
#5. modify nginx
RUN echo "daemon off;" >> /etc/nginx/nginx.conf && \
rm -rf /etc/nginx/conf.d/*
ADD ./conf/nginx/dolphinscheduler.conf /etc/nginx/conf.d
#7. add configuration and modify permissions and set soft links
#6. add configuration and modify permissions and set soft links
ADD ./checkpoint.sh /root/checkpoint.sh
ADD ./startup-init-conf.sh /root/startup-init-conf.sh
ADD ./startup.sh /root/startup.sh
@ -75,22 +62,21 @@ RUN chmod +x /root/checkpoint.sh && \
chmod +x /opt/dolphinscheduler/conf/env/dolphinscheduler_env.sh && \
chmod +x /opt/dolphinscheduler/script/*.sh && \
chmod +x /opt/dolphinscheduler/bin/*.sh && \
chmod +x /opt/zookeeper/bin/*.sh && \
dos2unix /root/checkpoint.sh && \
dos2unix /root/startup-init-conf.sh && \
dos2unix /root/startup.sh && \
dos2unix /opt/dolphinscheduler/conf/env/dolphinscheduler_env.sh && \
dos2unix /opt/dolphinscheduler/script/*.sh && \
dos2unix /opt/dolphinscheduler/bin/*.sh && \
dos2unix /opt/zookeeper/bin/*.sh && \
rm -rf /bin/sh && \
ln -s /bin/bash /bin/sh && \
mkdir -p /tmp/xls
#8. remove apk index cache
RUN rm -rf /var/cache/apk/*
#7. remove apk index cache and disable coredup for sudo
RUN rm -rf /var/cache/apk/* && \
echo "Set disable_coredump false" >> /etc/sudo.conf
#9. expose port
#8. expose port
EXPOSE 2181 2888 3888 5432 5678 1234 12345 50051 8888
ENTRYPOINT ["/sbin/tini", "--", "/root/startup.sh"]
ENTRYPOINT ["/sbin/tini", "--", "/root/startup.sh"]

6
dockerfile/README.md → docker/build/README.md

@ -109,16 +109,16 @@ In Unix-Like, Example:
```bash
$ cd path/incubator-dolphinscheduler
$ sh ./dockerfile/hooks/build
$ sh ./docker/build/hooks/build
```
In Windows, Example:
```bat
c:\incubator-dolphinscheduler>.\dockerfile\hooks\build.bat
c:\incubator-dolphinscheduler>.\docker\build\hooks\build.bat
```
Please read `./dockerfile/hooks/build` `./dockerfile/hooks/build.bat` script files if you don't understand
Please read `./docker/build/hooks/build` `./docker/build/hooks/build.bat` script files if you don't understand
## Environment Variables

6
dockerfile/README_zh_CN.md → docker/build/README_zh_CN.md

@ -109,16 +109,16 @@ dolphinscheduler frontend
```bash
$ cd path/incubator-dolphinscheduler
$ sh ./dockerfile/hooks/build
$ sh ./docker/build/hooks/build
```
Windows系统, 如下:
```bat
c:\incubator-dolphinscheduler>.\dockerfile\hooks\build.bat
c:\incubator-dolphinscheduler>.\docker\build\hooks\build.bat
```
如果你不理解这些脚本 `./dockerfile/hooks/build` `./dockerfile/hooks/build.bat`,请阅读里面的内容。
如果你不理解这些脚本 `./docker/build/hooks/build` `./docker/build/hooks/build.bat`,请阅读里面的内容。
## 环境变量

0
dockerfile/checkpoint.sh → docker/build/checkpoint.sh

0
dockerfile/conf/dolphinscheduler/alert.properties.tpl → docker/build/conf/dolphinscheduler/alert.properties.tpl

0
dockerfile/conf/dolphinscheduler/application-api.properties.tpl → docker/build/conf/dolphinscheduler/application-api.properties.tpl

0
dockerfile/conf/dolphinscheduler/common.properties.tpl → docker/build/conf/dolphinscheduler/common.properties.tpl

0
dockerfile/conf/dolphinscheduler/datasource.properties.tpl → docker/build/conf/dolphinscheduler/datasource.properties.tpl

0
dockerfile/conf/dolphinscheduler/env/dolphinscheduler_env.sh → docker/build/conf/dolphinscheduler/env/dolphinscheduler_env.sh vendored

0
dockerfile/conf/dolphinscheduler/logback/logback-alert.xml → docker/build/conf/dolphinscheduler/logback/logback-alert.xml

0
dockerfile/conf/dolphinscheduler/logback/logback-api.xml → docker/build/conf/dolphinscheduler/logback/logback-api.xml

0
dockerfile/conf/dolphinscheduler/logback/logback-master.xml → docker/build/conf/dolphinscheduler/logback/logback-master.xml

0
dockerfile/conf/dolphinscheduler/logback/logback-worker.xml → docker/build/conf/dolphinscheduler/logback/logback-worker.xml

0
dockerfile/conf/dolphinscheduler/master.properties.tpl → docker/build/conf/dolphinscheduler/master.properties.tpl

0
dockerfile/conf/dolphinscheduler/quartz.properties.tpl → docker/build/conf/dolphinscheduler/quartz.properties.tpl

0
dockerfile/conf/dolphinscheduler/worker.properties.tpl → docker/build/conf/dolphinscheduler/worker.properties.tpl

0
dockerfile/conf/dolphinscheduler/zookeeper.properties.tpl → docker/build/conf/dolphinscheduler/zookeeper.properties.tpl

0
dockerfile/conf/nginx/dolphinscheduler.conf → docker/build/conf/nginx/dolphinscheduler.conf

0
dockerfile/conf/zookeeper/zoo.cfg → docker/build/conf/zookeeper/zoo.cfg

14
dockerfile/hooks/build → docker/build/hooks/build

@ -24,13 +24,13 @@ printenv
if [ -z "${VERSION}" ]
then
echo "set default environment variable [VERSION]"
VERSION=$(cat $(pwd)/sql/soft_version)
export VERSION=$(cat $(pwd)/pom.xml | grep '<version>' -m 1 | awk '{print $1}' | sed 's/<version>//' | sed 's/<\/version>//')
fi
if [ "${DOCKER_REPO}x" = "x" ]
then
echo "set default environment variable [DOCKER_REPO]"
DOCKER_REPO='dolphinscheduler'
export DOCKER_REPO='apache/dolphinscheduler'
fi
echo "Version: $VERSION"
@ -42,12 +42,12 @@ echo -e "Current Directory is $(pwd)\n"
echo -e "mvn -B clean compile package -Prelease -Dmaven.test.skip=true"
mvn -B clean compile package -Prelease -Dmaven.test.skip=true
# mv dolphinscheduler-bin.tar.gz file to dockerfile directory
echo -e "mv $(pwd)/dolphinscheduler-dist/target/apache-dolphinscheduler-incubating-${VERSION}-SNAPSHOT-dolphinscheduler-bin.tar.gz $(pwd)/dockerfile/\n"
mv $(pwd)/dolphinscheduler-dist/target/apache-dolphinscheduler-incubating-${VERSION}-SNAPSHOT-dolphinscheduler-bin.tar.gz $(pwd)/dockerfile/
# mv dolphinscheduler-bin.tar.gz file to docker/build directory
echo -e "mv $(pwd)/dolphinscheduler-dist/target/apache-dolphinscheduler-incubating-${VERSION}-dolphinscheduler-bin.tar.gz $(pwd)/docker/build/\n"
mv $(pwd)/dolphinscheduler-dist/target/apache-dolphinscheduler-incubating-${VERSION}-dolphinscheduler-bin.tar.gz $(pwd)/docker/build/
# docker build
echo -e "docker build --build-arg VERSION=${VERSION} -t $DOCKER_REPO:${VERSION} $(pwd)/dockerfile/\n"
docker build --build-arg VERSION=${VERSION} -t $DOCKER_REPO:${VERSION} $(pwd)/dockerfile/
echo -e "docker build --build-arg VERSION=${VERSION} -t $DOCKER_REPO:${VERSION} $(pwd)/docker/build/\n"
sudo docker build --build-arg VERSION=${VERSION} -t $DOCKER_REPO:${VERSION} $(pwd)/docker/build/
echo "------ dolphinscheduler end - build -------"

12
dockerfile/hooks/build.bat → docker/build/hooks/build.bat

@ -38,13 +38,13 @@ echo "call mvn clean compile package -Prelease"
call mvn clean compile package -Prelease -DskipTests=true
if "%errorlevel%"=="1" goto :mvnFailed
:: move dolphinscheduler-bin.tar.gz file to dockerfile directory
echo "move %cd%\dolphinscheduler-dist\target\apache-dolphinscheduler-incubating-%VERSION%-SNAPSHOT-dolphinscheduler-bin.tar.gz %cd%\dockerfile\"
move %cd%\dolphinscheduler-dist\target\apache-dolphinscheduler-incubating-%VERSION%-SNAPSHOT-dolphinscheduler-bin.tar.gz %cd%\dockerfile\
:: move dolphinscheduler-bin.tar.gz file to docker/build directory
echo "move %cd%\dolphinscheduler-dist\target\apache-dolphinscheduler-incubating-%VERSION%-SNAPSHOT-dolphinscheduler-bin.tar.gz %cd%\docker\build\"
move %cd%\dolphinscheduler-dist\target\apache-dolphinscheduler-incubating-%VERSION%-SNAPSHOT-dolphinscheduler-bin.tar.gz %cd%\docker\build\
:: docker build
echo "docker build --build-arg VERSION=%VERSION% -t %DOCKER_REPO%:%VERSION% %cd%\dockerfile\"
docker build --build-arg VERSION=%VERSION% -t %DOCKER_REPO%:%VERSION% %cd%\dockerfile\
echo "docker build --build-arg VERSION=%VERSION% -t %DOCKER_REPO%:%VERSION% %cd%\docker\build\"
docker build --build-arg VERSION=%VERSION% -t %DOCKER_REPO%:%VERSION% %cd%\docker\build\
if "%errorlevel%"=="1" goto :dockerBuildFailed
echo "------ dolphinscheduler end - build -------"
@ -53,4 +53,4 @@ echo "------ dolphinscheduler end - build -------"
echo "MAVEN PACKAGE FAILED!"
:dockerBuildFailed
echo "DOCKER BUILD FAILED!"
echo "DOCKER BUILD FAILED!"

0
dockerfile/hooks/push → docker/build/hooks/push

0
dockerfile/hooks/push.bat → docker/build/hooks/push.bat

0
dockerfile/startup-init-conf.sh → docker/build/startup-init-conf.sh

53
dockerfile/startup.sh → docker/build/startup.sh

@ -24,31 +24,6 @@ DOLPHINSCHEDULER_LOGS=${DOLPHINSCHEDULER_HOME}/logs
# start postgresql
initPostgreSQL() {
echo "checking postgresql"
if [[ "${POSTGRESQL_HOST}" = "127.0.0.1" || "${POSTGRESQL_HOST}" = "localhost" ]]; then
export PGPORT=${POSTGRESQL_PORT}
echo "start postgresql service"
rc-service postgresql restart
# role if not exists, create
flag=$(sudo -u postgres psql -tAc "SELECT 1 FROM pg_roles WHERE rolname='${POSTGRESQL_USERNAME}'")
if [ -z "${flag}" ]; then
echo "create user"
sudo -u postgres psql -tAc "create user ${POSTGRESQL_USERNAME} with password '${POSTGRESQL_PASSWORD}'"
fi
# database if not exists, create
flag=$(sudo -u postgres psql -tAc "select 1 from pg_database where datname='dolphinscheduler'")
if [ -z "${flag}" ]; then
echo "init db"
sudo -u postgres psql -tAc "create database dolphinscheduler owner ${POSTGRESQL_USERNAME}"
fi
# grant
sudo -u postgres psql -tAc "grant all privileges on database dolphinscheduler to ${POSTGRESQL_USERNAME}"
fi
echo "test postgresql service"
while ! nc -z ${POSTGRESQL_HOST} ${POSTGRESQL_PORT}; do
counter=$((counter+1))
@ -73,24 +48,18 @@ initPostgreSQL() {
# start zk
initZK() {
echo -e "checking zookeeper"
if [[ "${ZOOKEEPER_QUORUM}" = "127.0.0.1:2181" || "${ZOOKEEPER_QUORUM}" = "localhost:2181" ]]; then
echo "start local zookeeper"
/opt/zookeeper/bin/zkServer.sh restart
else
echo "connect remote zookeeper"
echo "${ZOOKEEPER_QUORUM}" | awk -F ',' 'BEGIN{ i=1 }{ while( i <= NF ){ print $i; i++ } }' | while read line; do
while ! nc -z ${line%:*} ${line#*:}; do
counter=$((counter+1))
if [ $counter == 30 ]; then
echo "Error: Couldn't connect to zookeeper."
exit 1
fi
echo "Trying to connect to zookeeper at ${line}. Attempt $counter."
sleep 5
done
echo "connect remote zookeeper"
echo "${ZOOKEEPER_QUORUM}" | awk -F ',' 'BEGIN{ i=1 }{ while( i <= NF ){ print $i; i++ } }' | while read line; do
while ! nc -z ${line%:*} ${line#*:}; do
counter=$((counter+1))
if [ $counter == 30 ]; then
echo "Error: Couldn't connect to zookeeper."
exit 1
fi
echo "Trying to connect to zookeeper at ${line}. Attempt $counter."
sleep 5
done
fi
done
}
# start nginx

40
docker/docker-compose.yml

@ -1,40 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
version: '2'
services:
zookeeper:
image: zookeeper
restart: always
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOO_MY_ID: 1
ZOO_4LW_COMMANDS_WHITELIST: srvr,ruok,wchs,cons
db:
image: postgres
container_name: postgres
environment:
- POSTGRES_USER=test
- POSTGRES_PASSWORD=test
- POSTGRES_DB=dolphinscheduler
ports:
- "5432:5432"
volumes:
- pgdata:/var/lib/postgresql/data
- ./postgres/docker-entrypoint-initdb:/docker-entrypoint-initdb.d
volumes:
pgdata:

2
dockerfile/hooks/check → docker/docker-swarm/check

@ -17,7 +17,7 @@
#
echo "------ dolphinscheduler check - server - status -------"
sleep 60
server_num=$(docker top `docker container list | grep '/sbin/tini' | awk '{print $1}'`| grep java | grep "dolphinscheduler" | awk -F 'classpath ' '{print $2}' | awk '{print $2}' | sort | uniq -c | wc -l)
server_num=$(docker-compose -f $(pwd)/docker/docker-swarm/docker-compose.yml top | grep java | grep "dolphinscheduler" | awk -F 'classpath ' '{print $2}' | awk '{print $2}' | sort | uniq -c | wc -l)
if [ $server_num -eq 5 ]
then
echo "Server all start successfully"

47
docker/docker-swarm/docker-compose.yml

@ -30,6 +30,7 @@ services:
POSTGRESQL_DATABASE: dolphinscheduler
volumes:
- dolphinscheduler-postgresql:/bitnami/postgresql
- dolphinscheduler-postgresql-initdb:/docker-entrypoint-initdb.d
networks:
- dolphinscheduler
@ -41,13 +42,14 @@ services:
environment:
TZ: Asia/Shanghai
ALLOW_ANONYMOUS_LOGIN: "yes"
ZOO_4LW_COMMANDS_WHITELIST: srvr,ruok,wchs,cons
volumes:
- dolphinscheduler-zookeeper:/bitnami/zookeeper
networks:
- dolphinscheduler
dolphinscheduler-api:
image: registry.cn-qingdao.aliyuncs.com/sxyj/dolphinscheduler:dev
image: apache/dolphinscheduler:latest
container_name: dolphinscheduler-api
command: ["api-server"]
ports:
@ -70,12 +72,12 @@ services:
- dolphinscheduler-postgresql
- dolphinscheduler-zookeeper
volumes:
- dolphinscheduler-api:/opt/dolphinscheduler/logs
networks:
- dolphinscheduler-logs:/opt/dolphinscheduler/logs
networks:
- dolphinscheduler
dolphinscheduler-frontend:
image: registry.cn-qingdao.aliyuncs.com/sxyj/dolphinscheduler:dev
image: apache/dolphinscheduler:latest
container_name: dolphinscheduler-frontend
command: ["frontend"]
ports:
@ -93,12 +95,12 @@ services:
depends_on:
- dolphinscheduler-api
volumes:
- dolphinscheduler-frontend:/var/log/nginx
- dolphinscheduler-logs:/var/log/nginx
networks:
- dolphinscheduler
dolphinscheduler-alert:
image: registry.cn-qingdao.aliyuncs.com/sxyj/dolphinscheduler:dev
image: apache/dolphinscheduler:latest
container_name: dolphinscheduler-alert
command: ["alert-server"]
environment:
@ -130,18 +132,18 @@ services:
start_period: 30s
depends_on:
- dolphinscheduler-postgresql
volumes:
- dolphinscheduler-alert:/opt/dolphinscheduler/logs
networks:
volumes:
- dolphinscheduler-logs:/opt/dolphinscheduler/logs
networks:
- dolphinscheduler
dolphinscheduler-master:
image: registry.cn-qingdao.aliyuncs.com/sxyj/dolphinscheduler:dev
image: apache/dolphinscheduler:latest
container_name: dolphinscheduler-master
command: ["master-server"]
ports:
ports:
- 5678:5678
environment:
environment:
TZ: Asia/Shanghai
MASTER_EXEC_THREADS: "100"
MASTER_EXEC_TASK_NUM: "20"
@ -162,22 +164,22 @@ services:
timeout: 5s
retries: 3
start_period: 30s
depends_on:
depends_on:
- dolphinscheduler-postgresql
- dolphinscheduler-zookeeper
volumes:
- dolphinscheduler-master:/opt/dolphinscheduler/logs
volumes:
- dolphinscheduler-logs:/opt/dolphinscheduler/logs
networks:
- dolphinscheduler
dolphinscheduler-worker:
image: registry.cn-qingdao.aliyuncs.com/sxyj/dolphinscheduler:dev
image: apache/dolphinscheduler:latest
container_name: dolphinscheduler-worker
command: ["worker-server"]
ports:
ports:
- 1234:1234
- 50051:50051
environment:
environment:
TZ: Asia/Shanghai
WORKER_EXEC_THREADS: "100"
WORKER_HEARTBEAT_INTERVAL: "10"
@ -209,7 +211,7 @@ services:
source: dolphinscheduler-worker-data
target: /tmp/dolphinscheduler
- type: volume
source: dolphinscheduler-worker-logs
source: dolphinscheduler-logs
target: /opt/dolphinscheduler/logs
networks:
- dolphinscheduler
@ -220,13 +222,10 @@ networks:
volumes:
dolphinscheduler-postgresql:
dolphinscheduler-postgresql-initdb:
dolphinscheduler-zookeeper:
dolphinscheduler-api:
dolphinscheduler-frontend:
dolphinscheduler-alert:
dolphinscheduler-master:
dolphinscheduler-worker-data:
dolphinscheduler-worker-logs:
dolphinscheduler-logs:
configs:
dolphinscheduler-worker-task-env:

27
docker/docker-swarm/docker-stack.yml

@ -42,6 +42,7 @@ services:
environment:
TZ: Asia/Shanghai
ALLOW_ANONYMOUS_LOGIN: "yes"
ZOO_4LW_COMMANDS_WHITELIST: srvr,ruok,wchs,cons
volumes:
- dolphinscheduler-zookeeper:/bitnami/zookeeper
networks:
@ -51,7 +52,7 @@ services:
replicas: 1
dolphinscheduler-api:
image: registry.cn-qingdao.aliyuncs.com/sxyj/dolphinscheduler:dev
image: apache/dolphinscheduler:latest
command: ["api-server"]
ports:
- 12345:12345
@ -70,7 +71,7 @@ services:
retries: 3
start_period: 30s
volumes:
- dolphinscheduler-api:/opt/dolphinscheduler/logs
- dolphinscheduler-logs:/opt/dolphinscheduler/logs
networks:
- dolphinscheduler
deploy:
@ -78,7 +79,7 @@ services:
replicas: 1
dolphinscheduler-frontend:
image: registry.cn-qingdao.aliyuncs.com/sxyj/dolphinscheduler:dev
image: apache/dolphinscheduler:latest
command: ["frontend"]
ports:
- 8888:8888
@ -93,7 +94,7 @@ services:
retries: 3
start_period: 30s
volumes:
- dolphinscheduler-frontend:/var/log/nginx
- dolphinscheduler-logs:/var/log/nginx
networks:
- dolphinscheduler
deploy:
@ -101,7 +102,7 @@ services:
replicas: 1
dolphinscheduler-alert:
image: registry.cn-qingdao.aliyuncs.com/sxyj/dolphinscheduler:dev
image: apache/dolphinscheduler:latest
command: ["alert-server"]
environment:
TZ: Asia/Shanghai
@ -131,7 +132,7 @@ services:
retries: 3
start_period: 30s
volumes:
- dolphinscheduler-alert:/opt/dolphinscheduler/logs
- dolphinscheduler-logs:/opt/dolphinscheduler/logs
networks:
- dolphinscheduler
deploy:
@ -139,7 +140,7 @@ services:
replicas: 1
dolphinscheduler-master:
image: registry.cn-qingdao.aliyuncs.com/sxyj/dolphinscheduler:dev
image: apache/dolphinscheduler:latest
command: ["master-server"]
ports:
- 5678:5678
@ -165,7 +166,7 @@ services:
retries: 3
start_period: 30s
volumes:
- dolphinscheduler-master:/opt/dolphinscheduler/logs
- dolphinscheduler-logs:/opt/dolphinscheduler/logs
networks:
- dolphinscheduler
deploy:
@ -173,7 +174,7 @@ services:
replicas: 1
dolphinscheduler-worker:
image: registry.cn-qingdao.aliyuncs.com/sxyj/dolphinscheduler:dev
image: apache/dolphinscheduler:latest
command: ["worker-server"]
ports:
- 1234:1234
@ -201,7 +202,7 @@ services:
start_period: 30s
volumes:
- dolphinscheduler-worker-data:/tmp/dolphinscheduler
- dolphinscheduler-worker-logs:/opt/dolphinscheduler/logs
- dolphinscheduler-logs:/opt/dolphinscheduler/logs
configs:
- source: dolphinscheduler-worker-task-env
target: /opt/dolphinscheduler/conf/env/dolphinscheduler_env.sh
@ -218,12 +219,8 @@ networks:
volumes:
dolphinscheduler-postgresql:
dolphinscheduler-zookeeper:
dolphinscheduler-api:
dolphinscheduler-frontend:
dolphinscheduler-alert:
dolphinscheduler-master:
dolphinscheduler-worker-data:
dolphinscheduler-worker-logs:
dolphinscheduler-logs:
configs:
dolphinscheduler-worker-task-env:

6
docker/kubernetes/dolphinscheduler/values.yaml

@ -25,9 +25,9 @@ fullnameOverride: ""
timezone: "Asia/Shanghai"
image:
registry: "docker.io"
registry: "apache"
repository: "dolphinscheduler"
tag: "1.3.0"
tag: "latest"
pullPolicy: "IfNotPresent"
imagePullSecrets: []
@ -56,6 +56,8 @@ externalDatabase:
zookeeper:
enabled: true
taskQueue: "zookeeper"
config:
ZOO_4LW_COMMANDS_WHITELIST: srvr,ruok,wchs,cons
service:
port: "2181"
persistence:

762
docker/postgres/docker-entrypoint-initdb/init.sql

@ -1,762 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
DROP TABLE IF EXISTS QRTZ_FIRED_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_PAUSED_TRIGGER_GRPS;
DROP TABLE IF EXISTS QRTZ_SCHEDULER_STATE;
DROP TABLE IF EXISTS QRTZ_LOCKS;
DROP TABLE IF EXISTS QRTZ_SIMPLE_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_SIMPROP_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_CRON_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_BLOB_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_JOB_DETAILS;
DROP TABLE IF EXISTS QRTZ_CALENDARS;
CREATE TABLE QRTZ_JOB_DETAILS(
SCHED_NAME character varying(120) NOT NULL,
JOB_NAME character varying(200) NOT NULL,
JOB_GROUP character varying(200) NOT NULL,
DESCRIPTION character varying(250) NULL,
JOB_CLASS_NAME character varying(250) NOT NULL,
IS_DURABLE boolean NOT NULL,
IS_NONCONCURRENT boolean NOT NULL,
IS_UPDATE_DATA boolean NOT NULL,
REQUESTS_RECOVERY boolean NOT NULL,
JOB_DATA bytea NULL);
alter table QRTZ_JOB_DETAILS add primary key(SCHED_NAME,JOB_NAME,JOB_GROUP);
CREATE TABLE QRTZ_TRIGGERS (
SCHED_NAME character varying(120) NOT NULL,
TRIGGER_NAME character varying(200) NOT NULL,
TRIGGER_GROUP character varying(200) NOT NULL,
JOB_NAME character varying(200) NOT NULL,
JOB_GROUP character varying(200) NOT NULL,
DESCRIPTION character varying(250) NULL,
NEXT_FIRE_TIME BIGINT NULL,
PREV_FIRE_TIME BIGINT NULL,
PRIORITY INTEGER NULL,
TRIGGER_STATE character varying(16) NOT NULL,
TRIGGER_TYPE character varying(8) NOT NULL,
START_TIME BIGINT NOT NULL,
END_TIME BIGINT NULL,
CALENDAR_NAME character varying(200) NULL,
MISFIRE_INSTR SMALLINT NULL,
JOB_DATA bytea NULL) ;
alter table QRTZ_TRIGGERS add primary key(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP);
CREATE TABLE QRTZ_SIMPLE_TRIGGERS (
SCHED_NAME character varying(120) NOT NULL,
TRIGGER_NAME character varying(200) NOT NULL,
TRIGGER_GROUP character varying(200) NOT NULL,
REPEAT_COUNT BIGINT NOT NULL,
REPEAT_INTERVAL BIGINT NOT NULL,
TIMES_TRIGGERED BIGINT NOT NULL) ;
alter table QRTZ_SIMPLE_TRIGGERS add primary key(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP);
CREATE TABLE QRTZ_CRON_TRIGGERS (
SCHED_NAME character varying(120) NOT NULL,
TRIGGER_NAME character varying(200) NOT NULL,
TRIGGER_GROUP character varying(200) NOT NULL,
CRON_EXPRESSION character varying(120) NOT NULL,
TIME_ZONE_ID character varying(80)) ;
alter table QRTZ_CRON_TRIGGERS add primary key(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP);
CREATE TABLE QRTZ_SIMPROP_TRIGGERS
(
SCHED_NAME character varying(120) NOT NULL,
TRIGGER_NAME character varying(200) NOT NULL,
TRIGGER_GROUP character varying(200) NOT NULL,
STR_PROP_1 character varying(512) NULL,
STR_PROP_2 character varying(512) NULL,
STR_PROP_3 character varying(512) NULL,
INT_PROP_1 INT NULL,
INT_PROP_2 INT NULL,
LONG_PROP_1 BIGINT NULL,
LONG_PROP_2 BIGINT NULL,
DEC_PROP_1 NUMERIC(13,4) NULL,
DEC_PROP_2 NUMERIC(13,4) NULL,
BOOL_PROP_1 boolean NULL,
BOOL_PROP_2 boolean NULL) ;
alter table QRTZ_SIMPROP_TRIGGERS add primary key(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP);
CREATE TABLE QRTZ_BLOB_TRIGGERS (
SCHED_NAME character varying(120) NOT NULL,
TRIGGER_NAME character varying(200) NOT NULL,
TRIGGER_GROUP character varying(200) NOT NULL,
BLOB_DATA bytea NULL) ;
alter table QRTZ_BLOB_TRIGGERS add primary key(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP);
CREATE TABLE QRTZ_CALENDARS (
SCHED_NAME character varying(120) NOT NULL,
CALENDAR_NAME character varying(200) NOT NULL,
CALENDAR bytea NOT NULL) ;
alter table QRTZ_CALENDARS add primary key(SCHED_NAME,CALENDAR_NAME);
CREATE TABLE QRTZ_PAUSED_TRIGGER_GRPS (
SCHED_NAME character varying(120) NOT NULL,
TRIGGER_GROUP character varying(200) NOT NULL) ;
alter table QRTZ_PAUSED_TRIGGER_GRPS add primary key(SCHED_NAME,TRIGGER_GROUP);
CREATE TABLE QRTZ_FIRED_TRIGGERS (
SCHED_NAME character varying(120) NOT NULL,
ENTRY_ID character varying(95) NOT NULL,
TRIGGER_NAME character varying(200) NOT NULL,
TRIGGER_GROUP character varying(200) NOT NULL,
INSTANCE_NAME character varying(200) NOT NULL,
FIRED_TIME BIGINT NOT NULL,
SCHED_TIME BIGINT NOT NULL,
PRIORITY INTEGER NOT NULL,
STATE character varying(16) NOT NULL,
JOB_NAME character varying(200) NULL,
JOB_GROUP character varying(200) NULL,
IS_NONCONCURRENT boolean NULL,
REQUESTS_RECOVERY boolean NULL) ;
alter table QRTZ_FIRED_TRIGGERS add primary key(SCHED_NAME,ENTRY_ID);
CREATE TABLE QRTZ_SCHEDULER_STATE (
SCHED_NAME character varying(120) NOT NULL,
INSTANCE_NAME character varying(200) NOT NULL,
LAST_CHECKIN_TIME BIGINT NOT NULL,
CHECKIN_INTERVAL BIGINT NOT NULL) ;
alter table QRTZ_SCHEDULER_STATE add primary key(SCHED_NAME,INSTANCE_NAME);
CREATE TABLE QRTZ_LOCKS (
SCHED_NAME character varying(120) NOT NULL,
LOCK_NAME character varying(40) NOT NULL) ;
alter table QRTZ_LOCKS add primary key(SCHED_NAME,LOCK_NAME);
CREATE INDEX IDX_QRTZ_J_REQ_RECOVERY ON QRTZ_JOB_DETAILS(SCHED_NAME,REQUESTS_RECOVERY);
CREATE INDEX IDX_QRTZ_J_GRP ON QRTZ_JOB_DETAILS(SCHED_NAME,JOB_GROUP);
CREATE INDEX IDX_QRTZ_T_J ON QRTZ_TRIGGERS(SCHED_NAME,JOB_NAME,JOB_GROUP);
CREATE INDEX IDX_QRTZ_T_JG ON QRTZ_TRIGGERS(SCHED_NAME,JOB_GROUP);
CREATE INDEX IDX_QRTZ_T_C ON QRTZ_TRIGGERS(SCHED_NAME,CALENDAR_NAME);
CREATE INDEX IDX_QRTZ_T_G ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_GROUP);
CREATE INDEX IDX_QRTZ_T_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_STATE);
CREATE INDEX IDX_QRTZ_T_N_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP,TRIGGER_STATE);
CREATE INDEX IDX_QRTZ_T_N_G_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_GROUP,TRIGGER_STATE);
CREATE INDEX IDX_QRTZ_T_NEXT_FIRE_TIME ON QRTZ_TRIGGERS(SCHED_NAME,NEXT_FIRE_TIME);
CREATE INDEX IDX_QRTZ_T_NFT_ST ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_STATE,NEXT_FIRE_TIME);
CREATE INDEX IDX_QRTZ_T_NFT_MISFIRE ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME);
CREATE INDEX IDX_QRTZ_T_NFT_ST_MISFIRE ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME,TRIGGER_STATE);
CREATE INDEX IDX_QRTZ_T_NFT_ST_MISFIRE_GRP ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME,TRIGGER_GROUP,TRIGGER_STATE);
CREATE INDEX IDX_QRTZ_FT_TRIG_INST_NAME ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,INSTANCE_NAME);
CREATE INDEX IDX_QRTZ_FT_INST_JOB_REQ_RCVRY ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,INSTANCE_NAME,REQUESTS_RECOVERY);
CREATE INDEX IDX_QRTZ_FT_J_G ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,JOB_NAME,JOB_GROUP);
CREATE INDEX IDX_QRTZ_FT_JG ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,JOB_GROUP);
CREATE INDEX IDX_QRTZ_FT_T_G ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP);
CREATE INDEX IDX_QRTZ_FT_TG ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,TRIGGER_GROUP);
--
-- Table structure for table t_ds_access_token
--
DROP TABLE IF EXISTS t_ds_access_token;
CREATE TABLE t_ds_access_token (
id int NOT NULL ,
user_id int DEFAULT NULL ,
token varchar(64) DEFAULT NULL ,
expire_time timestamp DEFAULT NULL ,
create_time timestamp DEFAULT NULL ,
update_time timestamp DEFAULT NULL ,
PRIMARY KEY (id)
) ;
--
-- Table structure for table t_ds_alert
--
DROP TABLE IF EXISTS t_ds_alert;
CREATE TABLE t_ds_alert (
id int NOT NULL ,
title varchar(64) DEFAULT NULL ,
show_type int DEFAULT NULL ,
content text ,
alert_type int DEFAULT NULL ,
alert_status int DEFAULT '0' ,
·log· text ,
alertgroup_id int DEFAULT NULL ,
receivers text ,
receivers_cc text ,
create_time timestamp DEFAULT NULL ,
update_time timestamp DEFAULT NULL ,
PRIMARY KEY (id)
) ;
--
-- Table structure for table t_ds_alertgroup
--
DROP TABLE IF EXISTS t_ds_alertgroup;
CREATE TABLE t_ds_alertgroup (
id int NOT NULL ,
group_name varchar(255) DEFAULT NULL ,
group_type int DEFAULT NULL ,
description varchar(255) DEFAULT NULL ,
create_time timestamp DEFAULT NULL ,
update_time timestamp DEFAULT NULL ,
PRIMARY KEY (id)
) ;
--
-- Table structure for table t_ds_command
--
DROP TABLE IF EXISTS t_ds_command;
CREATE TABLE t_ds_command (
id int NOT NULL ,
command_type int DEFAULT NULL ,
process_definition_id int DEFAULT NULL ,
command_param text ,
task_depend_type int DEFAULT NULL ,
failure_strategy int DEFAULT '0' ,
warning_type int DEFAULT '0' ,
warning_group_id int DEFAULT NULL ,
schedule_time timestamp DEFAULT NULL ,
start_time timestamp DEFAULT NULL ,
executor_id int DEFAULT NULL ,
dependence varchar(255) DEFAULT NULL ,
update_time timestamp DEFAULT NULL ,
process_instance_priority int DEFAULT NULL ,
worker_group varchar(64),
PRIMARY KEY (id)
) ;
--
-- Table structure for table t_ds_datasource
--
DROP TABLE IF EXISTS t_ds_datasource;
CREATE TABLE t_ds_datasource (
id int NOT NULL ,
name varchar(64) NOT NULL ,
note varchar(256) DEFAULT NULL ,
type int NOT NULL ,
user_id int NOT NULL ,
connection_params text NOT NULL ,
create_time timestamp NOT NULL ,
update_time timestamp DEFAULT NULL ,
PRIMARY KEY (id)
) ;
--
-- Table structure for table t_ds_error_command
--
DROP TABLE IF EXISTS t_ds_error_command;
CREATE TABLE t_ds_error_command (
id int NOT NULL ,
command_type int DEFAULT NULL ,
executor_id int DEFAULT NULL ,
process_definition_id int DEFAULT NULL ,
command_param text ,
task_depend_type int DEFAULT NULL ,
failure_strategy int DEFAULT '0' ,
warning_type int DEFAULT '0' ,
warning_group_id int DEFAULT NULL ,
schedule_time timestamp DEFAULT NULL ,
start_time timestamp DEFAULT NULL ,
update_time timestamp DEFAULT NULL ,
dependence text ,
process_instance_priority int DEFAULT NULL ,
worker_group varchar(64),
message text ,
PRIMARY KEY (id)
);
--
-- Table structure for table t_ds_master_server
--
--
-- Table structure for table t_ds_process_definition
--
DROP TABLE IF EXISTS t_ds_process_definition;
CREATE TABLE t_ds_process_definition (
id int NOT NULL ,
name varchar(255) DEFAULT NULL ,
version int DEFAULT NULL ,
release_state int DEFAULT NULL ,
project_id int DEFAULT NULL ,
user_id int DEFAULT NULL ,
process_definition_json text ,
description text ,
global_params text ,
flag int DEFAULT NULL ,
locations text ,
connects text ,
receivers text ,
receivers_cc text ,
create_time timestamp DEFAULT NULL ,
timeout int DEFAULT '0' ,
tenant_id int NOT NULL DEFAULT '-1' ,
update_time timestamp DEFAULT NULL ,
modify_by varchar(36) DEFAULT '' ,
resource_ids varchar(64),
PRIMARY KEY (id)
) ;
create index process_definition_index on t_ds_process_definition (project_id,id);
--
-- Table structure for table t_ds_process_instance
--
DROP TABLE IF EXISTS t_ds_process_instance;
CREATE TABLE t_ds_process_instance (
id int NOT NULL ,
name varchar(255) DEFAULT NULL ,
process_definition_id int DEFAULT NULL ,
state int DEFAULT NULL ,
recovery int DEFAULT NULL ,
start_time timestamp DEFAULT NULL ,
end_time timestamp DEFAULT NULL ,
run_times int DEFAULT NULL ,
host varchar(45) DEFAULT NULL ,
command_type int DEFAULT NULL ,
command_param text ,
task_depend_type int DEFAULT NULL ,
max_try_times int DEFAULT '0' ,
failure_strategy int DEFAULT '0' ,
warning_type int DEFAULT '0' ,
warning_group_id int DEFAULT NULL ,
schedule_time timestamp DEFAULT NULL ,
command_start_time timestamp DEFAULT NULL ,
global_params text ,
process_instance_json text ,
flag int DEFAULT '1' ,
update_time timestamp NULL ,
is_sub_process int DEFAULT '0' ,
executor_id int NOT NULL ,
locations text ,
connects text ,
history_cmd text ,
dependence_schedule_times text ,
process_instance_priority int DEFAULT NULL ,
worker_group varchar(64) ,
timeout int DEFAULT '0' ,
tenant_id int NOT NULL DEFAULT '-1' ,
PRIMARY KEY (id)
) ;
create index process_instance_index on t_ds_process_instance (process_definition_id,id);
create index start_time_index on t_ds_process_instance (start_time);
--
-- Table structure for table t_ds_project
--
DROP TABLE IF EXISTS t_ds_project;
CREATE TABLE t_ds_project (
id int NOT NULL ,
name varchar(100) DEFAULT NULL ,
description varchar(200) DEFAULT NULL ,
user_id int DEFAULT NULL ,
flag int DEFAULT '1' ,
create_time timestamp DEFAULT CURRENT_TIMESTAMP ,
update_time timestamp DEFAULT CURRENT_TIMESTAMP ,
PRIMARY KEY (id)
) ;
create index user_id_index on t_ds_project (user_id);
--
-- Table structure for table t_ds_queue
--
DROP TABLE IF EXISTS t_ds_queue;
CREATE TABLE t_ds_queue (
id int NOT NULL ,
queue_name varchar(64) DEFAULT NULL ,
queue varchar(64) DEFAULT NULL ,
create_time timestamp DEFAULT NULL ,
update_time timestamp DEFAULT NULL ,
PRIMARY KEY (id)
);
--
-- Table structure for table t_ds_relation_datasource_user
--
DROP TABLE IF EXISTS t_ds_relation_datasource_user;
CREATE TABLE t_ds_relation_datasource_user (
id int NOT NULL ,
user_id int NOT NULL ,
datasource_id int DEFAULT NULL ,
perm int DEFAULT '1' ,
create_time timestamp DEFAULT NULL ,
update_time timestamp DEFAULT NULL ,
PRIMARY KEY (id)
) ;
;
--
-- Table structure for table t_ds_relation_process_instance
--
DROP TABLE IF EXISTS t_ds_relation_process_instance;
CREATE TABLE t_ds_relation_process_instance (
id int NOT NULL ,
parent_process_instance_id int DEFAULT NULL ,
parent_task_instance_id int DEFAULT NULL ,
process_instance_id int DEFAULT NULL ,
PRIMARY KEY (id)
) ;
--
-- Table structure for table t_ds_relation_project_user
--
DROP TABLE IF EXISTS t_ds_relation_project_user;
CREATE TABLE t_ds_relation_project_user (
id int NOT NULL ,
user_id int NOT NULL ,
project_id int DEFAULT NULL ,
perm int DEFAULT '1' ,
create_time timestamp DEFAULT NULL ,
update_time timestamp DEFAULT NULL ,
PRIMARY KEY (id)
) ;
create index relation_project_user_id_index on t_ds_relation_project_user (user_id);
--
-- Table structure for table t_ds_relation_resources_user
--
DROP TABLE IF EXISTS t_ds_relation_resources_user;
CREATE TABLE t_ds_relation_resources_user (
id int NOT NULL ,
user_id int NOT NULL ,
resources_id int DEFAULT NULL ,
perm int DEFAULT '1' ,
create_time timestamp DEFAULT NULL ,
update_time timestamp DEFAULT NULL ,
PRIMARY KEY (id)
) ;
--
-- Table structure for table t_ds_relation_udfs_user
--
DROP TABLE IF EXISTS t_ds_relation_udfs_user;
CREATE TABLE t_ds_relation_udfs_user (
id int NOT NULL ,
user_id int NOT NULL ,
udf_id int DEFAULT NULL ,
perm int DEFAULT '1' ,
create_time timestamp DEFAULT NULL ,
update_time timestamp DEFAULT NULL ,
PRIMARY KEY (id)
) ;
;
--
-- Table structure for table t_ds_relation_user_alertgroup
--
DROP TABLE IF EXISTS t_ds_relation_user_alertgroup;
CREATE TABLE t_ds_relation_user_alertgroup (
id int NOT NULL,
alertgroup_id int DEFAULT NULL,
user_id int DEFAULT NULL,
create_time timestamp DEFAULT NULL,
update_time timestamp DEFAULT NULL,
PRIMARY KEY (id)
);
--
-- Table structure for table t_ds_resources
--
DROP TABLE IF EXISTS t_ds_resources;
CREATE TABLE t_ds_resources (
id int NOT NULL ,
alias varchar(64) DEFAULT NULL ,
file_name varchar(64) DEFAULT NULL ,
description varchar(256) DEFAULT NULL ,
user_id int DEFAULT NULL ,
type int DEFAULT NULL ,
size bigint DEFAULT NULL ,
create_time timestamp DEFAULT NULL ,
update_time timestamp DEFAULT NULL ,
pid int,
full_name varchar(64),
is_directory int,
PRIMARY KEY (id)
) ;
--
-- Table structure for table t_ds_schedules
--
DROP TABLE IF EXISTS t_ds_schedules;
CREATE TABLE t_ds_schedules (
id int NOT NULL ,
process_definition_id int NOT NULL ,
start_time timestamp NOT NULL ,
end_time timestamp NOT NULL ,
crontab varchar(256) NOT NULL ,
failure_strategy int NOT NULL ,
user_id int NOT NULL ,
release_state int NOT NULL ,
warning_type int NOT NULL ,
warning_group_id int DEFAULT NULL ,
process_instance_priority int DEFAULT NULL ,
worker_group varchar(64),
create_time timestamp NOT NULL ,
update_time timestamp NOT NULL ,
PRIMARY KEY (id)
);
--
-- Table structure for table t_ds_session
--
DROP TABLE IF EXISTS t_ds_session;
CREATE TABLE t_ds_session (
id varchar(64) NOT NULL ,
user_id int DEFAULT NULL ,
ip varchar(45) DEFAULT NULL ,
last_login_time timestamp DEFAULT NULL ,
PRIMARY KEY (id)
);
--
-- Table structure for table t_ds_task_instance
--
DROP TABLE IF EXISTS t_ds_task_instance;
CREATE TABLE t_ds_task_instance (
id int NOT NULL ,
name varchar(255) DEFAULT NULL ,
task_type varchar(64) DEFAULT NULL ,
process_definition_id int DEFAULT NULL ,
process_instance_id int DEFAULT NULL ,
task_json text ,
state int DEFAULT NULL ,
submit_time timestamp DEFAULT NULL ,
start_time timestamp DEFAULT NULL ,
end_time timestamp DEFAULT NULL ,
host varchar(45) DEFAULT NULL ,
execute_path varchar(200) DEFAULT NULL ,
log_path varchar(200) DEFAULT NULL ,
alert_flag int DEFAULT NULL ,
retry_times int DEFAULT '0' ,
pid int DEFAULT NULL ,
app_link varchar(255) DEFAULT NULL ,
flag int DEFAULT '1' ,
retry_interval int DEFAULT NULL ,
max_retry_times int DEFAULT NULL ,
task_instance_priority int DEFAULT NULL ,
worker_group varchar(64),
executor_id int DEFAULT NULL ,
PRIMARY KEY (id)
) ;
--
-- Table structure for table t_ds_tenant
--
DROP TABLE IF EXISTS t_ds_tenant;
CREATE TABLE t_ds_tenant (
id int NOT NULL ,
tenant_code varchar(64) DEFAULT NULL ,
tenant_name varchar(64) DEFAULT NULL ,
description varchar(256) DEFAULT NULL ,
queue_id int DEFAULT NULL ,
create_time timestamp DEFAULT NULL ,
update_time timestamp DEFAULT NULL ,
PRIMARY KEY (id)
) ;
--
-- Table structure for table t_ds_udfs
--
DROP TABLE IF EXISTS t_ds_udfs;
CREATE TABLE t_ds_udfs (
id int NOT NULL ,
user_id int NOT NULL ,
func_name varchar(100) NOT NULL ,
class_name varchar(255) NOT NULL ,
type int NOT NULL ,
arg_types varchar(255) DEFAULT NULL ,
database varchar(255) DEFAULT NULL ,
description varchar(255) DEFAULT NULL ,
resource_id int NOT NULL ,
resource_name varchar(255) NOT NULL ,
create_time timestamp NOT NULL ,
update_time timestamp NOT NULL ,
PRIMARY KEY (id)
) ;
--
-- Table structure for table t_ds_user
--
DROP TABLE IF EXISTS t_ds_user;
CREATE TABLE t_ds_user (
id int NOT NULL ,
user_name varchar(64) DEFAULT NULL ,
user_password varchar(64) DEFAULT NULL ,
user_type int DEFAULT NULL ,
email varchar(64) DEFAULT NULL ,
phone varchar(11) DEFAULT NULL ,
tenant_id int DEFAULT NULL ,
create_time timestamp DEFAULT NULL ,
update_time timestamp DEFAULT NULL ,
queue varchar(64) DEFAULT NULL ,
PRIMARY KEY (id)
);
--
-- Table structure for table t_ds_version
--
DROP TABLE IF EXISTS t_ds_version;
CREATE TABLE t_ds_version (
id int NOT NULL ,
version varchar(200) NOT NULL,
PRIMARY KEY (id)
) ;
create index version_index on t_ds_version(version);
--
-- Table structure for table t_ds_worker_group
--
DROP TABLE IF EXISTS t_ds_worker_group;
CREATE TABLE t_ds_worker_group (
id bigint NOT NULL ,
name varchar(256) DEFAULT NULL ,
ip_list varchar(256) DEFAULT NULL ,
create_time timestamp DEFAULT NULL ,
update_time timestamp DEFAULT NULL ,
PRIMARY KEY (id)
) ;
--
-- Table structure for table t_ds_worker_server
--
DROP TABLE IF EXISTS t_ds_worker_server;
CREATE TABLE t_ds_worker_server (
id int NOT NULL ,
host varchar(45) DEFAULT NULL ,
port int DEFAULT NULL ,
zk_directory varchar(64) DEFAULT NULL ,
res_info varchar(255) DEFAULT NULL ,
create_time timestamp DEFAULT NULL ,
last_heartbeat_time timestamp DEFAULT NULL ,
PRIMARY KEY (id)
) ;
DROP SEQUENCE IF EXISTS t_ds_access_token_id_sequence;
CREATE SEQUENCE t_ds_access_token_id_sequence;
ALTER TABLE t_ds_access_token ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_access_token_id_sequence');
DROP SEQUENCE IF EXISTS t_ds_alert_id_sequence;
CREATE SEQUENCE t_ds_alert_id_sequence;
ALTER TABLE t_ds_alert ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_alert_id_sequence');
DROP SEQUENCE IF EXISTS t_ds_alertgroup_id_sequence;
CREATE SEQUENCE t_ds_alertgroup_id_sequence;
ALTER TABLE t_ds_alertgroup ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_alertgroup_id_sequence');
DROP SEQUENCE IF EXISTS t_ds_command_id_sequence;
CREATE SEQUENCE t_ds_command_id_sequence;
ALTER TABLE t_ds_command ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_command_id_sequence');
DROP SEQUENCE IF EXISTS t_ds_datasource_id_sequence;
CREATE SEQUENCE t_ds_datasource_id_sequence;
ALTER TABLE t_ds_datasource ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_datasource_id_sequence');
DROP SEQUENCE IF EXISTS t_ds_process_definition_id_sequence;
CREATE SEQUENCE t_ds_process_definition_id_sequence;
ALTER TABLE t_ds_process_definition ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_process_definition_id_sequence');
DROP SEQUENCE IF EXISTS t_ds_process_instance_id_sequence;
CREATE SEQUENCE t_ds_process_instance_id_sequence;
ALTER TABLE t_ds_process_instance ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_process_instance_id_sequence');
DROP SEQUENCE IF EXISTS t_ds_project_id_sequence;
CREATE SEQUENCE t_ds_project_id_sequence;
ALTER TABLE t_ds_project ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_project_id_sequence');
DROP SEQUENCE IF EXISTS t_ds_queue_id_sequence;
CREATE SEQUENCE t_ds_queue_id_sequence;
ALTER TABLE t_ds_queue ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_queue_id_sequence');
DROP SEQUENCE IF EXISTS t_ds_relation_datasource_user_id_sequence;
CREATE SEQUENCE t_ds_relation_datasource_user_id_sequence;
ALTER TABLE t_ds_relation_datasource_user ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_relation_datasource_user_id_sequence');
DROP SEQUENCE IF EXISTS t_ds_relation_process_instance_id_sequence;
CREATE SEQUENCE t_ds_relation_process_instance_id_sequence;
ALTER TABLE t_ds_relation_process_instance ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_relation_process_instance_id_sequence');
DROP SEQUENCE IF EXISTS t_ds_relation_project_user_id_sequence;
CREATE SEQUENCE t_ds_relation_project_user_id_sequence;
ALTER TABLE t_ds_relation_project_user ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_relation_project_user_id_sequence');
DROP SEQUENCE IF EXISTS t_ds_relation_resources_user_id_sequence;
CREATE SEQUENCE t_ds_relation_resources_user_id_sequence;
ALTER TABLE t_ds_relation_resources_user ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_relation_resources_user_id_sequence');
DROP SEQUENCE IF EXISTS t_ds_relation_udfs_user_id_sequence;
CREATE SEQUENCE t_ds_relation_udfs_user_id_sequence;
ALTER TABLE t_ds_relation_udfs_user ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_relation_udfs_user_id_sequence');
DROP SEQUENCE IF EXISTS t_ds_relation_user_alertgroup_id_sequence;
CREATE SEQUENCE t_ds_relation_user_alertgroup_id_sequence;
ALTER TABLE t_ds_relation_user_alertgroup ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_relation_user_alertgroup_id_sequence');
DROP SEQUENCE IF EXISTS t_ds_resources_id_sequence;
CREATE SEQUENCE t_ds_resources_id_sequence;
ALTER TABLE t_ds_resources ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_resources_id_sequence');
DROP SEQUENCE IF EXISTS t_ds_schedules_id_sequence;
CREATE SEQUENCE t_ds_schedules_id_sequence;
ALTER TABLE t_ds_schedules ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_schedules_id_sequence');
DROP SEQUENCE IF EXISTS t_ds_task_instance_id_sequence;
CREATE SEQUENCE t_ds_task_instance_id_sequence;
ALTER TABLE t_ds_task_instance ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_task_instance_id_sequence');
DROP SEQUENCE IF EXISTS t_ds_tenant_id_sequence;
CREATE SEQUENCE t_ds_tenant_id_sequence;
ALTER TABLE t_ds_tenant ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_tenant_id_sequence');
DROP SEQUENCE IF EXISTS t_ds_udfs_id_sequence;
CREATE SEQUENCE t_ds_udfs_id_sequence;
ALTER TABLE t_ds_udfs ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_udfs_id_sequence');
DROP SEQUENCE IF EXISTS t_ds_user_id_sequence;
CREATE SEQUENCE t_ds_user_id_sequence;
ALTER TABLE t_ds_user ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_user_id_sequence');
DROP SEQUENCE IF EXISTS t_ds_version_id_sequence;
CREATE SEQUENCE t_ds_version_id_sequence;
ALTER TABLE t_ds_version ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_version_id_sequence');
DROP SEQUENCE IF EXISTS t_ds_worker_group_id_sequence;
CREATE SEQUENCE t_ds_worker_group_id_sequence;
ALTER TABLE t_ds_worker_group ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_worker_group_id_sequence');
DROP SEQUENCE IF EXISTS t_ds_worker_server_id_sequence;
CREATE SEQUENCE t_ds_worker_server_id_sequence;
ALTER TABLE t_ds_worker_server ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_worker_server_id_sequence');
-- Records of t_ds_user?user : admin , password : dolphinscheduler123
INSERT INTO t_ds_user(user_name,user_password,user_type,email,phone,tenant_id,create_time,update_time) VALUES ('admin', '7ad2410b2f4c074479a8937a28a22b8f', '0', 'xxx@qq.com', 'xx', '0', '2018-03-27 15:48:50', '2018-10-24 17:40:22');
-- Records of t_ds_alertgroup,dolphinscheduler warning group
INSERT INTO t_ds_alertgroup(group_name,group_type,description,create_time,update_time) VALUES ('dolphinscheduler warning group', '0', 'dolphinscheduler warning group','2018-11-29 10:20:39', '2018-11-29 10:20:39');
INSERT INTO t_ds_relation_user_alertgroup(alertgroup_id,user_id,create_time,update_time) VALUES ( '1', '1', '2018-11-29 10:22:33', '2018-11-29 10:22:33');
-- Records of t_ds_queue,default queue name : default
INSERT INTO t_ds_queue(queue_name,queue,create_time,update_time) VALUES ('default', 'default','2018-11-29 10:22:33', '2018-11-29 10:22:33');
-- Records of t_ds_queue,default queue name : default
INSERT INTO t_ds_version(version) VALUES ('2.0.0');

53
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/manager/DingTalkManager.java

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.alert.manager;
import org.apache.dolphinscheduler.alert.utils.Constants;
import org.apache.dolphinscheduler.alert.utils.DingTalkUtils;
import org.apache.dolphinscheduler.plugin.model.AlertInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
* Ding Talk Manager
*/
public class DingTalkManager {
private static final Logger logger = LoggerFactory.getLogger(EnterpriseWeChatManager.class);
public Map<String,Object> send(AlertInfo alert) {
Map<String,Object> retMap = new HashMap<>();
retMap.put(Constants.STATUS, false);
logger.info("send message {}", alert.getAlertData().getTitle());
try {
String msg = buildMessage(alert);
DingTalkUtils.sendDingTalkMsg(msg, Constants.UTF_8);
} catch (IOException e) {
logger.error(e.getMessage(),e);
}
retMap.put(Constants.STATUS, true);
return retMap;
}
private String buildMessage(AlertInfo alert) {
String msg = alert.getAlertData().getContent();
return msg;
}
}

8
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/plugin/EmailAlertPlugin.java

@ -16,9 +16,11 @@
*/
package org.apache.dolphinscheduler.alert.plugin;
import org.apache.dolphinscheduler.alert.manager.DingTalkManager;
import org.apache.dolphinscheduler.alert.manager.EmailManager;
import org.apache.dolphinscheduler.alert.manager.EnterpriseWeChatManager;
import org.apache.dolphinscheduler.alert.utils.Constants;
import org.apache.dolphinscheduler.alert.utils.DingTalkUtils;
import org.apache.dolphinscheduler.alert.utils.EnterpriseWeChatUtils;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
@ -44,6 +46,7 @@ public class EmailAlertPlugin implements AlertPlugin {
private static final EmailManager emailManager = new EmailManager();
private static final EnterpriseWeChatManager weChatManager = new EnterpriseWeChatManager();
private static final DingTalkManager dingTalkManager = new DingTalkManager();
public EmailAlertPlugin() {
this.pluginName = new PluginName();
@ -121,6 +124,11 @@ public class EmailAlertPlugin implements AlertPlugin {
logger.error(e.getMessage(), e);
}
}
if (DingTalkUtils.isEnableDingTalk) {
logger.info("Ding Talk is enable.");
dingTalkManager.send(info);
}
} else {
retMaps.put(Constants.MESSAGE, "alert send error.");

18
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/Constants.java

@ -156,6 +156,23 @@ public class Constants {
public static final String ENTERPRISE_WECHAT_AGENT_ID = "enterprise.wechat.agent.id";
public static final String ENTERPRISE_WECHAT_USERS = "enterprise.wechat.users";
public static final String DINGTALK_WEBHOOK = "dingtalk.webhook";
public static final String DINGTALK_KEYWORD = "dingtalk.keyword";
public static final String DINGTALK_PROXY_ENABLE = "dingtalk.isEnableProxy";
public static final String DINGTALK_PROXY = "dingtalk.proxy";
public static final String DINGTALK_PORT = "dingtalk.port";
public static final String DINGTALK_USER = "dingtalk.user";
public static final String DINGTALK_PASSWORD = "dingtalk.password";
public static final String DINGTALK_ENABLE = "dingtalk.isEnable";
/**
* plugin config
@ -173,4 +190,5 @@ public class Constants {
public static final String PLUGIN_DEFAULT_EMAIL_RECEIVERCCS = "receiverCcs";
public static final String RETMAP_MSG = "msg";
}

136
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/DingTalkUtils.java

@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.alert.utils;
import com.alibaba.fastjson.JSON;
import org.apache.commons.codec.binary.StringUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
* DingTalkUtils utils
* support send msg to ding talk by robot message push function.
* support proxy setting
*/
public class DingTalkUtils {
public static final Logger logger = LoggerFactory.getLogger(DingTalkUtils.class);
public static final boolean isEnableDingTalk = PropertyUtils.getBoolean(Constants.DINGTALK_ENABLE);
private static final String dingTaskUrl = PropertyUtils.getString(Constants.DINGTALK_WEBHOOK);
private static final String keyword = PropertyUtils.getString(Constants.DINGTALK_KEYWORD);
private static final Boolean isEnableProxy = PropertyUtils.getBoolean(Constants.DINGTALK_PROXY_ENABLE);
private static final String proxy = PropertyUtils.getString(Constants.DINGTALK_PROXY);
private static final String user = PropertyUtils.getString(Constants.DINGTALK_USER);
private static final String passwd = PropertyUtils.getString(Constants.DINGTALK_PASSWORD);
private static final Integer port = PropertyUtils.getInt(Constants.DINGTALK_PORT);
/**
* send message interface
* only support text message format now.
* @param msg message context to send
* @param charset charset type
* @return result of sending msg
* @throws IOException the IOException
*/
public static String sendDingTalkMsg(String msg, String charset) throws IOException {
String msgToJson = textToJsonString(msg + "#" + keyword);
HttpPost httpPost = constructHttpPost(msgToJson, charset);
CloseableHttpClient httpClient;
if (isEnableProxy) {
httpClient = getProxyClient();
RequestConfig rcf = getProxyConfig();
httpPost.setConfig(rcf);
} else {
httpClient = getDefaultClient();
}
try {
CloseableHttpResponse response = httpClient.execute(httpPost);
String resp;
try {
HttpEntity entity = response.getEntity();
resp = EntityUtils.toString(entity, charset);
EntityUtils.consume(entity);
} finally {
response.close();
}
logger.info("Ding Talk send [{}], resp:{%s}", msg, resp);
return resp;
} finally {
httpClient.close();
}
}
public static HttpPost constructHttpPost(String msg, String charset) {
HttpPost post = new HttpPost(dingTaskUrl);
StringEntity entity = new StringEntity(msg, charset);
post.setEntity(entity);
post.addHeader("Content-Type", "application/json; charset=utf-8");
return post;
}
public static CloseableHttpClient getProxyClient() {
HttpHost httpProxy = new HttpHost(proxy, port);
CredentialsProvider provider = new BasicCredentialsProvider();
provider.setCredentials(new AuthScope(httpProxy), new UsernamePasswordCredentials(user, passwd));
CloseableHttpClient httpClient = HttpClients.custom().setDefaultCredentialsProvider(provider).build();
return httpClient;
}
public static CloseableHttpClient getDefaultClient() {
return HttpClients.createDefault();
}
public static RequestConfig getProxyConfig() {
HttpHost httpProxy = new HttpHost(proxy, port);
return RequestConfig.custom().setProxy(httpProxy).build();
}
public static String textToJsonString(String text) {
Map<String, Object> items = new HashMap<String, Object>();
items.put("msgtype", "text");
Map<String, String> textContent = new HashMap<String, String>();
byte[] byt = StringUtils.getBytesUtf8(text);
String txt = StringUtils.newStringUtf8(byt);
textContent.put("content", txt);
items.put("text", textContent);
return JSON.toJSONString(items);
}
}

11
dolphinscheduler-alert/src/main/resources/alert.properties

@ -36,6 +36,7 @@ mail.smtp.ssl.trust=xxx.xxx.com
# Enterprise WeChat configuration
enterprise.wechat.enable=false
#enterprise.wechat.corp.id=xxxxxxx
#enterprise.wechat.secret=xxxxxxx
#enterprise.wechat.agent.id=xxxxxxx
@ -47,3 +48,13 @@ enterprise.wechat.enable=false
plugin.dir=/Users/xx/your/path/to/plugin/dir
#ding talk configuration
dingtalk.isEnable=flase
dingtalk.webhook=https://oapi.dingtalk.com/robot/send?access_token=xxxxx
dingtalk.keyword=
dingtalk.proxy=
dingtalk.port=80
dingtalk.user=
dingtalk.password=
dingtalk.isEnableProxy=false

125
dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/utils/DingTalkUtilsTest.java

@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.alert.utils;
import com.alibaba.fastjson.JSON;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.CloseableHttpClient;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import static org.junit.Assert.*;
@PrepareForTest(PropertyUtils.class)
@RunWith(PowerMockRunner.class)
@PowerMockIgnore("javax.net.ssl.*")
public class DingTalkUtilsTest {
Logger logger = LoggerFactory.getLogger(DingTalkUtilsTest.class);
private static final String mockUrl = "https://oapi.dingtalk.com/robot/send?access_token=test";
private static final String mockKeyWords = "onway";
private static final String msg = "ding talk test";
@Before
public void init(){
PowerMockito.mockStatic(PropertyUtils.class);
Mockito.when(PropertyUtils.getString(Constants.DINGTALK_WEBHOOK)).thenReturn(mockUrl);
Mockito.when(PropertyUtils.getString(Constants.DINGTALK_KEYWORD)).thenReturn(mockKeyWords);
Mockito.when(PropertyUtils.getBoolean(Constants.DINGTALK_PROXY_ENABLE)).thenReturn(true);
Mockito.when(PropertyUtils.getString(Constants.DINGTALK_PROXY)).thenReturn("proxy.com.cn");
Mockito.when(PropertyUtils.getString(Constants.DINGTALK_USER)).thenReturn("user");
Mockito.when(PropertyUtils.getString(Constants.DINGTALK_PASSWORD)).thenReturn("pswd");
Mockito.when(PropertyUtils.getInt(Constants.DINGTALK_PORT)).thenReturn(80);
}
// @Test
// @Ignore
// public void testSendMsg() {
// try {
// String msgTosend = "msg to send";
// logger.info(PropertyUtils.getString(Constants.DINGTALK_WEBHOOK));
// String rsp = DingTalkUtils.sendDingTalkMsg(msgTosend, Constants.UTF_8);
// logger.info("send msg result:{}",rsp);
// String errmsg = JSON.parseObject(rsp).getString("errmsg");
// Assert.assertEquals("ok", errmsg);
// } catch (Exception e) {
// e.printStackTrace();
// }
// }
@Test
public void testCreateDefaultClient() {
CloseableHttpClient client = DingTalkUtils.getDefaultClient();;
try {
Assert.assertNotNull(client);
client.close();
} catch (IOException ex) {
logger.info("close exception",ex.getMessage());
new Throwable();
}
}
@Test
public void testCreateProxyClient() {
CloseableHttpClient client = DingTalkUtils.getProxyClient();
try {
Assert.assertNotNull(client);
client.close();
} catch (IOException ex) {
logger.info("close exception",ex.getMessage());
new Throwable();
}
}
@Test
public void testProxyConfig() {
RequestConfig rc = DingTalkUtils.getProxyConfig();
Assert.assertEquals(rc.getProxy().getPort(), 80);
Assert.assertEquals(rc.getProxy().getHostName(), "proxy.com.cn");
}
@Test
public void testDingTalkMsgToJson() {
String jsonString = DingTalkUtils.textToJsonString("this is test");
logger.info(jsonString);
String expect = "{\"text\":{\"content\":\"this is test\"},\"msgtype\":\"text\"}";
Assert.assertEquals(expect, jsonString);
}
@Test
public void testDingTalkMsgUtf8() {
String msg = DingTalkUtils.textToJsonString("this is test:中文");
logger.info("test support utf8, actual:" + msg);
logger.info("test support utf8, actual:" + DingTalkUtils.isEnableDingTalk);
String expect = "{\"text\":{\"content\":\"this is test:中文\"},\"msgtype\":\"text\"}";
Assert.assertEquals(expect, msg);
}
}

25
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/UsersController.java

@ -72,7 +72,8 @@ public class UsersController extends BaseController {
@ApiImplicitParam(name = "tenantId", value = "TENANT_ID", dataType = "Int", example = "100"),
@ApiImplicitParam(name = "queue", value = "QUEUE", dataType = "Int", example = "100"),
@ApiImplicitParam(name = "email", value = "EMAIL", dataType = "Int", example = "100"),
@ApiImplicitParam(name = "phone", value = "PHONE", dataType = "Int", example = "100")
@ApiImplicitParam(name = "phone", value = "PHONE", dataType = "Int", example = "100"),
@ApiImplicitParam(name = "state", value = "STATE", dataType = "Int", example = "1")
})
@PostMapping(value = "/create")
@ResponseStatus(HttpStatus.CREATED)
@ -83,11 +84,11 @@ public class UsersController extends BaseController {
@RequestParam(value = "tenantId") int tenantId,
@RequestParam(value = "queue", required = false, defaultValue = "") String queue,
@RequestParam(value = "email") String email,
@RequestParam(value = "phone", required = false) String phone) throws Exception {
logger.info("login user {}, create user, userName: {}, email: {}, tenantId: {}, userPassword: {}, phone: {}, user queue: {}",
loginUser.getUserName(), userName, email, tenantId, Constants.PASSWORD_DEFAULT, phone, queue);
Map<String, Object> result = usersService.createUser(loginUser, userName, userPassword, email, tenantId, phone, queue);
@RequestParam(value = "phone", required = false) String phone,
@RequestParam(value = "state", required = false) int state) throws Exception {
logger.info("login user {}, create user, userName: {}, email: {}, tenantId: {}, userPassword: {}, phone: {}, user queue: {}, state: {}",
loginUser.getUserName(), userName, email, tenantId, Constants.PASSWORD_DEFAULT, phone, queue, state);
Map<String, Object> result = usersService.createUser(loginUser, userName, userPassword, email, tenantId, phone, queue, state);
return returnDataList(result);
}
@ -146,7 +147,8 @@ public class UsersController extends BaseController {
@ApiImplicitParam(name = "tenantId", value = "TENANT_ID", dataType = "Int", example = "100"),
@ApiImplicitParam(name = "queue", value = "QUEUE", dataType = "Int", example = "100"),
@ApiImplicitParam(name = "email", value = "EMAIL", dataType = "Int", example = "100"),
@ApiImplicitParam(name = "phone", value = "PHONE", dataType = "Int", example = "100")
@ApiImplicitParam(name = "phone", value = "PHONE", dataType = "Int", example = "100"),
@ApiImplicitParam(name = "state", value = "STATE", dataType = "Int", example = "1")
})
@PostMapping(value = "/update")
@ResponseStatus(HttpStatus.OK)
@ -158,10 +160,11 @@ public class UsersController extends BaseController {
@RequestParam(value = "queue", required = false, defaultValue = "") String queue,
@RequestParam(value = "email") String email,
@RequestParam(value = "tenantId") int tenantId,
@RequestParam(value = "phone", required = false) String phone) throws Exception {
logger.info("login user {}, updateProcessInstance user, userName: {}, email: {}, tenantId: {}, userPassword: {}, phone: {}, user queue: {}",
loginUser.getUserName(), userName, email, tenantId, Constants.PASSWORD_DEFAULT, phone, queue);
Map<String, Object> result = usersService.updateUser(id, userName, userPassword, email, tenantId, phone, queue);
@RequestParam(value = "phone", required = false) String phone,
@RequestParam(value = "state", required = false) int state) throws Exception {
logger.info("login user {}, updateProcessInstance user, userName: {}, email: {}, tenantId: {}, userPassword: {}, phone: {}, user queue: {}, state: {}",
loginUser.getUserName(), userName, email, tenantId, Constants.PASSWORD_DEFAULT, phone, queue, state);
Map<String, Object> result = usersService.updateUser(id, userName, userPassword, email, tenantId, phone, queue, state);
return returnDataList(result);
}

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/visitor/ResourceTreeVisitor.java

@ -50,6 +50,7 @@ public class ResourceTreeVisitor implements Visitor{
* visit
* @return resoruce component
*/
@Override
public ResourceComponent visit() {
ResourceComponent rootDirectory = new Directory();
for (Resource resource : resourceList) {
@ -117,6 +118,7 @@ public class ResourceTreeVisitor implements Visitor{
}else{
tempResourceComponent = new FileLeaf();
}
tempResourceComponent.setName(resource.getAlias());
tempResourceComponent.setFullName(resource.getFullName().replaceFirst("/",""));
tempResourceComponent.setId(resource.getId());

1
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java

@ -175,6 +175,7 @@ public enum Status {
QUERY_WORKER_GROUP_FAIL(10146,"query worker group fail ", "查询worker分组失败"),
DELETE_WORKER_GROUP_FAIL(10147,"delete worker group fail ", "删除worker分组失败"),
COPY_PROCESS_DEFINITION_ERROR(10148,"copy process definition error", "复制工作流错误"),
USER_DISABLED(10149,"The current user is disabled", "当前用户已停用"),
UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found", "UDF函数不存在"),
UDF_FUNCTION_EXISTS(20002, "UDF function already exists", "UDF函数已存在"),

10
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/interceptor/LoginHandlerInterceptor.java

@ -16,9 +16,11 @@
*/
package org.apache.dolphinscheduler.api.interceptor;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.security.Authenticator;
import org.apache.dolphinscheduler.api.service.SessionService;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.commons.httpclient.HttpStatus;
@ -85,6 +87,14 @@ public class LoginHandlerInterceptor implements HandlerInterceptor {
return false;
}
}
// check user state
if (user.getState() == Flag.NO.ordinal()) {
response.setStatus(HttpStatus.SC_UNAUTHORIZED);
logger.info(Status.USER_DISABLED.getMsg());
return false;
}
request.setAttribute(Constants.SESSION_USER, user);
return true;
}

8
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/security/PasswordAuthenticator.java

@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.api.service.SessionService;
import org.apache.dolphinscheduler.api.service.UsersService;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.dao.entity.Session;
import org.apache.dolphinscheduler.dao.entity.User;
import org.slf4j.Logger;
@ -49,6 +50,13 @@ public class PasswordAuthenticator implements Authenticator {
return result;
}
// check user state
if (user.getState() == Flag.NO.ordinal()) {
result.setCode(Status.USER_DISABLED.getCode());
result.setMsg(Status.USER_DISABLED.getMsg());
return result;
}
// create session
String sessionId = sessionService.createSession(user, extra);
if (sessionId == null) {

6
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java

@ -104,7 +104,7 @@ public class ResourcesService extends BaseService {
putMsg(result, Status.HDFS_NOT_STARTUP);
return result;
}
String fullName = currentDir.equals("/") ? String.format("%s%s",currentDir,name):String.format("%s/%s",currentDir,name);
String fullName = "/".equals(currentDir) ? String.format("%s%s",currentDir,name):String.format("%s/%s",currentDir,name);
if (pid != -1) {
Resource parentResource = resourcesMapper.selectById(pid);
@ -229,7 +229,7 @@ public class ResourcesService extends BaseService {
}
// check resoure name exists
String fullName = currentDir.equals("/") ? String.format("%s%s",currentDir,name):String.format("%s/%s",currentDir,name);
String fullName = "/".equals(currentDir) ? String.format("%s%s",currentDir,name):String.format("%s/%s",currentDir,name);
if (checkResourceExists(fullName, 0, type.ordinal())) {
logger.error("resource {} has exist, can't recreate", name);
putMsg(result, Status.RESOURCE_EXIST);
@ -839,7 +839,7 @@ public class ResourcesService extends BaseService {
}
String name = fileName.trim() + "." + nameSuffix;
String fullName = currentDirectory.equals("/") ? String.format("%s%s",currentDirectory,name):String.format("%s/%s",currentDirectory,name);
String fullName = "/".equals(currentDirectory) ? String.format("%s%s",currentDirectory,name):String.format("%s/%s",currentDirectory,name);
result = verifyResourceName(fullName,type,loginUser);
if (!result.getCode().equals(Status.SUCCESS.getCode())) {

14
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UsersService.java

@ -94,7 +94,8 @@ public class UsersService extends BaseService {
String email,
int tenantId,
String phone,
String queue) throws Exception {
String queue,
int state) throws Exception {
Map<String, Object> result = new HashMap<>(5);
@ -115,7 +116,7 @@ public class UsersService extends BaseService {
return result;
}
User user = createUser(userName, userPassword, email, tenantId, phone, queue);
User user = createUser(userName, userPassword, email, tenantId, phone, queue, state);
Tenant tenant = tenantMapper.queryById(tenantId);
// resource upload startup
@ -139,7 +140,8 @@ public class UsersService extends BaseService {
String email,
int tenantId,
String phone,
String queue) throws Exception {
String queue,
int state) throws Exception {
User user = new User();
Date now = new Date();
@ -148,6 +150,7 @@ public class UsersService extends BaseService {
user.setEmail(email);
user.setTenantId(tenantId);
user.setPhone(phone);
user.setState(state);
// create general users, administrator users are currently built-in
user.setUserType(UserType.GENERAL_USER);
user.setCreateTime(now);
@ -260,7 +263,8 @@ public class UsersService extends BaseService {
String email,
int tenantId,
String phone,
String queue) throws Exception {
String queue,
int state) throws Exception {
Map<String, Object> result = new HashMap<>(5);
result.put(Constants.STATUS, false);
@ -309,7 +313,9 @@ public class UsersService extends BaseService {
}
user.setPhone(phone);
}
user.setQueue(queue);
user.setState(state);
Date now = new Date();
user.setUpdateTime(now);

1
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/LoginControllerTest.java

@ -56,7 +56,6 @@ public class LoginControllerTest extends AbstractControllerTest{
logger.info(mvcResult.getResponse().getContentAsString());
}
@Test
public void testSignOut() throws Exception {
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();

6
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/interceptor/LoginHandlerInterceptorTest.java

@ -57,6 +57,7 @@ public class LoginHandlerInterceptorTest {
User mockUser = new User();
mockUser.setId(1);
mockUser.setUserType(UserType.GENERAL_USER);
mockUser.setState(1);
// test no token
when(authenticator.getAuthUser(request)).thenReturn(mockUser);
@ -67,5 +68,10 @@ public class LoginHandlerInterceptorTest {
when(request.getHeader("token")).thenReturn(token);
when(userMapper.queryUserByToken(token)).thenReturn(mockUser);
Assert.assertTrue(interceptor.preHandle(request, response, null));
// test disable user
mockUser.setState(0);
when(authenticator.getAuthUser(request)).thenReturn(mockUser);
Assert.assertFalse(interceptor.preHandle(request, response, null));
}
}

8
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/security/PasswordAuthenticatorTest.java

@ -67,6 +67,7 @@ public class PasswordAuthenticatorTest {
mockUser.setEmail("test@test.com");
mockUser.setUserPassword("test");
mockUser.setId(1);
mockUser.setState(1);
mockSession = new Session();
mockSession.setId(UUID.randomUUID().toString());
@ -82,6 +83,13 @@ public class PasswordAuthenticatorTest {
Result result = authenticator.authenticate("test", "test", "127.0.0.1");
Assert.assertEquals(Status.SUCCESS.getCode(), (int) result.getCode());
logger.info(result.toString());
mockUser.setState(0);
when(usersService.queryUser("test", "test")).thenReturn(mockUser);
when(sessionService.createSession(mockUser, "127.0.0.1")).thenReturn(mockSession.getId());
Result result1 = authenticator.authenticate("test", "test", "127.0.0.1");
Assert.assertEquals(Status.USER_DISABLED.getCode(), (int) result1.getCode());
logger.info(result1.toString());
}
@Test

18
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java

@ -98,41 +98,42 @@ public class UsersServiceTest {
String email = "123@qq.com";
int tenantId = Integer.MAX_VALUE;
String phone= "13456432345";
int state = 1;
try {
//userName error
Map<String, Object> result = usersService.createUser(user, userName, userPassword, email, tenantId, phone, queueName);
Map<String, Object> result = usersService.createUser(user, userName, userPassword, email, tenantId, phone, queueName, state);
logger.info(result.toString());
Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR, result.get(Constants.STATUS));
userName = "userTest0001";
userPassword = "userTest000111111111111111";
//password error
result = usersService.createUser(user, userName, userPassword, email, tenantId, phone, queueName);
result = usersService.createUser(user, userName, userPassword, email, tenantId, phone, queueName, state);
logger.info(result.toString());
Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR, result.get(Constants.STATUS));
userPassword = "userTest0001";
email = "1q.com";
//email error
result = usersService.createUser(user, userName, userPassword, email, tenantId, phone, queueName);
result = usersService.createUser(user, userName, userPassword, email, tenantId, phone, queueName, state);
logger.info(result.toString());
Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR, result.get(Constants.STATUS));
email = "122222@qq.com";
phone ="2233";
//phone error
result = usersService.createUser(user, userName, userPassword, email, tenantId, phone, queueName);
result = usersService.createUser(user, userName, userPassword, email, tenantId, phone, queueName, state);
logger.info(result.toString());
Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR, result.get(Constants.STATUS));
phone = "13456432345";
//tenantId not exists
result = usersService.createUser(user, userName, userPassword, email, tenantId, phone, queueName);
result = usersService.createUser(user, userName, userPassword, email, tenantId, phone, queueName, state);
logger.info(result.toString());
Assert.assertEquals(Status.TENANT_NOT_EXIST, result.get(Constants.STATUS));
//success
Mockito.when(tenantMapper.queryById(1)).thenReturn(getTenant());
result = usersService.createUser(user, userName, userPassword, email, 1, phone, queueName);
result = usersService.createUser(user, userName, userPassword, email, 1, phone, queueName, state);
logger.info(result.toString());
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
@ -225,13 +226,13 @@ public class UsersServiceTest {
String userPassword = "userTest0001";
try {
//user not exist
Map<String, Object> result = usersService.updateUser(0,userName,userPassword,"3443@qq.com",1,"13457864543","queue");
Map<String, Object> result = usersService.updateUser(0,userName,userPassword,"3443@qq.com",1,"13457864543","queue", 1);
Assert.assertEquals(Status.USER_NOT_EXIST, result.get(Constants.STATUS));
logger.info(result.toString());
//success
when(userMapper.selectById(1)).thenReturn(getUser());
result = usersService.updateUser(1,userName,userPassword,"32222s@qq.com",1,"13457864543","queue");
result = usersService.updateUser(1,userName,userPassword,"32222s@qq.com",1,"13457864543","queue", 1);
logger.info(result.toString());
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
} catch (Exception e) {
@ -482,6 +483,7 @@ public class UsersServiceTest {
user.setUserType(UserType.ADMIN_USER);
user.setUserName("userTest0001");
user.setUserPassword("userTest0001");
user.setState(1);
return user;
}

14
dolphinscheduler-common/pom.xml

@ -594,5 +594,19 @@
<artifactId>janino</artifactId>
<version>${codehaus.janino.version}</version>
</dependency>
<dependency>
<groupId>com.github.rholder</groupId>
<artifactId>guava-retrying</artifactId>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</project>

2
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java

@ -76,7 +76,7 @@ public enum ExecutionStatus {
* @return status
*/
public boolean typeIsFailure(){
return this == FAILURE || this == NEED_FAULT_TOLERANCE;
return this == FAILURE || this == NEED_FAULT_TOLERANCE || this == KILL;
}
/**

4
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java

@ -187,7 +187,9 @@ public class DataxParameters extends AbstractParameters {
@Override
public boolean checkParameters() {
if (customConfig == null) return false;
if (customConfig == null) {
return false;
}
if (customConfig == 0) {
return dataSource != 0
&& dataTarget != 0

11
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java

@ -57,6 +57,12 @@ public class OSUtils {
private OSUtils() {}
/**
* Initialization regularization, solve the problem of pre-compilation performance,
* avoid the thread safety problem of multi-thread operation
*/
private static final Pattern PATTERN = Pattern.compile("\\s+");
/**
* get memory usage
@ -219,8 +225,7 @@ public class OSUtils {
List<String> users = new ArrayList<>();
while (startPos <= endPos) {
Pattern pattern = Pattern.compile("\\s+");
users.addAll(Arrays.asList(pattern.split(lines[startPos])));
users.addAll(Arrays.asList(PATTERN.split(lines[startPos])));
startPos++;
}
@ -313,7 +318,7 @@ public class OSUtils {
String currentProcUserName = System.getProperty("user.name");
String result = exeCmd(String.format("net user \"%s\"", currentProcUserName));
String line = result.split("\n")[22];
String group = Pattern.compile("\\s+").split(line)[1];
String group = PATTERN.split(line)[1];
if (group.charAt(0) == '*') {
return group.substring(1);
} else {

108
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/RetryerUtils.java

@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.utils;
import com.github.rholder.retry.*;
import org.apache.dolphinscheduler.common.Constants;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
* The Retryer util.
*/
public class RetryerUtils {
private static Retryer<Boolean> defaultRetryerResultCheck;
private static Retryer<Boolean> defaultRetryerResultNoCheck;
private RetryerUtils() {
}
private static Retryer<Boolean> getDefaultRetryerResultNoCheck() {
if (defaultRetryerResultNoCheck == null) {
defaultRetryerResultNoCheck = RetryerBuilder
.<Boolean>newBuilder()
.retryIfException()
.withWaitStrategy(WaitStrategies.fixedWait(Constants.SLEEP_TIME_MILLIS, TimeUnit.MILLISECONDS))
.withStopStrategy(StopStrategies.stopAfterAttempt(3))
.build();
}
return defaultRetryerResultNoCheck;
}
/**
* Gets default retryer.
* the retryer will retry 3 times if exceptions throw
* and wait 1 second between each retry
*
* @param checkResult true means the callable must return true before retrying
* false means that retry callable only throw exceptions
* @return the default retryer
*/
public static Retryer<Boolean> getDefaultRetryer(boolean checkResult) {
return checkResult ? getDefaultRetryer() : getDefaultRetryerResultNoCheck();
}
/**
* Gets default retryer.
* the retryer will retry 3 times if exceptions throw
* and wait 1 second between each retry
*
* @return the default retryer
*/
public static Retryer<Boolean> getDefaultRetryer() {
if (defaultRetryerResultCheck == null) {
defaultRetryerResultCheck = RetryerBuilder
.<Boolean>newBuilder()
.retryIfResult(Boolean.FALSE::equals)
.retryIfException()
.withWaitStrategy(WaitStrategies.fixedWait(Constants.SLEEP_TIME_MILLIS, TimeUnit.MILLISECONDS))
.withStopStrategy(StopStrategies.stopAfterAttempt(3))
.build();
}
return defaultRetryerResultCheck;
}
/**
* Use RETRYER to invoke the Callable
*
* @param callable the callable
* @param checkResult true means that retry callable before returning true
* false means that retry callable only throw exceptions
* @return the final result of callable
* @throws ExecutionException the execution exception
* @throws RetryException the retry exception
*/
public static Boolean retryCall(final Callable<Boolean> callable, boolean checkResult) throws ExecutionException, RetryException {
return getDefaultRetryer(checkResult).call(callable);
}
/**
* Use RETRYER to invoke the Callable before returning true
*
* @param callable the callable
* @return the boolean
* @throws ExecutionException the execution exception
* @throws RetryException the retry exception
*/
public static Boolean retryCall(final Callable<Boolean> callable) throws ExecutionException, RetryException {
return retryCall(callable, true);
}
}

86
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/process/ProcessBuilderForWin32.java

@ -189,8 +189,9 @@ public class ProcessBuilderForWin32 {
* @throws NullPointerException if the argument is null
*/
public ProcessBuilderForWin32(List<String> command) {
if (command == null)
if (command == null) {
throw new NullPointerException();
}
this.command = command;
}
@ -207,8 +208,9 @@ public class ProcessBuilderForWin32 {
*/
public ProcessBuilderForWin32(String... command) {
this.command = new ArrayList<>(command.length);
for (String arg : command)
for (String arg : command) {
this.command.add(arg);
}
}
/**
@ -238,8 +240,9 @@ public class ProcessBuilderForWin32 {
* @throws NullPointerException if the argument is null
*/
public ProcessBuilderForWin32 command(List<String> command) {
if (command == null)
if (command == null) {
throw new NullPointerException();
}
this.command = command;
return this;
}
@ -257,8 +260,9 @@ public class ProcessBuilderForWin32 {
*/
public ProcessBuilderForWin32 command(String... command) {
this.command = new ArrayList<>(command.length);
for (String arg : command)
for (String arg : command) {
this.command.add(arg);
}
return this;
}
@ -344,11 +348,13 @@ public class ProcessBuilderForWin32 {
*/
public Map<String,String> environment() {
SecurityManager security = System.getSecurityManager();
if (security != null)
if (security != null) {
security.checkPermission(new RuntimePermission("getenv.*"));
}
if (environment == null)
if (environment == null) {
environment = ProcessEnvironmentForWin32.environment();
}
assert environment != null;
@ -369,15 +375,17 @@ public class ProcessBuilderForWin32 {
// for compatibility with old broken code.
// Silently discard any trailing junk.
if (envstring.indexOf((int) '\u0000') != -1)
if (envstring.indexOf((int) '\u0000') != -1) {
envstring = envstring.replaceFirst("\u0000.*", "");
}
int eqlsign =
envstring.indexOf('=', ProcessEnvironmentForWin32.MIN_NAME_LENGTH);
// Silently ignore envstrings lacking the required `='.
if (eqlsign != -1)
if (eqlsign != -1) {
environment.put(envstring.substring(0,eqlsign),
envstring.substring(eqlsign+1));
}
}
}
return this;
@ -425,6 +433,7 @@ public class ProcessBuilderForWin32 {
static class NullInputStream extends InputStream {
static final ProcessBuilderForWin32.NullInputStream INSTANCE = new ProcessBuilderForWin32.NullInputStream();
private NullInputStream() {}
@Override
public int read() { return -1; }
@Override
public int available() { return 0; }
@ -436,6 +445,7 @@ public class ProcessBuilderForWin32 {
static class NullOutputStream extends OutputStream {
static final ProcessBuilderForWin32.NullOutputStream INSTANCE = new ProcessBuilderForWin32.NullOutputStream();
private NullOutputStream() {}
@Override
public void write(int b) throws IOException {
throw new IOException("Stream closed");
}
@ -516,7 +526,9 @@ public class ProcessBuilderForWin32 {
* }</pre>
*/
public static final ProcessBuilderForWin32.Redirect PIPE = new ProcessBuilderForWin32.Redirect() {
@Override
public Type type() { return Type.PIPE; }
@Override
public String toString() { return type().toString(); }};
/**
@ -531,7 +543,9 @@ public class ProcessBuilderForWin32 {
* }</pre>
*/
public static final ProcessBuilderForWin32.Redirect INHERIT = new ProcessBuilderForWin32.Redirect() {
@Override
public Type type() { return Type.INHERIT; }
@Override
public String toString() { return type().toString(); }};
/**
@ -565,12 +579,15 @@ public class ProcessBuilderForWin32 {
* @return a redirect to read from the specified file
*/
public static ProcessBuilderForWin32.Redirect from(final File file) {
if (file == null)
if (file == null) {
throw new NullPointerException();
}
return new ProcessBuilderForWin32.Redirect() {
@Override
public Type type() { return Type.READ; }
@Override
public File file() { return file; }
@Override
public String toString() {
return "redirect to read from file \"" + file + "\"";
}
@ -593,12 +610,15 @@ public class ProcessBuilderForWin32 {
* @return a redirect to write to the specified file
*/
public static ProcessBuilderForWin32.Redirect to(final File file) {
if (file == null)
if (file == null) {
throw new NullPointerException();
}
return new ProcessBuilderForWin32.Redirect() {
@Override
public Type type() { return Type.WRITE; }
@Override
public File file() { return file; }
@Override
public String toString() {
return "redirect to write to file \"" + file + "\"";
}
@ -626,12 +646,15 @@ public class ProcessBuilderForWin32 {
* @return a redirect to append to the specified file
*/
public static ProcessBuilderForWin32.Redirect appendTo(final File file) {
if (file == null)
if (file == null) {
throw new NullPointerException();
}
return new ProcessBuilderForWin32.Redirect() {
@Override
public Type type() { return Type.APPEND; }
@Override
public File file() { return file; }
@Override
public String toString() {
return "redirect to append to file \"" + file + "\"";
}
@ -647,14 +670,18 @@ public class ProcessBuilderForWin32 {
* instances of the same type associated with non-null equal
* {@code File} instances.
*/
@Override
public boolean equals(Object obj) {
if (obj == this)
if (obj == this) {
return true;
if (! (obj instanceof ProcessBuilderForWin32.Redirect))
}
if (! (obj instanceof ProcessBuilderForWin32.Redirect)) {
return false;
}
ProcessBuilderForWin32.Redirect r = (ProcessBuilderForWin32.Redirect) obj;
if (r.type() != this.type())
if (r.type() != this.type()) {
return false;
}
assert this.file() != null;
return this.file().equals(r.file());
}
@ -663,12 +690,14 @@ public class ProcessBuilderForWin32 {
* Returns a hash code value for this {@code Redirect}.
* @return a hash code value for this {@code Redirect}
*/
@Override
public int hashCode() {
File file = file();
if (file == null)
if (file == null) {
return super.hashCode();
else
} else {
return file.hashCode();
}
}
/**
@ -679,10 +708,11 @@ public class ProcessBuilderForWin32 {
}
private ProcessBuilderForWin32.Redirect[] redirects() {
if (redirects == null)
redirects = new ProcessBuilderForWin32.Redirect[] {
ProcessBuilderForWin32.Redirect.PIPE, ProcessBuilderForWin32.Redirect.PIPE, ProcessBuilderForWin32.Redirect.PIPE
if (redirects == null) {
redirects = new Redirect[] {
Redirect.PIPE, Redirect.PIPE, Redirect.PIPE
};
}
return redirects;
}
@ -711,9 +741,10 @@ public class ProcessBuilderForWin32 {
*/
public ProcessBuilderForWin32 redirectInput(ProcessBuilderForWin32.Redirect source) {
if (source.type() == ProcessBuilderForWin32.Redirect.Type.WRITE ||
source.type() == ProcessBuilderForWin32.Redirect.Type.APPEND)
source.type() == ProcessBuilderForWin32.Redirect.Type.APPEND) {
throw new IllegalArgumentException(
"Redirect invalid for reading: " + source);
}
redirects()[0] = source;
return this;
}
@ -741,9 +772,10 @@ public class ProcessBuilderForWin32 {
* @since 1.7
*/
public ProcessBuilderForWin32 redirectOutput(ProcessBuilderForWin32.Redirect destination) {
if (destination.type() == ProcessBuilderForWin32.Redirect.Type.READ)
if (destination.type() == ProcessBuilderForWin32.Redirect.Type.READ) {
throw new IllegalArgumentException(
"Redirect invalid for writing: " + destination);
}
redirects()[1] = destination;
return this;
}
@ -775,9 +807,10 @@ public class ProcessBuilderForWin32 {
* @since 1.7
*/
public ProcessBuilderForWin32 redirectError(ProcessBuilderForWin32.Redirect destination) {
if (destination.type() == ProcessBuilderForWin32.Redirect.Type.READ)
if (destination.type() == ProcessBuilderForWin32.Redirect.Type.READ) {
throw new IllegalArgumentException(
"Redirect invalid for writing: " + destination);
}
redirects()[2] = destination;
return this;
}
@ -1019,15 +1052,18 @@ public class ProcessBuilderForWin32 {
String[] cmdarray = command.toArray(new String[command.size()]);
cmdarray = cmdarray.clone();
for (String arg : cmdarray)
if (arg == null)
for (String arg : cmdarray) {
if (arg == null) {
throw new NullPointerException();
}
}
// Throws IndexOutOfBoundsException if command is empty
String prog = cmdarray[0];
SecurityManager security = System.getSecurityManager();
if (security != null)
if (security != null) {
security.checkExec(prog);
}
String dir = directory == null ? null : directory.toString();

38
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/process/ProcessEnvironmentForWin32.java

@ -27,22 +27,25 @@ final class ProcessEnvironmentForWin32 extends HashMap<String,String> {
private static String validateName(String name) {
// An initial `=' indicates a magic Windows variable name -- OK
if (name.indexOf('=', 1) != -1 ||
name.indexOf('\u0000') != -1)
name.indexOf('\u0000') != -1) {
throw new IllegalArgumentException
("Invalid environment variable name: \"" + name + "\"");
}
return name;
}
private static String validateValue(String value) {
if (value.indexOf('\u0000') != -1)
if (value.indexOf('\u0000') != -1) {
throw new IllegalArgumentException
("Invalid environment variable value: \"" + value + "\"");
}
return value;
}
private static String nonNullString(Object o) {
if (o == null)
if (o == null) {
throw new NullPointerException();
}
return (String) o;
}
@ -70,26 +73,38 @@ final class ProcessEnvironmentForWin32 extends HashMap<String,String> {
private static class CheckedEntry implements Entry<String,String> {
private final Entry<String,String> e;
public CheckedEntry(Entry<String,String> e) {this.e = e;}
@Override
public String getKey() { return e.getKey();}
@Override
public String getValue() { return e.getValue();}
@Override
public String setValue(String value) {
return e.setValue(validateValue(value));
}
@Override
public String toString() { return getKey() + "=" + getValue();}
@Override
public boolean equals(Object o) {return e.equals(o);}
@Override
public int hashCode() {return e.hashCode();}
}
private static class CheckedEntrySet extends AbstractSet<Entry<String,String>> {
private final Set<Entry<String,String>> s;
public CheckedEntrySet(Set<Entry<String,String>> s) {this.s = s;}
@Override
public int size() {return s.size();}
@Override
public boolean isEmpty() {return s.isEmpty();}
@Override
public void clear() { s.clear();}
@Override
public Iterator<Entry<String,String>> iterator() {
return new Iterator<Entry<String,String>>() {
Iterator<Entry<String,String>> i = s.iterator();
@Override
public boolean hasNext() { return i.hasNext();}
@Override
public Entry<String,String> next() {
return new CheckedEntry(i.next());
}
@ -104,18 +119,22 @@ final class ProcessEnvironmentForWin32 extends HashMap<String,String> {
nonNullString(e.getValue());
return e;
}
@Override
public boolean contains(Object o) {return s.contains(checkedEntry(o));}
@Override
public boolean remove(Object o) {return s.remove(checkedEntry(o));}
}
private static class CheckedValues extends AbstractCollection<String> {
private final Collection<String> c;
public CheckedValues(Collection<String> c) {this.c = c;}
@Override
public int size() {return c.size();}
@Override
public boolean isEmpty() {return c.isEmpty();}
@Override
public void clear() { c.clear();}
@Override
public Iterator<String> iterator() {return c.iterator();}
@Override
public boolean contains(Object o) {return c.contains(nonNullString(o));}
@ -126,11 +145,17 @@ final class ProcessEnvironmentForWin32 extends HashMap<String,String> {
private static class CheckedKeySet extends AbstractSet<String> {
private final Set<String> s;
public CheckedKeySet(Set<String> s) {this.s = s;}
@Override
public int size() {return s.size();}
@Override
public boolean isEmpty() {return s.isEmpty();}
@Override
public void clear() { s.clear();}
@Override
public Iterator<String> iterator() {return s.iterator();}
@Override
public boolean contains(Object o) {return s.contains(nonNullString(o));}
@Override
public boolean remove(Object o) {return s.remove(nonNullString(o));}
}
@Override
@ -147,6 +172,7 @@ final class ProcessEnvironmentForWin32 extends HashMap<String,String> {
}
private static final class NameComparator implements Comparator<String> {
@Override
public int compare(String s1, String s2) {
// We can't use String.compareToIgnoreCase since it
// canonicalizes to lower case, while Windows
@ -163,7 +189,9 @@ final class ProcessEnvironmentForWin32 extends HashMap<String,String> {
c2 = Character.toUpperCase(c2);
if (c1 != c2)
// No overflow because of numeric promotion
{
return c1 - c2;
}
}
}
return n1 - n2;
@ -171,6 +199,7 @@ final class ProcessEnvironmentForWin32 extends HashMap<String,String> {
}
private static final class EntryComparator implements Comparator<Entry<String,String>> {
@Override
public int compare(Entry<String,String> e1,
Entry<String,String> e2) {
return nameComparator.compare(e1.getKey(), e2.getKey());
@ -278,8 +307,9 @@ final class ProcessEnvironmentForWin32 extends HashMap<String,String> {
// add the environment variable to the child, if it exists in parent
private static void addToEnvIfSet(StringBuilder sb, String name) {
String s = getenv(name);
if (s != null)
if (s != null) {
addToEnv(sb, name, s);
}
}
private static void addToEnv(StringBuilder sb, String name, String val) {

89
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/process/ProcessImplForWin32.java

@ -93,13 +93,15 @@ public class ProcessImplForWin32 extends Process {
if (append) {
String path = f.getPath();
SecurityManager sm = System.getSecurityManager();
if (sm != null)
if (sm != null) {
sm.checkWrite(path);
}
long handle = openForAtomicAppend(path);
final FileDescriptor fd = new FileDescriptor();
setHandle(fd, handle);
return AccessController.doPrivileged(
new PrivilegedAction<FileOutputStream>() {
@Override
public FileOutputStream run() {
return new FileOutputStream(fd);
}
@ -133,30 +135,30 @@ public class ProcessImplForWin32 extends Process {
} else {
stdHandles = new long[3];
if (redirects[0] == ProcessBuilderForWin32.Redirect.PIPE)
if (redirects[0] == ProcessBuilderForWin32.Redirect.PIPE) {
stdHandles[0] = -1L;
else if (redirects[0] == ProcessBuilderForWin32.Redirect.INHERIT)
} else if (redirects[0] == ProcessBuilderForWin32.Redirect.INHERIT) {
stdHandles[0] = getHandle(FileDescriptor.in);
else {
} else {
f0 = new FileInputStream(redirects[0].file());
stdHandles[0] = getHandle(f0.getFD());
}
if (redirects[1] == ProcessBuilderForWin32.Redirect.PIPE)
if (redirects[1] == ProcessBuilderForWin32.Redirect.PIPE) {
stdHandles[1] = -1L;
else if (redirects[1] == ProcessBuilderForWin32.Redirect.INHERIT)
} else if (redirects[1] == ProcessBuilderForWin32.Redirect.INHERIT) {
stdHandles[1] = getHandle(FileDescriptor.out);
else {
} else {
f1 = newFileOutputStream(redirects[1].file(),
redirects[1].append());
stdHandles[1] = getHandle(f1.getFD());
}
if (redirects[2] == ProcessBuilderForWin32.Redirect.PIPE)
if (redirects[2] == ProcessBuilderForWin32.Redirect.PIPE) {
stdHandles[2] = -1L;
else if (redirects[2] == ProcessBuilderForWin32.Redirect.INHERIT)
} else if (redirects[2] == ProcessBuilderForWin32.Redirect.INHERIT) {
stdHandles[2] = getHandle(FileDescriptor.err);
else {
} else {
f2 = newFileOutputStream(redirects[2].file(),
redirects[2].append());
stdHandles[2] = getHandle(f2.getFD());
@ -167,10 +169,19 @@ public class ProcessImplForWin32 extends Process {
} finally {
// In theory, close() can throw IOException
// (although it is rather unlikely to happen here)
try { if (f0 != null) f0.close(); }
try { if (f0 != null) {
f0.close();
}
}
finally {
try { if (f1 != null) f1.close(); }
finally { if (f2 != null) f2.close(); }
try { if (f1 != null) {
f1.close();
}
}
finally { if (f2 != null) {
f2.close();
}
}
}
}
@ -193,8 +204,9 @@ public class ProcessImplForWin32 extends Process {
private static String[] getTokensFromCommand(String command) {
ArrayList<String> matchList = new ArrayList<>(8);
Matcher regexMatcher = ProcessImplForWin32.LazyPattern.PATTERN.matcher(command);
while (regexMatcher.find())
while (regexMatcher.find()) {
matchList.add(regexMatcher.group());
}
return matchList.toArray(new String[matchList.size()]);
}
@ -378,8 +390,9 @@ public class ProcessImplForWin32 extends Process {
// .bat files don't include backslashes as part of the quote
private static int countLeadingBackslash(int verificationType,
CharSequence input, int start) {
if (verificationType == VERIFICATION_CMD_BAT)
if (verificationType == VERIFICATION_CMD_BAT) {
return 0;
}
int j;
for (j = start - 1; j >= 0 && input.charAt(j) == BACKSLASH; j--) {
// just scanning backwards
@ -417,8 +430,9 @@ public class ProcessImplForWin32 extends Process {
String executablePath = new File(cmd[0]).getPath();
// No worry about internal, unpaired ["], and redirection/piping.
if (needsEscaping(VERIFICATION_LEGACY, executablePath) )
if (needsEscaping(VERIFICATION_LEGACY, executablePath) ) {
executablePath = quoteString(executablePath);
}
cmdstr = createCommandLine(
//legacy mode doesn't worry about extended verification
@ -442,16 +456,18 @@ public class ProcessImplForWin32 extends Process {
// Restore original command line.
StringBuilder join = new StringBuilder();
// terminal space in command line is ok
for (String s : cmd)
for (String s : cmd) {
join.append(s).append(' ');
}
// Parse the command line again.
cmd = getTokensFromCommand(join.toString());
executablePath = getExecutablePath(cmd[0]);
// Check new executable name once more
if (security != null)
if (security != null) {
security.checkExec(executablePath);
}
}
// Quotation protects from interpretation of the [path] argument as
@ -471,28 +487,29 @@ public class ProcessImplForWin32 extends Process {
AccessController.doPrivileged(
new PrivilegedAction<Void>() {
@Override
public Void run() {
if (stdHandles[0] == -1L)
if (stdHandles[0] == -1L) {
stdinStream = ProcessBuilderForWin32.NullOutputStream.INSTANCE;
else {
} else {
FileDescriptor stdinFd = new FileDescriptor();
setHandle(stdinFd, stdHandles[0]);
stdinStream = new BufferedOutputStream(
new FileOutputStream(stdinFd));
}
if (stdHandles[1] == -1L)
if (stdHandles[1] == -1L) {
stdoutStream = ProcessBuilderForWin32.NullInputStream.INSTANCE;
else {
} else {
FileDescriptor stdoutFd = new FileDescriptor();
setHandle(stdoutFd, stdHandles[1]);
stdoutStream = new BufferedInputStream(
new FileInputStream(stdoutFd));
}
if (stdHandles[2] == -1L)
if (stdHandles[2] == -1L) {
stderrStream = ProcessBuilderForWin32.NullInputStream.INSTANCE;
else {
} else {
FileDescriptor stderrFd = new FileDescriptor();
setHandle(stderrFd, stdHandles[2]);
stderrStream = new FileInputStream(stderrFd);
@ -501,33 +518,41 @@ public class ProcessImplForWin32 extends Process {
return null; }});
}
@Override
public OutputStream getOutputStream() {
return stdinStream;
}
@Override
public InputStream getInputStream() {
return stdoutStream;
}
@Override
public InputStream getErrorStream() {
return stderrStream;
}
@Override
protected void finalize() {
closeHandle(handle);
}
@Override
public int exitValue() {
int exitCode = getExitCodeProcess(handle);
if (exitCode == STILL_ACTIVE)
if (exitCode == STILL_ACTIVE) {
throw new IllegalThreadStateException("process has not exited");
}
return exitCode;
}
@Override
public int waitFor() throws InterruptedException {
waitForInterruptibly(handle);
if (Thread.interrupted())
if (Thread.interrupted()) {
throw new InterruptedException();
}
return exitValue();
}
@ -535,8 +560,12 @@ public class ProcessImplForWin32 extends Process {
public boolean waitFor(long timeout, TimeUnit unit)
throws InterruptedException
{
if (getExitCodeProcess(handle) != STILL_ACTIVE) return true;
if (timeout <= 0) return false;
if (getExitCodeProcess(handle) != STILL_ACTIVE) {
return true;
}
if (timeout <= 0) {
return false;
}
long remainingNanos = unit.toNanos(timeout);
long deadline = System.nanoTime() + remainingNanos ;
@ -545,8 +574,9 @@ public class ProcessImplForWin32 extends Process {
// Round up to next millisecond
long msTimeout = TimeUnit.NANOSECONDS.toMillis(remainingNanos + 999_999L);
waitForTimeoutInterruptibly(handle, msTimeout);
if (Thread.interrupted())
if (Thread.interrupted()) {
throw new InterruptedException();
}
if (getExitCodeProcess(handle) != STILL_ACTIVE) {
return true;
}
@ -556,6 +586,7 @@ public class ProcessImplForWin32 extends Process {
return (getExitCodeProcess(handle) != STILL_ACTIVE);
}
@Override
public void destroy() { terminateProcess(handle); }
@Override

216
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/RetryerUtilsTest.java

@ -0,0 +1,216 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.utils;
import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.Retryer;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.ExecutionException;
public class RetryerUtilsTest {
@Test
public void testDefaultRetryer() {
Retryer<Boolean> retryer = RetryerUtils.getDefaultRetryer();
Assert.assertNotNull(retryer);
try {
boolean result = retryer.call(() -> true);
Assert.assertTrue(result);
} catch (ExecutionException | RetryException e) {
Assert.fail("Retry call failed " + e.getMessage());
}
Retryer<Boolean> retryer1 = RetryerUtils.getDefaultRetryer(true);
Assert.assertEquals(retryer, retryer1);
}
@Test
public void testDefaultRetryerResultCheck() {
Retryer<Boolean> retryer = RetryerUtils.getDefaultRetryer();
Assert.assertNotNull(retryer);
try {
for (int execTarget = 1; execTarget <= 3; execTarget++) {
int finalExecTarget = execTarget;
int[] execTime = {0};
boolean result = retryer.call(() -> {
execTime[0]++;
return execTime[0] == finalExecTarget;
});
Assert.assertEquals(finalExecTarget, execTime[0]);
Assert.assertTrue(result);
}
} catch (ExecutionException | RetryException e) {
Assert.fail("Retry call failed " + e.getMessage());
}
int[] execTime = {0};
try {
retryer.call(() -> {
execTime[0]++;
return execTime[0] == 4;
});
Assert.fail("Retry times not reached");
} catch (RetryException e) {
Assert.assertEquals(3, e.getNumberOfFailedAttempts());
Assert.assertEquals(3, execTime[0]);
} catch (ExecutionException e) {
Assert.fail("Retry call failed " + e.getMessage());
}
}
@Test
public void testDefaultRetryerResultNoCheck() {
Retryer<Boolean> retryer = RetryerUtils.getDefaultRetryer(false);
Assert.assertNotNull(retryer);
try {
for (int execTarget = 1; execTarget <= 5; execTarget++) {
int[] execTime = {0};
boolean result = retryer.call(() -> {
execTime[0]++;
return execTime[0] > 1;
});
Assert.assertEquals(1, execTime[0]);
Assert.assertFalse(result);
}
} catch (ExecutionException | RetryException e) {
Assert.fail("Retry call failed " + e.getMessage());
}
}
@Test
public void testRecallResultCheck() {
try {
for (int execTarget = 1; execTarget <= 3; execTarget++) {
int finalExecTarget = execTarget;
int[] execTime = {0};
boolean result = RetryerUtils.retryCall(() -> {
execTime[0]++;
return execTime[0] == finalExecTarget;
});
Assert.assertEquals(finalExecTarget, execTime[0]);
Assert.assertTrue(result);
}
} catch (ExecutionException | RetryException e) {
Assert.fail("Retry call failed " + e.getMessage());
}
int[] execTime = {0};
try {
RetryerUtils.retryCall(() -> {
execTime[0]++;
return execTime[0] == 4;
});
Assert.fail("Recall times not reached");
} catch (RetryException e) {
Assert.assertEquals(3, e.getNumberOfFailedAttempts());
Assert.assertEquals(3, execTime[0]);
} catch (ExecutionException e) {
Assert.fail("Retry call failed " + e.getMessage());
}
}
@Test
public void testRecallResultCheckWithPara() {
try {
for (int execTarget = 1; execTarget <= 3; execTarget++) {
int finalExecTarget = execTarget;
int[] execTime = {0};
boolean result = RetryerUtils.retryCall(() -> {
execTime[0]++;
return execTime[0] == finalExecTarget;
}, true);
Assert.assertEquals(finalExecTarget, execTime[0]);
Assert.assertTrue(result);
}
} catch (ExecutionException | RetryException e) {
Assert.fail("Retry call failed " + e.getMessage());
}
int[] execTime = {0};
try {
RetryerUtils.retryCall(() -> {
execTime[0]++;
return execTime[0] == 4;
}, true);
Assert.fail("Recall times not reached");
} catch (RetryException e) {
Assert.assertEquals(3, e.getNumberOfFailedAttempts());
Assert.assertEquals(3, execTime[0]);
} catch (ExecutionException e) {
Assert.fail("Retry call failed " + e.getMessage());
}
}
@Test
public void testRecallResultNoCheck() {
try {
for (int execTarget = 1; execTarget <= 5; execTarget++) {
int[] execTime = {0};
boolean result = RetryerUtils.retryCall(() -> {
execTime[0]++;
return execTime[0] > 1;
}, false);
Assert.assertEquals(1, execTime[0]);
Assert.assertFalse(result);
}
} catch (ExecutionException | RetryException e) {
Assert.fail("Retry call failed " + e.getMessage());
}
}
private void testRetryExceptionWithPara(boolean checkResult) {
try {
for (int execTarget = 1; execTarget <= 3; execTarget++) {
int finalExecTarget = execTarget;
int[] execTime = {0};
boolean result = RetryerUtils.retryCall(() -> {
execTime[0]++;
if (execTime[0] != finalExecTarget) {
throw new IllegalArgumentException(String.valueOf(execTime[0]));
}
return true;
}, checkResult);
Assert.assertEquals(finalExecTarget, execTime[0]);
Assert.assertTrue(result);
}
} catch (ExecutionException | RetryException e) {
Assert.fail("Retry call failed " + e.getMessage());
}
int[] execTime = {0};
try {
RetryerUtils.retryCall(() -> {
execTime[0]++;
if (execTime[0] != 4) {
throw new IllegalArgumentException(String.valueOf(execTime[0]));
}
return true;
}, checkResult);
Assert.fail("Recall times not reached");
} catch (RetryException e) {
Assert.assertEquals(3, e.getNumberOfFailedAttempts());
Assert.assertEquals(3, execTime[0]);
Assert.assertNotNull(e.getCause());
Assert.assertEquals(3, Integer.parseInt(e.getCause().getMessage()));
} catch (ExecutionException e) {
Assert.fail("Retry call failed " + e.getMessage());
}
}
@Test
public void testRetryException() {
testRetryExceptionWithPara(true);
testRetryExceptionWithPara(false);
}
}

20
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstanceMap.java

@ -91,14 +91,24 @@ public class ProcessInstanceMap {
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ProcessInstanceMap that = (ProcessInstanceMap) o;
if (id != that.id) return false;
if (parentProcessInstanceId != that.parentProcessInstanceId) return false;
if (parentTaskInstanceId != that.parentTaskInstanceId) return false;
if (id != that.id) {
return false;
}
if (parentProcessInstanceId != that.parentProcessInstanceId) {
return false;
}
if (parentTaskInstanceId != that.parentTaskInstanceId) {
return false;
}
return processInstanceId == that.processInstanceId;
}

16
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Queue.java

@ -104,13 +104,21 @@ public class Queue {
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Queue queue1 = (Queue) o;
if (id != queue1.id) return false;
if (!queueName.equals(queue1.queueName)) return false;
if (id != queue1.id) {
return false;
}
if (!queueName.equals(queue1.queueName)) {
return false;
}
return queue.equals(queue1.queue);
}

20
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Session.java

@ -93,14 +93,24 @@ public class Session {
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Session session = (Session) o;
if (userId != session.userId) return false;
if (!id.equals(session.id)) return false;
if (!lastLoginTime.equals(session.lastLoginTime)) return false;
if (userId != session.userId) {
return false;
}
if (!id.equals(session.id)) {
return false;
}
if (!lastLoginTime.equals(session.lastLoginTime)) {
return false;
}
return ip.equals(session.ip);
}

8
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Tenant.java

@ -166,8 +166,12 @@ public class Tenant {
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Tenant tenant = (Tenant) o;

14
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/User.java

@ -67,6 +67,11 @@ public class User {
*/
private int tenantId;
/**
* user state
*/
private int state;
/**
* tenant code
*/
@ -219,6 +224,14 @@ public class User {
this.queue = queue;
}
public int getState() {
return state;
}
public void setState(int state) {
this.state = state;
}
@Override
public boolean equals(Object o) {
if (this == o) {
@ -254,6 +267,7 @@ public class User {
", phone='" + phone + '\'' +
", userType=" + userType +
", tenantId=" + tenantId +
", state=" + state +
", tenantCode='" + tenantCode + '\'' +
", tenantName='" + tenantName + '\'' +
", queueName='" + queueName + '\'' +

8
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/MysqlPerformance.java

@ -56,7 +56,7 @@ public class MysqlPerformance extends BaseDBPerformance{
try (ResultSet rs1 = pstmt.executeQuery("show global variables")) {
while(rs1.next()){
if(rs1.getString(VARIABLE_NAME).equalsIgnoreCase("MAX_CONNECTIONS")){
if("MAX_CONNECTIONS".equalsIgnoreCase(rs1.getString(VARIABLE_NAME))){
monitorRecord.setMaxConnections( Long.parseLong(rs1.getString("value")));
}
}
@ -64,11 +64,11 @@ public class MysqlPerformance extends BaseDBPerformance{
try (ResultSet rs2 = pstmt.executeQuery("show global status")) {
while(rs2.next()){
if(rs2.getString(VARIABLE_NAME).equalsIgnoreCase("MAX_USED_CONNECTIONS")){
if("MAX_USED_CONNECTIONS".equalsIgnoreCase(rs2.getString(VARIABLE_NAME))){
monitorRecord.setMaxUsedConnections(Long.parseLong(rs2.getString("value")));
}else if(rs2.getString(VARIABLE_NAME).equalsIgnoreCase("THREADS_CONNECTED")){
}else if("THREADS_CONNECTED".equalsIgnoreCase(rs2.getString(VARIABLE_NAME))){
monitorRecord.setThreadsConnections(Long.parseLong(rs2.getString("value")));
}else if(rs2.getString(VARIABLE_NAME).equalsIgnoreCase("THREADS_RUNNING")){
}else if("THREADS_RUNNING".equalsIgnoreCase(rs2.getString(VARIABLE_NAME))){
monitorRecord.setThreadsRunningConnections(Long.parseLong(rs2.getString("value")));
}
}

2
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/UserMapper.xml

@ -32,7 +32,7 @@
</select>
<select id="queryUserPaging" resultType="org.apache.dolphinscheduler.dao.entity.User">
select u.id,u.user_name,u.user_password,u.user_type,u.email,u.phone,u.tenant_id,u.create_time,
u.update_time,t.tenant_name,
u.update_time,t.tenant_name,u.state,
case when u.queue <![CDATA[ <> ]]> '' then u.queue else q.queue_name end as queue, q.queue_name
from t_ds_user u
left join t_ds_tenant t on u.tenant_id=t.id

12
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java

@ -46,6 +46,14 @@ public class ProcessUtils {
*/
private final static Logger logger = LoggerFactory.getLogger(ProcessUtils.class);
/**
* Initialization regularization, solve the problem of pre-compilation performance,
* avoid the thread safety problem of multi-thread operation
*/
private static final Pattern MACPATTERN = Pattern.compile("-[+|-]-\\s(\\d+)");
private static final Pattern WINDOWSATTERN = Pattern.compile("(\\d+)");
/**
* build command line characters
* @param commandList command list
@ -356,10 +364,10 @@ public class ProcessUtils {
// pstree pid get sub pids
if (OSUtils.isMacOS()) {
String pids = OSUtils.exeCmd("pstree -sp " + processId);
mat = Pattern.compile("-[+|-]-\\s(\\d+)").matcher(pids);
mat = MACPATTERN.matcher(pids);
} else {
String pids = OSUtils.exeCmd("pstree -p " + processId);
mat = Pattern.compile("(\\d+)").matcher(pids);
mat = WINDOWSATTERN.matcher(pids);
}
while (mat.find()){

27
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java

@ -20,11 +20,13 @@ package org.apache.dolphinscheduler.server.worker.processor;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.sift.SiftingAppender;
import com.alibaba.fastjson.JSONObject;
import com.github.rholder.retry.RetryException;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.RetryerUtils;
import org.apache.dolphinscheduler.server.log.TaskLogDiscriminator;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
@ -43,6 +45,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
/**
@ -101,21 +104,19 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(),
new NettyRemoteChannel(channel, command.getOpaque()));
// tell master that task is in executing
final Command ackCommand = buildAckCommand(taskExecutionContext).convert2Command();
try {
this.doAck(taskExecutionContext);
}catch (Exception e){
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
this.doAck(taskExecutionContext);
RetryerUtils.retryCall(() -> {
taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(),ackCommand);
return Boolean.TRUE;
});
// submit task
workerExecService.submit(new TaskExecuteThread(taskExecutionContext, taskCallbackService));
} catch (ExecutionException | RetryException e) {
logger.error(e.getMessage(), e);
}
// submit task
workerExecService.submit(new TaskExecuteThread(taskExecutionContext, taskCallbackService));
}
private void doAck(TaskExecutionContext taskExecutionContext){
// tell master that task is in executing
TaskExecuteAckCommand ackCommand = buildAckCommand(taskExecutionContext);
taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand.convert2Command());
}
/**

2
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java

@ -272,7 +272,7 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator {
try {
mutex.release();
} catch (Exception e) {
if(e.getMessage().equals("instance must be started before calling this method")){
if("instance must be started before calling this method".equals(e.getMessage())){
logger.warn("lock release");
}else{
logger.error("lock release failed",e);

15
dolphinscheduler-ui/src/js/conf/home/pages/security/pages/users/_source/createUser.vue

@ -93,6 +93,15 @@
</x-input>
</template>
</m-list-box-f>
<m-list-box-f>
<template slot="name">{{$t('State')}}</template>
<template slot="content">
<x-radio-group v-model="userState" >
<x-radio :label="'1'">{{$t('Enable')}}</x-radio>
<x-radio :label="'0'">{{$t('Disable')}}</x-radio>
</x-radio-group>
</template>
</m-list-box-f>
</div>
</template>
</m-popup>
@ -118,6 +127,7 @@
queueName: '',
email: '',
phone: '',
userState: '1',
tenantList: [],
// Source admin user information
isADMIN: store.state.user.userInfo.userType === 'ADMIN_USER' && router.history.current.name !== 'account'
@ -241,7 +251,8 @@
tenantId: this.tenantId,
email: this.email,
queue: queueCode,
phone: this.phone
phone: this.phone,
state: this.userState
}
if (this.item) {
@ -270,6 +281,7 @@
this.userPassword = ''
this.email = this.item.email
this.phone = this.item.phone
this.userState = this.item.state + '' || '1'
this.tenantId = this.item.tenantId
this.$nextTick(() => {
this.queueName = _.find(this.queueList, ['code', this.item.queue]).id||''
@ -282,6 +294,7 @@
this.userPassword = ''
this.email = this.item.email
this.phone = this.item.phone
this.userState = this.state + '' || '1'
this.tenantId = this.item.tenantId
if(this.queueList.length>0) {
this.queueName = _.find(this.queueList, ['code', this.item.queue]).id

12
dolphinscheduler-ui/src/js/conf/home/pages/security/pages/users/_source/list.vue

@ -40,7 +40,9 @@
<th>
<span>{{$t('Phone')}}</span>
</th>
<th id="state">
<span>{{$t('State')}}</span>
</th>
<th>
<span>{{$t('Create Time')}}</span>
</th>
@ -71,6 +73,10 @@
<td>
<span>{{item.phone || '-'}}</span>
</td>
<td>
<span v-if="item.state == 1">{{$t('Enable')}}</span>
<span v-else>{{$t('Disable')}}</span>
</td>
<td>
<span v-if="item.createTime">{{item.createTime | formatDate}}</span>
<span v-else>-</span>
@ -232,7 +238,7 @@
getLeaf(data)
return result
},
_authFile (item, i) {
_authFile (item, i) {
this.$refs[`poptip-auth-${i}`][0].doClose()
this.getResourceList({
id: item.id,
@ -257,7 +263,7 @@
})
let fileTargetList = []
let udfTargetList = []
let pathId = []
data[1].forEach(v=>{
let arr = []

4
dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js

@ -590,5 +590,7 @@ export default {
'Cannot select the same node for successful branch flow and failed branch flow': 'Cannot select the same node for successful branch flow and failed branch flow',
'Successful branch flow and failed branch flow are required': 'Successful branch flow and failed branch flow are required',
'Unauthorized or deleted resources': 'Unauthorized or deleted resources',
'Please delete all non-existent resources': 'Please delete all non-existent resources'
'Please delete all non-existent resources': 'Please delete all non-existent resources',
'Enable': 'Enable',
'Disable': 'Disable'
}

4
dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js

@ -590,5 +590,7 @@ export default {
'Cannot select the same node for successful branch flow and failed branch flow': '成功分支流转和失败分支流转不能选择同一个节点',
'Successful branch flow and failed branch flow are required': '成功分支流转和失败分支流转必填',
'Unauthorized or deleted resources': '未授权或已删除资源',
'Please delete all non-existent resources': '请删除所有未授权或已删除资源'
'Please delete all non-existent resources': '请删除所有未授权或已删除资源',
'Enable': '启用',
'Disable': '停用'
}

2
e2e/src/test/java/org/apache/dolphinscheduler/locator/security/UserManageLocator.java

@ -42,7 +42,7 @@ public class UserManageLocator {
public static final By SUBMIT = By.xpath("//div[3]/button[2]/span");
public static final By DELETE_USER_BUTTON = By.xpath("//span[2]/button/i");
public static final By DELETE_USER_BUTTON = By.xpath("//span[2]/button");
public static final By CONFIRM_DELETE_USER_BUTTON = By.xpath("//div[2]/div/button[2]/span");
}

9
pom.xml

@ -118,6 +118,7 @@
<servlet-api.version>2.5</servlet-api.version>
<swagger.version>1.9.3</swagger.version>
<springfox.version>2.9.2</springfox.version>
<guava-retry.version>2.0.0</guava-retry.version>
</properties>
<dependencyManagement>
@ -544,6 +545,12 @@
<artifactId>swagger-bootstrap-ui</artifactId>
<version>${swagger.version}</version>
</dependency>
<dependency>
<groupId>com.github.rholder</groupId>
<artifactId>guava-retrying</artifactId>
<version>${guava-retry.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
@ -687,6 +694,7 @@
<version>${maven-surefire-plugin.version}</version>
<configuration>
<includes>
<include>**/alert/utils/DingTalkUtilsTest.java</include>
<include>**/alert/template/AlertTemplateFactoryTest.java</include>
<include>**/alert/template/impl/DefaultHTMLTemplateTest.java</include>
<include>**/alert/utils/EnterpriseWeChatUtilsTest.java</include>
@ -770,6 +778,7 @@
<include>**/common/utils/HttpUtilsTest.java</include>
<include>**/common/ConstantsTest.java</include>
<include>**/common/utils/HadoopUtils.java</include>
<include>**/common/utils/RetryerUtilsTest.java</include>
<include>**/common/plugin/FilePluginManagerTest</include>
<include>**/common/plugin/PluginClassLoaderTest</include>
<include>**/dao/mapper/AccessTokenMapperTest.java</include>

2
script/scp-hosts.sh

@ -49,4 +49,4 @@ do
done
echo "scp dirs to $host/$installPath complete"
done
done

3
sql/dolphinscheduler-postgre.sql

@ -623,6 +623,7 @@ CREATE TABLE t_ds_user (
create_time timestamp DEFAULT NULL ,
update_time timestamp DEFAULT NULL ,
queue varchar(64) DEFAULT NULL ,
state int DEFAULT 1 ,
PRIMARY KEY (id)
);
@ -759,4 +760,4 @@ INSERT INTO t_ds_relation_user_alertgroup(alertgroup_id,user_id,create_time,upda
INSERT INTO t_ds_queue(queue_name,queue,create_time,update_time) VALUES ('default', 'default','2018-11-29 10:22:33', '2018-11-29 10:22:33');
-- Records of t_ds_queue,default queue name : default
INSERT INTO t_ds_version(version) VALUES ('2.0.0');
INSERT INTO t_ds_version(version) VALUES ('2.0.0');

1
sql/dolphinscheduler_mysql.sql

@ -775,6 +775,7 @@ CREATE TABLE `t_ds_user` (
`create_time` datetime DEFAULT NULL COMMENT 'create time',
`update_time` datetime DEFAULT NULL COMMENT 'update time',
`queue` varchar(64) DEFAULT NULL COMMENT 'queue',
`state` int(1) DEFAULT 1 COMMENT 'state 0:disable 1:enable',
PRIMARY KEY (`id`),
UNIQUE KEY `user_name_unique` (`user_name`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

2
sql/soft_version

@ -1 +1 @@
1.2.2
1.3.0

21
sql/upgrade/1.2.2_schema/mysql/dolphinscheduler_ddl.sql

@ -357,3 +357,24 @@ delimiter ;
CALL dc_dolphin_T_t_ds_error_command_D_worker_group_id;
DROP PROCEDURE dc_dolphin_T_t_ds_error_command_D_worker_group_id;
-- ac_dolphin_T_t_ds_user_A_state
drop PROCEDURE if EXISTS ac_dolphin_T_t_ds_user_A_state;
delimiter d//
CREATE PROCEDURE ac_dolphin_T_t_ds_user_A_state()
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_user'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME ='state')
THEN
ALTER TABLE t_ds_user ADD `state` int(1) DEFAULT 1 COMMENT 'state 0:disable 1:enable';
END IF;
END;
d//
delimiter ;
CALL ac_dolphin_T_t_ds_user_A_state;
DROP PROCEDURE ac_dolphin_T_t_ds_user_A_state;

20
sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_ddl.sql

@ -364,3 +364,23 @@ select dc_dolphin_T_t_ds_error_command_D_worker_group_id();
DROP FUNCTION dc_dolphin_T_t_ds_error_command_D_worker_group_id();
-- ac_dolphin_T_t_ds_user_A_state
delimiter ;
DROP FUNCTION IF EXISTS ac_dolphin_T_t_ds_user_A_state();
delimiter d//
CREATE FUNCTION ac_dolphin_T_t_ds_user_A_state() RETURNS void AS $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_CATALOG=current_database()
AND TABLE_SCHEMA=current_schema()
AND TABLE_NAME='t_ds_user'
AND COLUMN_NAME ='state')
THEN
ALTER TABLE t_ds_user ADD COLUMN state int DEFAULT 1;
END IF;
END;
$$ LANGUAGE plpgsql;
d//
delimiter ;
select ac_dolphin_T_t_ds_user_A_state();
DROP FUNCTION ac_dolphin_T_t_ds_user_A_state();

1
tools/dependencies/known-dependencies.txt

@ -209,3 +209,4 @@ xml-apis-1.4.01.jar
xmlenc-0.52.jar
xz-1.0.jar
zookeeper-3.4.14.jar
guava-retrying-2.0.0.jar

Loading…
Cancel
Save