Browse Source

Fix serial wait will go into block when there is no task need to submit (#13486)

3.2.0-release
Wenjun Ruan 2 years ago committed by GitHub
parent
commit
6300a9b8de
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

8
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -336,7 +336,7 @@ public class ProcessServiceImpl implements ProcessService {
processInstance.getProcessDefinitionVersion()); processInstance.getProcessDefinitionVersion());
if (processDefinition.getExecutionType().typeIsSerial()) { if (processDefinition.getExecutionType().typeIsSerial()) {
saveSerialProcess(processInstance, processDefinition); saveSerialProcess(processInstance, processDefinition);
if (processInstance.getState() != WorkflowExecutionStatus.SUBMITTED_SUCCESS) { if (processInstance.getState() != WorkflowExecutionStatus.RUNNING_EXECUTION) {
setSubProcessParam(processInstance); setSubProcessParam(processInstance);
triggerRelationService.saveProcessInstanceTrigger(command.getId(), processInstance.getId()); triggerRelationService.saveProcessInstanceTrigger(command.getId(), processInstance.getId());
deleteCommandWithCheck(command.getId()); deleteCommandWithCheck(command.getId());
@ -364,7 +364,7 @@ public class ProcessServiceImpl implements ProcessService {
org.apache.dolphinscheduler.service.utils.Constants.RUNNING_PROCESS_STATE, org.apache.dolphinscheduler.service.utils.Constants.RUNNING_PROCESS_STATE,
processInstance.getId()); processInstance.getId());
if (CollectionUtils.isEmpty(runningProcessInstances)) { if (CollectionUtils.isEmpty(runningProcessInstances)) {
processInstance.setStateWithDesc(WorkflowExecutionStatus.SUBMITTED_SUCCESS, processInstance.setStateWithDesc(WorkflowExecutionStatus.RUNNING_EXECUTION,
"submit from serial_wait strategy"); "submit from serial_wait strategy");
processInstanceDao.upsertProcessInstance(processInstance); processInstanceDao.upsertProcessInstance(processInstance);
} }
@ -380,7 +380,7 @@ public class ProcessServiceImpl implements ProcessService {
processInstanceDao.upsertProcessInstance(processInstance); processInstanceDao.upsertProcessInstance(processInstance);
return; return;
} }
processInstance.setStateWithDesc(WorkflowExecutionStatus.SUBMITTED_SUCCESS, processInstance.setStateWithDesc(WorkflowExecutionStatus.RUNNING_EXECUTION,
"submit from serial_discard strategy"); "submit from serial_discard strategy");
processInstanceDao.upsertProcessInstance(processInstance); processInstanceDao.upsertProcessInstance(processInstance);
} else if (processDefinition.getExecutionType().typeIsSerialPriority()) { } else if (processDefinition.getExecutionType().typeIsSerialPriority()) {
@ -408,7 +408,7 @@ public class ProcessServiceImpl implements ProcessService {
} }
} }
} }
processInstance.setStateWithDesc(WorkflowExecutionStatus.SUBMITTED_SUCCESS, processInstance.setStateWithDesc(WorkflowExecutionStatus.RUNNING_EXECUTION,
"submit by serial_priority strategy"); "submit by serial_priority strategy");
processInstanceDao.upsertProcessInstance(processInstance); processInstanceDao.upsertProcessInstance(processInstance);
} }

Loading…
Cancel
Save