Browse Source

Solve transaction problems under multi-threading (#10917)

3.0.0/version-upgrade
WangJPLeo 2 years ago committed by caishunfeng
parent
commit
d1cf6fe86b
  1. 17
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

17
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -312,18 +312,11 @@ public class ProcessServiceImpl implements ProcessService {
//serial wait //serial wait
//when we get the running instance(or waiting instance) only get the priority instance(by id) //when we get the running instance(or waiting instance) only get the priority instance(by id)
if (processDefinition.getExecutionType().typeIsSerialWait()) { if (processDefinition.getExecutionType().typeIsSerialWait()) {
while (true) { List<ProcessInstance> runningProcessInstances = this.processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
List<ProcessInstance> runningProcessInstances = this.processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion(), Constants.RUNNING_PROCESS_STATE, processInstance.getId());
processInstance.getProcessDefinitionVersion(), Constants.RUNNING_PROCESS_STATE, processInstance.getId()); if (CollectionUtils.isEmpty(runningProcessInstances)) {
if (CollectionUtils.isEmpty(runningProcessInstances)) { processInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
processInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS); saveProcessInstance(processInstance);
saveProcessInstance(processInstance);
return;
}
ProcessInstance runningProcess = runningProcessInstances.get(0);
if (this.processInstanceMapper.updateNextProcessIdById(processInstance.getId(), runningProcess.getId())) {
return;
}
} }
} else if (processDefinition.getExecutionType().typeIsSerialDiscard()) { } else if (processDefinition.getExecutionType().typeIsSerialDiscard()) {
List<ProcessInstance> runningProcessInstances = this.processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(processInstance.getProcessDefinitionCode(), List<ProcessInstance> runningProcessInstances = this.processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(processInstance.getProcessDefinitionCode(),

Loading…
Cancel
Save