From 01eb8f834ff4abf662b704379f1bcd019f4d2d74 Mon Sep 17 00:00:00 2001 From: Jay Chung Date: Mon, 5 Feb 2024 16:17:30 +0800 Subject: [PATCH] fix: start param for wf not work (#15544) * fix: start param for wf not work fix: #15280 * fix test --- .../runner/WorkflowExecuteRunnable.java | 39 +--------------- .../service/expand/CuringParamsService.java | 12 +++++ .../expand/CuringParamsServiceImpl.java | 27 +++++++++++- .../service/process/ProcessService.java | 3 ++ .../service/process/ProcessServiceImpl.java | 31 ++++++------- .../expand/CuringParamsServiceTest.java | 44 +++++++++++++++++++ 6 files changed, 99 insertions(+), 57 deletions(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index 2db49c6f09..fa658c04f1 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.server.master.runner; import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_COMPLEMENT_DATA_END_DATE; import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST; import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_COMPLEMENT_DATA_START_DATE; -import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_FATHER_PARAMS; import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_RECOVERY_START_NODE_STRING; import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_RECOVER_PROCESS_ID_STRING; import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_START_NODES; @@ -29,8 +28,6 @@ import static org.apache.dolphinscheduler.common.constants.Constants.COMMA; import static org.apache.dolphinscheduler.common.constants.Constants.DEFAULT_WORKER_GROUP; import static org.apache.dolphinscheduler.common.constants.DateConstants.YYYY_MM_DD_HH_MM_SS; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING; -import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR; -import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN; import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; @@ -860,7 +857,7 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable { Map cmdParam = JSONUtils.toMap(workflowInstance.getCommandParam()); if (cmdParam != null) { // reset global params while there are start parameters - setGlobalParamIfCommanded(workflowDefinition, cmdParam); + processService.setGlobalParamIfCommanded(workflowDefinition, cmdParam); Date start = null; Date end = null; @@ -2057,40 +2054,6 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable { return waitToRetryTaskInstanceMap; } - private void setGlobalParamIfCommanded(ProcessDefinition processDefinition, Map cmdParam) { - // get start params from command param - Map startParamMap = new HashMap<>(); - if (cmdParam.containsKey(CMD_PARAM_START_PARAMS)) { - String startParamJson = cmdParam.get(CMD_PARAM_START_PARAMS); - startParamMap = JSONUtils.toMap(startParamJson); - } - Map fatherParamMap = new HashMap<>(); - if (cmdParam.containsKey(CMD_PARAM_FATHER_PARAMS)) { - String fatherParamJson = cmdParam.get(CMD_PARAM_FATHER_PARAMS); - fatherParamMap = JSONUtils.toMap(fatherParamJson); - } - startParamMap.putAll(fatherParamMap); - // set start param into global params - Map globalMap = processDefinition.getGlobalParamMap(); - List globalParamList = processDefinition.getGlobalParamList(); - if (startParamMap.size() > 0 && globalMap != null) { - // start param to overwrite global param - for (Map.Entry param : globalMap.entrySet()) { - String val = startParamMap.get(param.getKey()); - if (val != null) { - param.setValue(val); - } - } - // start param to create new global param if global not exist - for (Map.Entry startParam : startParamMap.entrySet()) { - if (!globalMap.containsKey(startParam.getKey())) { - globalMap.put(startParam.getKey(), startParam.getValue()); - globalParamList.add(new Property(startParam.getKey(), IN, VARCHAR, startParam.getValue())); - } - } - } - } - /** * clear related data if command of process instance is EXECUTE_TASK * 1. find all task code from sub dag (only contains related task) diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/CuringParamsService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/CuringParamsService.java index d6d50ace80..eff0a3071f 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/CuringParamsService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/CuringParamsService.java @@ -27,6 +27,8 @@ import java.util.Date; import java.util.List; import java.util.Map; +import javax.annotation.Nullable; + import lombok.NonNull; public interface CuringParamsService { @@ -80,6 +82,16 @@ public interface CuringParamsService { @NonNull AbstractParameters parameters, @NonNull ProcessInstance processInstance); + /** + * Parse workflow star parameter + */ + Map parseWorkflowStartParam(@Nullable Map cmdParam); + + /** + * Parse workflow father parameter + */ + Map parseWorkflowFatherParam(@Nullable Map cmdParam); + /** * preBuildBusinessParams * @param processInstance diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/CuringParamsServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/CuringParamsServiceImpl.java index e23c07b1b7..afcfae3fd6 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/CuringParamsServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/CuringParamsServiceImpl.java @@ -28,6 +28,7 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETE import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETER_WORKFLOW_DEFINITION_NAME; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETER_WORKFLOW_INSTANCE_ID; +import org.apache.dolphinscheduler.common.constants.CommandKeyConstants; import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.constants.DateConstants; import org.apache.dolphinscheduler.common.enums.CommandType; @@ -55,6 +56,8 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import javax.annotation.Nullable; + import lombok.NonNull; import org.springframework.beans.factory.annotation.Autowired; @@ -141,6 +144,28 @@ public class CuringParamsServiceImpl implements CuringParamsService { return JSONUtils.toJsonString(globalParamList); } + @Override + public Map parseWorkflowStartParam(@Nullable Map cmdParam) { + if (cmdParam == null || !cmdParam.containsKey(CommandKeyConstants.CMD_PARAM_START_PARAMS)) { + return new HashMap<>(); + } + String startParamJson = cmdParam.get(CommandKeyConstants.CMD_PARAM_START_PARAMS); + Map startParamMap = JSONUtils.toMap(startParamJson); + return startParamMap.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, + entry -> new Property(entry.getKey(), Direct.IN, DataType.VARCHAR, entry.getValue()))); + } + + @Override + public Map parseWorkflowFatherParam(@Nullable Map cmdParam) { + if (cmdParam == null || !cmdParam.containsKey(CommandKeyConstants.CMD_PARAM_FATHER_PARAMS)) { + return new HashMap<>(); + } + String startParamJson = cmdParam.get(CommandKeyConstants.CMD_PARAM_FATHER_PARAMS); + Map startParamMap = JSONUtils.toMap(startParamJson); + return startParamMap.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, + entry -> new Property(entry.getKey(), Direct.IN, DataType.VARCHAR, entry.getValue()))); + } + /** * the global parameters and local parameters used in the worker will be prepared here, and built-in parameters. * @@ -199,7 +224,7 @@ public class CuringParamsServiceImpl implements CuringParamsService { } if (MapUtils.isNotEmpty(cmdParam)) { - prepareParamsMap.putAll(ParameterUtils.getUserDefParamsMap(cmdParam)); + prepareParamsMap.putAll(parseWorkflowStartParam(cmdParam)); } Iterator> iter = prepareParamsMap.entrySet().iterator(); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 091d6353fb..ba4def7e06 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -50,6 +50,7 @@ import org.apache.dolphinscheduler.service.exceptions.CronParseException; import org.apache.dolphinscheduler.service.model.TaskNode; import java.util.List; +import java.util.Map; import java.util.Optional; import javax.annotation.Nullable; @@ -194,4 +195,6 @@ public interface ProcessService { void forceProcessInstanceSuccessByTaskInstanceId(Integer taskInstanceId); void saveCommandTrigger(Integer commandId, Integer processInstanceId); + + void setGlobalParamIfCommanded(ProcessDefinition processDefinition, Map cmdParam); } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index 93b257ba27..130a1c0993 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -28,8 +28,6 @@ import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.C import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE; import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID; import static org.apache.dolphinscheduler.common.constants.Constants.LOCAL_PARAMS; -import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR; -import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN; import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID; import org.apache.dolphinscheduler.common.constants.CommandKeyConstants; @@ -587,35 +585,32 @@ public class ProcessServiceImpl implements ProcessService { return processInstance; } - private void setGlobalParamIfCommanded(ProcessDefinition processDefinition, Map cmdParam) { + @Override + public void setGlobalParamIfCommanded(ProcessDefinition processDefinition, Map cmdParam) { + // get start params from command param - Map startParamMap = new HashMap<>(); - if (cmdParam != null && cmdParam.containsKey(CommandKeyConstants.CMD_PARAM_START_PARAMS)) { - String startParamJson = cmdParam.get(CommandKeyConstants.CMD_PARAM_START_PARAMS); - startParamMap = JSONUtils.toMap(startParamJson); - } - Map fatherParamMap = new HashMap<>(); - if (cmdParam != null && cmdParam.containsKey(CommandKeyConstants.CMD_PARAM_FATHER_PARAMS)) { - String fatherParamJson = cmdParam.get(CommandKeyConstants.CMD_PARAM_FATHER_PARAMS); - fatherParamMap = JSONUtils.toMap(fatherParamJson); - } - startParamMap.putAll(fatherParamMap); + Map fatherParam = curingGlobalParamsService.parseWorkflowFatherParam(cmdParam); + Map startParamMap = new HashMap<>(fatherParam); + + Map currentStartParamMap = curingGlobalParamsService.parseWorkflowStartParam(cmdParam); + startParamMap.putAll(currentStartParamMap); + // set start param into global params Map globalMap = processDefinition.getGlobalParamMap(); List globalParamList = processDefinition.getGlobalParamList(); if (MapUtils.isNotEmpty(startParamMap) && globalMap != null) { // start param to overwrite global param for (Map.Entry param : globalMap.entrySet()) { - String val = startParamMap.get(param.getKey()); + String val = startParamMap.get(param.getKey()).getValue(); if (val != null) { param.setValue(val); } } // start param to create new global param if global not exist - for (Entry startParam : startParamMap.entrySet()) { + for (Entry startParam : startParamMap.entrySet()) { if (!globalMap.containsKey(startParam.getKey())) { - globalMap.put(startParam.getKey(), startParam.getValue()); - globalParamList.add(new Property(startParam.getKey(), IN, VARCHAR, startParam.getValue())); + globalMap.put(startParam.getKey(), startParam.getValue().getValue()); + globalParamList.add(startParam.getValue()); } } } diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/expand/CuringParamsServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/expand/CuringParamsServiceTest.java index 29b3ed58c1..96c9503249 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/expand/CuringParamsServiceTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/expand/CuringParamsServiceTest.java @@ -33,6 +33,8 @@ import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.SubProcessParameters; +import org.apache.commons.collections4.MapUtils; + import java.util.ArrayList; import java.util.Collections; import java.util.Date; @@ -234,4 +236,46 @@ public class CuringParamsServiceTest { Assertions.assertEquals(propertyMap.get(TaskConstants.PARAMETER_WORKFLOW_DEFINITION_CODE).getValue(), String.valueOf(processDefinition.getCode())); } + + @Test + public void testParseWorkflowStartParam() { + Map result = new HashMap<>(); + // empty cmd param + Map startParamMap = new HashMap<>(); + result = dolphinSchedulerCuringGlobalParams.parseWorkflowStartParam(startParamMap); + Assertions.assertTrue(MapUtils.isEmpty(result)); + + // without key + startParamMap.put("testStartParam", "$[yyyyMMdd]"); + result = dolphinSchedulerCuringGlobalParams.parseWorkflowStartParam(startParamMap); + Assertions.assertTrue(MapUtils.isEmpty(result)); + + startParamMap.put("StartParams", "{\"param1\":\"11111\", \"param2\":\"22222\"}"); + result = dolphinSchedulerCuringGlobalParams.parseWorkflowStartParam(startParamMap); + Assertions.assertTrue(MapUtils.isNotEmpty(result)); + Assertions.assertEquals(2, result.keySet().size()); + Assertions.assertEquals("11111", result.get("param1").getValue()); + Assertions.assertEquals("22222", result.get("param2").getValue()); + } + + @Test + public void testParseWorkflowFatherParam() { + Map result = new HashMap<>(); + // empty cmd param + Map startParamMap = new HashMap<>(); + result = dolphinSchedulerCuringGlobalParams.parseWorkflowFatherParam(startParamMap); + Assertions.assertTrue(MapUtils.isEmpty(result)); + + // without key + startParamMap.put("testfatherParams", "$[yyyyMMdd]"); + result = dolphinSchedulerCuringGlobalParams.parseWorkflowFatherParam(startParamMap); + Assertions.assertTrue(MapUtils.isEmpty(result)); + + startParamMap.put("fatherParams", "{\"param1\":\"11111\", \"param2\":\"22222\"}"); + result = dolphinSchedulerCuringGlobalParams.parseWorkflowFatherParam(startParamMap); + Assertions.assertTrue(MapUtils.isNotEmpty(result)); + Assertions.assertEquals(2, result.keySet().size()); + Assertions.assertEquals("11111", result.get("param1").getValue()); + Assertions.assertEquals("22222", result.get("param2").getValue()); + } }