Browse Source

[fix] Fix the SERIAL_DISCARD execution_type not working (#10258)

* fix always stop process_instance when execution_type is SERIAL_DISCARD

* test: no running_process, submit it
3.1.0-release
cadl 2 years ago committed by GitHub
parent
commit
13af2adfc1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
  2. 2
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

7
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -333,10 +333,13 @@ public class ProcessServiceImpl implements ProcessService {
} else if (processDefinition.getExecutionType().typeIsSerialDiscard()) { } else if (processDefinition.getExecutionType().typeIsSerialDiscard()) {
List<ProcessInstance> runningProcessInstances = this.processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(processInstance.getProcessDefinitionCode(), List<ProcessInstance> runningProcessInstances = this.processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(), Constants.RUNNING_PROCESS_STATE, processInstance.getId()); processInstance.getProcessDefinitionVersion(), Constants.RUNNING_PROCESS_STATE, processInstance.getId());
if (CollectionUtils.isEmpty(runningProcessInstances)) { if (CollectionUtils.isNotEmpty(runningProcessInstances)) {
processInstance.setState(ExecutionStatus.STOP); processInstance.setState(ExecutionStatus.STOP);
saveProcessInstance(processInstance); saveProcessInstance(processInstance);
return;
} }
processInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
saveProcessInstance(processInstance);
} else if (processDefinition.getExecutionType().typeIsSerialPriority()) { } else if (processDefinition.getExecutionType().typeIsSerialPriority()) {
List<ProcessInstance> runningProcessInstances = this.processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(processInstance.getProcessDefinitionCode(), List<ProcessInstance> runningProcessInstances = this.processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(), Constants.RUNNING_PROCESS_STATE, processInstance.getId()); processInstance.getProcessDefinitionVersion(), Constants.RUNNING_PROCESS_STATE, processInstance.getId());
@ -3064,4 +3067,4 @@ public class ProcessServiceImpl implements ProcessService {
K8s k8s = k8sMapper.selectOne(nodeWrapper); K8s k8s = k8sMapper.selectOne(nodeWrapper);
return k8s.getK8sConfig(); return k8s.getK8sConfig();
} }
} }

2
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

@ -421,7 +421,7 @@ public class ProcessServiceTest {
Mockito.when(commandMapper.deleteById(7)).thenReturn(1); Mockito.when(commandMapper.deleteById(7)).thenReturn(1);
Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(11L, 1, Constants.RUNNING_PROCESS_STATE, 224)).thenReturn(null); Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(11L, 1, Constants.RUNNING_PROCESS_STATE, 224)).thenReturn(null);
ProcessInstance processInstance8 = processService.handleCommand(logger, host, command7); ProcessInstance processInstance8 = processService.handleCommand(logger, host, command7);
Assert.assertTrue(processInstance8 == null); Assert.assertTrue(processInstance8 != null);
ProcessDefinition processDefinition2 = new ProcessDefinition(); ProcessDefinition processDefinition2 = new ProcessDefinition();
processDefinition2.setId(123); processDefinition2.setId(123);

Loading…
Cancel
Save