Browse Source

[Fix-9868] A task flow definition isolates the runs of different execution strategies by version numbers. (#9869)

* The thread cache task flow definition should get the latest version.

* Coverage on New Code

* Coverage on New Code

* Coverage on New Code

* use an existing method.

* Increase unit test coverage.

* Task flow definitions enforce policy isolation.
3.0.0/version-upgrade
WangJPLeo 2 years ago committed by GitHub
parent
commit
fb0f96ed94
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
  2. 3
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
  3. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
  4. 12
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
  5. 6
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

3
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java

@ -247,7 +247,8 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
List<ProcessInstance> queryByProcessDefineCodeAndStatus(@Param("processDefinitionCode") Long processDefinitionCode,
@Param("states") int[] states);
List<ProcessInstance> queryByProcessDefineCodeAndStatusAndNextId(@Param("processDefinitionCode") Long processDefinitionCode,
List<ProcessInstance> queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(@Param("processDefinitionCode") Long processDefinitionCode,
@Param("processDefinitionVersion") int processDefinitionVersion,
@Param("states") int[] states, @Param("id") int id);
int updateGlobalParamsById(@Param("globalParams") String globalParams,

3
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml

@ -231,11 +231,12 @@
</foreach>
order by id asc
</select>
<select id="queryByProcessDefineCodeAndStatusAndNextId" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
<select id="queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select
<include refid="baseSql"/>
from t_ds_process_instance
where process_definition_code=#{processDefinitionCode}
and process_definition_version = #{processDefinitionVersion}
and state in
<foreach collection="states" item="i" open="(" close=")" separator=",">
#{i}

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java

@ -802,7 +802,7 @@ public class WorkflowExecuteThread {
/**
* process end handle
*/
private void endProcess() {
public void endProcess() {
this.stateEvents.clear();
if (processDefinition.getExecutionType().typeIsSerialWait()) {
checkSerialProcess(processDefinition);

12
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -313,8 +313,8 @@ public class ProcessServiceImpl implements ProcessService {
//when we get the running instance(or waiting instance) only get the priority instance(by id)
if (processDefinition.getExecutionType().typeIsSerialWait()) {
while (true) {
List<ProcessInstance> runningProcessInstances = this.processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
Constants.RUNNING_PROCESS_STATE, processInstance.getId());
List<ProcessInstance> runningProcessInstances = this.processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(), Constants.RUNNING_PROCESS_STATE, processInstance.getId());
if (CollectionUtils.isEmpty(runningProcessInstances)) {
processInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
saveProcessInstance(processInstance);
@ -326,15 +326,15 @@ public class ProcessServiceImpl implements ProcessService {
}
}
} else if (processDefinition.getExecutionType().typeIsSerialDiscard()) {
List<ProcessInstance> runningProcessInstances = this.processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
Constants.RUNNING_PROCESS_STATE, processInstance.getId());
List<ProcessInstance> runningProcessInstances = this.processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(), Constants.RUNNING_PROCESS_STATE, processInstance.getId());
if (CollectionUtils.isEmpty(runningProcessInstances)) {
processInstance.setState(ExecutionStatus.STOP);
saveProcessInstance(processInstance);
}
} else if (processDefinition.getExecutionType().typeIsSerialPriority()) {
List<ProcessInstance> runningProcessInstances = this.processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
Constants.RUNNING_PROCESS_STATE, processInstance.getId());
List<ProcessInstance> runningProcessInstances = this.processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(), Constants.RUNNING_PROCESS_STATE, processInstance.getId());
if (CollectionUtils.isNotEmpty(runningProcessInstances)) {
for (ProcessInstance info : runningProcessInstances) {
info.setCommandType(CommandType.STOP);

6
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

@ -398,7 +398,7 @@ public class ProcessServiceTest {
command6.setCommandParam("{\"ProcessInstanceId\":223}");
command6.setCommandType(CommandType.RECOVER_SERIAL_WAIT);
command6.setProcessDefinitionVersion(1);
Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(11L, Constants.RUNNING_PROCESS_STATE, 223)).thenReturn(lists);
Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(11L, 1, Constants.RUNNING_PROCESS_STATE, 223)).thenReturn(lists);
Mockito.when(processInstanceMapper.updateNextProcessIdById(223, 222)).thenReturn(true);
Mockito.when(commandMapper.deleteById(6)).thenReturn(1);
ProcessInstance processInstance6 = processService.handleCommand(logger, host, command6);
@ -419,7 +419,7 @@ public class ProcessServiceTest {
command7.setCommandType(CommandType.RECOVER_SERIAL_WAIT);
command7.setProcessDefinitionVersion(1);
Mockito.when(commandMapper.deleteById(7)).thenReturn(1);
Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(11L, Constants.RUNNING_PROCESS_STATE, 224)).thenReturn(null);
Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(11L, 1, Constants.RUNNING_PROCESS_STATE, 224)).thenReturn(null);
ProcessInstance processInstance8 = processService.handleCommand(logger, host, command7);
Assert.assertTrue(processInstance8 == null);
@ -441,7 +441,7 @@ public class ProcessServiceTest {
command9.setCommandType(CommandType.RECOVER_SERIAL_WAIT);
command9.setProcessDefinitionVersion(1);
Mockito.when(processInstanceMapper.queryDetailById(225)).thenReturn(processInstance9);
Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(12L, Constants.RUNNING_PROCESS_STATE, 0)).thenReturn(lists);
Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(12L, 1, Constants.RUNNING_PROCESS_STATE, 0)).thenReturn(lists);
Mockito.when(processInstanceMapper.updateById(processInstance)).thenReturn(1);
Mockito.when(commandMapper.deleteById(9)).thenReturn(1);
ProcessInstance processInstance10 = processService.handleCommand(logger, host, command9);

Loading…
Cancel
Save