Browse Source

Merge pull request #16 from apache/dev-1.3.0

Dev 1.3.0
pull/3/MERGE
Simon 5 years ago committed by GitHub
parent
commit
da4863cbee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 11
      .github/workflows/ci_e2e.yml
  2. 2
      docker/docker-swarm/check
  3. 41
      docker/docker-swarm/docker-compose.yml
  4. 27
      docker/docker-swarm/docker-stack.yml
  5. 6
      docker/kubernetes/dolphinscheduler/values.yaml
  6. 4
      dockerfile/Dockerfile
  7. 10
      dockerfile/hooks/build
  8. 19
      dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/template/impl/DefaultHTMLTemplate.java
  9. 4
      dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/Constants.java
  10. 2
      dolphinscheduler-alert/src/main/resources/logback-alert.xml
  11. 37
      dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/template/impl/DefaultHTMLTemplateTest.java
  12. 5
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
  13. 2
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java
  14. 2
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
  15. 63
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogRequestCommand.java
  16. 63
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogResponseCommand.java
  17. 35
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
  18. 1
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerServer.java
  19. 1
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
  20. 12
      dolphinscheduler-server/src/main/resources/config/install_config.conf
  21. 4
      dolphinscheduler-server/src/main/resources/worker.properties
  22. 29
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
  23. 37
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  24. 4
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java
  25. 11
      script/scp-hosts.sh
  26. 3
      script/start-all.sh
  27. 3
      script/stop-all.sh
  28. 3
      sql/upgrade/1.3.0_schema/mysql/dolphinscheduler_dml.sql
  29. 3
      sql/upgrade/1.3.0_schema/postgresql/dolphinscheduler_dml.sql

11
.github/workflows/ci_e2e.yml

@ -44,15 +44,14 @@ jobs:
${{ runner.os }}-maven-
- name: Build Image
run: |
export VERSION=`cat $(pwd)/pom.xml| grep "SNAPSHOT</version>" | awk -F "-SNAPSHOT" '{print $1}' | awk -F ">" '{print $2}'`
sh ./dockerfile/hooks/build
- name: Docker Run
run: |
VERSION=`cat $(pwd)/pom.xml| grep "SNAPSHOT</version>" | awk -F "-SNAPSHOT" '{print $1}' | awk -F ">" '{print $2}'`
mkdir -p /tmp/logs
docker run -dit -e POSTGRESQL_USERNAME=test -e POSTGRESQL_PASSWORD=test -v /tmp/logs:/opt/dolphinscheduler/logs -p 8888:8888 dolphinscheduler:$VERSION all
export VERSION=$(cat $(pwd)/pom.xml | grep '<version>' -m 1 | awk '{print $1}' | sed 's/<version>//' | sed 's/<\/version>//')
sed -i "s/apache\/dolphinscheduler:latest/apache\/dolphinscheduler:${VERSION}/g" $(pwd)/docker/docker-swarm/docker-compose.yml
docker-compose -f $(pwd)/docker/docker-swarm/docker-compose.yml up -d
- name: Check Server Status
run: sh ./dockerfile/hooks/check
run: sh $(pwd)/docker/docker-swarm/check
- name: Prepare e2e env
run: |
sudo apt-get install -y libxss1 libappindicator1 libindicator7 xvfb unzip libgbm1
@ -70,6 +69,6 @@ jobs:
uses: actions/upload-artifact@v1
with:
name: dslogs
path: /tmp/logs
path: /var/lib/docker/volumes/docker-swarm_dolphinscheduler-logs/_data

2
dockerfile/hooks/check → docker/docker-swarm/check

@ -17,7 +17,7 @@
#
echo "------ dolphinscheduler check - server - status -------"
sleep 60
server_num=$(docker top `docker container list | grep '/sbin/tini' | awk '{print $1}'`| grep java | grep "dolphinscheduler" | awk -F 'classpath ' '{print $2}' | awk '{print $2}' | sort | uniq -c | wc -l)
server_num=$(docker-compose -f $(pwd)/docker/docker-swarm/docker-compose.yml top | grep java | grep "dolphinscheduler" | awk -F 'classpath ' '{print $2}' | awk '{print $2}' | sort | uniq -c | wc -l)
if [ $server_num -eq 5 ]
then
echo "Server all start successfully"

41
docker/docker-swarm/docker-compose.yml

@ -41,13 +41,14 @@ services:
environment:
TZ: Asia/Shanghai
ALLOW_ANONYMOUS_LOGIN: "yes"
ZOO_4LW_COMMANDS_WHITELIST: srvr,ruok,wchs,cons
volumes:
- dolphinscheduler-zookeeper:/bitnami/zookeeper
networks:
- dolphinscheduler
dolphinscheduler-api:
image: registry.cn-qingdao.aliyuncs.com/sxyj/dolphinscheduler:dev
image: apache/dolphinscheduler:latest
container_name: dolphinscheduler-api
command: ["api-server"]
ports:
@ -70,12 +71,12 @@ services:
- dolphinscheduler-postgresql
- dolphinscheduler-zookeeper
volumes:
- dolphinscheduler-api:/opt/dolphinscheduler/logs
networks:
- dolphinscheduler-logs:/opt/dolphinscheduler/logs
networks:
- dolphinscheduler
dolphinscheduler-frontend:
image: registry.cn-qingdao.aliyuncs.com/sxyj/dolphinscheduler:dev
image: apache/dolphinscheduler:latest
container_name: dolphinscheduler-frontend
command: ["frontend"]
ports:
@ -93,12 +94,12 @@ services:
depends_on:
- dolphinscheduler-api
volumes:
- dolphinscheduler-frontend:/var/log/nginx
- dolphinscheduler-logs:/var/log/nginx
networks:
- dolphinscheduler
dolphinscheduler-alert:
image: registry.cn-qingdao.aliyuncs.com/sxyj/dolphinscheduler:dev
image: apache/dolphinscheduler:latest
container_name: dolphinscheduler-alert
command: ["alert-server"]
environment:
@ -130,18 +131,18 @@ services:
start_period: 30s
depends_on:
- dolphinscheduler-postgresql
volumes:
- dolphinscheduler-alert:/opt/dolphinscheduler/logs
networks:
volumes:
- dolphinscheduler-logs:/opt/dolphinscheduler/logs
networks:
- dolphinscheduler
dolphinscheduler-master:
image: registry.cn-qingdao.aliyuncs.com/sxyj/dolphinscheduler:dev
image: apache/dolphinscheduler:latest
container_name: dolphinscheduler-master
command: ["master-server"]
ports:
ports:
- 5678:5678
environment:
environment:
TZ: Asia/Shanghai
MASTER_EXEC_THREADS: "100"
MASTER_EXEC_TASK_NUM: "20"
@ -162,16 +163,16 @@ services:
timeout: 5s
retries: 3
start_period: 30s
depends_on:
depends_on:
- dolphinscheduler-postgresql
- dolphinscheduler-zookeeper
volumes:
- dolphinscheduler-master:/opt/dolphinscheduler/logs
volumes:
- dolphinscheduler-logs:/opt/dolphinscheduler/logs
networks:
- dolphinscheduler
dolphinscheduler-worker:
image: registry.cn-qingdao.aliyuncs.com/sxyj/dolphinscheduler:dev
image: apache/dolphinscheduler:latest
container_name: dolphinscheduler-worker
command: ["worker-server"]
ports:
@ -209,7 +210,7 @@ services:
source: dolphinscheduler-worker-data
target: /tmp/dolphinscheduler
- type: volume
source: dolphinscheduler-worker-logs
source: dolphinscheduler-logs
target: /opt/dolphinscheduler/logs
networks:
- dolphinscheduler
@ -221,12 +222,8 @@ networks:
volumes:
dolphinscheduler-postgresql:
dolphinscheduler-zookeeper:
dolphinscheduler-api:
dolphinscheduler-frontend:
dolphinscheduler-alert:
dolphinscheduler-master:
dolphinscheduler-worker-data:
dolphinscheduler-worker-logs:
dolphinscheduler-logs:
configs:
dolphinscheduler-worker-task-env:

27
docker/docker-swarm/docker-stack.yml

@ -42,6 +42,7 @@ services:
environment:
TZ: Asia/Shanghai
ALLOW_ANONYMOUS_LOGIN: "yes"
ZOO_4LW_COMMANDS_WHITELIST: srvr,ruok,wchs,cons
volumes:
- dolphinscheduler-zookeeper:/bitnami/zookeeper
networks:
@ -51,7 +52,7 @@ services:
replicas: 1
dolphinscheduler-api:
image: registry.cn-qingdao.aliyuncs.com/sxyj/dolphinscheduler:dev
image: apache/dolphinscheduler:latest
command: ["api-server"]
ports:
- 12345:12345
@ -70,7 +71,7 @@ services:
retries: 3
start_period: 30s
volumes:
- dolphinscheduler-api:/opt/dolphinscheduler/logs
- dolphinscheduler-logs:/opt/dolphinscheduler/logs
networks:
- dolphinscheduler
deploy:
@ -78,7 +79,7 @@ services:
replicas: 1
dolphinscheduler-frontend:
image: registry.cn-qingdao.aliyuncs.com/sxyj/dolphinscheduler:dev
image: apache/dolphinscheduler:latest
command: ["frontend"]
ports:
- 8888:8888
@ -93,7 +94,7 @@ services:
retries: 3
start_period: 30s
volumes:
- dolphinscheduler-frontend:/var/log/nginx
- dolphinscheduler-logs:/var/log/nginx
networks:
- dolphinscheduler
deploy:
@ -101,7 +102,7 @@ services:
replicas: 1
dolphinscheduler-alert:
image: registry.cn-qingdao.aliyuncs.com/sxyj/dolphinscheduler:dev
image: apache/dolphinscheduler:latest
command: ["alert-server"]
environment:
TZ: Asia/Shanghai
@ -131,7 +132,7 @@ services:
retries: 3
start_period: 30s
volumes:
- dolphinscheduler-alert:/opt/dolphinscheduler/logs
- dolphinscheduler-logs:/opt/dolphinscheduler/logs
networks:
- dolphinscheduler
deploy:
@ -139,7 +140,7 @@ services:
replicas: 1
dolphinscheduler-master:
image: registry.cn-qingdao.aliyuncs.com/sxyj/dolphinscheduler:dev
image: apache/dolphinscheduler:latest
command: ["master-server"]
ports:
- 5678:5678
@ -165,7 +166,7 @@ services:
retries: 3
start_period: 30s
volumes:
- dolphinscheduler-master:/opt/dolphinscheduler/logs
- dolphinscheduler-logs:/opt/dolphinscheduler/logs
networks:
- dolphinscheduler
deploy:
@ -173,7 +174,7 @@ services:
replicas: 1
dolphinscheduler-worker:
image: registry.cn-qingdao.aliyuncs.com/sxyj/dolphinscheduler:dev
image: apache/dolphinscheduler:latest
command: ["worker-server"]
ports:
- 1234:1234
@ -201,7 +202,7 @@ services:
start_period: 30s
volumes:
- dolphinscheduler-worker-data:/tmp/dolphinscheduler
- dolphinscheduler-worker-logs:/opt/dolphinscheduler/logs
- dolphinscheduler-logs:/opt/dolphinscheduler/logs
configs:
- source: dolphinscheduler-worker-task-env
target: /opt/dolphinscheduler/conf/env/dolphinscheduler_env.sh
@ -218,12 +219,8 @@ networks:
volumes:
dolphinscheduler-postgresql:
dolphinscheduler-zookeeper:
dolphinscheduler-api:
dolphinscheduler-frontend:
dolphinscheduler-alert:
dolphinscheduler-master:
dolphinscheduler-worker-data:
dolphinscheduler-worker-logs:
dolphinscheduler-logs:
configs:
dolphinscheduler-worker-task-env:

6
docker/kubernetes/dolphinscheduler/values.yaml

@ -25,9 +25,9 @@ fullnameOverride: ""
timezone: "Asia/Shanghai"
image:
registry: "docker.io"
registry: "apache"
repository: "dolphinscheduler"
tag: "1.3.0"
tag: "latest"
pullPolicy: "IfNotPresent"
imagePullSecrets: []
@ -56,6 +56,8 @@ externalDatabase:
zookeeper:
enabled: true
taskQueue: "zookeeper"
config:
ZOO_4LW_COMMANDS_WHITELIST: srvr,ruok,wchs,cons
service:
port: "2181"
persistence:

4
dockerfile/Dockerfile

@ -53,8 +53,8 @@ ENV PATH $ZK_HOME/bin:$PATH
RUN apk add postgresql postgresql-contrib
#5. add dolphinscheduler
ADD ./apache-dolphinscheduler-incubating-${VERSION}-SNAPSHOT-dolphinscheduler-bin.tar.gz /opt/
RUN mv /opt/apache-dolphinscheduler-incubating-${VERSION}-SNAPSHOT-dolphinscheduler-bin/ /opt/dolphinscheduler/
ADD ./apache-dolphinscheduler-incubating-${VERSION}-dolphinscheduler-bin.tar.gz /opt/
RUN mv /opt/apache-dolphinscheduler-incubating-${VERSION}-dolphinscheduler-bin/ /opt/dolphinscheduler/
ENV DOLPHINSCHEDULER_HOME /opt/dolphinscheduler
#6. modify nginx

10
dockerfile/hooks/build

@ -24,13 +24,13 @@ printenv
if [ -z "${VERSION}" ]
then
echo "set default environment variable [VERSION]"
VERSION=$(cat $(pwd)/sql/soft_version)
export VERSION=$(cat $(pwd)/pom.xml | grep '<version>' -m 1 | awk '{print $1}' | sed 's/<version>//' | sed 's/<\/version>//')
fi
if [ "${DOCKER_REPO}x" = "x" ]
then
echo "set default environment variable [DOCKER_REPO]"
DOCKER_REPO='dolphinscheduler'
export DOCKER_REPO='apache/dolphinscheduler'
fi
echo "Version: $VERSION"
@ -43,11 +43,11 @@ echo -e "mvn -B clean compile package -Prelease -Dmaven.test.skip=true"
mvn -B clean compile package -Prelease -Dmaven.test.skip=true
# mv dolphinscheduler-bin.tar.gz file to dockerfile directory
echo -e "mv $(pwd)/dolphinscheduler-dist/target/apache-dolphinscheduler-incubating-${VERSION}-SNAPSHOT-dolphinscheduler-bin.tar.gz $(pwd)/dockerfile/\n"
mv $(pwd)/dolphinscheduler-dist/target/apache-dolphinscheduler-incubating-${VERSION}-SNAPSHOT-dolphinscheduler-bin.tar.gz $(pwd)/dockerfile/
echo -e "mv $(pwd)/dolphinscheduler-dist/target/apache-dolphinscheduler-incubating-${VERSION}-dolphinscheduler-bin.tar.gz $(pwd)/dockerfile/\n"
mv $(pwd)/dolphinscheduler-dist/target/apache-dolphinscheduler-incubating-${VERSION}-dolphinscheduler-bin.tar.gz $(pwd)/dockerfile/
# docker build
echo -e "docker build --build-arg VERSION=${VERSION} -t $DOCKER_REPO:${VERSION} $(pwd)/dockerfile/\n"
docker build --build-arg VERSION=${VERSION} -t $DOCKER_REPO:${VERSION} $(pwd)/dockerfile/
sudo docker build --build-arg VERSION=${VERSION} -t $DOCKER_REPO:${VERSION} $(pwd)/dockerfile/
echo "------ dolphinscheduler end - build -------"

19
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/template/impl/DefaultHTMLTemplate.java

@ -35,6 +35,7 @@ public class DefaultHTMLTemplate implements AlertTemplate {
public static final Logger logger = LoggerFactory.getLogger(DefaultHTMLTemplate.class);
@Override
public String getMessageFromTemplate(String content, ShowType showType,boolean showAll) {
@ -140,21 +141,7 @@ public class DefaultHTMLTemplate implements AlertTemplate {
checkNotNull(content);
String htmlTableThead = StringUtils.isEmpty(title) ? "" : String.format("<thead>%s</thead>\n",title);
return "<html>\n" +
" <head>\n" +
" <title>dolphinscheduler</title>\n" +
" <meta name='Keywords' content=''>\n" +
" <meta name='Description' content=''>\n" +
" <style type=\"text/css\">\n" +
" table {margin-top:0px;padding-top:0px;border:1px solid;font-size: 14px;color: #333333;border-width: 1px;border-color: #666666;border-collapse: collapse;}\n" +
" table th {border-width: 1px;padding: 8px;border-style: solid;border-color: #666666;background-color: #dedede;text-align: right;}\n" +
" table td {border-width: 1px;padding: 8px;border-style: solid;border-color: #666666;background-color: #ffffff;text-align: right;}\n" +
" </style>\n" +
" </head>\n" +
" <body style=\"margin:0;padding:0\">\n" +
" <table border=\"1px\" cellpadding=\"5px\" cellspacing=\"-10px\">\n" + htmlTableThead + content +
" </table>\n" +
" </body>\n" +
"</html>";
return Constants.HTML_HEADER_PREFIX +htmlTableThead + content + Constants.TABLE_BODY_HTML_TAIL;
}
}

4
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/Constants.java

@ -157,6 +157,10 @@ public class Constants {
public static final String ENTERPRISE_WECHAT_USERS = "enterprise.wechat.users";
public static final String HTML_HEADER_PREFIX = "<!DOCTYPE HTML PUBLIC '-//W3C//DTD HTML 4.01 Transitional//EN' 'http://www.w3.org/TR/html4/loose.dtd'><html><head><title>dolphinscheduler</title><meta name='Keywords' content=''><meta name='Description' content=''><style type=\"text/css\">table {margin-top:0px;padding-top:0px;border:1px solid;font-size: 14px;color: #333333;border-width: 1px;border-color: #666666;border-collapse: collapse;}table th {border-width: 1px;padding: 8px;border-style: solid;border-color: #666666;background-color: #dedede;text-align: left;}table td {border-width: 1px;padding: 8px;border-style: solid;border-color: #666666;background-color: #ffffff;text-align: left;}</style></head><body style=\"margin:0;padding:0\"><table border=\"1px\" cellpadding=\"5px\" cellspacing=\"-10px\"> ";
public static final String TABLE_BODY_HTML_TAIL = "</table></body></html>";
/**
* plugin config
*/

2
dolphinscheduler-alert/src/main/resources/logback-alert.xml

@ -46,7 +46,7 @@
<root level="INFO">
<appender-ref ref="STDOUT"/>
<appender-ref ref="APILOGFILE"/>
<appender-ref ref="ALERTLOGFILE"/>
</root>
</configuration>

37
dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/template/impl/DefaultHTMLTemplateTest.java

@ -16,6 +16,7 @@
*/
package org.apache.dolphinscheduler.alert.template.impl;
import org.apache.dolphinscheduler.alert.utils.Constants;
import org.apache.dolphinscheduler.alert.utils.JSONUtils;
import org.apache.dolphinscheduler.common.enums.ShowType;
import org.junit.Test;
@ -82,42 +83,14 @@ public class DefaultHTMLTemplateTest{
private String generateMockTableTypeResultByHand(){
return "<html>\n" +
" <head>\n" +
" <title>dolphinscheduler</title>\n" +
" <meta name='Keywords' content=''>\n" +
" <meta name='Description' content=''>\n" +
" <style type=\"text/css\">\n" +
" table {margin-top:0px;padding-top:0px;border:1px solid;font-size: 14px;color: #333333;border-width: 1px;border-color: #666666;border-collapse: collapse;}\n" +
" table th {border-width: 1px;padding: 8px;border-style: solid;border-color: #666666;background-color: #dedede;text-align: right;}\n" +
" table td {border-width: 1px;padding: 8px;border-style: solid;border-color: #666666;background-color: #ffffff;text-align: right;}\n" +
" </style>\n" +
" </head>\n" +
" <body style=\"margin:0;padding:0\">\n" +
" <table border=\"1px\" cellpadding=\"5px\" cellspacing=\"-10px\">\n" +
return Constants.HTML_HEADER_PREFIX +
"<thead><tr><th>mysql service name</th><th>mysql address</th><th>port</th><th>no index of number</th><th>database client connections</th></tr></thead>\n" +
"<tr><td>mysql200</td><td>192.168.xx.xx</td><td>3306</td><td>80</td><td>190</td></tr><tr><td>mysql210</td><td>192.168.xx.xx</td><td>3306</td><td>10</td><td>90</td></tr> </table>\n" +
" </body>\n" +
"</html>";
"<tr><td>mysql200</td><td>192.168.xx.xx</td><td>3306</td><td>80</td><td>190</td></tr><tr><td>mysql210</td><td>192.168.xx.xx</td><td>3306</td><td>10</td><td>90</td></tr>" + Constants.TABLE_BODY_HTML_TAIL;
}
private String generateMockTextTypeResultByHand(){
return "<html>\n" +
" <head>\n" +
" <title>dolphinscheduler</title>\n" +
" <meta name='Keywords' content=''>\n" +
" <meta name='Description' content=''>\n" +
" <style type=\"text/css\">\n" +
" table {margin-top:0px;padding-top:0px;border:1px solid;font-size: 14px;color: #333333;border-width: 1px;border-color: #666666;border-collapse: collapse;}\n" +
" table th {border-width: 1px;padding: 8px;border-style: solid;border-color: #666666;background-color: #dedede;text-align: right;}\n" +
" table td {border-width: 1px;padding: 8px;border-style: solid;border-color: #666666;background-color: #ffffff;text-align: right;}\n" +
" </style>\n" +
" </head>\n" +
" <body style=\"margin:0;padding:0\">\n" +
" <table border=\"1px\" cellpadding=\"5px\" cellspacing=\"-10px\">\n" +
"<tr><td>{\"mysql service name\":\"mysql200\",\"mysql address\":\"192.168.xx.xx\",\"database client connections\":\"190\",\"port\":\"3306\",\"no index of number\":\"80\"}</td></tr><tr><td>{\"mysql service name\":\"mysql210\",\"mysql address\":\"192.168.xx.xx\",\"database client connections\":\"90\",\"port\":\"3306\",\"no index of number\":\"10\"}</td></tr> </table>\n" +
" </body>\n" +
"</html>";
return Constants.HTML_HEADER_PREFIX + "<tr><td>{\"mysql service name\":\"mysql200\",\"mysql address\":\"192.168.xx.xx\",\"database client connections\":\"190\",\"port\":\"3306\",\"no index of number\":\"80\"}</td></tr><tr><td>{\"mysql service name\":\"mysql210\",\"mysql address\":\"192.168.xx.xx\",\"database client connections\":\"90\",\"port\":\"3306\",\"no index of number\":\"10\"}</td></tr>" + Constants.TABLE_BODY_HTML_TAIL;
}
}

5
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java

@ -476,8 +476,6 @@ public class ProcessInstanceService extends BaseDAGService {
return checkResult;
}
ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId);
List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(processInstanceId);
if (null == processInstance) {
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
return result;
@ -485,8 +483,11 @@ public class ProcessInstanceService extends BaseDAGService {
processService.removeTaskLogFile(processInstanceId);
// delete database cascade
int delete = processService.deleteWorkProcessInstanceById(processInstanceId);
processService.deleteAllSubWorkProcessByParentId(processInstanceId);
processService.deleteWorkProcessMapByParentId(processInstanceId);

2
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java

@ -76,7 +76,7 @@ public enum ExecutionStatus {
* @return status
*/
public boolean typeIsFailure(){
return this == FAILURE || this == NEED_FAULT_TOLERANCE;
return this == FAILURE || this == NEED_FAULT_TOLERANCE || this == KILL;
}
/**

2
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java

@ -1 +1 @@
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; public enum CommandType { /** * roll view log request */ ROLL_VIEW_LOG_REQUEST, /** * roll view log response */ ROLL_VIEW_LOG_RESPONSE, /** * view whole log request */ VIEW_WHOLE_LOG_REQUEST, /** * view whole log response */ VIEW_WHOLE_LOG_RESPONSE, /** * get log bytes request */ GET_LOG_BYTES_REQUEST, /** * get log bytes response */ GET_LOG_BYTES_RESPONSE, WORKER_REQUEST, MASTER_RESPONSE, /** * execute task request */ TASK_EXECUTE_REQUEST, /** * execute task ack */ TASK_EXECUTE_ACK, /** * execute task response */ TASK_EXECUTE_RESPONSE, /** * kill task */ TASK_KILL_REQUEST, /** * kill task response */ TASK_KILL_RESPONSE, /** * ping */ PING, /** * pong */ PONG; }
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; public enum CommandType { /** * remove task log request, */ REMOVE_TAK_LOG_REQUEST, /** * remove task log response */ REMOVE_TAK_LOG_RESPONSE, /** * roll view log request */ ROLL_VIEW_LOG_REQUEST, /** * roll view log response */ ROLL_VIEW_LOG_RESPONSE, /** * view whole log request */ VIEW_WHOLE_LOG_REQUEST, /** * view whole log response */ VIEW_WHOLE_LOG_RESPONSE, /** * get log bytes request */ GET_LOG_BYTES_REQUEST, /** * get log bytes response */ GET_LOG_BYTES_RESPONSE, WORKER_REQUEST, MASTER_RESPONSE, /** * execute task request */ TASK_EXECUTE_REQUEST, /** * execute task ack */ TASK_EXECUTE_ACK, /** * execute task response */ TASK_EXECUTE_RESPONSE, /** * kill task */ TASK_KILL_REQUEST, /** * kill task response */ TASK_KILL_RESPONSE, /** * ping */ PING, /** * pong */ PONG; }

63
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogRequestCommand.java

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command.log;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
/**
* remove task log request command
*/
public class RemoveTaskLogRequestCommand implements Serializable {
/**
* log path
*/
private String path;
public RemoveTaskLogRequestCommand() {
}
public RemoveTaskLogRequestCommand(String path) {
this.path = path;
}
public String getPath() {
return path;
}
public void setPath(String path) {
this.path = path;
}
/**
* package request command
*
* @return command
*/
public Command convert2Command(){
Command command = new Command();
command.setType(CommandType.REMOVE_TAK_LOG_REQUEST);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
}

63
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogResponseCommand.java

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command.log;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
/**
* remove task log request command
*/
public class RemoveTaskLogResponseCommand implements Serializable {
/**
* log path
*/
private Boolean status;
public RemoveTaskLogResponseCommand() {
}
public RemoveTaskLogResponseCommand(Boolean status) {
this.status = status;
}
public Boolean getStatus() {
return status;
}
public void setStatus(Boolean status) {
this.status = status;
}
/**
* package request command
*
* @return command
*/
public Command convert2Command(long opaque){
Command command = new Command(opaque);
command.setType(CommandType.REMOVE_TAK_LOG_RESPONSE);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
}

35
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java

@ -60,14 +60,14 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
*/
final CommandType commandType = command.getType();
switch (commandType){
case GET_LOG_BYTES_REQUEST:
GetLogBytesRequestCommand getLogRequest = FastJsonSerializer.deserialize(
command.getBody(), GetLogBytesRequestCommand.class);
byte[] bytes = getFileContentBytes(getLogRequest.getPath());
GetLogBytesResponseCommand getLogResponse = new GetLogBytesResponseCommand(bytes);
channel.writeAndFlush(getLogResponse.convert2Command(command.getOpaque()));
break;
case VIEW_WHOLE_LOG_REQUEST:
case GET_LOG_BYTES_REQUEST:
GetLogBytesRequestCommand getLogRequest = FastJsonSerializer.deserialize(
command.getBody(), GetLogBytesRequestCommand.class);
byte[] bytes = getFileContentBytes(getLogRequest.getPath());
GetLogBytesResponseCommand getLogResponse = new GetLogBytesResponseCommand(bytes);
channel.writeAndFlush(getLogResponse.convert2Command(command.getOpaque()));
break;
case VIEW_WHOLE_LOG_REQUEST:
ViewLogRequestCommand viewLogRequest = FastJsonSerializer.deserialize(
command.getBody(), ViewLogRequestCommand.class);
String msg = readWholeFileContent(viewLogRequest.getPath());
@ -86,6 +86,25 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
RollViewLogResponseCommand rollViewLogRequestResponse = new RollViewLogResponseCommand(builder.toString());
channel.writeAndFlush(rollViewLogRequestResponse.convert2Command(command.getOpaque()));
break;
case REMOVE_TAK_LOG_REQUEST:
RemoveTaskLogRequestCommand removeTaskLogRequest = FastJsonSerializer.deserialize(
command.getBody(), RemoveTaskLogRequestCommand.class);
String taskLogPath = removeTaskLogRequest.getPath();
File taskLogFile = new File(taskLogPath);
Boolean status = true;
try {
if (taskLogFile.exists()){
taskLogFile.delete();
}
}catch (Exception e){
status = false;
}
RemoveTaskLogResponseCommand removeTaskLogResponse = new RemoveTaskLogResponseCommand(status);
channel.writeAndFlush(removeTaskLogResponse.convert2Command(command.getOpaque()));
break;
default:
throw new IllegalArgumentException("unknown commandType");
}

1
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerServer.java

@ -55,6 +55,7 @@ public class LoggerServer {
this.server.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, requestProcessor, requestProcessor.getExecutor());
this.server.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, requestProcessor, requestProcessor.getExecutor());
this.server.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, requestProcessor, requestProcessor.getExecutor());
this.server.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, requestProcessor, requestProcessor.getExecutor());
}
/**

1
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

@ -496,6 +496,7 @@ public class MasterExecThread implements Runnable {
}
String processWorkerGroup = processInstance.getWorkerGroup();
processWorkerGroup = StringUtils.isBlank(processWorkerGroup) ? DEFAULT_WORKER_GROUP : processWorkerGroup;
String taskWorkerGroup = StringUtils.isBlank(taskNode.getWorkerGroup()) ? processWorkerGroup : taskNode.getWorkerGroup();
if (!processWorkerGroup.equals(DEFAULT_WORKER_GROUP) && taskWorkerGroup.equals(DEFAULT_WORKER_GROUP)) {
taskInstance.setWorkerGroup(processWorkerGroup);

12
dolphinscheduler-server/src/main/resources/config/install_config.conf

@ -118,7 +118,7 @@ apiServerPort="12345"
# install hosts
# Note: install the scheduled hostname list. If it is pseudo-distributed, just write a pseudo-distributed hostname
ips="ark0,ark1,ark2,ark3,ark4"
ips="ds1,ds2,ds3,ds4,ds5"
# ssh port, default 22
# Note: if ssh port is not default, modify here
@ -126,19 +126,19 @@ sshPort="22"
# run master machine
# Note: list of hosts hostname for deploying master
masters="ark0,ark1"
masters="ds1,ds2"
# run worker machine
# note: list of machine hostnames for deploying workers
workers="ark2,ark3,ark4"
# note: need to write the worker group name of each worker, the default value is "default"
workersGroup=(["ds1"]="default" ["ds2"]="default" ["ds3"]="default" ["ds4"]="default" ["ds5"]="default")
# run alert machine
# note: list of machine hostnames for deploying alert server
alertServer="ark3"
alertServer="ds3"
# run api machine
# note: list of machine hostnames for deploying api server
apiServers="ark1"
apiServers="ds1"
# whether to start monitoring self-starting scripts
monitorServerState="false"

4
dolphinscheduler-server/src/main/resources/worker.properties

@ -21,7 +21,7 @@
# worker heartbeat interval
#worker.heartbeat.interval=10
# submit the number of tasks at a time
# submit the number of tasks at a time TODO
#worker.fetch.task.num = 3
# only less than cpu avg load, worker server can work. default value : the number of cpu cores * 2
@ -34,4 +34,4 @@
#worker.listen.port: 1234
# default worker group
#worker.group=default
worker.group=default

29
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java

@ -144,4 +144,33 @@ public class LogClientService {
}
return result;
}
/**
* remove task log
* @param host host
* @param port port
* @param path path
* @return remove task status
*/
public Boolean removeTaskLog(String host, int port, String path) {
logger.info("log path {}", path);
RemoveTaskLogRequestCommand request = new RemoveTaskLogRequestCommand(path);
Boolean result = false;
final Host address = new Host(host, port);
try {
Command command = request.convert2Command();
Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT);
if(response != null){
RemoveTaskLogResponseCommand taskLogResponse = FastJsonSerializer.deserialize(
response.getBody(), RemoveTaskLogResponseCommand.class);
return taskLogResponse.getStatus();
}
} catch (Exception e) {
logger.error("remove task log error", e);
} finally {
this.client.closeChannel(address);
}
return result;
}
}

37
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -29,6 +29,8 @@ import org.apache.dolphinscheduler.common.task.subprocess.SubProcessParameters;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.mapper.*;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
import org.quartz.CronExpression;
import org.slf4j.Logger;
@ -37,6 +39,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.io.File;
import java.util.*;
import java.util.stream.Collectors;
@ -238,7 +241,7 @@ public class ProcessService {
* @param defineId
* @return
*/
public List<TaskNode> getTaskNodeListByDefinitionId(Integer defineId){
public List<TaskNode> getTaskNodeListByDefinitionId(Integer defineId){
ProcessDefinition processDefinition = processDefineMapper.selectById(defineId);
if (processDefinition == null) {
logger.info("process define not exists");
@ -293,15 +296,44 @@ public class ProcessService {
List<Integer> subProcessIdList = processInstanceMapMapper.querySubIdListByParentId(processInstanceId);
for(Integer subId : subProcessIdList ){
for(Integer subId : subProcessIdList){
deleteAllSubWorkProcessByParentId(subId);
deleteWorkProcessMapByParentId(subId);
removeTaskLogFile(subId);
deleteWorkProcessInstanceById(subId);
}
return 1;
}
/**
* remove task log file
* @param processInstanceId processInstanceId
*/
public void removeTaskLogFile(Integer processInstanceId){
LogClientService logClient = new LogClientService();
List<TaskInstance> taskInstanceList = findValidTaskListByProcessId(processInstanceId);
if (CollectionUtils.isEmpty(taskInstanceList)){
return;
}
for (TaskInstance taskInstance : taskInstanceList){
String taskLogPath = taskInstance.getLogPath();
if (StringUtils.isEmpty(taskInstance.getHost())){
continue;
}
String ip = Host.of(taskInstance.getHost()).getIp();
int port = Constants.RPC_PORT;
// remove task log from loggerserver
logClient.removeTaskLog(ip,port,taskLogPath);
}
}
/**
* calculate sub process number in the process define.
* @param processDefinitionId processDefinitionId
@ -926,6 +958,7 @@ public class ProcessService {
command.setCommandParam(processMapStr);
command.setCommandType(commandType);
command.setProcessInstancePriority(parentProcessInstance.getProcessInstancePriority());
command.setWorkerGroup(parentProcessInstance.getWorkerGroup());
createCommand(command);
logger.info("sub process command created: {} ", command.toString());
}

4
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java

@ -32,6 +32,7 @@ import org.quartz.JobExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import java.util.Date;
@ -98,7 +99,8 @@ public class ProcessScheduleJob implements Job {
command.setScheduleTime(scheduledFireTime);
command.setStartTime(fireTime);
command.setWarningGroupId(schedule.getWarningGroupId());
command.setWorkerGroup(schedule.getWorkerGroup());
String workerGroup = StringUtils.isEmpty(schedule.getWorkerGroup()) ? Constants.DEFAULT_WORKER_GROUP : schedule.getWorkerGroup();
command.setWorkerGroup(workerGroup);
command.setWarningType(schedule.getWarningType());
command.setProcessInstancePriority(schedule.getProcessInstancePriority());

11
script/scp-hosts.sh

@ -20,6 +20,12 @@ workDir=`dirname $0`
workDir=`cd ${workDir};pwd`
source $workDir/../conf/config/install_config.conf
txt=""
if [[ "$OSTYPE" == "darwin"* ]]; then
# Mac OSX
txt="''"
fi
hostsArr=(${ips//,/ })
for host in ${hostsArr[@]}
do
@ -33,6 +39,11 @@ do
for dsDir in bin conf lib script sql ui install.sh
do
# if worker in workersGroup
if [[ "${map[${host}]}" ]] && [[ "${dsDir}" -eq "conf" ]]; then
sed -i ${txt} "s#worker.group.*#worker.group=${map[${host}]}#g" $workDir/../conf/worker.properties
fi
echo "start to scp $dsDir to $host/$installPath"
scp -P $sshPort -r $workDir/../$dsDir $host:$installPath
done

3
script/start-all.sh

@ -28,8 +28,7 @@ do
done
workersHost=(${workers//,/ })
for worker in ${workersHost[@]}
for worker in ${!workersGroup[*]}
do
echo "$worker worker server is starting"

3
script/stop-all.sh

@ -29,8 +29,7 @@ do
done
workersHost=(${workers//,/ })
for worker in ${workersHost[@]}
for worker in ${!workersGroup[*]}
do
echo "$worker worker server is stopping"
ssh -p $sshPort $worker "cd $installPath/; sh bin/dolphinscheduler-daemon.sh stop worker-server;"

3
sql/upgrade/1.3.0_schema/mysql/dolphinscheduler_dml.sql

@ -17,4 +17,5 @@
SET sql_mode=(SELECT REPLACE(@@sql_mode,'ONLY_FULL_GROUP_BY',''));
SET FOREIGN_KEY_CHECKS=0;
UPDATE t_ds_resources SET pid=-1,is_directory=false WHERE pid IS NULL;
UPDATE t_ds_resources SET full_name = concat('/',alias) WHERE pid=-1 and full_name IS NULL;
UPDATE t_ds_resources SET full_name = concat('/',alias) WHERE pid=-1 and full_name IS NULL;
UPDATE QRTZ_JOB_DETAILS SET JOB_CLASS_NAME='org.apache.dolphinscheduler.service.quartz.ProcessScheduleJob' WHERE JOB_CLASS_NAME='org.apache.dolphinscheduler.server.quartz.ProcessScheduleJob';

3
sql/upgrade/1.3.0_schema/postgresql/dolphinscheduler_dml.sql

@ -15,4 +15,5 @@
* limitations under the License.
*/
UPDATE t_ds_resources SET pid=-1,is_directory=false WHERE pid IS NULL;
UPDATE t_ds_resources SET full_name = concat('/',alias) WHERE pid=-1 and full_name IS NULL;
UPDATE t_ds_resources SET full_name = concat('/',alias) WHERE pid=-1 and full_name IS NULL;
UPDATE QRTZ_JOB_DETAILS SET JOB_CLASS_NAME='org.apache.dolphinscheduler.service.quartz.ProcessScheduleJob' WHERE JOB_CLASS_NAME='org.apache.dolphinscheduler.server.quartz.ProcessScheduleJob';
Loading…
Cancel
Save