Browse Source

[BUG FIX] fix bug:cannot pause work flow when task state is "submit success" (#2783)

* feature: add number configuration for master dispatch tasks

* fix bug(#2762) the master would be blocked when worker group not exists

* fix bug(#2762) the master would be blocked when worker group not exists

* fix ut

* fix ut

* fix bug(2781): cannot pause work flow when task state is "submit success"

* fix code smell

* add mysql other param blank judge

* test

* update comments

* update comments

* add ut

Co-authored-by: baoliang <baoliang@analysys.com.cn>
pull/3/MERGE
bao liang 5 years ago committed by GitHub
parent
commit
3d857bba92
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/MySQLDataSource.java
  2. 30
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
  3. 1
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
  4. 39
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java

4
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/MySQLDataSource.java

@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.dao.datasource;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -51,6 +52,9 @@ public class MySQLDataSource extends BaseDataSource {
@Override
protected String filterOther(String other){
if(StringUtils.isBlank(other)){
return "";
}
if(other.contains(sensitiveParam)){
int index = other.indexOf(sensitiveParam);
String tmp = sensitiveParam;

30
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java

@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
@ -38,7 +39,6 @@ import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.Date;
import java.util.Set;
@ -142,6 +142,9 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
if(this.cancel || this.processInstance.getState() == ExecutionStatus.READY_STOP){
cancelTaskInstance();
}
if(processInstance.getState() == ExecutionStatus.READY_PAUSE){
pauseTask();
}
// task instance finished
if (taskInstance.getState().typeIsFinished()){
// if task is final result , then remove taskInstance from cache
@ -176,20 +179,33 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
return true;
}
/**
* pause task if task have not been dispatched to worker, do not dispatch anymore.
*
*/
public void pauseTask() {
taskInstance = processService.findTaskInstanceById(taskInstance.getId());
if(taskInstance == null){
return;
}
if(StringUtils.isBlank(taskInstance.getHost())){
taskInstance.setState(ExecutionStatus.PAUSE);
taskInstance.setEndTime(new Date());
processService.updateTaskInstance(taskInstance);
}
}
/**
* task instance add queue , waiting worker to kill
*/
private void cancelTaskInstance() throws Exception{
if(alreadyKilled){
return ;
return;
}
alreadyKilled = true;
String taskInstanceWorkerGroup = taskInstance.getWorkerGroup();
// not exists
if (!existsValidWorkerGroup(taskInstanceWorkerGroup)){
taskInstance = processService.findTaskInstanceById(taskInstance.getId());
if(StringUtils.isBlank(taskInstance.getHost())){
taskInstance.setState(ExecutionStatus.KILL);
taskInstance.setEndTime(new Date());
processService.updateTaskInstance(taskInstance);

1
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java

@ -235,7 +235,6 @@ public class TaskPriorityQueueConsumerTest {
dataSource.setUpdateTime(new Date());
Mockito.when(processService.findDataSourceById(1)).thenReturn(dataSource);
Thread.sleep(10000);
}

39
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java

@ -17,8 +17,8 @@
package org.apache.dolphinscheduler.server.master.runner;
import junit.framework.Assert;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.consumer.TaskPriorityQueueConsumer;
import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
@ -27,12 +27,13 @@ import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.zk.SpringZKServer;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.springframework.context.ApplicationContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@ -45,6 +46,7 @@ import java.util.Set;
ZookeeperNodeManager.class, ZookeeperCachedOperator.class, ZookeeperConfig.class})
public class MasterTaskExecThreadTest {
@Test
public void testExistsValidWorkerGroup1(){
ZookeeperRegistryCenter zookeeperRegistryCenter = Mockito.mock(ZookeeperRegistryCenter.class);
@ -76,5 +78,36 @@ public class MasterTaskExecThreadTest {
masterTaskExecThread.existsValidWorkerGroup("test1");
}
@Test
public void testPauseTask(){
ProcessService processService = Mockito.mock(ProcessService.class);
ApplicationContext applicationContext = Mockito.mock(ApplicationContext.class);
SpringApplicationContext springApplicationContext = new SpringApplicationContext();
springApplicationContext.setApplicationContext(applicationContext);
Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
TaskInstance taskInstance = getTaskInstance();
Mockito.when(processService.findTaskInstanceById(252612))
.thenReturn(taskInstance);
Mockito.when(processService.updateTaskInstance(taskInstance))
.thenReturn(true);
MasterTaskExecThread masterTaskExecThread = new MasterTaskExecThread(taskInstance);
masterTaskExecThread.pauseTask();
org.junit.Assert.assertEquals(ExecutionStatus.PAUSE, taskInstance.getState());
}
private TaskInstance getTaskInstance(){
TaskInstance taskInstance = new TaskInstance();
taskInstance.setTaskType("SHELL");
taskInstance.setId(252612);
taskInstance.setName("C");
taskInstance.setProcessInstanceId(10111);
taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
return taskInstance;
}
}

Loading…
Cancel
Save