Browse Source

cherry-pick [Bug-13951 ][dolphinscheduler-service] StartParams is not applied when task is failover(RECOVER_TOLERANCE_FAULT_PROCESS CommandType) #13958

3.1.7-release
Drake Youngkun Min 2 years ago committed by zhuangchong
parent
commit
9b094e980b
  1. 19
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
  2. 25
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

19
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -938,8 +938,9 @@ public class ProcessServiceImpl implements ProcessService {
}
if (cmdParam != null) {
CommandType commandTypeIfComplement = getCommandTypeIfComplement(processInstance, command);
// reset global params while repeat running is needed by cmdParam
if (commandTypeIfComplement == CommandType.REPEAT_RUNNING) {
// reset global params while repeat running and recover tolerance fault process is needed by cmdParam
if (commandTypeIfComplement == CommandType.REPEAT_RUNNING ||
commandTypeIfComplement == CommandType.RECOVER_TOLERANCE_FAULT_PROCESS) {
setGlobalParamIfCommanded(processDefinition, cmdParam);
}
@ -2081,8 +2082,7 @@ public class ProcessServiceImpl implements ProcessService {
cmd.setProcessDefinitionCode(processDefinition.getCode());
cmd.setProcessDefinitionVersion(processDefinition.getVersion());
cmd.setProcessInstanceId(processInstance.getId());
cmd.setCommandParam(
String.format("{\"%s\":%d}", CMD_PARAM_RECOVER_PROCESS_ID_STRING, processInstance.getId()));
cmd.setCommandParam(JSONUtils.toJsonString(createCommandParams(processInstance)));
cmd.setExecutorId(processInstance.getExecutorId());
cmd.setCommandType(CommandType.RECOVER_TOLERANCE_FAULT_PROCESS);
cmd.setProcessInstancePriority(processInstance.getProcessInstancePriority());
@ -3202,4 +3202,15 @@ public class ProcessServiceImpl implements ProcessService {
}
}
}
private Map<String, Object> createCommandParams(ProcessInstance processInstance) {
Map<String, Object> commandMap =
JSONUtils.parseObject(processInstance.getCommandParam(), new TypeReference<Map<String, Object>>() {
});
Map<String, Object> recoverFailoverCommandParams = new HashMap<>();
Optional.ofNullable(MapUtils.getObject(commandMap, CMD_PARAM_START_PARAMS))
.ifPresent(startParams -> recoverFailoverCommandParams.put(CMD_PARAM_START_PARAMS, startParams));
recoverFailoverCommandParams.put(CMD_PARAM_RECOVER_PROCESS_ID_STRING, processInstance.getId());
return recoverFailoverCommandParams;
}
}

25
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

@ -477,6 +477,31 @@ public class ProcessServiceTest {
Mockito.when(commandMapper.deleteById(9)).thenReturn(1);
ProcessInstance processInstance10 = processService.handleCommand(host, command9);
Assert.assertTrue(processInstance10 != null);
// build command same as processService.processNeedFailoverProcessInstances(processInstance);
Command command12 = new Command();
command12.setId(12);
command12.setProcessDefinitionCode(definitionCode);
command12.setProcessDefinitionVersion(definitionVersion);
command12.setProcessInstanceId(processInstanceId);
command12.setCommandType(CommandType.RECOVER_TOLERANCE_FAULT_PROCESS);
HashMap<String, String> startParams12 = new HashMap<>();
startParams12.put("startParam11", "testStartParam11");
HashMap<String, String> commandParams12 = new HashMap<>();
commandParams12.put(CMD_PARAM_START_PARAMS, JSONUtils.toJsonString(startParams12));
commandParams12.put("ProcessInstanceId", "222");
command12.setCommandParam(JSONUtils.toJsonString(commandParams12));
Mockito.when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance);
Mockito.when(commandMapper.deleteById(12)).thenReturn(1);
Mockito.when(curingGlobalParamsService.curingGlobalParams(222,
processDefinition.getGlobalParamMap(),
processDefinition.getGlobalParamList(),
CommandType.RECOVER_TOLERANCE_FAULT_PROCESS,
processInstance.getScheduleTime(), null)).thenReturn("\"testStartParam11\"");
ProcessInstance processInstance13 = processService.handleCommand(host, command12);
Assert.assertNotNull(processInstance13);
Assert.assertNotNull(processInstance13.getGlobalParams());
Assert.assertTrue(processInstance13.getGlobalParams().contains("\"testStartParam11\""));
}
@Test(expected = ServiceException.class)

Loading…
Cancel
Save