Browse Source

fix: start param for wf not work (#15544)

* fix: start param for wf not work

fix: #15280

* fix test
3.2.2-prepare
Jay Chung 11 months ago committed by GitHub
parent
commit
01eb8f834f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 39
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
  2. 12
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/CuringParamsService.java
  3. 27
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/CuringParamsServiceImpl.java
  4. 3
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  5. 31
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
  6. 44
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/expand/CuringParamsServiceTest.java

39
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.server.master.runner;
import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_COMPLEMENT_DATA_END_DATE;
import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_COMPLEMENT_DATA_START_DATE;
import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_FATHER_PARAMS;
import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_RECOVERY_START_NODE_STRING;
import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_START_NODES;
@ -29,8 +28,6 @@ import static org.apache.dolphinscheduler.common.constants.Constants.COMMA;
import static org.apache.dolphinscheduler.common.constants.Constants.DEFAULT_WORKER_GROUP;
import static org.apache.dolphinscheduler.common.constants.DateConstants.YYYY_MM_DD_HH_MM_SS;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING;
import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR;
import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
@ -860,7 +857,7 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable {
Map<String, String> cmdParam = JSONUtils.toMap(workflowInstance.getCommandParam());
if (cmdParam != null) {
// reset global params while there are start parameters
setGlobalParamIfCommanded(workflowDefinition, cmdParam);
processService.setGlobalParamIfCommanded(workflowDefinition, cmdParam);
Date start = null;
Date end = null;
@ -2057,40 +2054,6 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable {
return waitToRetryTaskInstanceMap;
}
private void setGlobalParamIfCommanded(ProcessDefinition processDefinition, Map<String, String> cmdParam) {
// get start params from command param
Map<String, String> startParamMap = new HashMap<>();
if (cmdParam.containsKey(CMD_PARAM_START_PARAMS)) {
String startParamJson = cmdParam.get(CMD_PARAM_START_PARAMS);
startParamMap = JSONUtils.toMap(startParamJson);
}
Map<String, String> fatherParamMap = new HashMap<>();
if (cmdParam.containsKey(CMD_PARAM_FATHER_PARAMS)) {
String fatherParamJson = cmdParam.get(CMD_PARAM_FATHER_PARAMS);
fatherParamMap = JSONUtils.toMap(fatherParamJson);
}
startParamMap.putAll(fatherParamMap);
// set start param into global params
Map<String, String> globalMap = processDefinition.getGlobalParamMap();
List<Property> globalParamList = processDefinition.getGlobalParamList();
if (startParamMap.size() > 0 && globalMap != null) {
// start param to overwrite global param
for (Map.Entry<String, String> param : globalMap.entrySet()) {
String val = startParamMap.get(param.getKey());
if (val != null) {
param.setValue(val);
}
}
// start param to create new global param if global not exist
for (Map.Entry<String, String> startParam : startParamMap.entrySet()) {
if (!globalMap.containsKey(startParam.getKey())) {
globalMap.put(startParam.getKey(), startParam.getValue());
globalParamList.add(new Property(startParam.getKey(), IN, VARCHAR, startParam.getValue()));
}
}
}
}
/**
* clear related data if command of process instance is EXECUTE_TASK
* 1. find all task code from sub dag (only contains related task)

12
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/CuringParamsService.java

@ -27,6 +27,8 @@ import java.util.Date;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import lombok.NonNull;
public interface CuringParamsService {
@ -80,6 +82,16 @@ public interface CuringParamsService {
@NonNull AbstractParameters parameters,
@NonNull ProcessInstance processInstance);
/**
* Parse workflow star parameter
*/
Map<String, Property> parseWorkflowStartParam(@Nullable Map<String, String> cmdParam);
/**
* Parse workflow father parameter
*/
Map<String, Property> parseWorkflowFatherParam(@Nullable Map<String, String> cmdParam);
/**
* preBuildBusinessParams
* @param processInstance

27
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/CuringParamsServiceImpl.java

@ -28,6 +28,7 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETE
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETER_WORKFLOW_DEFINITION_NAME;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETER_WORKFLOW_INSTANCE_ID;
import org.apache.dolphinscheduler.common.constants.CommandKeyConstants;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.constants.DateConstants;
import org.apache.dolphinscheduler.common.enums.CommandType;
@ -55,6 +56,8 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import lombok.NonNull;
import org.springframework.beans.factory.annotation.Autowired;
@ -141,6 +144,28 @@ public class CuringParamsServiceImpl implements CuringParamsService {
return JSONUtils.toJsonString(globalParamList);
}
@Override
public Map<String, Property> parseWorkflowStartParam(@Nullable Map<String, String> cmdParam) {
if (cmdParam == null || !cmdParam.containsKey(CommandKeyConstants.CMD_PARAM_START_PARAMS)) {
return new HashMap<>();
}
String startParamJson = cmdParam.get(CommandKeyConstants.CMD_PARAM_START_PARAMS);
Map<String, String> startParamMap = JSONUtils.toMap(startParamJson);
return startParamMap.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
entry -> new Property(entry.getKey(), Direct.IN, DataType.VARCHAR, entry.getValue())));
}
@Override
public Map<String, Property> parseWorkflowFatherParam(@Nullable Map<String, String> cmdParam) {
if (cmdParam == null || !cmdParam.containsKey(CommandKeyConstants.CMD_PARAM_FATHER_PARAMS)) {
return new HashMap<>();
}
String startParamJson = cmdParam.get(CommandKeyConstants.CMD_PARAM_FATHER_PARAMS);
Map<String, String> startParamMap = JSONUtils.toMap(startParamJson);
return startParamMap.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
entry -> new Property(entry.getKey(), Direct.IN, DataType.VARCHAR, entry.getValue())));
}
/**
* the global parameters and local parameters used in the worker will be prepared here, and built-in parameters.
*
@ -199,7 +224,7 @@ public class CuringParamsServiceImpl implements CuringParamsService {
}
if (MapUtils.isNotEmpty(cmdParam)) {
prepareParamsMap.putAll(ParameterUtils.getUserDefParamsMap(cmdParam));
prepareParamsMap.putAll(parseWorkflowStartParam(cmdParam));
}
Iterator<Map.Entry<String, Property>> iter = prepareParamsMap.entrySet().iterator();

3
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -50,6 +50,7 @@ import org.apache.dolphinscheduler.service.exceptions.CronParseException;
import org.apache.dolphinscheduler.service.model.TaskNode;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
@ -194,4 +195,6 @@ public interface ProcessService {
void forceProcessInstanceSuccessByTaskInstanceId(Integer taskInstanceId);
void saveCommandTrigger(Integer commandId, Integer processInstanceId);
void setGlobalParamIfCommanded(ProcessDefinition processDefinition, Map<String, String> cmdParam);
}

31
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -28,8 +28,6 @@ import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.C
import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
import static org.apache.dolphinscheduler.common.constants.Constants.LOCAL_PARAMS;
import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR;
import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID;
import org.apache.dolphinscheduler.common.constants.CommandKeyConstants;
@ -587,35 +585,32 @@ public class ProcessServiceImpl implements ProcessService {
return processInstance;
}
private void setGlobalParamIfCommanded(ProcessDefinition processDefinition, Map<String, String> cmdParam) {
@Override
public void setGlobalParamIfCommanded(ProcessDefinition processDefinition, Map<String, String> cmdParam) {
// get start params from command param
Map<String, String> startParamMap = new HashMap<>();
if (cmdParam != null && cmdParam.containsKey(CommandKeyConstants.CMD_PARAM_START_PARAMS)) {
String startParamJson = cmdParam.get(CommandKeyConstants.CMD_PARAM_START_PARAMS);
startParamMap = JSONUtils.toMap(startParamJson);
}
Map<String, String> fatherParamMap = new HashMap<>();
if (cmdParam != null && cmdParam.containsKey(CommandKeyConstants.CMD_PARAM_FATHER_PARAMS)) {
String fatherParamJson = cmdParam.get(CommandKeyConstants.CMD_PARAM_FATHER_PARAMS);
fatherParamMap = JSONUtils.toMap(fatherParamJson);
}
startParamMap.putAll(fatherParamMap);
Map<String, Property> fatherParam = curingGlobalParamsService.parseWorkflowFatherParam(cmdParam);
Map<String, Property> startParamMap = new HashMap<>(fatherParam);
Map<String, Property> currentStartParamMap = curingGlobalParamsService.parseWorkflowStartParam(cmdParam);
startParamMap.putAll(currentStartParamMap);
// set start param into global params
Map<String, String> globalMap = processDefinition.getGlobalParamMap();
List<Property> globalParamList = processDefinition.getGlobalParamList();
if (MapUtils.isNotEmpty(startParamMap) && globalMap != null) {
// start param to overwrite global param
for (Map.Entry<String, String> param : globalMap.entrySet()) {
String val = startParamMap.get(param.getKey());
String val = startParamMap.get(param.getKey()).getValue();
if (val != null) {
param.setValue(val);
}
}
// start param to create new global param if global not exist
for (Entry<String, String> startParam : startParamMap.entrySet()) {
for (Entry<String, Property> startParam : startParamMap.entrySet()) {
if (!globalMap.containsKey(startParam.getKey())) {
globalMap.put(startParam.getKey(), startParam.getValue());
globalParamList.add(new Property(startParam.getKey(), IN, VARCHAR, startParam.getValue()));
globalMap.put(startParam.getKey(), startParam.getValue().getValue());
globalParamList.add(startParam.getValue());
}
}
}

44
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/expand/CuringParamsServiceTest.java

@ -33,6 +33,8 @@ import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.SubProcessParameters;
import org.apache.commons.collections4.MapUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
@ -234,4 +236,46 @@ public class CuringParamsServiceTest {
Assertions.assertEquals(propertyMap.get(TaskConstants.PARAMETER_WORKFLOW_DEFINITION_CODE).getValue(),
String.valueOf(processDefinition.getCode()));
}
@Test
public void testParseWorkflowStartParam() {
Map<String, Property> result = new HashMap<>();
// empty cmd param
Map<String, String> startParamMap = new HashMap<>();
result = dolphinSchedulerCuringGlobalParams.parseWorkflowStartParam(startParamMap);
Assertions.assertTrue(MapUtils.isEmpty(result));
// without key
startParamMap.put("testStartParam", "$[yyyyMMdd]");
result = dolphinSchedulerCuringGlobalParams.parseWorkflowStartParam(startParamMap);
Assertions.assertTrue(MapUtils.isEmpty(result));
startParamMap.put("StartParams", "{\"param1\":\"11111\", \"param2\":\"22222\"}");
result = dolphinSchedulerCuringGlobalParams.parseWorkflowStartParam(startParamMap);
Assertions.assertTrue(MapUtils.isNotEmpty(result));
Assertions.assertEquals(2, result.keySet().size());
Assertions.assertEquals("11111", result.get("param1").getValue());
Assertions.assertEquals("22222", result.get("param2").getValue());
}
@Test
public void testParseWorkflowFatherParam() {
Map<String, Property> result = new HashMap<>();
// empty cmd param
Map<String, String> startParamMap = new HashMap<>();
result = dolphinSchedulerCuringGlobalParams.parseWorkflowFatherParam(startParamMap);
Assertions.assertTrue(MapUtils.isEmpty(result));
// without key
startParamMap.put("testfatherParams", "$[yyyyMMdd]");
result = dolphinSchedulerCuringGlobalParams.parseWorkflowFatherParam(startParamMap);
Assertions.assertTrue(MapUtils.isEmpty(result));
startParamMap.put("fatherParams", "{\"param1\":\"11111\", \"param2\":\"22222\"}");
result = dolphinSchedulerCuringGlobalParams.parseWorkflowFatherParam(startParamMap);
Assertions.assertTrue(MapUtils.isNotEmpty(result));
Assertions.assertEquals(2, result.keySet().size());
Assertions.assertEquals("11111", result.get("param1").getValue());
Assertions.assertEquals("22222", result.get("param2").getValue());
}
}

Loading…
Cancel
Save