diff --git a/.github/workflows/ci_ut.yml b/.github/workflows/ci_ut.yml
index 66e7660359..55f1259d2b 100644
--- a/.github/workflows/ci_ut.yml
+++ b/.github/workflows/ci_ut.yml
@@ -21,7 +21,6 @@ on:
branches:
- dev
env:
- DOCKER_DIR: ./docker
LOG_DIR: /tmp/dolphinscheduler
name: Unit Test
@@ -49,6 +48,8 @@ jobs:
- name: Bootstrap database
run: |
sed -i "s/: root/: test/g" $(pwd)/docker/docker-swarm/docker-compose.yml
+ docker-compose -f $(pwd)/docker/docker-swarm/docker-compose.yml create --force-recreate dolphinscheduler-zookeeper dolphinscheduler-postgresql
+ sudo cp $(pwd)/sql/dolphinscheduler-postgre.sql $(docker volume inspect docker-swarm_dolphinscheduler-postgresql-initdb | grep "Mountpoint" | awk -F "\"" '{print $4}')
docker-compose -f $(pwd)/docker/docker-swarm/docker-compose.yml up -d dolphinscheduler-zookeeper dolphinscheduler-postgresql
- name: Set up JDK 1.8
uses: actions/setup-java@v1
@@ -84,6 +85,5 @@ jobs:
- name: Collect logs
run: |
mkdir -p ${LOG_DIR}
- cd ${DOCKER_DIR}
- docker-compose logs db > ${LOG_DIR}/db.txt
+ docker-compose -f $(pwd)/docker/docker-swarm/docker-compose.yml logs dolphinscheduler-postgresql > ${LOG_DIR}/db.txt
continue-on-error: true
diff --git a/docker/build/Dockerfile b/docker/build/Dockerfile
index 3b02a1364e..bb69347797 100644
--- a/docker/build/Dockerfile
+++ b/docker/build/Dockerfile
@@ -37,14 +37,13 @@ RUN apk add openjdk8
ENV JAVA_HOME /usr/lib/jvm/java-1.8-openjdk
ENV PATH $JAVA_HOME/bin:$PATH
-#3. install pg
-RUN apk add postgresql postgresql-contrib
-
-#4. add dolphinscheduler
+#3. add dolphinscheduler
ADD ./apache-dolphinscheduler-incubating-${VERSION}-dolphinscheduler-bin.tar.gz /opt/
RUN mv /opt/apache-dolphinscheduler-incubating-${VERSION}-dolphinscheduler-bin/ /opt/dolphinscheduler/
ENV DOLPHINSCHEDULER_HOME /opt/dolphinscheduler
+#4. install pg
+RUN apk add postgresql postgresql-contrib
#5. modify nginx
RUN echo "daemon off;" >> /etc/nginx/nginx.conf && \
rm -rf /etc/nginx/conf.d/*
@@ -73,8 +72,9 @@ RUN chmod +x /root/checkpoint.sh && \
ln -s /bin/bash /bin/sh && \
mkdir -p /tmp/xls
-#7. remove apk index cache
-RUN rm -rf /var/cache/apk/*
+#7. remove apk index cache and disable coredup for sudo
+RUN rm -rf /var/cache/apk/* && \
+ echo "Set disable_coredump false" >> /etc/sudo.conf
#8. expose port
EXPOSE 2181 2888 3888 5432 5678 1234 12345 50051 8888
diff --git a/docker/build/startup.sh b/docker/build/startup.sh
index 71ab506777..00bb9cdc52 100644
--- a/docker/build/startup.sh
+++ b/docker/build/startup.sh
@@ -24,31 +24,6 @@ DOLPHINSCHEDULER_LOGS=${DOLPHINSCHEDULER_HOME}/logs
# start postgresql
initPostgreSQL() {
- echo "checking postgresql"
- if [[ "${POSTGRESQL_HOST}" = "127.0.0.1" || "${POSTGRESQL_HOST}" = "localhost" ]]; then
- export PGPORT=${POSTGRESQL_PORT}
-
- echo "start postgresql service"
- rc-service postgresql restart
-
- # role if not exists, create
- flag=$(sudo -u postgres psql -tAc "SELECT 1 FROM pg_roles WHERE rolname='${POSTGRESQL_USERNAME}'")
- if [ -z "${flag}" ]; then
- echo "create user"
- sudo -u postgres psql -tAc "create user ${POSTGRESQL_USERNAME} with password '${POSTGRESQL_PASSWORD}'"
- fi
-
- # database if not exists, create
- flag=$(sudo -u postgres psql -tAc "select 1 from pg_database where datname='dolphinscheduler'")
- if [ -z "${flag}" ]; then
- echo "init db"
- sudo -u postgres psql -tAc "create database dolphinscheduler owner ${POSTGRESQL_USERNAME}"
- fi
-
- # grant
- sudo -u postgres psql -tAc "grant all privileges on database dolphinscheduler to ${POSTGRESQL_USERNAME}"
- fi
-
echo "test postgresql service"
while ! nc -z ${POSTGRESQL_HOST} ${POSTGRESQL_PORT}; do
counter=$((counter+1))
@@ -73,24 +48,18 @@ initPostgreSQL() {
# start zk
initZK() {
- echo -e "checking zookeeper"
- if [[ "${ZOOKEEPER_QUORUM}" = "127.0.0.1:2181" || "${ZOOKEEPER_QUORUM}" = "localhost:2181" ]]; then
- echo "start local zookeeper"
- /opt/zookeeper/bin/zkServer.sh restart
- else
- echo "connect remote zookeeper"
- echo "${ZOOKEEPER_QUORUM}" | awk -F ',' 'BEGIN{ i=1 }{ while( i <= NF ){ print $i; i++ } }' | while read line; do
- while ! nc -z ${line%:*} ${line#*:}; do
- counter=$((counter+1))
- if [ $counter == 30 ]; then
- echo "Error: Couldn't connect to zookeeper."
- exit 1
- fi
- echo "Trying to connect to zookeeper at ${line}. Attempt $counter."
- sleep 5
- done
+ echo "connect remote zookeeper"
+ echo "${ZOOKEEPER_QUORUM}" | awk -F ',' 'BEGIN{ i=1 }{ while( i <= NF ){ print $i; i++ } }' | while read line; do
+ while ! nc -z ${line%:*} ${line#*:}; do
+ counter=$((counter+1))
+ if [ $counter == 30 ]; then
+ echo "Error: Couldn't connect to zookeeper."
+ exit 1
+ fi
+ echo "Trying to connect to zookeeper at ${line}. Attempt $counter."
+ sleep 5
done
- fi
+ done
}
# start nginx
diff --git a/docker/docker-swarm/docker-compose.yml b/docker/docker-swarm/docker-compose.yml
index b564eda842..ee8be2570d 100644
--- a/docker/docker-swarm/docker-compose.yml
+++ b/docker/docker-swarm/docker-compose.yml
@@ -30,6 +30,7 @@ services:
POSTGRESQL_DATABASE: dolphinscheduler
volumes:
- dolphinscheduler-postgresql:/bitnami/postgresql
+ - dolphinscheduler-postgresql-initdb:/docker-entrypoint-initdb.d
networks:
- dolphinscheduler
@@ -175,10 +176,10 @@ services:
image: apache/dolphinscheduler:latest
container_name: dolphinscheduler-worker
command: ["worker-server"]
- ports:
+ ports:
- 1234:1234
- 50051:50051
- environment:
+ environment:
TZ: Asia/Shanghai
WORKER_EXEC_THREADS: "100"
WORKER_HEARTBEAT_INTERVAL: "10"
@@ -221,6 +222,7 @@ networks:
volumes:
dolphinscheduler-postgresql:
+ dolphinscheduler-postgresql-initdb:
dolphinscheduler-zookeeper:
dolphinscheduler-worker-data:
dolphinscheduler-logs:
diff --git a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/template/impl/DefaultHTMLTemplate.java b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/template/impl/DefaultHTMLTemplate.java
index 7039d7c5c4..b5ba75522b 100644
--- a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/template/impl/DefaultHTMLTemplate.java
+++ b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/template/impl/DefaultHTMLTemplate.java
@@ -35,6 +35,7 @@ public class DefaultHTMLTemplate implements AlertTemplate {
public static final Logger logger = LoggerFactory.getLogger(DefaultHTMLTemplate.class);
+
@Override
public String getMessageFromTemplate(String content, ShowType showType,boolean showAll) {
@@ -140,7 +141,7 @@ public class DefaultHTMLTemplate implements AlertTemplate {
checkNotNull(content);
String htmlTableThead = StringUtils.isEmpty(title) ? "" : String.format("%s\n",title);
- return "
dolphinscheduler " +htmlTableThead + content +"
";
+ return Constants.HTML_HEADER_PREFIX +htmlTableThead + content + Constants.TABLE_BODY_HTML_TAIL;
}
}
diff --git a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/Constants.java b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/Constants.java
index 8fa38c62fc..540b1a9409 100644
--- a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/Constants.java
+++ b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/Constants.java
@@ -157,6 +157,10 @@ public class Constants {
public static final String ENTERPRISE_WECHAT_USERS = "enterprise.wechat.users";
+ public static final String HTML_HEADER_PREFIX = "dolphinscheduler ";
+
+ public static final String TABLE_BODY_HTML_TAIL = "
";
+
/**
* plugin config
*/
diff --git a/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/template/impl/DefaultHTMLTemplateTest.java b/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/template/impl/DefaultHTMLTemplateTest.java
index 58609c07cb..52d5853b92 100644
--- a/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/template/impl/DefaultHTMLTemplateTest.java
+++ b/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/template/impl/DefaultHTMLTemplateTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.dolphinscheduler.alert.template.impl;
+import org.apache.dolphinscheduler.alert.utils.Constants;
import org.apache.dolphinscheduler.alert.utils.JSONUtils;
import org.apache.dolphinscheduler.common.enums.ShowType;
import org.junit.Test;
@@ -82,42 +83,14 @@ public class DefaultHTMLTemplateTest{
private String generateMockTableTypeResultByHand(){
- return "\n" +
- " \n" +
- " dolphinscheduler\n" +
- " \n" +
- " \n" +
- " \n" +
- " \n" +
- " \n" +
- " \n" +
+ return Constants.HTML_HEADER_PREFIX +
"mysql service name | mysql address | port | no index of number | database client connections |
\n" +
- "mysql200 | 192.168.xx.xx | 3306 | 80 | 190 |
mysql210 | 192.168.xx.xx | 3306 | 10 | 90 |
\n" +
- " \n" +
- "";
+ "mysql200 | 192.168.xx.xx | 3306 | 80 | 190 |
mysql210 | 192.168.xx.xx | 3306 | 10 | 90 |
" + Constants.TABLE_BODY_HTML_TAIL;
+
}
private String generateMockTextTypeResultByHand(){
- return "\n" +
- " \n" +
- " dolphinscheduler\n" +
- " \n" +
- " \n" +
- " \n" +
- " \n" +
- " \n" +
- " \n" +
- "{\"mysql service name\":\"mysql200\",\"mysql address\":\"192.168.xx.xx\",\"database client connections\":\"190\",\"port\":\"3306\",\"no index of number\":\"80\"} |
{\"mysql service name\":\"mysql210\",\"mysql address\":\"192.168.xx.xx\",\"database client connections\":\"90\",\"port\":\"3306\",\"no index of number\":\"10\"} |
\n" +
- " \n" +
- "";
+ return Constants.HTML_HEADER_PREFIX + "{\"mysql service name\":\"mysql200\",\"mysql address\":\"192.168.xx.xx\",\"database client connections\":\"190\",\"port\":\"3306\",\"no index of number\":\"80\"} |
{\"mysql service name\":\"mysql210\",\"mysql address\":\"192.168.xx.xx\",\"database client connections\":\"90\",\"port\":\"3306\",\"no index of number\":\"10\"} |
" + Constants.TABLE_BODY_HTML_TAIL;
}
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
index a5a341376e..f8ad4c6e8e 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
@@ -476,8 +476,6 @@ public class ProcessInstanceService extends BaseDAGService {
return checkResult;
}
ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId);
- List taskInstanceList = processService.findValidTaskListByProcessId(processInstanceId);
-
if (null == processInstance) {
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
return result;
@@ -485,8 +483,11 @@ public class ProcessInstanceService extends BaseDAGService {
+ processService.removeTaskLogFile(processInstanceId);
// delete database cascade
int delete = processService.deleteWorkProcessInstanceById(processInstanceId);
+
+
processService.deleteAllSubWorkProcessByParentId(processInstanceId);
processService.deleteWorkProcessMapByParentId(processInstanceId);
diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapperTest.java
index 9dafbe138c..e92ec54540 100644
--- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapperTest.java
+++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapperTest.java
@@ -73,6 +73,22 @@ public class ProcessDefinitionMapperTest {
return processDefinition;
}
+ /**
+ * insert
+ * @return ProcessDefinition
+ */
+ private ProcessDefinition insertTwo(){
+ //insertOne
+ ProcessDefinition processDefinition = new ProcessDefinition();
+ processDefinition.setName("def 2");
+ processDefinition.setProjectId(1010);
+ processDefinition.setUserId(101);
+ processDefinition.setUpdateTime(new Date());
+ processDefinition.setCreateTime(new Date());
+ processDefinitionMapper.insert(processDefinition);
+ return processDefinition;
+ }
+
/**
* test update
*/
@@ -177,7 +193,7 @@ public class ProcessDefinitionMapperTest {
public void testQueryDefinitionListByIdList() {
ProcessDefinition processDefinition = insertOne();
- ProcessDefinition processDefinition1 = insertOne();
+ ProcessDefinition processDefinition1 = insertTwo();
Integer[] array = new Integer[2];
array[0] = processDefinition.getId();
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
index c8d56597ee..d1ffc65f57 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
@@ -1 +1 @@
-/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
public enum CommandType {
/**
* roll view log request
*/
ROLL_VIEW_LOG_REQUEST,
/**
* roll view log response
*/
ROLL_VIEW_LOG_RESPONSE,
/**
* view whole log request
*/
VIEW_WHOLE_LOG_REQUEST,
/**
* view whole log response
*/
VIEW_WHOLE_LOG_RESPONSE,
/**
* get log bytes request
*/
GET_LOG_BYTES_REQUEST,
/**
* get log bytes response
*/
GET_LOG_BYTES_RESPONSE,
WORKER_REQUEST,
MASTER_RESPONSE,
/**
* execute task request
*/
TASK_EXECUTE_REQUEST,
/**
* execute task ack
*/
TASK_EXECUTE_ACK,
/**
* execute task response
*/
TASK_EXECUTE_RESPONSE,
/**
* kill task
*/
TASK_KILL_REQUEST,
/**
* kill task response
*/
TASK_KILL_RESPONSE,
/**
* ping
*/
PING,
/**
* pong
*/
PONG;
}
\ No newline at end of file
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
public enum CommandType {
/**
* remove task log request,
*/
REMOVE_TAK_LOG_REQUEST,
/**
* remove task log response
*/
REMOVE_TAK_LOG_RESPONSE,
/**
* roll view log request
*/
ROLL_VIEW_LOG_REQUEST,
/**
* roll view log response
*/
ROLL_VIEW_LOG_RESPONSE,
/**
* view whole log request
*/
VIEW_WHOLE_LOG_REQUEST,
/**
* view whole log response
*/
VIEW_WHOLE_LOG_RESPONSE,
/**
* get log bytes request
*/
GET_LOG_BYTES_REQUEST,
/**
* get log bytes response
*/
GET_LOG_BYTES_RESPONSE,
WORKER_REQUEST,
MASTER_RESPONSE,
/**
* execute task request
*/
TASK_EXECUTE_REQUEST,
/**
* execute task ack
*/
TASK_EXECUTE_ACK,
/**
* execute task response
*/
TASK_EXECUTE_RESPONSE,
/**
* kill task
*/
TASK_KILL_REQUEST,
/**
* kill task response
*/
TASK_KILL_RESPONSE,
/**
* ping
*/
PING,
/**
* pong
*/
PONG;
}
\ No newline at end of file
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogRequestCommand.java
new file mode 100644
index 0000000000..4cf66265cf
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogRequestCommand.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command.log;
+
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+
+import java.io.Serializable;
+
+/**
+ * remove task log request command
+ */
+public class RemoveTaskLogRequestCommand implements Serializable {
+
+ /**
+ * log path
+ */
+ private String path;
+
+ public RemoveTaskLogRequestCommand() {
+ }
+
+ public RemoveTaskLogRequestCommand(String path) {
+ this.path = path;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public void setPath(String path) {
+ this.path = path;
+ }
+
+ /**
+ * package request command
+ *
+ * @return command
+ */
+ public Command convert2Command(){
+ Command command = new Command();
+ command.setType(CommandType.REMOVE_TAK_LOG_REQUEST);
+ byte[] body = FastJsonSerializer.serialize(this);
+ command.setBody(body);
+ return command;
+ }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogResponseCommand.java
new file mode 100644
index 0000000000..a72f84ab41
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogResponseCommand.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command.log;
+
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+
+import java.io.Serializable;
+
+/**
+ * remove task log request command
+ */
+public class RemoveTaskLogResponseCommand implements Serializable {
+
+ /**
+ * log path
+ */
+ private Boolean status;
+
+ public RemoveTaskLogResponseCommand() {
+ }
+
+ public RemoveTaskLogResponseCommand(Boolean status) {
+ this.status = status;
+ }
+
+ public Boolean getStatus() {
+ return status;
+ }
+
+ public void setStatus(Boolean status) {
+ this.status = status;
+ }
+
+ /**
+ * package request command
+ *
+ * @return command
+ */
+ public Command convert2Command(long opaque){
+ Command command = new Command(opaque);
+ command.setType(CommandType.REMOVE_TAK_LOG_RESPONSE);
+ byte[] body = FastJsonSerializer.serialize(this);
+ command.setBody(body);
+ return command;
+ }
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
index 44ec68f89f..a73e47397b 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
@@ -60,14 +60,14 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
*/
final CommandType commandType = command.getType();
switch (commandType){
- case GET_LOG_BYTES_REQUEST:
- GetLogBytesRequestCommand getLogRequest = FastJsonSerializer.deserialize(
- command.getBody(), GetLogBytesRequestCommand.class);
- byte[] bytes = getFileContentBytes(getLogRequest.getPath());
- GetLogBytesResponseCommand getLogResponse = new GetLogBytesResponseCommand(bytes);
- channel.writeAndFlush(getLogResponse.convert2Command(command.getOpaque()));
- break;
- case VIEW_WHOLE_LOG_REQUEST:
+ case GET_LOG_BYTES_REQUEST:
+ GetLogBytesRequestCommand getLogRequest = FastJsonSerializer.deserialize(
+ command.getBody(), GetLogBytesRequestCommand.class);
+ byte[] bytes = getFileContentBytes(getLogRequest.getPath());
+ GetLogBytesResponseCommand getLogResponse = new GetLogBytesResponseCommand(bytes);
+ channel.writeAndFlush(getLogResponse.convert2Command(command.getOpaque()));
+ break;
+ case VIEW_WHOLE_LOG_REQUEST:
ViewLogRequestCommand viewLogRequest = FastJsonSerializer.deserialize(
command.getBody(), ViewLogRequestCommand.class);
String msg = readWholeFileContent(viewLogRequest.getPath());
@@ -86,6 +86,25 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
RollViewLogResponseCommand rollViewLogRequestResponse = new RollViewLogResponseCommand(builder.toString());
channel.writeAndFlush(rollViewLogRequestResponse.convert2Command(command.getOpaque()));
break;
+ case REMOVE_TAK_LOG_REQUEST:
+ RemoveTaskLogRequestCommand removeTaskLogRequest = FastJsonSerializer.deserialize(
+ command.getBody(), RemoveTaskLogRequestCommand.class);
+
+ String taskLogPath = removeTaskLogRequest.getPath();
+
+ File taskLogFile = new File(taskLogPath);
+ Boolean status = true;
+ try {
+ if (taskLogFile.exists()){
+ taskLogFile.delete();
+ }
+ }catch (Exception e){
+ status = false;
+ }
+
+ RemoveTaskLogResponseCommand removeTaskLogResponse = new RemoveTaskLogResponseCommand(status);
+ channel.writeAndFlush(removeTaskLogResponse.convert2Command(command.getOpaque()));
+ break;
default:
throw new IllegalArgumentException("unknown commandType");
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerServer.java
index 3520fb09ec..f1999e641c 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerServer.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerServer.java
@@ -55,6 +55,7 @@ public class LoggerServer {
this.server.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, requestProcessor, requestProcessor.getExecutor());
this.server.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, requestProcessor, requestProcessor.getExecutor());
this.server.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, requestProcessor, requestProcessor.getExecutor());
+ this.server.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, requestProcessor, requestProcessor.getExecutor());
}
/**
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
index e0110adfcd..462e30ca19 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
@@ -496,6 +496,7 @@ public class MasterExecThread implements Runnable {
}
String processWorkerGroup = processInstance.getWorkerGroup();
+ processWorkerGroup = StringUtils.isBlank(processWorkerGroup) ? DEFAULT_WORKER_GROUP : processWorkerGroup;
String taskWorkerGroup = StringUtils.isBlank(taskNode.getWorkerGroup()) ? processWorkerGroup : taskNode.getWorkerGroup();
if (!processWorkerGroup.equals(DEFAULT_WORKER_GROUP) && taskWorkerGroup.equals(DEFAULT_WORKER_GROUP)) {
taskInstance.setWorkerGroup(processWorkerGroup);
diff --git a/dolphinscheduler-server/src/main/resources/config/install_config.conf b/dolphinscheduler-server/src/main/resources/config/install_config.conf
index fab6b32d51..f6b09b9cb8 100644
--- a/dolphinscheduler-server/src/main/resources/config/install_config.conf
+++ b/dolphinscheduler-server/src/main/resources/config/install_config.conf
@@ -118,7 +118,7 @@ apiServerPort="12345"
# install hosts
# Note: install the scheduled hostname list. If it is pseudo-distributed, just write a pseudo-distributed hostname
-ips="ark0,ark1,ark2,ark3,ark4"
+ips="ds1,ds2,ds3,ds4,ds5"
# ssh port, default 22
# Note: if ssh port is not default, modify here
@@ -126,19 +126,19 @@ sshPort="22"
# run master machine
# Note: list of hosts hostname for deploying master
-masters="ark0,ark1"
+masters="ds1,ds2"
# run worker machine
-# note: list of machine hostnames for deploying workers
-workers="ark2,ark3,ark4"
+# note: need to write the worker group name of each worker, the default value is "default"
+workersGroup=(["ds1"]="default" ["ds2"]="default" ["ds3"]="default" ["ds4"]="default" ["ds5"]="default")
# run alert machine
# note: list of machine hostnames for deploying alert server
-alertServer="ark3"
+alertServer="ds3"
# run api machine
# note: list of machine hostnames for deploying api server
-apiServers="ark1"
+apiServers="ds1"
# whether to start monitoring self-starting scripts
monitorServerState="false"
diff --git a/dolphinscheduler-server/src/main/resources/worker.properties b/dolphinscheduler-server/src/main/resources/worker.properties
index d078f26ca6..36bc132743 100644
--- a/dolphinscheduler-server/src/main/resources/worker.properties
+++ b/dolphinscheduler-server/src/main/resources/worker.properties
@@ -21,7 +21,7 @@
# worker heartbeat interval
#worker.heartbeat.interval=10
-# submit the number of tasks at a time
+# submit the number of tasks at a time TODO
#worker.fetch.task.num = 3
# only less than cpu avg load, worker server can work. default value : the number of cpu cores * 2
@@ -34,4 +34,4 @@
#worker.listen.port: 1234
# default worker group
-#worker.group=default
\ No newline at end of file
+worker.group=default
\ No newline at end of file
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
index 8e63c89405..4567d8051d 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
@@ -144,4 +144,33 @@ public class LogClientService {
}
return result;
}
+
+
+ /**
+ * remove task log
+ * @param host host
+ * @param port port
+ * @param path path
+ * @return remove task status
+ */
+ public Boolean removeTaskLog(String host, int port, String path) {
+ logger.info("log path {}", path);
+ RemoveTaskLogRequestCommand request = new RemoveTaskLogRequestCommand(path);
+ Boolean result = false;
+ final Host address = new Host(host, port);
+ try {
+ Command command = request.convert2Command();
+ Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT);
+ if(response != null){
+ RemoveTaskLogResponseCommand taskLogResponse = FastJsonSerializer.deserialize(
+ response.getBody(), RemoveTaskLogResponseCommand.class);
+ return taskLogResponse.getStatus();
+ }
+ } catch (Exception e) {
+ logger.error("remove task log error", e);
+ } finally {
+ this.client.closeChannel(address);
+ }
+ return result;
+ }
}
\ No newline at end of file
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index e0ebaf47df..f0ae76ea4d 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -29,6 +29,8 @@ import org.apache.dolphinscheduler.common.task.subprocess.SubProcessParameters;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.mapper.*;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
import org.quartz.CronExpression;
import org.slf4j.Logger;
@@ -37,6 +39,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
+import java.io.File;
import java.util.*;
import java.util.stream.Collectors;
@@ -238,7 +241,7 @@ public class ProcessService {
* @param defineId
* @return
*/
- public List getTaskNodeListByDefinitionId(Integer defineId){
+ public List getTaskNodeListByDefinitionId(Integer defineId){
ProcessDefinition processDefinition = processDefineMapper.selectById(defineId);
if (processDefinition == null) {
logger.info("process define not exists");
@@ -293,15 +296,44 @@ public class ProcessService {
List subProcessIdList = processInstanceMapMapper.querySubIdListByParentId(processInstanceId);
- for(Integer subId : subProcessIdList ){
+ for(Integer subId : subProcessIdList){
deleteAllSubWorkProcessByParentId(subId);
deleteWorkProcessMapByParentId(subId);
+ removeTaskLogFile(subId);
deleteWorkProcessInstanceById(subId);
}
return 1;
}
+ /**
+ * remove task log file
+ * @param processInstanceId processInstanceId
+ */
+ public void removeTaskLogFile(Integer processInstanceId){
+
+ LogClientService logClient = new LogClientService();
+
+ List taskInstanceList = findValidTaskListByProcessId(processInstanceId);
+
+ if (CollectionUtils.isEmpty(taskInstanceList)){
+ return;
+ }
+
+ for (TaskInstance taskInstance : taskInstanceList){
+ String taskLogPath = taskInstance.getLogPath();
+ if (StringUtils.isEmpty(taskInstance.getHost())){
+ continue;
+ }
+ String ip = Host.of(taskInstance.getHost()).getIp();
+ int port = Constants.RPC_PORT;
+
+ // remove task log from loggerserver
+ logClient.removeTaskLog(ip,port,taskLogPath);
+ }
+ }
+
+
/**
* calculate sub process number in the process define.
* @param processDefinitionId processDefinitionId
@@ -926,6 +958,7 @@ public class ProcessService {
command.setCommandParam(processMapStr);
command.setCommandType(commandType);
command.setProcessInstancePriority(parentProcessInstance.getProcessInstancePriority());
+ command.setWorkerGroup(parentProcessInstance.getWorkerGroup());
createCommand(command);
logger.info("sub process command created: {} ", command.toString());
}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java
index c89b7affb8..6ac847b8db 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java
@@ -32,6 +32,7 @@ import org.quartz.JobExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
+import org.springframework.util.StringUtils;
import java.util.Date;
@@ -98,7 +99,8 @@ public class ProcessScheduleJob implements Job {
command.setScheduleTime(scheduledFireTime);
command.setStartTime(fireTime);
command.setWarningGroupId(schedule.getWarningGroupId());
- command.setWorkerGroup(schedule.getWorkerGroup());
+ String workerGroup = StringUtils.isEmpty(schedule.getWorkerGroup()) ? Constants.DEFAULT_WORKER_GROUP : schedule.getWorkerGroup();
+ command.setWorkerGroup(workerGroup);
command.setWarningType(schedule.getWarningType());
command.setProcessInstancePriority(schedule.getProcessInstancePriority());
diff --git a/script/scp-hosts.sh b/script/scp-hosts.sh
index f4949f311a..adee7d76c7 100644
--- a/script/scp-hosts.sh
+++ b/script/scp-hosts.sh
@@ -20,6 +20,12 @@ workDir=`dirname $0`
workDir=`cd ${workDir};pwd`
source $workDir/../conf/config/install_config.conf
+txt=""
+if [[ "$OSTYPE" == "darwin"* ]]; then
+ # Mac OSX
+ txt="''"
+fi
+
hostsArr=(${ips//,/ })
for host in ${hostsArr[@]}
do
@@ -33,6 +39,11 @@ do
for dsDir in bin conf lib script sql ui install.sh
do
+ # if worker in workersGroup
+ if [[ "${map[${host}]}" ]] && [[ "${dsDir}" -eq "conf" ]]; then
+ sed -i ${txt} "s#worker.group.*#worker.group=${map[${host}]}#g" $workDir/../conf/worker.properties
+ fi
+
echo "start to scp $dsDir to $host/$installPath"
scp -P $sshPort -r $workDir/../$dsDir $host:$installPath
done
diff --git a/script/start-all.sh b/script/start-all.sh
index 11e4572059..5579a7d87e 100644
--- a/script/start-all.sh
+++ b/script/start-all.sh
@@ -28,8 +28,7 @@ do
done
-workersHost=(${workers//,/ })
-for worker in ${workersHost[@]}
+for worker in ${!workersGroup[*]}
do
echo "$worker worker server is starting"
diff --git a/script/stop-all.sh b/script/stop-all.sh
index f761579cc8..e4ccf75a29 100644
--- a/script/stop-all.sh
+++ b/script/stop-all.sh
@@ -29,8 +29,7 @@ do
done
-workersHost=(${workers//,/ })
-for worker in ${workersHost[@]}
+for worker in ${!workersGroup[*]}
do
echo "$worker worker server is stopping"
ssh -p $sshPort $worker "cd $installPath/; sh bin/dolphinscheduler-daemon.sh stop worker-server;"
diff --git a/sql/upgrade/1.3.0_schema/mysql/dolphinscheduler_dml.sql b/sql/upgrade/1.3.0_schema/mysql/dolphinscheduler_dml.sql
index 7fb3bc2fd5..67b005fb71 100644
--- a/sql/upgrade/1.3.0_schema/mysql/dolphinscheduler_dml.sql
+++ b/sql/upgrade/1.3.0_schema/mysql/dolphinscheduler_dml.sql
@@ -17,4 +17,5 @@
SET sql_mode=(SELECT REPLACE(@@sql_mode,'ONLY_FULL_GROUP_BY',''));
SET FOREIGN_KEY_CHECKS=0;
UPDATE t_ds_resources SET pid=-1,is_directory=false WHERE pid IS NULL;
-UPDATE t_ds_resources SET full_name = concat('/',alias) WHERE pid=-1 and full_name IS NULL;
\ No newline at end of file
+UPDATE t_ds_resources SET full_name = concat('/',alias) WHERE pid=-1 and full_name IS NULL;
+UPDATE QRTZ_JOB_DETAILS SET JOB_CLASS_NAME='org.apache.dolphinscheduler.service.quartz.ProcessScheduleJob' WHERE JOB_CLASS_NAME='org.apache.dolphinscheduler.server.quartz.ProcessScheduleJob';
diff --git a/sql/upgrade/1.3.0_schema/postgresql/dolphinscheduler_ddl.sql b/sql/upgrade/1.3.0_schema/postgresql/dolphinscheduler_ddl.sql
index 61cbfed4fe..16a49a897e 100644
--- a/sql/upgrade/1.3.0_schema/postgresql/dolphinscheduler_ddl.sql
+++ b/sql/upgrade/1.3.0_schema/postgresql/dolphinscheduler_ddl.sql
@@ -368,9 +368,9 @@ DROP FUNCTION dc_dolphin_T_t_ds_error_command_D_worker_group_id();
delimiter d//
CREATE OR REPLACE FUNCTION uc_dolphin_T_t_ds_process_definition_A_process_definition_unique() RETURNS void AS $$
BEGIN
- IF NOT EXISTS (SELECT 1 FROM information_schema.STATISTICS
- WHERE TABLE_NAME='t_ds_process_definition'
- AND INDEX_NAME ='process_definition_unique')
+ IF NOT EXISTS (SELECT 1 FROM pg_stat_all_indexes
+ WHERE relname='t_ds_process_definition'
+ AND indexrelname ='process_definition_unique')
THEN
ALTER TABLE t_ds_process_definition ADD CONSTRAINT process_definition_unique UNIQUE (name,project_id);
END IF;
diff --git a/sql/upgrade/1.3.0_schema/postgresql/dolphinscheduler_dml.sql b/sql/upgrade/1.3.0_schema/postgresql/dolphinscheduler_dml.sql
index f892fa8c91..abf85cdf7c 100644
--- a/sql/upgrade/1.3.0_schema/postgresql/dolphinscheduler_dml.sql
+++ b/sql/upgrade/1.3.0_schema/postgresql/dolphinscheduler_dml.sql
@@ -15,4 +15,5 @@
* limitations under the License.
*/
UPDATE t_ds_resources SET pid=-1,is_directory=false WHERE pid IS NULL;
-UPDATE t_ds_resources SET full_name = concat('/',alias) WHERE pid=-1 and full_name IS NULL;
\ No newline at end of file
+UPDATE t_ds_resources SET full_name = concat('/',alias) WHERE pid=-1 and full_name IS NULL;
+UPDATE QRTZ_JOB_DETAILS SET JOB_CLASS_NAME='org.apache.dolphinscheduler.service.quartz.ProcessScheduleJob' WHERE JOB_CLASS_NAME='org.apache.dolphinscheduler.server.quartz.ProcessScheduleJob';
\ No newline at end of file