Browse Source

[Future-9396]Support output parameters transfer from parent workflow to child work flow (#9410)

* [Future-9396]Support output parameters transfer from parent workflow to child work flow

* fix note
3.0.0/version-upgrade
caishunfeng 3 years ago committed by GitHub
parent
commit
b285ccf930
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 42
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
  2. 79
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
  3. 16
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/AbstractParameters.java

42
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java

@ -17,10 +17,14 @@
package org.apache.dolphinscheduler.server.master.runner; package org.apache.dolphinscheduler.server.master.runner;
import com.google.common.collect.Lists; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import org.apache.commons.collections.CollectionUtils; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import org.apache.commons.lang.StringUtils; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
import org.apache.commons.lang.math.NumberUtils; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.FailureStrategy; import org.apache.dolphinscheduler.common.enums.FailureStrategy;
@ -67,8 +71,10 @@ import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils; import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue; import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.math.NumberUtils;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -86,13 +92,10 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; import org.slf4j.Logger;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; import org.slf4j.LoggerFactory;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING; import com.google.common.collect.Lists;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING;
/** /**
* master exec thread,split dag * master exec thread,split dag
@ -447,6 +450,7 @@ public class WorkflowExecuteThread {
if (taskInstance.getState().typeIsSuccess()) { if (taskInstance.getState().typeIsSuccess()) {
completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
processInstance.setVarPool(taskInstance.getVarPool());
processService.saveProcessInstance(processInstance); processService.saveProcessInstance(processInstance);
if (!processInstance.isBlocked()) { if (!processInstance.isBlocked()) {
submitPostNode(Long.toString(taskInstance.getTaskCode())); submitPostNode(Long.toString(taskInstance.getTaskCode()));
@ -1210,6 +1214,10 @@ public class WorkflowExecuteThread {
if (allProperty.size() > 0) { if (allProperty.size() > 0) {
taskInstance.setVarPool(JSONUtils.toJsonString(allProperty.values())); taskInstance.setVarPool(JSONUtils.toJsonString(allProperty.values()));
} }
} else {
if (StringUtils.isNotEmpty(processInstance.getVarPool())) {
taskInstance.setVarPool(processInstance.getVarPool());
}
} }
} }
@ -1279,10 +1287,10 @@ public class WorkflowExecuteThread {
taskInstances.add(task); taskInstances.add(task);
} }
//the end node of the branch of the dag //the end node of the branch of the dag
if (StringUtils.isNotEmpty(parentNodeCode) && dag.getEndNode().contains(parentNodeCode)){ if (StringUtils.isNotEmpty(parentNodeCode) && dag.getEndNode().contains(parentNodeCode)) {
TaskInstance endTaskInstance = taskInstanceMap.get(completeTaskMap.get(NumberUtils.toLong(parentNodeCode))); TaskInstance endTaskInstance = taskInstanceMap.get(completeTaskMap.get(NumberUtils.toLong(parentNodeCode)));
String taskInstanceVarPool = endTaskInstance.getVarPool(); String taskInstanceVarPool = endTaskInstance.getVarPool();
if(StringUtils.isNotEmpty(taskInstanceVarPool)) { if (StringUtils.isNotEmpty(taskInstanceVarPool)) {
Set<Property> taskProperties = JSONUtils.toList(taskInstanceVarPool, Property.class) Set<Property> taskProperties = JSONUtils.toList(taskInstanceVarPool, Property.class)
.stream().collect(Collectors.toSet()); .stream().collect(Collectors.toSet());
String processInstanceVarPool = processInstance.getVarPool(); String processInstanceVarPool = processInstance.getVarPool();
@ -1291,7 +1299,7 @@ public class WorkflowExecuteThread {
.stream().collect(Collectors.toSet()); .stream().collect(Collectors.toSet());
properties.addAll(taskProperties); properties.addAll(taskProperties);
processInstance.setVarPool(JSONUtils.toJsonString(properties)); processInstance.setVarPool(JSONUtils.toJsonString(properties));
}else{ } else {
processInstance.setVarPool(JSONUtils.toJsonString(taskProperties)); processInstance.setVarPool(JSONUtils.toJsonString(taskProperties));
} }
} }
@ -1637,7 +1645,7 @@ public class WorkflowExecuteThread {
stateEvent.setExecutionStatus(processInstance.getState()); stateEvent.setExecutionStatus(processInstance.getState());
stateEvent.setProcessInstanceId(this.processInstance.getId()); stateEvent.setProcessInstanceId(this.processInstance.getId());
stateEvent.setType(StateEventType.PROCESS_STATE_CHANGE); stateEvent.setType(StateEventType.PROCESS_STATE_CHANGE);
// this.processStateChangeHandler(stateEvent); // this.processStateChangeHandler(stateEvent);
// replace with `stateEvents`, make sure `WorkflowExecuteThread` can be deleted to avoid memory leaks // replace with `stateEvents`, make sure `WorkflowExecuteThread` can be deleted to avoid memory leaks
this.stateEvents.add(stateEvent); this.stateEvents.add(stateEvent);
} }

79
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -17,13 +17,18 @@
package org.apache.dolphinscheduler.service.process; package org.apache.dolphinscheduler.service.process;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import static java.util.stream.Collectors.toSet;
import com.fasterxml.jackson.core.type.TypeReference; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import com.fasterxml.jackson.databind.node.ObjectNode; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import com.google.common.collect.Lists; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
import org.apache.commons.collections.CollectionUtils; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS;
import org.apache.commons.lang.StringUtils; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
import org.apache.commons.lang.math.NumberUtils; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType; import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.CommandType;
@ -124,11 +129,10 @@ import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils; import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
import org.apache.dolphinscheduler.service.task.TaskPluginManager; import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.apache.dolphinscheduler.spi.enums.ResourceType; import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired; import org.apache.commons.lang.StringUtils;
import org.springframework.stereotype.Component; import org.apache.commons.lang.math.NumberUtils;
import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -143,17 +147,16 @@ import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static java.util.stream.Collectors.toSet; import org.slf4j.Logger;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; import org.slf4j.LoggerFactory;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; import org.springframework.beans.factory.annotation.Autowired;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS; import org.springframework.stereotype.Component;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS; import org.springframework.transaction.annotation.Transactional;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE; import com.fasterxml.jackson.core.type.TypeReference;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID; import com.fasterxml.jackson.databind.node.ObjectNode;
import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS; import com.google.common.collect.Lists;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID;
/** /**
* process relative dao that some mappers in this. * process relative dao that some mappers in this.
@ -163,7 +166,7 @@ public class ProcessServiceImpl implements ProcessService {
private final Logger logger = LoggerFactory.getLogger(getClass()); private final Logger logger = LoggerFactory.getLogger(getClass());
private final int[] stateArray = new int[]{ExecutionStatus.SUBMITTED_SUCCESS.ordinal(), private final int[] stateArray = new int[] {ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
ExecutionStatus.DISPATCH.ordinal(), ExecutionStatus.DISPATCH.ordinal(),
ExecutionStatus.RUNNING_EXECUTION.ordinal(), ExecutionStatus.RUNNING_EXECUTION.ordinal(),
ExecutionStatus.DELAY_EXECUTION.ordinal(), ExecutionStatus.DELAY_EXECUTION.ordinal(),
@ -1123,8 +1126,8 @@ public class ProcessServiceImpl implements ProcessService {
if (StringUtils.isNotEmpty(parentInstanceId)) { if (StringUtils.isNotEmpty(parentInstanceId)) {
ProcessInstance parentInstance = findProcessInstanceDetailById(Integer.parseInt(parentInstanceId)); ProcessInstance parentInstance = findProcessInstanceDetailById(Integer.parseInt(parentInstanceId));
if (parentInstance != null) { if (parentInstance != null) {
subProcessInstance.setGlobalParams( subProcessInstance.setGlobalParams(joinGlobalParams(parentInstance.getGlobalParams(), subProcessInstance.getGlobalParams()));
joinGlobalParams(parentInstance.getGlobalParams(), subProcessInstance.getGlobalParams())); subProcessInstance.setVarPool(joinVarPool(parentInstance.getVarPool(), subProcessInstance.getVarPool()));
this.saveProcessInstance(subProcessInstance); this.saveProcessInstance(subProcessInstance);
} else { } else {
logger.error("sub process command params error, cannot find parent instance: {} ", cmdParam); logger.error("sub process command params error, cannot find parent instance: {} ", cmdParam);
@ -1167,6 +1170,26 @@ public class ProcessServiceImpl implements ProcessService {
return JSONUtils.toJsonString(parentParams); return JSONUtils.toJsonString(parentParams);
} }
/**
* join parent var pool params into sub process.
* only the keys doesn't in sub process global would be joined.
*
* @param parentValPool
* @param subValPool
* @return
*/
private String joinVarPool(String parentValPool, String subValPool) {
List<Property> parentValPools = Lists.newArrayList(JSONUtils.toList(parentValPool, Property.class));
parentValPools = parentValPools.stream().filter(valPool -> valPool.getDirect() == Direct.OUT).collect(Collectors.toList());
List<Property> subValPools = Lists.newArrayList(JSONUtils.toList(subValPool, Property.class));
Set<String> parentValPoolKeys = parentValPools.stream().map(Property::getProp).collect(toSet());
List<Property> extraSubValPools = subValPools.stream().filter(sub -> !parentValPoolKeys.contains(sub.getProp())).collect(Collectors.toList());
parentValPools.addAll(extraSubValPools);
return JSONUtils.toJsonString(parentValPools);
}
/** /**
* initialize task instance * initialize task instance
* *
@ -2387,7 +2410,7 @@ public class ProcessServiceImpl implements ProcessService {
stream() stream()
.filter(t -> t.getId() != 0) .filter(t -> t.getId() != 0)
.map(ResourceInfo::getId) .map(ResourceInfo::getId)
.collect(Collectors.toSet()); .collect(toSet());
} }
if (CollectionUtils.isEmpty(resourceIds)) { if (CollectionUtils.isEmpty(resourceIds)) {
return StringUtils.EMPTY; return StringUtils.EMPTY;
@ -2549,7 +2572,7 @@ public class ProcessServiceImpl implements ProcessService {
Set<Long> processDefinitionCodes = processTaskRelationList Set<Long> processDefinitionCodes = processTaskRelationList
.stream() .stream()
.map(ProcessTaskRelation::getProcessDefinitionCode) .map(ProcessTaskRelation::getProcessDefinitionCode)
.collect(Collectors.toSet()); .collect(toSet());
List<ProcessDefinition> processDefinitionList = processDefineMapper.queryByCodes(processDefinitionCodes); List<ProcessDefinition> processDefinitionList = processDefineMapper.queryByCodes(processDefinitionCodes);
// check process definition is already online // check process definition is already online
for (ProcessDefinition processDefinition : processDefinitionList) { for (ProcessDefinition processDefinition : processDefinitionList) {

16
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/AbstractParameters.java

@ -77,7 +77,6 @@ public abstract class AbstractParameters implements IParameters {
public Map<String, Property> getLocalParametersMap() { public Map<String, Property> getLocalParametersMap() {
Map<String, Property> localParametersMaps = new LinkedHashMap<>(); Map<String, Property> localParametersMaps = new LinkedHashMap<>();
if (localParams != null) { if (localParams != null) {
for (Property property : localParams) { for (Property property : localParams) {
localParametersMaps.put(property.getProp(),property); localParametersMaps.put(property.getProp(),property);
} }
@ -113,14 +112,14 @@ public abstract class AbstractParameters implements IParameters {
} }
public void dealOutParam(String result) { public void dealOutParam(String result) {
if (org.apache.commons.collections4.CollectionUtils.isEmpty(localParams)) { if (CollectionUtils.isEmpty(localParams)) {
return; return;
} }
List<Property> outProperty = getOutProperty(localParams); List<Property> outProperty = getOutProperty(localParams);
if (org.apache.commons.collections4.CollectionUtils.isEmpty(outProperty)) { if (CollectionUtils.isEmpty(outProperty)) {
return; return;
} }
if (org.apache.dolphinscheduler.spi.utils.StringUtils.isEmpty(result)) { if (StringUtils.isEmpty(result)) {
varPool.addAll(outProperty); varPool.addAll(outProperty);
return; return;
} }
@ -130,9 +129,9 @@ public abstract class AbstractParameters implements IParameters {
} }
for (Property info : outProperty) { for (Property info : outProperty) {
String propValue = taskResult.get(info.getProp()); String propValue = taskResult.get(info.getProp());
if (org.apache.dolphinscheduler.spi.utils.StringUtils.isNotEmpty(propValue)) { if (StringUtils.isNotEmpty(propValue)) {
info.setValue(propValue); info.setValue(propValue);
varPool.add(info); addPropertyToValPool(info);
} }
} }
} }
@ -180,4 +179,9 @@ public abstract class AbstractParameters implements IParameters {
public ResourceParametersHelper getResources() { public ResourceParametersHelper getResources() {
return new ResourceParametersHelper(); return new ResourceParametersHelper();
} }
private void addPropertyToValPool(Property property) {
varPool.removeIf(p -> p.getProp().equals(property.getProp()));
varPool.add(property);
}
} }

Loading…
Cancel
Save