From 8951b2988094f0f2d2c838ec08086ccd68f99dd0 Mon Sep 17 00:00:00 2001 From: zhangchunyang Date: Fri, 22 May 2020 10:39:42 +0800 Subject: [PATCH 1/3] fixed #2788 resolve the bug #2788 (#2789) * Modify the ambari-plugin readme for packing RPM packages and add a description of the replication plug-in directory * fixed #2788 --- dolphinscheduler-dist/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dolphinscheduler-dist/pom.xml b/dolphinscheduler-dist/pom.xml index f4b8d2b7ab..dd4807666e 100644 --- a/dolphinscheduler-dist/pom.xml +++ b/dolphinscheduler-dist/pom.xml @@ -203,7 +203,7 @@ dolphinscheduler /opt/soft - + false __os_install_post %(echo '%{__os_install_post}' | sed -e 's!/usr/lib[^[:space:]]*/brp-python-bytecompile[[:space:]].*$!!g') From 3d857bba925348f4b92b9ba857c37fbca28549dd Mon Sep 17 00:00:00 2001 From: bao liang <29528966+lenboo@users.noreply.github.com> Date: Fri, 22 May 2020 10:42:24 +0800 Subject: [PATCH 2/3] [BUG FIX] fix bug:cannot pause work flow when task state is "submit success" (#2783) * feature: add number configuration for master dispatch tasks * fix bug(#2762) the master would be blocked when worker group not exists * fix bug(#2762) the master would be blocked when worker group not exists * fix ut * fix ut * fix bug(2781): cannot pause work flow when task state is "submit success" * fix code smell * add mysql other param blank judge * test * update comments * update comments * add ut Co-authored-by: baoliang --- .../dao/datasource/MySQLDataSource.java | 4 ++ .../master/runner/MasterTaskExecThread.java | 30 ++++++++++---- .../TaskPriorityQueueConsumerTest.java | 1 - .../runner/MasterTaskExecThreadTest.java | 39 +++++++++++++++++-- 4 files changed, 63 insertions(+), 11 deletions(-) diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/MySQLDataSource.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/MySQLDataSource.java index d2024ecfde..50e5e7b996 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/MySQLDataSource.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/MySQLDataSource.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.dao.datasource; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.DbType; +import org.apache.dolphinscheduler.common.utils.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,6 +52,9 @@ public class MySQLDataSource extends BaseDataSource { @Override protected String filterOther(String other){ + if(StringUtils.isBlank(other)){ + return ""; + } if(other.contains(sensitiveParam)){ int index = other.indexOf(sensitiveParam); String tmp = sensitiveParam; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java index 105584fe99..8c4c2bac02 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java @@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.utils.CollectionUtils; +import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand; @@ -38,7 +39,6 @@ import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import org.springframework.beans.factory.annotation.Autowired; import java.util.Date; import java.util.Set; @@ -142,6 +142,9 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { if(this.cancel || this.processInstance.getState() == ExecutionStatus.READY_STOP){ cancelTaskInstance(); } + if(processInstance.getState() == ExecutionStatus.READY_PAUSE){ + pauseTask(); + } // task instance finished if (taskInstance.getState().typeIsFinished()){ // if task is final result , then remove taskInstance from cache @@ -176,20 +179,33 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { return true; } + /** + * pause task if task have not been dispatched to worker, do not dispatch anymore. + * + */ + public void pauseTask() { + taskInstance = processService.findTaskInstanceById(taskInstance.getId()); + if(taskInstance == null){ + return; + } + if(StringUtils.isBlank(taskInstance.getHost())){ + taskInstance.setState(ExecutionStatus.PAUSE); + taskInstance.setEndTime(new Date()); + processService.updateTaskInstance(taskInstance); + } + } + /** * task instance add queue , waiting worker to kill */ private void cancelTaskInstance() throws Exception{ if(alreadyKilled){ - return ; + return; } alreadyKilled = true; - - String taskInstanceWorkerGroup = taskInstance.getWorkerGroup(); - - // not exists - if (!existsValidWorkerGroup(taskInstanceWorkerGroup)){ + taskInstance = processService.findTaskInstanceById(taskInstance.getId()); + if(StringUtils.isBlank(taskInstance.getHost())){ taskInstance.setState(ExecutionStatus.KILL); taskInstance.setEndTime(new Date()); processService.updateTaskInstance(taskInstance); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java index b14dbc3ccf..b247dc41e6 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java @@ -235,7 +235,6 @@ public class TaskPriorityQueueConsumerTest { dataSource.setUpdateTime(new Date()); Mockito.when(processService.findDataSourceById(1)).thenReturn(dataSource); - Thread.sleep(10000); } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java index 2606dc1c1f..83095a371a 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java @@ -17,8 +17,8 @@ package org.apache.dolphinscheduler.server.master.runner; -import junit.framework.Assert; -import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.server.master.consumer.TaskPriorityQueueConsumer; import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; @@ -27,12 +27,13 @@ import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.zk.SpringZKServer; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl; +import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mockito; +import org.springframework.context.ApplicationContext; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; @@ -45,6 +46,7 @@ import java.util.Set; ZookeeperNodeManager.class, ZookeeperCachedOperator.class, ZookeeperConfig.class}) public class MasterTaskExecThreadTest { + @Test public void testExistsValidWorkerGroup1(){ ZookeeperRegistryCenter zookeeperRegistryCenter = Mockito.mock(ZookeeperRegistryCenter.class); @@ -76,5 +78,36 @@ public class MasterTaskExecThreadTest { masterTaskExecThread.existsValidWorkerGroup("test1"); } + @Test + public void testPauseTask(){ + + + ProcessService processService = Mockito.mock(ProcessService.class); + ApplicationContext applicationContext = Mockito.mock(ApplicationContext.class); + SpringApplicationContext springApplicationContext = new SpringApplicationContext(); + springApplicationContext.setApplicationContext(applicationContext); + Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService); + + TaskInstance taskInstance = getTaskInstance(); + Mockito.when(processService.findTaskInstanceById(252612)) + .thenReturn(taskInstance); + + Mockito.when(processService.updateTaskInstance(taskInstance)) + .thenReturn(true); + + MasterTaskExecThread masterTaskExecThread = new MasterTaskExecThread(taskInstance); + masterTaskExecThread.pauseTask(); + org.junit.Assert.assertEquals(ExecutionStatus.PAUSE, taskInstance.getState()); + } + + private TaskInstance getTaskInstance(){ + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setTaskType("SHELL"); + taskInstance.setId(252612); + taskInstance.setName("C"); + taskInstance.setProcessInstanceId(10111); + taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS); + return taskInstance; + } } From 396b1716e4e9b0f55f3686ba8e0283555c65c085 Mon Sep 17 00:00:00 2001 From: tlhhup <137273278@qq.com> Date: Fri, 22 May 2020 11:20:54 +0800 Subject: [PATCH 3/3] fix bug:#2735 (#2770) 1. store process id in zk. 2. resolve host and port through zk path. 3. modify ui key. --- .../org/apache/dolphinscheduler/common/Constants.java | 2 +- .../apache/dolphinscheduler/common/utils/ResInfo.java | 4 ++-- .../server/master/registry/MasterRegistry.java | 4 +++- .../server/registry/HeartBeatTask.java | 4 +++- .../server/worker/registry/WorkerRegistry.java | 4 +++- .../dolphinscheduler/service/zk/AbstractZKClient.java | 11 +++++++---- .../conf/home/pages/monitor/pages/servers/master.vue | 2 +- .../conf/home/pages/monitor/pages/servers/worker.vue | 2 +- .../src/js/module/i18n/locale/zh_CN.js | 2 +- 9 files changed, 22 insertions(+), 13 deletions(-) diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index d9589320ca..686dce3dd6 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -507,7 +507,7 @@ public final class Constants { /** * heartbeat for zk info length */ - public static final int HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH = 9; + public static final int HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH = 10; /** diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ResInfo.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ResInfo.java index 9c1d8806c4..fa52a91f91 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ResInfo.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ResInfo.java @@ -18,8 +18,6 @@ package org.apache.dolphinscheduler.common.utils; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.model.Server; -import java.util.Date; - /** * heartbeat for ZK reigster res info */ @@ -109,6 +107,8 @@ public class ResInfo { Double.parseDouble(masterArray[2]))); masterServer.setCreateTime(DateUtils.stringToDate(masterArray[6])); masterServer.setLastHeartbeatTime(DateUtils.stringToDate(masterArray[7])); + //set process id + masterServer.setId(Integer.parseInt(masterArray[9])); return masterServer; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java index de6d3bc2cf..3ddb9e73d2 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.master.registry; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; @@ -131,6 +132,7 @@ public class MasterRegistry { * @return */ private String getLocalAddress(){ - return OSUtils.getHost() + ":" + masterConfig.getListenPort(); + return OSUtils.getHost() + Constants.COLON + masterConfig.getListenPort(); } + } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java index 6d0eae9316..2345ce9533 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java @@ -73,7 +73,9 @@ public class HeartBeatTask extends Thread{ builder.append(reservedMemory).append(Constants.COMMA); builder.append(startTime).append(Constants.COMMA); builder.append(DateUtils.dateToString(new Date())).append(Constants.COMMA); - builder.append(status); + builder.append(status).append(COMMA); + //save process id + builder.append(OSUtils.getProcessID()); zookeeperRegistryCenter.getZookeeperCachedOperator().update(heartBeatPath, builder.toString()); } catch (Throwable ex){ logger.error("error write heartbeat info", ex); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java index f7093a1ec7..715db3966e 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.worker.registry; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; @@ -148,6 +149,7 @@ public class WorkerRegistry { * @return */ private String getLocalAddress(){ - return OSUtils.getHost() + ":" + workerConfig.getListenPort(); + return OSUtils.getHost() + Constants.COLON + workerConfig.getListenPort(); } + } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java index e75e20becb..acbbe76188 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java @@ -123,12 +123,15 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator { String parentPath = getZNodeParentPath(zkNodeType); List masterServers = new ArrayList<>(); - int i = 0; for (Map.Entry entry : masterMap.entrySet()) { Server masterServer = ResInfo.parseHeartbeatForZKInfo(entry.getValue()); - masterServer.setZkDirectory(parentPath + "/"+ entry.getKey()); - masterServer.setId(i); - i ++; + String key = entry.getKey(); + masterServer.setZkDirectory(parentPath + "/"+ key); + //set host and port + String[] hostAndPort=key.split(COLON); + String[] hosts=hostAndPort[0].split(DIVISION_STRING); + masterServer.setHost(hosts[hosts.length-1]);// fetch the last one + masterServer.setPort(Integer.parseInt(hostAndPort[1])); masterServers.add(masterServer); } return masterServers; diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/monitor/pages/servers/master.vue b/dolphinscheduler-ui/src/js/conf/home/pages/monitor/pages/servers/master.vue index dc06712cd2..ecc414c5f6 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/monitor/pages/servers/master.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/monitor/pages/servers/master.vue @@ -22,7 +22,7 @@
IP: {{item.host}} - {{$t('Process Pid')}}: {{item.port}} + {{$t('Process Pid')}}: {{item.id}} {{$t('Zk registration directory')}}: {{item.zkDirectory}}
diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/monitor/pages/servers/worker.vue b/dolphinscheduler-ui/src/js/conf/home/pages/monitor/pages/servers/worker.vue index b386ea0cc0..ceaed89675 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/monitor/pages/servers/worker.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/monitor/pages/servers/worker.vue @@ -22,7 +22,7 @@
IP: {{item.host}} - {{$t('Process Pid')}}: {{item.port}} + {{$t('Process Pid')}}: {{item.id}} {{$t('Zk registration directory')}}: {{item.zkDirectory}}
diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js index 2cdf09db7b..777173f6ce 100755 --- a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js +++ b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js @@ -168,7 +168,7 @@ export default { 'Project Name': '项目名称', 'Please enter name': '请输入名称', 'Owned Users': '所属用户', - 'Process Pid': '进程pid', + 'Process Pid': '进程Pid', 'Zk registration directory': 'zk注册目录', cpuUsage: 'cpuUsage', memoryUsage: 'memoryUsage',