From f874e8dcce759456e1576be2d1616c11c09f39ae Mon Sep 17 00:00:00 2001 From: xsbai Date: Fri, 19 Jun 2020 16:43:56 +0800 Subject: [PATCH 01/10] fix issue #3018 [BUG]i18n incomplete --- .../js/conf/home/pages/security/pages/users/_source/list.vue | 2 +- dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js | 1 + dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js | 1 + 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/users/_source/list.vue b/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/users/_source/list.vue index cb4a78f0b4..78681075ca 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/users/_source/list.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/users/_source/list.vue @@ -26,7 +26,7 @@ {{$t('User Name')}} - 用户类型 + {{$t('User Type')}} {{$t('Tenant')}} diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js index c8d82d4246..cec9859bb9 100755 --- a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js +++ b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js @@ -181,6 +181,7 @@ export default { Tenant: 'Tenant', Email: 'Email', Phone: 'Phone', + 'User Type':'User Type', 'Please enter phone number': 'Please enter phone number', 'Please enter main class': 'Please enter main class', 'Please enter email': 'Please enter email', diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js index e65449092e..540452efe4 100755 --- a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js +++ b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js @@ -184,6 +184,7 @@ export default { Tenant: '租户', Email: '邮件', Phone: '手机', + 'User Type':'用户类型', 'Please enter phone number': '请输入手机', 'Please enter email': '请输入邮箱', 'Please enter the correct email format': '请输入正确的邮箱格式', From 0654757f15ec40c5c44baa44304c8d48750a582a Mon Sep 17 00:00:00 2001 From: xsbai Date: Mon, 22 Jun 2020 09:10:24 +0800 Subject: [PATCH 02/10] modifiy the error comment --- .../dolphinscheduler/api/controller/ResourcesController.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java index cc09b2d650..0e63189e74 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java @@ -63,7 +63,7 @@ public class ResourcesController extends BaseController { private UdfFuncService udfFuncService; /** - * create resource + * create dirctory * * @param loginUser login user * @param alias alias From f7533e0bd0f78346a4c1f7687a03eed18ef6ef24 Mon Sep 17 00:00:00 2001 From: xsbai Date: Mon, 22 Jun 2020 09:15:13 +0800 Subject: [PATCH 03/10] modifiy the error comment --- .../dolphinscheduler/api/controller/ResourcesController.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java index 0e63189e74..a0ec666ed7 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java @@ -63,7 +63,7 @@ public class ResourcesController extends BaseController { private UdfFuncService udfFuncService; /** - * create dirctory + * create directory * * @param loginUser login user * @param alias alias From 40bbef5e3dd4e9f5eae3a9a11e03234d4fd87a1a Mon Sep 17 00:00:00 2001 From: muzhongjiang Date: Tue, 23 Jun 2020 18:47:04 +0800 Subject: [PATCH 04/10] add state (#3040) Co-authored-by: mzjnumber1@163.com --- sql/dolphinscheduler_mysql.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/dolphinscheduler_mysql.sql b/sql/dolphinscheduler_mysql.sql index a8fcc51564..7fe1511380 100644 --- a/sql/dolphinscheduler_mysql.sql +++ b/sql/dolphinscheduler_mysql.sql @@ -815,4 +815,4 @@ INSERT INTO `t_ds_relation_user_alertgroup` VALUES ('1', '1', '1', '2018-11-29 1 -- ---------------------------- -- Records of t_ds_user -- ---------------------------- -INSERT INTO `t_ds_user` VALUES ('1', 'admin', '7ad2410b2f4c074479a8937a28a22b8f', '0', 'xxx@qq.com', 'xx', '0', '2018-03-27 15:48:50', '2018-10-24 17:40:22', null); +INSERT INTO `t_ds_user` VALUES ('1', 'admin', '7ad2410b2f4c074479a8937a28a22b8f', '0', 'xxx@qq.com', 'xx', '0', '2018-03-27 15:48:50', '2018-10-24 17:40:22', null,1); From ad28c65160c63ea82768d72780c8e9bba495e218 Mon Sep 17 00:00:00 2001 From: CalvinKirs Date: Tue, 23 Jun 2020 22:35:12 +0800 Subject: [PATCH 05/10] [BUG_FIX]fix multi-threaded tests error. (#3044) * fix test mock error * fix test mock error --- .../TaskPriorityQueueConsumerTest.java | 24 +++++++------------ 1 file changed, 9 insertions(+), 15 deletions(-) diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java index 180688edd9..b2a48d4edd 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java @@ -75,9 +75,9 @@ public class TaskPriorityQueueConsumerTest { tenant.setCreateTime(new Date()); tenant.setUpdateTime(new Date()); - Mockito.when(processService.getTenantForProcess(1,2)).thenReturn(tenant); + Mockito.doReturn(tenant).when(processService).getTenantForProcess(1,2); - Mockito.when(processService.queryUserQueueByProcessInstanceId(1)).thenReturn("default"); + Mockito.doReturn("default").when(processService).queryUserQueueByProcessInstanceId(1); } @@ -104,7 +104,7 @@ public class TaskPriorityQueueConsumerTest { processDefinition.setProjectId(1); taskInstance.setProcessDefine(processDefinition); - Mockito.when(processService.getTaskInstanceDetailByTaskId(1)).thenReturn(taskInstance); + Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1); taskPriorityQueue.put("2_1_2_1_default"); Thread.sleep(10000); @@ -133,8 +133,7 @@ public class TaskPriorityQueueConsumerTest { processDefinition.setUserId(2); processDefinition.setProjectId(1); taskInstance.setProcessDefine(processDefinition); - - Mockito.when(processService.getTaskInstanceDetailByTaskId(1)).thenReturn(taskInstance); + Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1); taskPriorityQueue.put("2_1_2_1_default"); DataSource dataSource = new DataSource(); @@ -146,7 +145,7 @@ public class TaskPriorityQueueConsumerTest { dataSource.setCreateTime(new Date()); dataSource.setUpdateTime(new Date()); - Mockito.when(processService.findDataSourceById(1)).thenReturn(dataSource); + Mockito.doReturn(dataSource).when(processService).findDataSourceById(1); Thread.sleep(10000); } @@ -174,8 +173,7 @@ public class TaskPriorityQueueConsumerTest { processDefinition.setUserId(2); processDefinition.setProjectId(1); taskInstance.setProcessDefine(processDefinition); - - Mockito.when(processService.getTaskInstanceDetailByTaskId(1)).thenReturn(taskInstance); + Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1); taskPriorityQueue.put("2_1_2_1_default"); @@ -188,9 +186,7 @@ public class TaskPriorityQueueConsumerTest { dataSource.setConnectionParams("{\"address\":\"jdbc:mysql://192.168.221.185:3306\",\"database\":\"dolphinscheduler_qiaozhanwei\",\"jdbcUrl\":\"jdbc:mysql://192.168.221.185:3306/dolphinscheduler_qiaozhanwei\",\"user\":\"root\",\"password\":\"root@123\"}"); dataSource.setCreateTime(new Date()); dataSource.setUpdateTime(new Date()); - - Mockito.when(processService.findDataSourceById(80)).thenReturn(dataSource); - + Mockito.doReturn(dataSource).when(processService).findDataSourceById(80); Thread.sleep(10000); } @@ -217,12 +213,10 @@ public class TaskPriorityQueueConsumerTest { processDefinition.setUserId(2); processDefinition.setProjectId(1); taskInstance.setProcessDefine(processDefinition); - - Mockito.when(processService.getTaskInstanceDetailByTaskId(1)).thenReturn(taskInstance); + Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1); taskPriorityQueue.put("2_1_2_1_default"); - DataSource dataSource = new DataSource(); dataSource.setId(1); dataSource.setName("datax"); @@ -250,7 +244,7 @@ public class TaskPriorityQueueConsumerTest { taskInstance.setExecutorId(2); - Mockito.when( processService.findTaskInstanceById(1)).thenReturn(taskInstance); + Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1); taskPriorityQueueConsumer.taskInstanceIsFinalState(1); } From 6dffa97fdd7007fc19a90617679d9511ce83461b Mon Sep 17 00:00:00 2001 From: liuhuijuan <67306399+liuhuijuan-bx@users.noreply.github.com> Date: Thu, 25 Jun 2020 14:50:31 +0800 Subject: [PATCH 06/10] fix release for nginx not package config files, eg: zookeeper.properties (#3041) * fix release for nginx bug * add state (#3040) Co-authored-by: mzjnumber1@163.com * [BUG_FIX]fix multi-threaded tests error. (#3044) * fix test mock error * fix test mock error Co-authored-by: muzhongjiang Co-authored-by: mzjnumber1@163.com Co-authored-by: CalvinKirs --- .../main/assembly/dolphinscheduler-nginx.xml | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/dolphinscheduler-dist/src/main/assembly/dolphinscheduler-nginx.xml b/dolphinscheduler-dist/src/main/assembly/dolphinscheduler-nginx.xml index f4e403e4b4..f41151cd09 100644 --- a/dolphinscheduler-dist/src/main/assembly/dolphinscheduler-nginx.xml +++ b/dolphinscheduler-dist/src/main/assembly/dolphinscheduler-nginx.xml @@ -98,25 +98,24 @@ conf + + - ${basedir}/../dolphinscheduler-common/src/main/resources + ${basedir}/../dolphinscheduler-dao/src/main/resources **/*.properties **/*.xml **/*.json + **/*.yml conf + + + + - ${basedir}/../dolphinscheduler-common/src/main/resources/bin - - *.* - - 755 - bin - - - ${basedir}/../dolphinscheduler-dao/src/main/resources + ${basedir}/../dolphinscheduler-service/src/main/resources **/*.properties **/*.xml @@ -125,7 +124,8 @@ conf - + + ${basedir}/../dolphinscheduler-server/target/dolphinscheduler-server-${project.version} From f1632e22c5d735dd5419dc2bb14b51bb80ac15e1 Mon Sep 17 00:00:00 2001 From: Jave-Chen Date: Thu, 25 Jun 2020 23:41:43 +0800 Subject: [PATCH 07/10] Set up JDK 11 for SonarCloud in github action. (#3052) * Set up JDK 11 for SonarCloud in github action. * Fix javadoc error with JDK 11. * Prevent Javadoc from stopping if it finds any html errors. --- .github/workflows/ci_ut.yml | 8 ++++++-- pom.xml | 4 ++++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci_ut.yml b/.github/workflows/ci_ut.yml index b301cf77bd..230e85d1b1 100644 --- a/.github/workflows/ci_ut.yml +++ b/.github/workflows/ci_ut.yml @@ -67,10 +67,14 @@ jobs: - name: Upload coverage report to codecov run: | CODECOV_TOKEN="09c2663f-b091-4258-8a47-c981827eb29a" bash <(curl -s https://codecov.io/bash) + # Set up JDK 11 for SonarCloud. + - name: Set up JDK 1.11 + uses: actions/setup-java@v1 + with: + java-version: 1.11 - name: Run SonarCloud Analysis run: > - mvn verify --batch-mode - org.sonarsource.scanner.maven:sonar-maven-plugin:3.6.1.1688:sonar + mvn --batch-mode verify sonar:sonar -Dsonar.coverage.jacoco.xmlReportPaths=target/site/jacoco/jacoco.xml -Dmaven.test.skip=true -Dsonar.host.url=https://sonarcloud.io diff --git a/pom.xml b/pom.xml index 57269748cf..a0c719453c 100644 --- a/pom.xml +++ b/pom.xml @@ -599,6 +599,10 @@ org.apache.maven.plugins maven-javadoc-plugin ${maven-javadoc-plugin.version} + + 8 + false + From 8213da50d39ebea00d3d4dfdbcbca51107103157 Mon Sep 17 00:00:00 2001 From: CalvinKirs Date: Sun, 28 Jun 2020 10:20:59 +0800 Subject: [PATCH 08/10] [WIP] load balance #3054 (#3057) * Load balancing abstract * code smell * lower weight select --- .../host/assign/AbstractSelector.java | 45 +++++++++++++++++++ .../host/assign/LowerWeightRoundRobin.java | 4 +- .../dispatch/host/assign/RandomSelector.java | 15 +------ .../host/assign/RoundRobinSelector.java | 14 +----- 4 files changed, 51 insertions(+), 27 deletions(-) create mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/AbstractSelector.java diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/AbstractSelector.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/AbstractSelector.java new file mode 100644 index 0000000000..8560da957e --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/AbstractSelector.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.master.dispatch.host.assign; + +import org.apache.dolphinscheduler.common.utils.CollectionUtils; + +import java.util.Collection; + +/** + * AbstractSelector + */ +public abstract class AbstractSelector implements Selector{ + @Override + public T select(Collection source) { + + if (CollectionUtils.isEmpty(source)) { + throw new IllegalArgumentException("Empty source."); + } + + /** + * if only one , return directly + */ + if (source.size() == 1) { + return (T)source.toArray()[0]; + } + return doSelect(source); + } + + protected abstract T doSelect(Collection source); + +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java index bdf0f412f4..843397e20c 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java @@ -22,7 +22,7 @@ import java.util.Collection; /** * lower weight round robin */ -public class LowerWeightRoundRobin implements Selector{ +public class LowerWeightRoundRobin extends AbstractSelector{ /** * select @@ -30,7 +30,7 @@ public class LowerWeightRoundRobin implements Selector{ * @return HostWeight */ @Override - public HostWeight select(Collection sources){ + public HostWeight doSelect(Collection sources){ int totalWeight = 0; int lowWeight = 0; HostWeight lowerNode = null; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelector.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelector.java index be52fcb1cf..e00d6f7a65 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelector.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelector.java @@ -24,23 +24,12 @@ import java.util.Random; * random selector * @param T */ -public class RandomSelector implements Selector { +public class RandomSelector extends AbstractSelector { private final Random random = new Random(); @Override - public T select(final Collection source) { - - if (source == null || source.size() == 0) { - throw new IllegalArgumentException("Empty source."); - } - - /** - * if only one , return directly - */ - if (source.size() == 1) { - return (T) source.toArray()[0]; - } + public T doSelect(final Collection source) { int size = source.size(); /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelector.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelector.java index 1eb30c8d5a..06e469fe6b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelector.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelector.java @@ -26,22 +26,12 @@ import java.util.concurrent.atomic.AtomicInteger; * @param T */ @Service -public class RoundRobinSelector implements Selector { +public class RoundRobinSelector extends AbstractSelector { private final AtomicInteger index = new AtomicInteger(0); @Override - public T select(Collection source) { - if (source == null || source.size() == 0) { - throw new IllegalArgumentException("Empty source."); - } - - /** - * if only one , return directly - */ - if (source.size() == 1) { - return (T)source.toArray()[0]; - } + public T doSelect(Collection source) { int size = source.size(); /** From 9bf67d80d0db866d2ead7fce1b9d33767ee0f66e Mon Sep 17 00:00:00 2001 From: zixi0825 <649790970@qq.com> Date: Sun, 28 Jun 2020 10:48:14 +0800 Subject: [PATCH 09/10] Revise annotation spelling errors & Enhanced code robustness (#3042) * revise annotation spelling errors & enhanced code robustness * revise annotation spelling errors & enhanced code robustness Co-authored-by: sunchaohe Co-authored-by: dailidong Co-authored-by: qiaozhanwei --- .../api/utils/CheckUtils.java | 2 +- .../remote/command/TaskInfo.java | 2 +- .../server/entity/TaskExecutionContext.java | 2 +- .../worker/processor/NettyRemoteChannel.java | 2 +- .../worker/processor/TaskCallbackService.java | 5 +- .../processor/TaskExecuteProcessor.java | 14 ++++- .../processor/TaskCallbackServiceTest.java | 56 +++++++++++++++++-- 7 files changed, 68 insertions(+), 15 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java index 9c3bbe9cfd..9dee69be60 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java @@ -115,7 +115,7 @@ public class CheckUtils { * * @param parameter parameter * @param taskType task type - * @return true if taks node parameters are valid, otherwise return false + * @return true if task node parameters are valid, otherwise return false */ public static boolean checkTaskNodeParameters(String parameter, String taskType) { AbstractParameters abstractParameters = TaskParametersUtils.getParameters(taskType, parameter); diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskInfo.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskInfo.java index 196d0a73ae..1adf5a80ca 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskInfo.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskInfo.java @@ -34,7 +34,7 @@ public class TaskInfo implements Serializable{ /** - * taks name + * task name */ private String taskName; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java index 7b4c721422..3fc65c1853 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java @@ -39,7 +39,7 @@ public class TaskExecutionContext implements Serializable{ /** - * taks name + * task name */ private String taskName; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/NettyRemoteChannel.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/NettyRemoteChannel.java index cbb8972a33..b1b67affcc 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/NettyRemoteChannel.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/NettyRemoteChannel.java @@ -34,7 +34,7 @@ public class NettyRemoteChannel { private final Channel channel; /** - * equest unique identification + * request unique identification */ private final long opaque; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java index 1e8bf9d0e7..0fe75240c0 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.server.worker.processor; - import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; @@ -33,14 +32,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; - import java.util.Set; import java.util.concurrent.ConcurrentHashMap; - import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS; /** - * taks callback service + * task callback service */ @Service public class TaskCallbackService { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java index 50f8989b0e..0af84b100f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java @@ -51,7 +51,6 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { private final Logger logger = LoggerFactory.getLogger(TaskExecuteProcessor.class); - /** * thread executor service */ @@ -83,9 +82,18 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { logger.info("received command : {}", taskRequestCommand); - String contextJson = taskRequestCommand.getTaskExecutionContext(); + if(taskRequestCommand == null){ + logger.error("task execute request command is null"); + return; + } + String contextJson = taskRequestCommand.getTaskExecutionContext(); TaskExecutionContext taskExecutionContext = JSONUtils.parseObject(contextJson, TaskExecutionContext.class); + if(taskExecutionContext == null){ + logger.error("task execution context is null"); + return; + } + taskExecutionContext.setHost(OSUtils.getHost() + ":" + workerConfig.getListenPort()); // local execute path @@ -102,7 +110,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { // tell master that task is in executing final Command ackCommand = buildAckCommand(taskExecutionContext).convert2Command(); - + try { RetryerUtils.retryCall(() -> { taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(),ackCommand); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java index 78ba3a6b44..e064f4cebe 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java @@ -18,14 +18,17 @@ package org.apache.dolphinscheduler.server.worker.processor; import io.netty.channel.Channel; import org.apache.dolphinscheduler.common.thread.Stopper; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; +import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.config.NettyServerConfig; import org.apache.dolphinscheduler.remote.utils.Host; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor; @@ -52,10 +55,23 @@ import java.util.Date; * test task call back service */ @RunWith(SpringJUnit4ClassRunner.class) -@ContextConfiguration(classes={TaskCallbackServiceTestConfig.class, SpringZKServer.class, SpringApplicationContext.class, MasterRegistry.class, WorkerRegistry.class, - ZookeeperRegistryCenter.class, MasterConfig.class, WorkerConfig.class, - ZookeeperCachedOperator.class, ZookeeperConfig.class, ZookeeperNodeManager.class, TaskCallbackService.class, - TaskResponseService.class, TaskAckProcessor.class,TaskResponseProcessor.class}) +@ContextConfiguration(classes={ + TaskCallbackServiceTestConfig.class, + SpringZKServer.class, + SpringApplicationContext.class, + MasterRegistry.class, + WorkerRegistry.class, + ZookeeperRegistryCenter.class, + MasterConfig.class, + WorkerConfig.class, + ZookeeperCachedOperator.class, + ZookeeperConfig.class, + ZookeeperNodeManager.class, + TaskCallbackService.class, + TaskResponseService.class, + TaskAckProcessor.class, + TaskResponseProcessor.class, + TaskExecuteProcessor.class}) public class TaskCallbackServiceTest { @Autowired @@ -70,6 +86,9 @@ public class TaskCallbackServiceTest { @Autowired private TaskResponseProcessor taskResponseProcessor; + @Autowired + private TaskExecuteProcessor taskExecuteProcessor; + /** * send ack test * @throws Exception @@ -176,6 +195,35 @@ public class TaskCallbackServiceTest { } } + @Test + public void testTaskExecuteProcessor() throws Exception{ + final NettyServerConfig serverConfig = new NettyServerConfig(); + serverConfig.setListenPort(30000); + NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig); + nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, taskExecuteProcessor); + nettyRemotingServer.start(); + + final NettyClientConfig clientConfig = new NettyClientConfig(); + NettyRemotingClient nettyRemotingClient = new NettyRemotingClient(clientConfig); + + TaskExecuteRequestCommand taskExecuteRequestCommand = new TaskExecuteRequestCommand(); + + nettyRemotingClient.send(new Host("localhost",30000),taskExecuteRequestCommand.convert2Command()); + + taskExecuteRequestCommand.setTaskExecutionContext(JSONUtils.toJsonString(new TaskExecutionContext())); + + nettyRemotingClient.send(new Host("localhost",30000),taskExecuteRequestCommand.convert2Command()); + + Thread.sleep(5000); + + Stopper.stop(); + + Thread.sleep(5000); + + nettyRemotingServer.close(); + nettyRemotingClient.close(); + } + // @Test(expected = IllegalStateException.class) // public void testSendAckWithIllegalStateException2(){ // masterRegistry.registry(); From 38e485373de9a7301a19f57cb53dbf4cb6744696 Mon Sep 17 00:00:00 2001 From: Yichao Yang <1048262223@qq.com> Date: Sun, 28 Jun 2020 17:43:13 +0800 Subject: [PATCH 10/10] [Feature-2925][server] Init TaskLogger in TaskExecuteProcessor (#2925) (#2965) * [Feature-2925][common] Add exitVal judge in OSUtils.exeCmd (#2925) * optimize the logger utils --- .../common/shell/AbstractShell.java | 40 ++++--- .../common/utils/FileUtils.java | 60 +++++++--- .../common/utils/LoggerUtils.java | 27 ++++- .../common/utils/OSUtils.java | 103 +++++++++++------- .../common/utils/OSUtilsTest.java | 6 +- .../server/log/TaskLogFilter.java | 9 +- .../processor/TaskExecuteProcessor.java | 42 +++++-- .../worker/runner/TaskExecuteThread.java | 36 +++--- 8 files changed, 220 insertions(+), 103 deletions(-) diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/shell/AbstractShell.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/shell/AbstractShell.java index f846b19741..aafdb86014 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/shell/AbstractShell.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/shell/AbstractShell.java @@ -16,9 +16,6 @@ */ package org.apache.dolphinscheduler.common.shell; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.BufferedReader; import java.io.File; import java.io.IOException; @@ -30,6 +27,9 @@ import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * A base class for running a Unix command. @@ -128,7 +128,7 @@ public abstract class AbstractShell { /** * Run a command actual work */ - private void runCommand() throws IOException { + private void runCommand() throws IOException { ProcessBuilder builder = new ProcessBuilder(getExecString()); Timer timeOutTimer = null; ShellTimeoutTimerTask timeoutTimerTask = null; @@ -153,11 +153,11 @@ public abstract class AbstractShell { timeOutTimer.schedule(timeoutTimerTask, timeOutInterval); } final BufferedReader errReader = - new BufferedReader(new InputStreamReader(process - .getErrorStream())); - BufferedReader inReader = - new BufferedReader(new InputStreamReader(process - .getInputStream())); + new BufferedReader( + new InputStreamReader(process.getErrorStream())); + BufferedReader inReader = + new BufferedReader( + new InputStreamReader(process.getInputStream())); final StringBuilder errMsg = new StringBuilder(); // read error and input streams as this would free up the buffers @@ -177,23 +177,35 @@ public abstract class AbstractShell { } } }; + Thread inThread = new Thread() { + @Override + public void run() { + try { + parseExecResult(inReader); + } catch (IOException ioe) { + logger.warn("Error reading the in stream", ioe); + } + super.run(); + } + }; try { errThread.start(); + inThread.start(); } catch (IllegalStateException ise) { } try { // parse the output - parseExecResult(inReader); - exitCode = process.waitFor(); + exitCode = process.waitFor(); try { - // make sure that the error thread exits + // make sure that the error and in thread exits errThread.join(); + inThread.join(); } catch (InterruptedException ie) { - logger.warn("Interrupted while reading the error stream", ie); + logger.warn("Interrupted while reading the error and in stream", ie); } completed.set(true); //the timeout thread handling //taken care in finally block - if (exitCode != 0) { + if (exitCode != 0 || errMsg.length() > 0) { throw new ExitCodeException(exitCode, errMsg.toString()); } } catch (InterruptedException ie) { diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java index bae8f7f9bd..de3d42974a 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java @@ -16,18 +16,32 @@ */ package org.apache.dolphinscheduler.common.utils; +import static org.apache.dolphinscheduler.common.Constants.DATA_BASEDIR_PATH; +import static org.apache.dolphinscheduler.common.Constants.RESOURCE_VIEW_SUFFIXS; +import static org.apache.dolphinscheduler.common.Constants.RESOURCE_VIEW_SUFFIXS_DEFAULT_VALUE; +import static org.apache.dolphinscheduler.common.Constants.YYYYMMDDHHMMSS; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.FileWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.StringReader; +import java.io.UnsupportedEncodingException; +import java.nio.charset.Charset; +import java.nio.charset.UnsupportedCharsetException; +import java.util.Optional; + import org.apache.commons.io.Charsets; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.*; -import java.nio.charset.Charset; -import java.nio.charset.UnsupportedCharsetException; - -import static org.apache.dolphinscheduler.common.Constants.*; - /** * file utils */ @@ -36,6 +50,8 @@ public class FileUtils { public static final String DATA_BASEDIR = PropertyUtils.getString(DATA_BASEDIR_PATH,"/tmp/dolphinscheduler"); + public static final ThreadLocal taskLoggerThreadLocal = new ThreadLocal<>(); + /** * get file suffix * @@ -118,7 +134,7 @@ public class FileUtils { String fileName = String.format("%s/exec/process/%s/%s/%s", DATA_BASEDIR, Integer.toString(projectId), Integer.toString(processDefineId), Integer.toString(processInstanceId)); File file = new File(fileName); - if (!file.getParentFile().exists()){ + if (!file.getParentFile().exists()) { file.getParentFile().mkdirs(); } @@ -138,24 +154,40 @@ public class FileUtils { * @param userName user name * @throws IOException errors */ - public static void createWorkDirAndUserIfAbsent(String execLocalPath, String userName) throws IOException{ + public static void createWorkDirAndUserIfAbsent(String execLocalPath, String userName) throws IOException { //if work dir exists, first delete File execLocalPathFile = new File(execLocalPath); - if (execLocalPathFile.exists()){ + if (execLocalPathFile.exists()) { org.apache.commons.io.FileUtils.forceDelete(execLocalPathFile); } //create work dir org.apache.commons.io.FileUtils.forceMkdir(execLocalPathFile); - logger.info("create dir success {}" , execLocalPath); - + String mkdirLog = "create dir success " + execLocalPath; + LoggerUtils.logInfo(Optional.ofNullable(logger), mkdirLog); + LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), mkdirLog); //if not exists this user,then create - if (!OSUtils.getUserList().contains(userName)){ - OSUtils.createUser(userName); + OSUtils.taskLoggerThreadLocal.set(taskLoggerThreadLocal.get()); + try { + if (!OSUtils.getUserList().contains(userName)) { + boolean isSuccessCreateUser = OSUtils.createUser(userName); + + String infoLog; + if (isSuccessCreateUser) { + infoLog = String.format("create user name success %s", userName); + } else { + infoLog = String.format("create user name fail %s", userName); + } + LoggerUtils.logInfo(Optional.ofNullable(logger), infoLog); + LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), infoLog); + } + } catch (Throwable e) { + LoggerUtils.logError(Optional.ofNullable(logger), e); + LoggerUtils.logError(Optional.ofNullable(taskLoggerThreadLocal.get()), e); } - logger.info("create user name success {}", userName); + OSUtils.taskLoggerThreadLocal.remove(); } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java index 191df335c5..e3cf652efb 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java @@ -16,14 +16,15 @@ */ package org.apache.dolphinscheduler.common.utils; -import org.apache.dolphinscheduler.common.Constants; -import org.slf4j.Logger; - import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.dolphinscheduler.common.Constants; +import org.slf4j.Logger; + /** * logger utils */ @@ -93,4 +94,24 @@ public class LoggerUtils { } return appIds; } + + public static void logError(Optional optionalLogger + , String error) { + optionalLogger.ifPresent((Logger logger) -> logger.error(error)); + } + + public static void logError(Optional optionalLogger + , Throwable e) { + optionalLogger.ifPresent((Logger logger) -> logger.error(e.getMessage(), e)); + } + + public static void logError(Optional optionalLogger + , String error, Throwable e) { + optionalLogger.ifPresent((Logger logger) -> logger.error(error, e)); + } + + public static void logInfo(Optional optionalLogger + , String info) { + optionalLogger.ifPresent((Logger logger) -> logger.info(info)); + } } \ No newline at end of file diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java index e3b2cc2720..171a017b4e 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java @@ -16,16 +16,6 @@ */ package org.apache.dolphinscheduler.common.utils; -import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.shell.ShellExecutor; -import org.apache.commons.configuration.Configuration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import oshi.SystemInfo; -import oshi.hardware.CentralProcessor; -import oshi.hardware.GlobalMemory; -import oshi.hardware.HardwareAbstractionLayer; - import java.io.BufferedReader; import java.io.FileInputStream; import java.io.IOException; @@ -40,8 +30,21 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Optional; +import java.util.StringTokenizer; import java.util.regex.Pattern; +import org.apache.commons.configuration.Configuration; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.shell.ShellExecutor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import oshi.SystemInfo; +import oshi.hardware.CentralProcessor; +import oshi.hardware.GlobalMemory; +import oshi.hardware.HardwareAbstractionLayer; + /** * os utils * @@ -50,6 +53,8 @@ public class OSUtils { private static final Logger logger = LoggerFactory.getLogger(OSUtils.class); + public static final ThreadLocal taskLoggerThreadLocal = new ThreadLocal<>(); + private static final SystemInfo SI = new SystemInfo(); public static final String TWO_DECIMAL = "0.00"; @@ -251,7 +256,9 @@ public class OSUtils { try { String userGroup = OSUtils.getGroup(); if (StringUtils.isEmpty(userGroup)) { - logger.error("{} group does not exist for this operating system.", userGroup); + String errorLog = String.format("%s group does not exist for this operating system.", userGroup); + LoggerUtils.logError(Optional.ofNullable(logger), errorLog); + LoggerUtils.logError(Optional.ofNullable(taskLoggerThreadLocal.get()), errorLog); return false; } if (isMacOS()) { @@ -263,7 +270,8 @@ public class OSUtils { } return true; } catch (Exception e) { - logger.error(e.getMessage(), e); + LoggerUtils.logError(Optional.ofNullable(logger), e); + LoggerUtils.logError(Optional.ofNullable(taskLoggerThreadLocal.get()), e); } return false; @@ -276,10 +284,14 @@ public class OSUtils { * @throws IOException in case of an I/O error */ private static void createLinuxUser(String userName, String userGroup) throws IOException { - logger.info("create linux os user : {}", userName); - String cmd = String.format("sudo useradd -g %s %s", userGroup, userName); + String infoLog1 = String.format("create linux os user : %s", userName); + LoggerUtils.logInfo(Optional.ofNullable(logger), infoLog1); + LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), infoLog1); - logger.info("execute cmd : {}", cmd); + String cmd = String.format("sudo useradd -g %s %s", userGroup, userName); + String infoLog2 = String.format("execute cmd : %s", cmd); + LoggerUtils.logInfo(Optional.ofNullable(logger), infoLog2); + LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), infoLog2); OSUtils.exeCmd(cmd); } @@ -290,13 +302,24 @@ public class OSUtils { * @throws IOException in case of an I/O error */ private static void createMacUser(String userName, String userGroup) throws IOException { - logger.info("create mac os user : {}", userName); - String userCreateCmd = String.format("sudo sysadminctl -addUser %s -password %s", userName, userName); - String appendGroupCmd = String.format("sudo dseditgroup -o edit -a %s -t user %s", userName, userGroup); - logger.info("create user command : {}", userCreateCmd); - OSUtils.exeCmd(userCreateCmd); - logger.info("append user to group : {}", appendGroupCmd); + Optional optionalLogger = Optional.ofNullable(logger); + Optional optionalTaskLogger = Optional.ofNullable(taskLoggerThreadLocal.get()); + + String infoLog1 = String.format("create mac os user : %s", userName); + LoggerUtils.logInfo(optionalLogger, infoLog1); + LoggerUtils.logInfo(optionalTaskLogger, infoLog1); + + String createUserCmd = String.format("sudo sysadminctl -addUser %s -password %s", userName, userName); + String infoLog2 = String.format("create user command : %s", createUserCmd); + LoggerUtils.logInfo(optionalLogger, infoLog2); + LoggerUtils.logInfo(optionalTaskLogger, infoLog2); + OSUtils.exeCmd(createUserCmd); + + String appendGroupCmd = String.format("sudo dseditgroup -o edit -a %s -t user %s", userName, userGroup); + String infoLog3 = String.format("append user to group : %s", appendGroupCmd); + LoggerUtils.logInfo(optionalLogger, infoLog3); + LoggerUtils.logInfo(optionalTaskLogger, infoLog3); OSUtils.exeCmd(appendGroupCmd); } @@ -307,14 +330,20 @@ public class OSUtils { * @throws IOException in case of an I/O error */ private static void createWindowsUser(String userName, String userGroup) throws IOException { - logger.info("create windows os user : {}", userName); - String userCreateCmd = String.format("net user \"%s\" /add", userName); - String appendGroupCmd = String.format("net localgroup \"%s\" \"%s\" /add", userGroup, userName); + String infoLog1 = String.format("create windows os user : %s", userName); + LoggerUtils.logInfo(Optional.ofNullable(logger), infoLog1); + LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), infoLog1); - logger.info("execute create user command : {}", userCreateCmd); + String userCreateCmd = String.format("net user \"%s\" /add", userName); + String infoLog2 = String.format("execute create user command : %s", userCreateCmd); + LoggerUtils.logInfo(Optional.ofNullable(logger), infoLog2); + LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), infoLog2); OSUtils.exeCmd(userCreateCmd); - logger.info("execute append user to group : {}", appendGroupCmd); + String appendGroupCmd = String.format("net localgroup \"%s\" \"%s\" /add", userGroup, userName); + String infoLog3 = String.format("execute append user to group : %s", appendGroupCmd); + LoggerUtils.logInfo(Optional.ofNullable(logger), infoLog3); + LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), infoLog3); OSUtils.exeCmd(appendGroupCmd); } @@ -353,22 +382,12 @@ public class OSUtils { * @throws IOException errors */ public static String exeCmd(String command) throws IOException { - BufferedReader br = null; - - try { - Process p = Runtime.getRuntime().exec(command); - br = new BufferedReader(new InputStreamReader(p.getInputStream())); - String line; - StringBuilder sb = new StringBuilder(); - - while ((line = br.readLine()) != null) { - sb.append(line + "\n"); - } - - return sb.toString(); - } finally { - IOUtils.closeQuietly(br); + StringTokenizer st = new StringTokenizer(command); + String[] cmdArray = new String[st.countTokens()]; + for (int i = 0; st.hasMoreTokens(); i++) { + cmdArray[i] = st.nextToken(); } + return exeShell(cmdArray); } /** @@ -377,7 +396,7 @@ public class OSUtils { * @return result of execute the shell * @throws IOException errors */ - public static String exeShell(String command) throws IOException { + public static String exeShell(String[] command) throws IOException { return ShellExecutor.execCommand(command); } diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/OSUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/OSUtilsTest.java index e1fa0c563b..44c88f8b20 100644 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/OSUtilsTest.java +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/OSUtilsTest.java @@ -68,7 +68,11 @@ public class OSUtilsTest { @Test public void createUser() { boolean result = OSUtils.createUser("test123"); - Assert.assertTrue(result); + if (result) { + Assert.assertTrue("create user test123 success", true); + } else { + Assert.assertTrue("create user test123 fail", true); + } } @Test diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/TaskLogFilter.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/TaskLogFilter.java index 954341659b..9c47fb901f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/TaskLogFilter.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/TaskLogFilter.java @@ -16,11 +16,14 @@ */ package org.apache.dolphinscheduler.server.log; +import static org.apache.dolphinscheduler.common.utils.LoggerUtils.TASK_APPID_LOG_FORMAT; + +import org.apache.dolphinscheduler.common.utils.LoggerUtils; + import ch.qos.logback.classic.Level; import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.filter.Filter; import ch.qos.logback.core.spi.FilterReply; -import org.apache.dolphinscheduler.common.utils.LoggerUtils; /** * task log filter @@ -43,7 +46,9 @@ public class TaskLogFilter extends Filter { */ @Override public FilterReply decide(ILoggingEvent event) { - if (event.getThreadName().startsWith(LoggerUtils.TASK_LOGGER_THREAD_NAME) || event.getLevel().isGreaterOrEqual(level)) { + if (event.getThreadName().startsWith(LoggerUtils.TASK_LOGGER_THREAD_NAME) + || event.getLoggerName().startsWith(" - " + TASK_APPID_LOG_FORMAT) + || event.getLevel().isGreaterOrEqual(level)) { return FilterReply.ACCEPT; } return FilterReply.DENY; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java index 0af84b100f..4a2767f138 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java @@ -17,15 +17,22 @@ package org.apache.dolphinscheduler.server.worker.processor; -import ch.qos.logback.classic.LoggerContext; -import ch.qos.logback.classic.sift.SiftingAppender; -import com.github.rholder.retry.RetryException; -import io.netty.channel.Channel; + +import java.util.Date; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; + import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.thread.ThreadUtils; -import org.apache.dolphinscheduler.common.utils.*; +import org.apache.dolphinscheduler.common.utils.FileUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; +import org.apache.dolphinscheduler.common.utils.OSUtils; +import org.apache.dolphinscheduler.common.utils.Preconditions; +import org.apache.dolphinscheduler.common.utils.RetryerUtils; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; @@ -40,9 +47,11 @@ import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Date; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; +import com.github.rholder.retry.RetryException; + +import ch.qos.logback.classic.LoggerContext; +import ch.qos.logback.classic.sift.SiftingAppender; +import io.netty.channel.Channel; /** * worker request processor @@ -96,15 +105,26 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { taskExecutionContext.setHost(OSUtils.getHost() + ":" + workerConfig.getListenPort()); + // custom logger + Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX, + taskExecutionContext.getProcessDefineId(), + taskExecutionContext.getProcessInstanceId(), + taskExecutionContext.getTaskInstanceId())); + // local execute path String execLocalPath = getExecLocalPath(taskExecutionContext); logger.info("task instance local execute path : {} ", execLocalPath); + FileUtils.taskLoggerThreadLocal.set(taskLogger); try { FileUtils.createWorkDirAndUserIfAbsent(execLocalPath, taskExecutionContext.getTenantCode()); - } catch (Exception ex){ - logger.error(String.format("create execLocalPath : %s", execLocalPath), ex); + } catch (Throwable ex) { + String errorLog = String.format("create execLocalPath : %s", execLocalPath); + LoggerUtils.logError(Optional.ofNullable(logger), errorLog, ex); + LoggerUtils.logError(Optional.ofNullable(taskLogger), errorLog, ex); } + FileUtils.taskLoggerThreadLocal.remove(); + taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(), new NettyRemoteChannel(channel, command.getOpaque())); @@ -117,7 +137,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { return Boolean.TRUE; }); // submit task - workerExecService.submit(new TaskExecuteThread(taskExecutionContext, taskCallbackService)); + workerExecService.submit(new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger)); } catch (ExecutionException | RetryException e) { logger.error(e.getMessage(), e); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java index b6ab894020..a2ad762fde 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java @@ -16,7 +16,12 @@ */ package org.apache.dolphinscheduler.server.worker.runner; -import org.apache.dolphinscheduler.common.utils.*; +import java.io.File; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; @@ -24,6 +29,10 @@ import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter; import org.apache.dolphinscheduler.common.thread.ThreadUtils; +import org.apache.dolphinscheduler.common.utils.CollectionUtils; +import org.apache.dolphinscheduler.common.utils.CommonUtils; +import org.apache.dolphinscheduler.common.utils.HadoopUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; @@ -32,10 +41,6 @@ import org.apache.dolphinscheduler.server.worker.task.TaskManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.util.*; -import java.util.stream.Collectors; - /** * task scheduler thread @@ -62,14 +67,22 @@ public class TaskExecuteThread implements Runnable { */ private TaskCallbackService taskCallbackService; + /** + * task logger + */ + private Logger taskLogger; + /** * constructor * @param taskExecutionContext taskExecutionContext * @param taskCallbackService taskCallbackService */ - public TaskExecuteThread(TaskExecutionContext taskExecutionContext, TaskCallbackService taskCallbackService){ + public TaskExecuteThread(TaskExecutionContext taskExecutionContext + , TaskCallbackService taskCallbackService + , Logger taskLogger) { this.taskExecutionContext = taskExecutionContext; this.taskCallbackService = taskCallbackService; + this.taskLogger = taskLogger; } @Override @@ -99,16 +112,7 @@ public class TaskExecuteThread implements Runnable { taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId())); - // custom logger - Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX, - taskExecutionContext.getProcessDefineId(), - taskExecutionContext.getProcessInstanceId(), - taskExecutionContext.getTaskInstanceId())); - - - - task = TaskManager.newTask(taskExecutionContext, - taskLogger); + task = TaskManager.newTask(taskExecutionContext, taskLogger); // task init task.init();