From 3d857bba925348f4b92b9ba857c37fbca28549dd Mon Sep 17 00:00:00 2001 From: bao liang <29528966+lenboo@users.noreply.github.com> Date: Fri, 22 May 2020 10:42:24 +0800 Subject: [PATCH] [BUG FIX] fix bug:cannot pause work flow when task state is "submit success" (#2783) * feature: add number configuration for master dispatch tasks * fix bug(#2762) the master would be blocked when worker group not exists * fix bug(#2762) the master would be blocked when worker group not exists * fix ut * fix ut * fix bug(2781): cannot pause work flow when task state is "submit success" * fix code smell * add mysql other param blank judge * test * update comments * update comments * add ut Co-authored-by: baoliang --- .../dao/datasource/MySQLDataSource.java | 4 ++ .../master/runner/MasterTaskExecThread.java | 30 ++++++++++---- .../TaskPriorityQueueConsumerTest.java | 1 - .../runner/MasterTaskExecThreadTest.java | 39 +++++++++++++++++-- 4 files changed, 63 insertions(+), 11 deletions(-) diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/MySQLDataSource.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/MySQLDataSource.java index d2024ecfde..50e5e7b996 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/MySQLDataSource.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/MySQLDataSource.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.dao.datasource; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.DbType; +import org.apache.dolphinscheduler.common.utils.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,6 +52,9 @@ public class MySQLDataSource extends BaseDataSource { @Override protected String filterOther(String other){ + if(StringUtils.isBlank(other)){ + return ""; + } if(other.contains(sensitiveParam)){ int index = other.indexOf(sensitiveParam); String tmp = sensitiveParam; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java index 105584fe99..8c4c2bac02 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java @@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.utils.CollectionUtils; +import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand; @@ -38,7 +39,6 @@ import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import org.springframework.beans.factory.annotation.Autowired; import java.util.Date; import java.util.Set; @@ -142,6 +142,9 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { if(this.cancel || this.processInstance.getState() == ExecutionStatus.READY_STOP){ cancelTaskInstance(); } + if(processInstance.getState() == ExecutionStatus.READY_PAUSE){ + pauseTask(); + } // task instance finished if (taskInstance.getState().typeIsFinished()){ // if task is final result , then remove taskInstance from cache @@ -176,20 +179,33 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { return true; } + /** + * pause task if task have not been dispatched to worker, do not dispatch anymore. + * + */ + public void pauseTask() { + taskInstance = processService.findTaskInstanceById(taskInstance.getId()); + if(taskInstance == null){ + return; + } + if(StringUtils.isBlank(taskInstance.getHost())){ + taskInstance.setState(ExecutionStatus.PAUSE); + taskInstance.setEndTime(new Date()); + processService.updateTaskInstance(taskInstance); + } + } + /** * task instance add queue , waiting worker to kill */ private void cancelTaskInstance() throws Exception{ if(alreadyKilled){ - return ; + return; } alreadyKilled = true; - - String taskInstanceWorkerGroup = taskInstance.getWorkerGroup(); - - // not exists - if (!existsValidWorkerGroup(taskInstanceWorkerGroup)){ + taskInstance = processService.findTaskInstanceById(taskInstance.getId()); + if(StringUtils.isBlank(taskInstance.getHost())){ taskInstance.setState(ExecutionStatus.KILL); taskInstance.setEndTime(new Date()); processService.updateTaskInstance(taskInstance); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java index b14dbc3ccf..b247dc41e6 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java @@ -235,7 +235,6 @@ public class TaskPriorityQueueConsumerTest { dataSource.setUpdateTime(new Date()); Mockito.when(processService.findDataSourceById(1)).thenReturn(dataSource); - Thread.sleep(10000); } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java index 2606dc1c1f..83095a371a 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java @@ -17,8 +17,8 @@ package org.apache.dolphinscheduler.server.master.runner; -import junit.framework.Assert; -import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.server.master.consumer.TaskPriorityQueueConsumer; import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; @@ -27,12 +27,13 @@ import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.zk.SpringZKServer; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl; +import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mockito; +import org.springframework.context.ApplicationContext; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; @@ -45,6 +46,7 @@ import java.util.Set; ZookeeperNodeManager.class, ZookeeperCachedOperator.class, ZookeeperConfig.class}) public class MasterTaskExecThreadTest { + @Test public void testExistsValidWorkerGroup1(){ ZookeeperRegistryCenter zookeeperRegistryCenter = Mockito.mock(ZookeeperRegistryCenter.class); @@ -76,5 +78,36 @@ public class MasterTaskExecThreadTest { masterTaskExecThread.existsValidWorkerGroup("test1"); } + @Test + public void testPauseTask(){ + + + ProcessService processService = Mockito.mock(ProcessService.class); + ApplicationContext applicationContext = Mockito.mock(ApplicationContext.class); + SpringApplicationContext springApplicationContext = new SpringApplicationContext(); + springApplicationContext.setApplicationContext(applicationContext); + Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService); + + TaskInstance taskInstance = getTaskInstance(); + Mockito.when(processService.findTaskInstanceById(252612)) + .thenReturn(taskInstance); + + Mockito.when(processService.updateTaskInstance(taskInstance)) + .thenReturn(true); + + MasterTaskExecThread masterTaskExecThread = new MasterTaskExecThread(taskInstance); + masterTaskExecThread.pauseTask(); + org.junit.Assert.assertEquals(ExecutionStatus.PAUSE, taskInstance.getState()); + } + + private TaskInstance getTaskInstance(){ + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setTaskType("SHELL"); + taskInstance.setId(252612); + taskInstance.setName("C"); + taskInstance.setProcessInstanceId(10111); + taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS); + return taskInstance; + } }