From e47e4a70abdeb84fc89be935adce1888a8867f49 Mon Sep 17 00:00:00 2001 From: WilliamChen-luckbob <58684828+WilliamChen-luckbob@users.noreply.github.com> Date: Wed, 24 Feb 2021 23:48:23 +0800 Subject: [PATCH] [Fix] while repeat running, the global parameters will not set to the user specified start parameters. (#4779) * [fix]when repeat running, there are no startparams which was set in the first start * [fix] cannot set global param while rerun process instance * fix code format * delete unrelated test * Update ExecutorControllerTest.java * fix code smell * Update ExecutorService2Test.java don't know why e2e failed * replace duplicate code --- .../api/service/impl/ExecutorServiceImpl.java | 33 ++++++++---- .../api/service/ExecutorService2Test.java | 11 +++- .../service/process/ProcessService.java | 52 +++++++++++++------ 3 files changed, 69 insertions(+), 27 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java index b3e2b2cd74..8e75eb9c5c 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java @@ -24,6 +24,7 @@ import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODE_ import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS; import static org.apache.dolphinscheduler.common.Constants.MAX_TASK_TIMEOUT; +import org.apache.commons.collections.MapUtils; import org.apache.dolphinscheduler.api.enums.ExecuteType; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.service.ExecutorService; @@ -261,15 +262,22 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ putMsg(result, Status.TENANT_NOT_SUITABLE); } + //get the startParams user specified at the first starting while repeat running is needed + Map commandMap = JSONUtils.toMap(processInstance.getCommandParam(), String.class, Object.class); + String startParams = null; + if (MapUtils.isNotEmpty(commandMap) && executeType == ExecuteType.REPEAT_RUNNING) { + startParams = (commandMap.get(Constants.CMD_PARAM_START_PARAMS)).toString(); + } + switch (executeType) { case REPEAT_RUNNING: - result = insertCommand(loginUser, processInstanceId, processDefinition.getId(), CommandType.REPEAT_RUNNING); + result = insertCommand(loginUser, processInstanceId, processDefinition.getId(), CommandType.REPEAT_RUNNING, startParams); break; case RECOVER_SUSPENDED_PROCESS: - result = insertCommand(loginUser, processInstanceId, processDefinition.getId(), CommandType.RECOVER_SUSPENDED_PROCESS); + result = insertCommand(loginUser, processInstanceId, processDefinition.getId(), CommandType.RECOVER_SUSPENDED_PROCESS, startParams); break; case START_FAILURE_TASK_PROCESS: - result = insertCommand(loginUser, processInstanceId, processDefinition.getId(), CommandType.START_FAILURE_TASK_PROCESS); + result = insertCommand(loginUser, processInstanceId, processDefinition.getId(), CommandType.START_FAILURE_TASK_PROCESS, startParams); break; case STOP: if (processInstance.getState() == ExecutionStatus.READY_STOP) { @@ -379,19 +387,26 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ /** * insert command, used in the implementation of the page, re run, recovery (pause / failure) execution * - * @param loginUser login user - * @param instanceId instance id + * @param loginUser login user + * @param instanceId instance id * @param processDefinitionId process definition id - * @param commandType command type + * @param commandType command type * @return insert result code */ - private Map insertCommand(User loginUser, Integer instanceId, Integer processDefinitionId, CommandType commandType) { + private Map insertCommand(User loginUser, Integer instanceId, Integer processDefinitionId, CommandType commandType, String startParams) { Map result = new HashMap<>(); + + //To add startParams only when repeat running is needed + Map cmdParam = new HashMap<>(); + cmdParam.put(CMD_PARAM_RECOVER_PROCESS_ID_STRING, instanceId); + if (StringUtils.isNotEmpty(startParams)) { + cmdParam.put(CMD_PARAM_START_PARAMS, startParams); + } + Command command = new Command(); command.setCommandType(commandType); command.setProcessDefinitionId(processDefinitionId); - command.setCommandParam(String.format("{\"%s\":%d}", - CMD_PARAM_RECOVER_PROCESS_ID_STRING, instanceId)); + command.setCommandParam(JSONUtils.toJsonString(cmdParam)); command.setExecutorId(loginUser.getId()); if (!processService.verifyIsNeedCreateCommand(command)) { diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java index b3a79ccd62..b0af84f41f 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java @@ -21,6 +21,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import org.apache.dolphinscheduler.api.enums.ExecuteType; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.service.impl.ExecutorServiceImpl; import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl; @@ -260,6 +261,14 @@ public class ExecutorService2Test { } + @Test + public void testExecuteRepeatRunning() throws Exception { + Mockito.when(processService.verifyIsNeedCreateCommand(any(Command.class))).thenReturn(true); + + Map result = executorService.execute(loginUser, projectName, processInstanceId, ExecuteType.REPEAT_RUNNING); + Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); + } + private List getMasterServersList() { List masterServerList = new ArrayList<>(); Server masterServer1 = new Server(); @@ -295,4 +304,4 @@ public class ExecutorService2Test { result.put(Constants.STATUS, Status.SUCCESS); return result; } -} \ No newline at end of file +} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index c5f5e3a02d..a759c24e50 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -563,23 +563,8 @@ public class ProcessService { processInstance.setLocations(processDefinition.getLocations()); processInstance.setConnects(processDefinition.getConnects()); - // get start params from command param - Map startParamMap = null; - if (cmdParam != null && cmdParam.containsKey(Constants.CMD_PARAM_START_PARAMS)) { - String startParamJson = cmdParam.get(Constants.CMD_PARAM_START_PARAMS); - startParamMap = JSONUtils.toMap(startParamJson); - } - - // set start param into global params - if (startParamMap != null && startParamMap.size() > 0 - && processDefinition.getGlobalParamMap() != null) { - for (Map.Entry param : processDefinition.getGlobalParamMap().entrySet()) { - String val = startParamMap.get(param.getKey()); - if (val != null) { - param.setValue(val); - } - } - } + // reset global params while there are start parameters + setGlobalParamIfCommanded(processDefinition,cmdParam); // curing global params processInstance.setGlobalParams(ParameterUtils.curingGlobalParams( @@ -599,6 +584,26 @@ public class ProcessService { return processInstance; } + private void setGlobalParamIfCommanded(ProcessDefinition processDefinition, Map cmdParam) { + // get start params from command param + Map startParamMap = null; + if (cmdParam != null && cmdParam.containsKey(Constants.CMD_PARAM_START_PARAMS)) { + String startParamJson = cmdParam.get(Constants.CMD_PARAM_START_PARAMS); + startParamMap = JSONUtils.toMap(startParamJson); + } + + // set start param into global params + if (startParamMap != null && startParamMap.size() > 0 + && processDefinition.getGlobalParamMap() != null) { + for (Map.Entry param : processDefinition.getGlobalParamMap().entrySet()) { + String val = startParamMap.get(param.getKey()); + if (val != null) { + param.setValue(val); + } + } + } + } + /** * get process tenant * there is tenant id in definition, use the tenant of the definition. @@ -690,7 +695,20 @@ public class ProcessService { processInstance = generateNewProcessInstance(processDefinition, command, cmdParam); } else { processInstance = this.findProcessInstanceDetailById(processInstanceId); + CommandType commandTypeIfComplement = getCommandTypeIfComplement(processInstance, command); + + // reset global params while repeat running is needed by cmdParam + if (commandTypeIfComplement == CommandType.REPEAT_RUNNING) { + setGlobalParamIfCommanded(processDefinition, cmdParam); + } + // Recalculate global parameters after rerun. + + processInstance.setGlobalParams(ParameterUtils.curingGlobalParams( + processDefinition.getGlobalParamMap(), + processDefinition.getGlobalParamList(), + commandTypeIfComplement, + processInstance.getScheduleTime())); } processDefinition = processDefineMapper.selectById(processInstance.getProcessDefinitionId()); processInstance.setProcessDefinition(processDefinition);