From 801216bd35634ed2b1bb1f0ea9fa80140fb17e78 Mon Sep 17 00:00:00 2001 From: Drake Youngkun Min Date: Wed, 10 May 2023 14:02:21 +0900 Subject: [PATCH] RECOVER_TOLERANCE_FAULT_PROCESS CommandType needs the start parameters (#13958) --- .../service/process/ProcessServiceImpl.java | 20 ++++++++++++--- .../service/process/ProcessServiceTest.java | 25 +++++++++++++++++++ 2 files changed, 41 insertions(+), 4 deletions(-) diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index de19938dd9..d0f56f329f 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -23,6 +23,7 @@ import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.C import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_COMPLEMENT_DATA_START_DATE; import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_EMPTY_SUB_PROCESS; import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_RECOVER_PROCESS_ID_STRING; +import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_START_PARAMS; import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_SUB_PROCESS; import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE; import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID; @@ -770,8 +771,9 @@ public class ProcessServiceImpl implements ProcessService { } CommandType commandTypeIfComplement = getCommandTypeIfComplement(processInstance, command); - // reset global params while repeat running is needed by cmdParam - if (commandTypeIfComplement == CommandType.REPEAT_RUNNING) { + // reset global params while repeat running and recover tolerance fault process is needed by cmdParam + if (commandTypeIfComplement == CommandType.REPEAT_RUNNING || + commandTypeIfComplement == CommandType.RECOVER_TOLERANCE_FAULT_PROCESS) { setGlobalParamIfCommanded(processDefinition, cmdParam); } @@ -1590,8 +1592,7 @@ public class ProcessServiceImpl implements ProcessService { cmd.setProcessDefinitionCode(processInstance.getProcessDefinitionCode()); cmd.setProcessDefinitionVersion(processInstance.getProcessDefinitionVersion()); cmd.setProcessInstanceId(processInstance.getId()); - cmd.setCommandParam( - String.format("{\"%s\":%d}", CMD_PARAM_RECOVER_PROCESS_ID_STRING, processInstance.getId())); + cmd.setCommandParam(JSONUtils.toJsonString(createCommandParams(processInstance))); cmd.setExecutorId(processInstance.getExecutorId()); cmd.setCommandType(CommandType.RECOVER_TOLERANCE_FAULT_PROCESS); cmd.setProcessInstancePriority(processInstance.getProcessInstancePriority()); @@ -2605,4 +2606,15 @@ public class ProcessServiceImpl implements ProcessService { triggerRelationService.saveCommandTrigger(commandId, processInstanceId); } + private Map createCommandParams(ProcessInstance processInstance) { + Map commandMap = + JSONUtils.parseObject(processInstance.getCommandParam(), new TypeReference>() { + }); + Map recoverFailoverCommandParams = new HashMap<>(); + Optional.ofNullable(MapUtils.getObject(commandMap, CMD_PARAM_START_PARAMS)) + .ifPresent(startParams -> recoverFailoverCommandParams.put(CMD_PARAM_START_PARAMS, startParams)); + recoverFailoverCommandParams.put(CMD_PARAM_RECOVER_PROCESS_ID_STRING, processInstance.getId()); + return recoverFailoverCommandParams; + } + } diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java index 81ef7a214f..1e94407a4b 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java @@ -370,6 +370,31 @@ public class ProcessServiceTest { Mockito.when(commandMapper.deleteById(9)).thenReturn(1); ProcessInstance processInstance10 = processService.handleCommand(host, command9); Assertions.assertNotNull(processInstance10); + + // build command same as processService.processNeedFailoverProcessInstances(processInstance); + Command command12 = new Command(); + command12.setId(12); + command12.setProcessDefinitionCode(definitionCode); + command12.setProcessDefinitionVersion(definitionVersion); + command12.setProcessInstanceId(processInstanceId); + command12.setCommandType(CommandType.RECOVER_TOLERANCE_FAULT_PROCESS); + HashMap startParams12 = new HashMap<>(); + startParams12.put("startParam11", "testStartParam11"); + HashMap commandParams12 = new HashMap<>(); + commandParams12.put(CMD_PARAM_START_PARAMS, JSONUtils.toJsonString(startParams12)); + commandParams12.put("ProcessInstanceId", "222"); + command12.setCommandParam(JSONUtils.toJsonString(commandParams12)); + Mockito.when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance); + Mockito.when(commandMapper.deleteById(12)).thenReturn(1); + Mockito.when(curingGlobalParamsService.curingGlobalParams(222, + processDefinition.getGlobalParamMap(), + processDefinition.getGlobalParamList(), + CommandType.RECOVER_TOLERANCE_FAULT_PROCESS, + processInstance.getScheduleTime(), null)).thenReturn("\"testStartParam11\""); + ProcessInstance processInstance13 = processService.handleCommand(host, command12); + Assertions.assertNotNull(processInstance13); + Assertions.assertNotNull(processInstance13.getGlobalParams()); + Assertions.assertTrue(processInstance13.getGlobalParams().contains("\"testStartParam11\"")); } @Test