Browse Source

[BUG FIX] fix bug: dependent task failed when conditions task exists (#2768)

* fix bug 2464: change dependent task for process.

* remove unused code

* add ut

* add ut

* update comments

Co-authored-by: baoliang <baoliang@analysys.com.cn>
pull/3/MERGE
bao liang 5 years ago committed by GitHub
parent
commit
184d64e852
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 11
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java
  2. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java
  3. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
  4. 32
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java
  5. 64
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java
  6. 2
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

11
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java

@ -86,14 +86,14 @@ public enum ExecutionStatus {
public boolean typeIsFinished(){ public boolean typeIsFinished(){
return typeIsSuccess() || typeIsFailure() || typeIsCancel() || typeIsPause() return typeIsSuccess() || typeIsFailure() || typeIsCancel() || typeIsPause()
|| typeIsWaittingThread(); || typeIsStop();
} }
/** /**
* status is waiting thread * status is waiting thread
* @return status * @return status
*/ */
public boolean typeIsWaittingThread(){ public boolean typeIsWaitingThread(){
return this == WAITTING_THREAD; return this == WAITTING_THREAD;
} }
@ -104,6 +104,13 @@ public enum ExecutionStatus {
public boolean typeIsPause(){ public boolean typeIsPause(){
return this == PAUSE; return this == PAUSE;
} }
/**
* status is pause
* @return status
*/
public boolean typeIsStop(){
return this == STOP;
}
/** /**
* status is running * status is running

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java

@ -146,7 +146,7 @@ public class DependentTaskExecThread extends MasterBaseTaskExecThread {
if ( allDependentTaskFinish() || taskInstance.getState().typeIsFinished()){ if ( allDependentTaskFinish() || taskInstance.getState().typeIsFinished()){
break; break;
} }
// updateProcessInstance task instance // update process task
taskInstance = processService.findTaskInstanceById(taskInstance.getId()); taskInstance = processService.findTaskInstanceById(taskInstance.getId());
processInstance = processService.findProcessInstanceById(processInstance.getId()); processInstance = processService.findProcessInstanceById(processInstance.getId());
Thread.sleep(Constants.SLEEP_TIME_MILLIS); Thread.sleep(Constants.SLEEP_TIME_MILLIS);

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

@ -338,7 +338,7 @@ public class MasterExecThread implements Runnable {
private void endProcess() { private void endProcess() {
processInstance.setEndTime(new Date()); processInstance.setEndTime(new Date());
processService.updateProcessInstance(processInstance); processService.updateProcessInstance(processInstance);
if(processInstance.getState().typeIsWaittingThread()){ if(processInstance.getState().typeIsWaitingThread()){
processService.createRecoveryWaitingThreadCommand(null, processInstance); processService.createRecoveryWaitingThreadCommand(null, processInstance);
} }
List<TaskInstance> taskInstances = processService.findValidTaskListByProcessId(processInstance.getId()); List<TaskInstance> taskInstances = processService.findValidTaskListByProcessId(processInstance.getId());

32
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java

@ -123,28 +123,16 @@ public class DependentExecute {
/** /**
* depend type = depend_all * depend type = depend_all
* skip the condition tasks.
* judge all the task
* @return * @return
*/ */
private DependResult dependResultByProcessInstance(ProcessInstance processInstance){ private DependResult dependResultByProcessInstance(ProcessInstance processInstance){
DependResult result = DependResult.FAILED; if(!processInstance.getState().typeIsFinished()){
List<TaskNode> taskNodes = return DependResult.WAITING;
processService.getTaskNodeListByDefinitionId(processInstance.getProcessDefinitionId());
if(CollectionUtils.isEmpty(taskNodes)) {
return result;
} }
for(TaskNode taskNode:taskNodes){ if(processInstance.getState().typeIsSuccess()){
if(taskNode.isConditionsTask() return DependResult.SUCCESS;
|| DagHelper.haveConditionsAfterNode(taskNode.getName(), taskNodes)){
continue;
}
DependResult tmpResult = getDependTaskResult(taskNode.getName(),processInstance);
if(DependResult.SUCCESS != tmpResult){
return tmpResult;
}
} }
return DependResult.SUCCESS; return DependResult.FAILED;
} }
/** /**
@ -168,7 +156,11 @@ public class DependentExecute {
if(taskInstance == null){ if(taskInstance == null){
// cannot find task in the process instance // cannot find task in the process instance
// maybe because process instance is running or failed. // maybe because process instance is running or failed.
result = getDependResultByProcessStateWhenTaskNull(processInstance.getState()); if(processInstance.getState().typeIsFinished()){
result = DependResult.FAILED;
}else{
return DependResult.WAITING;
}
}else{ }else{
result = getDependResultByState(taskInstance.getState()); result = getDependResultByState(taskInstance.getState());
} }
@ -217,9 +209,7 @@ public class DependentExecute {
*/ */
private DependResult getDependResultByState(ExecutionStatus state) { private DependResult getDependResultByState(ExecutionStatus state) {
if(state.typeIsRunning() if(!state.typeIsFinished()){
|| state == ExecutionStatus.SUBMITTED_SUCCESS
|| state == ExecutionStatus.WAITTING_THREAD){
return DependResult.WAITING; return DependResult.WAITING;
}else if(state.typeIsSuccess()){ }else if(state.typeIsSuccess()){
return DependResult.SUCCESS; return DependResult.SUCCESS;

64
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java

@ -94,11 +94,12 @@ public class DependentTaskTest {
} }
@Test @Test
public void test() throws Exception{ public void testDependAll() throws Exception{
TaskInstance taskInstance = getTaskInstance(); TaskInstance taskInstance = getTaskInstance();
String dependString = "{\"dependTaskList\":[{\"dependItemList\":[{\"dateValue\":\"today\",\"depTasks\":\"ALL\",\"projectId\":1,\"definitionList\":[{\"label\":\"C\",\"value\":4},{\"label\":\"B\",\"value\":3},{\"label\":\"A\",\"value\":2}],\"cycle\":\"day\",\"definitionId\":4}],\"relation\":\"AND\"}],\"relation\":\"AND\"}"; String dependString = "{\"dependTaskList\":[{\"dependItemList\":[{\"dateValue\":\"today\",\"depTasks\":\"ALL\",\"projectId\":1,\"definitionList\":[{\"label\":\"C\",\"value\":4},{\"label\":\"B\",\"value\":3},{\"label\":\"A\",\"value\":2}],\"cycle\":\"day\",\"definitionId\":4}],\"relation\":\"AND\"}],\"relation\":\"AND\"}";
taskInstance.setDependency(dependString); taskInstance.setDependency(dependString);
Mockito.when(processService.submitTask(taskInstance)) Mockito.when(processService.submitTask(taskInstance))
.thenReturn(taskInstance); .thenReturn(taskInstance);
DependentTaskExecThread dependentTask = DependentTaskExecThread dependentTask =
@ -107,6 +108,54 @@ public class DependentTaskTest {
dependentTask.call(); dependentTask.call();
Assert.assertEquals(ExecutionStatus.SUCCESS, dependentTask.getTaskInstance().getState()); Assert.assertEquals(ExecutionStatus.SUCCESS, dependentTask.getTaskInstance().getState());
DateInterval dateInterval =DependentDateUtils.getTodayInterval(new Date()).get(0);
Mockito.when(processService
.findLastRunningProcess(4, dateInterval.getStartTime(),
dateInterval.getEndTime()))
.thenReturn(findLastStopProcessInterval());
DependentTaskExecThread dependentFailure = new DependentTaskExecThread(taskInstance);
dependentFailure.call();
Assert.assertEquals(ExecutionStatus.FAILURE, dependentFailure.getTaskInstance().getState());
}
@Test
public void testDependTask() throws Exception{
TaskInstance taskInstance = getTaskInstance();
String dependString = "{\"dependTaskList\":[{\"dependItemList\":[{\"dateValue\":\"today\",\"depTasks\":\"D\",\"projectId\":1,\"definitionList\":[{\"label\":\"C\",\"value\":4},{\"label\":\"B\",\"value\":3},{\"label\":\"A\",\"value\":2}],\"cycle\":\"day\",\"definitionId\":4}],\"relation\":\"AND\"}],\"relation\":\"AND\"}";
taskInstance.setDependency(dependString);
Mockito.when(processService.submitTask(taskInstance))
.thenReturn(taskInstance);
DependentTaskExecThread dependentTask =
new DependentTaskExecThread(taskInstance);
dependentTask.call();
Assert.assertEquals(ExecutionStatus.SUCCESS, dependentTask.getTaskInstance().getState());
DateInterval dateInterval =DependentDateUtils.getTodayInterval(new Date()).get(0);
Mockito.when(processService
.findLastRunningProcess(4, dateInterval.getStartTime(),
dateInterval.getEndTime()))
.thenReturn(findLastStopProcessInterval());
Mockito.when(processService
.findValidTaskListByProcessId(11))
.thenReturn(getErrorTaskInstances());
DependentTaskExecThread dependentFailure = new DependentTaskExecThread(taskInstance);
dependentFailure.call();
Assert.assertEquals(ExecutionStatus.FAILURE, dependentFailure.getTaskInstance().getState());
}
private ProcessInstance findLastStopProcessInterval(){
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(11);
processInstance.setProcessDefinitionId(4);
processInstance.setState(ExecutionStatus.STOP);
return processInstance;
} }
private ProcessInstance findLastProcessInterval(){ private ProcessInstance findLastProcessInterval(){
@ -142,7 +191,7 @@ public class DependentTaskTest {
return list; return list;
} }
private List<TaskInstance> getTaskInstances(){ private List<TaskInstance> getErrorTaskInstances(){
List<TaskInstance> list = new ArrayList<>(); List<TaskInstance> list = new ArrayList<>();
TaskInstance taskInstance = new TaskInstance(); TaskInstance taskInstance = new TaskInstance();
taskInstance.setName("C"); taskInstance.setName("C");
@ -152,12 +201,23 @@ public class DependentTaskTest {
return list; return list;
} }
private List<TaskInstance> getTaskInstances(){
List<TaskInstance> list = new ArrayList<>();
TaskInstance taskInstance = new TaskInstance();
taskInstance.setName("D");
taskInstance.setState(ExecutionStatus.SUCCESS);
taskInstance.setDependency("1231");
list.add(taskInstance);
return list;
}
private TaskInstance getTaskInstance(){ private TaskInstance getTaskInstance(){
TaskInstance taskInstance = new TaskInstance(); TaskInstance taskInstance = new TaskInstance();
taskInstance.setTaskType("DEPENDENT"); taskInstance.setTaskType("DEPENDENT");
taskInstance.setId(252612); taskInstance.setId(252612);
taskInstance.setName("C"); taskInstance.setName("C");
taskInstance.setProcessInstanceId(10111); taskInstance.setProcessInstanceId(10111);
taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
return taskInstance; return taskInstance;
} }

2
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -254,7 +254,7 @@ public class ProcessService {
//process data check //process data check
if (null == processData) { if (null == processData) {
logger.error("process data is null"); logger.error("process data is null");
return null; return new ArrayList<>();
} }
return processData.getTasks(); return processData.getTasks();

Loading…
Cancel
Save