From b285ccf9306cc5e402a184dd24da82ca7a08689c Mon Sep 17 00:00:00 2001 From: caishunfeng Date: Mon, 11 Apr 2022 20:03:16 +0800 Subject: [PATCH] [Future-9396]Support output parameters transfer from parent workflow to child work flow (#9410) * [Future-9396]Support output parameters transfer from parent workflow to child work flow * fix note --- .../master/runner/WorkflowExecuteThread.java | 54 +-- .../service/process/ProcessServiceImpl.java | 313 ++++++++++-------- .../api/parameters/AbstractParameters.java | 16 +- 3 files changed, 209 insertions(+), 174 deletions(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java index 9c1aa90c79..75a621bd43 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java @@ -17,10 +17,14 @@ package org.apache.dolphinscheduler.server.master.runner; -import com.google.common.collect.Lists; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.commons.lang.math.NumberUtils; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES; +import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING; + import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.FailureStrategy; @@ -67,8 +71,10 @@ import org.apache.dolphinscheduler.service.alert.ProcessAlertManager; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.quartz.cron.CronUtils; import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.math.NumberUtils; import java.util.ArrayList; import java.util.Arrays; @@ -86,13 +92,10 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; -import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; -import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES; -import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; /** * master exec thread,split dag @@ -447,6 +450,7 @@ public class WorkflowExecuteThread { if (taskInstance.getState().typeIsSuccess()) { completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); + processInstance.setVarPool(taskInstance.getVarPool()); processService.saveProcessInstance(processInstance); if (!processInstance.isBlocked()) { submitPostNode(Long.toString(taskInstance.getTaskCode())); @@ -1210,6 +1214,10 @@ public class WorkflowExecuteThread { if (allProperty.size() > 0) { taskInstance.setVarPool(JSONUtils.toJsonString(allProperty.values())); } + } else { + if (StringUtils.isNotEmpty(processInstance.getVarPool())) { + taskInstance.setVarPool(processInstance.getVarPool()); + } } } @@ -1279,19 +1287,19 @@ public class WorkflowExecuteThread { taskInstances.add(task); } //the end node of the branch of the dag - if (StringUtils.isNotEmpty(parentNodeCode) && dag.getEndNode().contains(parentNodeCode)){ + if (StringUtils.isNotEmpty(parentNodeCode) && dag.getEndNode().contains(parentNodeCode)) { TaskInstance endTaskInstance = taskInstanceMap.get(completeTaskMap.get(NumberUtils.toLong(parentNodeCode))); String taskInstanceVarPool = endTaskInstance.getVarPool(); - if(StringUtils.isNotEmpty(taskInstanceVarPool)) { + if (StringUtils.isNotEmpty(taskInstanceVarPool)) { Set taskProperties = JSONUtils.toList(taskInstanceVarPool, Property.class) - .stream().collect(Collectors.toSet()); + .stream().collect(Collectors.toSet()); String processInstanceVarPool = processInstance.getVarPool(); if (StringUtils.isNotEmpty(processInstanceVarPool)) { Set properties = JSONUtils.toList(processInstanceVarPool, Property.class) - .stream().collect(Collectors.toSet()); + .stream().collect(Collectors.toSet()); properties.addAll(taskProperties); processInstance.setVarPool(JSONUtils.toJsonString(properties)); - }else{ + } else { processInstance.setVarPool(JSONUtils.toJsonString(taskProperties)); } } @@ -1637,7 +1645,7 @@ public class WorkflowExecuteThread { stateEvent.setExecutionStatus(processInstance.getState()); stateEvent.setProcessInstanceId(this.processInstance.getId()); stateEvent.setType(StateEventType.PROCESS_STATE_CHANGE); -// this.processStateChangeHandler(stateEvent); + // this.processStateChangeHandler(stateEvent); // replace with `stateEvents`, make sure `WorkflowExecuteThread` can be deleted to avoid memory leaks this.stateEvents.add(stateEvent); } @@ -1650,10 +1658,10 @@ public class WorkflowExecuteThread { ExecutionStatus state = stateEvent.getExecutionStatus(); if (processInstance.getState() != state) { logger.info( - "work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}", - processInstance.getId(), processInstance.getName(), - processInstance.getState(), state, - processInstance.getCommandType()); + "work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}", + processInstance.getId(), processInstance.getName(), + processInstance.getState(), state, + processInstance.getCommandType()); processInstance.setState(state); if (state.typeIsFinished()) { diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index e6d03a8aaa..7fee2416df 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -17,13 +17,18 @@ package org.apache.dolphinscheduler.service.process; -import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.collect.Lists; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.commons.lang.math.NumberUtils; +import static java.util.stream.Collectors.toSet; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID; +import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS; +import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID; + import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.AuthorizationType; import org.apache.dolphinscheduler.common.enums.CommandType; @@ -124,11 +129,10 @@ import org.apache.dolphinscheduler.service.log.LogClientService; import org.apache.dolphinscheduler.service.quartz.cron.CronUtils; import org.apache.dolphinscheduler.service.task.TaskPluginManager; import org.apache.dolphinscheduler.spi.enums.ResourceType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; -import org.springframework.transaction.annotation.Transactional; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.math.NumberUtils; import java.util.ArrayList; import java.util.Arrays; @@ -143,17 +147,16 @@ import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; -import static java.util.stream.Collectors.toSet; -import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; -import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID; -import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS; -import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.Lists; /** * process relative dao that some mappers in this. @@ -163,12 +166,12 @@ public class ProcessServiceImpl implements ProcessService { private final Logger logger = LoggerFactory.getLogger(getClass()); - private final int[] stateArray = new int[]{ExecutionStatus.SUBMITTED_SUCCESS.ordinal(), - ExecutionStatus.DISPATCH.ordinal(), - ExecutionStatus.RUNNING_EXECUTION.ordinal(), - ExecutionStatus.DELAY_EXECUTION.ordinal(), - ExecutionStatus.READY_PAUSE.ordinal(), - ExecutionStatus.READY_STOP.ordinal()}; + private final int[] stateArray = new int[] {ExecutionStatus.SUBMITTED_SUCCESS.ordinal(), + ExecutionStatus.DISPATCH.ordinal(), + ExecutionStatus.RUNNING_EXECUTION.ordinal(), + ExecutionStatus.DELAY_EXECUTION.ordinal(), + ExecutionStatus.READY_PAUSE.ordinal(), + ExecutionStatus.READY_STOP.ordinal()}; @Autowired private UserMapper userMapper; @@ -309,7 +312,7 @@ public class ProcessServiceImpl implements ProcessService { if (processDefinition.getExecutionType().typeIsSerialWait()) { while (true) { List runningProcessInstances = this.processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(processInstance.getProcessDefinitionCode(), - Constants.RUNNING_PROCESS_STATE, processInstance.getId()); + Constants.RUNNING_PROCESS_STATE, processInstance.getId()); if (CollectionUtils.isEmpty(runningProcessInstances)) { processInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS); saveProcessInstance(processInstance); @@ -322,14 +325,14 @@ public class ProcessServiceImpl implements ProcessService { } } else if (processDefinition.getExecutionType().typeIsSerialDiscard()) { List runningProcessInstances = this.processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(processInstance.getProcessDefinitionCode(), - Constants.RUNNING_PROCESS_STATE, processInstance.getId()); + Constants.RUNNING_PROCESS_STATE, processInstance.getId()); if (CollectionUtils.isEmpty(runningProcessInstances)) { processInstance.setState(ExecutionStatus.STOP); saveProcessInstance(processInstance); } } else if (processDefinition.getExecutionType().typeIsSerialPriority()) { List runningProcessInstances = this.processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(processInstance.getProcessDefinitionCode(), - Constants.RUNNING_PROCESS_STATE, processInstance.getId()); + Constants.RUNNING_PROCESS_STATE, processInstance.getId()); if (CollectionUtils.isNotEmpty(runningProcessInstances)) { for (ProcessInstance info : runningProcessInstances) { info.setCommandType(CommandType.STOP); @@ -342,7 +345,7 @@ public class ProcessServiceImpl implements ProcessService { String address = host.split(":")[0]; int port = Integer.parseInt(host.split(":")[1]); StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand( - info.getId(), 0, info.getState(), info.getId(), 0 + info.getId(), 0, info.getState(), info.getId(), 0 ); try { stateEventCallbackService.sendResult(address, port, stateEventChangeCommand.convert2Command()); @@ -662,21 +665,21 @@ public class ProcessServiceImpl implements ProcessService { // process instance quit by "waiting thread" state if (originCommand == null) { Command command = new Command( - CommandType.RECOVER_WAITING_THREAD, - processInstance.getTaskDependType(), - processInstance.getFailureStrategy(), - processInstance.getExecutorId(), - processInstance.getProcessDefinition().getCode(), - JSONUtils.toJsonString(cmdParam), - processInstance.getWarningType(), - processInstance.getWarningGroupId(), - processInstance.getScheduleTime(), - processInstance.getWorkerGroup(), - processInstance.getEnvironmentCode(), - processInstance.getProcessInstancePriority(), - processInstance.getDryRun(), - processInstance.getId(), - processInstance.getProcessDefinitionVersion() + CommandType.RECOVER_WAITING_THREAD, + processInstance.getTaskDependType(), + processInstance.getFailureStrategy(), + processInstance.getExecutorId(), + processInstance.getProcessDefinition().getCode(), + JSONUtils.toJsonString(cmdParam), + processInstance.getWarningType(), + processInstance.getWarningGroupId(), + processInstance.getScheduleTime(), + processInstance.getWorkerGroup(), + processInstance.getEnvironmentCode(), + processInstance.getProcessInstancePriority(), + processInstance.getDryRun(), + processInstance.getId(), + processInstance.getProcessDefinitionVersion() ); saveCommand(command); return; @@ -708,8 +711,8 @@ public class ProcessServiceImpl implements ProcessService { private Date getScheduleTime(Command command, Map cmdParam) { Date scheduleTime = command.getScheduleTime(); if (scheduleTime == null - && cmdParam != null - && cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)) { + && cmdParam != null + && cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)) { Date start = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE)); Date end = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE)); @@ -720,7 +723,7 @@ public class ProcessServiceImpl implements ProcessService { scheduleTime = complementDateList.get(0); } else { logger.error("set scheduler time error: complement date list is empty, command: {}", - command.toString()); + command.toString()); } } return scheduleTime; @@ -769,10 +772,10 @@ public class ProcessServiceImpl implements ProcessService { // curing global params processInstance.setGlobalParams(ParameterUtils.curingGlobalParams( - processDefinition.getGlobalParamMap(), - processDefinition.getGlobalParamList(), - getCommandTypeIfComplement(processInstance, command), - processInstance.getScheduleTime())); + processDefinition.getGlobalParamMap(), + processDefinition.getGlobalParamList(), + getCommandTypeIfComplement(processInstance, command), + processInstance.getScheduleTime())); // set process instance priority processInstance.setProcessInstancePriority(command.getProcessInstancePriority()); @@ -799,7 +802,7 @@ public class ProcessServiceImpl implements ProcessService { startParamMap.putAll(fatherParamMap); // set start param into global params if (startParamMap.size() > 0 - && processDefinition.getGlobalParamMap() != null) { + && processDefinition.getGlobalParamMap() != null) { for (Map.Entry param : processDefinition.getGlobalParamMap().entrySet()) { String val = startParamMap.get(param.getKey()); if (val != null) { @@ -863,8 +866,8 @@ public class ProcessServiceImpl implements ProcessService { private Boolean checkCmdParam(Command command, Map cmdParam) { if (command.getTaskDependType() == TaskDependType.TASK_ONLY || command.getTaskDependType() == TaskDependType.TASK_PRE) { if (cmdParam == null - || !cmdParam.containsKey(Constants.CMD_PARAM_START_NODES) - || cmdParam.get(Constants.CMD_PARAM_START_NODES).isEmpty()) { + || !cmdParam.containsKey(Constants.CMD_PARAM_START_NODES) + || cmdParam.get(Constants.CMD_PARAM_START_NODES).isEmpty()) { logger.error("command node depend type is {}, but start nodes is null ", command.getTaskDependType()); return false; } @@ -908,10 +911,10 @@ public class ProcessServiceImpl implements ProcessService { // Recalculate global parameters after rerun. processInstance.setGlobalParams(ParameterUtils.curingGlobalParams( - processDefinition.getGlobalParamMap(), - processDefinition.getGlobalParamList(), - commandTypeIfComplement, - processInstance.getScheduleTime())); + processDefinition.getGlobalParamMap(), + processDefinition.getGlobalParamList(), + commandTypeIfComplement, + processInstance.getScheduleTime())); processInstance.setProcessDefinition(processDefinition); } //reset command parameter @@ -954,7 +957,7 @@ public class ProcessServiceImpl implements ProcessService { initTaskInstance(this.findTaskInstanceById(taskId)); } cmdParam.put(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING, - String.join(Constants.COMMA, convertIntListToString(failedList))); + String.join(Constants.COMMA, convertIntListToString(failedList))); processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam)); processInstance.setRunTimes(runTime + 1); break; @@ -967,7 +970,7 @@ public class ProcessServiceImpl implements ProcessService { cmdParam.remove(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING); List suspendedNodeList = this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.PAUSE); List stopNodeList = findTaskIdByInstanceState(processInstance.getId(), - ExecutionStatus.KILL); + ExecutionStatus.KILL); suspendedNodeList.addAll(stopNodeList); for (Integer taskId : suspendedNodeList) { // initialize the pause state @@ -1044,7 +1047,7 @@ public class ProcessServiceImpl implements ProcessService { } return processDefineLogMapper.queryByDefinitionCodeAndVersion( - processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion()); + processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion()); } } @@ -1086,13 +1089,13 @@ public class ProcessServiceImpl implements ProcessService { List complementDate = CronUtils.getSelfFireDateList(start, end, listSchedules); if (complementDate.size() > 0 - && Flag.NO == processInstance.getIsSubProcess()) { + && Flag.NO == processInstance.getIsSubProcess()) { processInstance.setScheduleTime(complementDate.get(0)); } processInstance.setGlobalParams(ParameterUtils.curingGlobalParams( - processDefinition.getGlobalParamMap(), - processDefinition.getGlobalParamList(), - CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime())); + processDefinition.getGlobalParamMap(), + processDefinition.getGlobalParamList(), + CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime())); } /** @@ -1111,7 +1114,7 @@ public class ProcessServiceImpl implements ProcessService { Map paramMap = JSONUtils.toMap(cmdParam); // write sub process id into cmd param. if (paramMap.containsKey(CMD_PARAM_SUB_PROCESS) - && CMD_PARAM_EMPTY_SUB_PROCESS.equals(paramMap.get(CMD_PARAM_SUB_PROCESS))) { + && CMD_PARAM_EMPTY_SUB_PROCESS.equals(paramMap.get(CMD_PARAM_SUB_PROCESS))) { paramMap.remove(CMD_PARAM_SUB_PROCESS); paramMap.put(CMD_PARAM_SUB_PROCESS, String.valueOf(subProcessInstance.getId())); subProcessInstance.setCommandParam(JSONUtils.toJsonString(paramMap)); @@ -1123,8 +1126,8 @@ public class ProcessServiceImpl implements ProcessService { if (StringUtils.isNotEmpty(parentInstanceId)) { ProcessInstance parentInstance = findProcessInstanceDetailById(Integer.parseInt(parentInstanceId)); if (parentInstance != null) { - subProcessInstance.setGlobalParams( - joinGlobalParams(parentInstance.getGlobalParams(), subProcessInstance.getGlobalParams())); + subProcessInstance.setGlobalParams(joinGlobalParams(parentInstance.getGlobalParams(), subProcessInstance.getGlobalParams())); + subProcessInstance.setVarPool(joinVarPool(parentInstance.getVarPool(), subProcessInstance.getVarPool())); this.saveProcessInstance(subProcessInstance); } else { logger.error("sub process command params error, cannot find parent instance: {} ", cmdParam); @@ -1162,11 +1165,31 @@ public class ProcessServiceImpl implements ProcessService { // e.g. the subProp's type is not equals with parent, or subProp's direct is not equals with parent // It's suggested to add node name in property, this kind of problem can be solved. List extraSubParams = subParams.stream() - .filter(subProp -> !parentParamKeys.contains(subProp.getProp())).collect(Collectors.toList()); + .filter(subProp -> !parentParamKeys.contains(subProp.getProp())).collect(Collectors.toList()); parentParams.addAll(extraSubParams); return JSONUtils.toJsonString(parentParams); } + /** + * join parent var pool params into sub process. + * only the keys doesn't in sub process global would be joined. + * + * @param parentValPool + * @param subValPool + * @return + */ + private String joinVarPool(String parentValPool, String subValPool) { + List parentValPools = Lists.newArrayList(JSONUtils.toList(parentValPool, Property.class)); + parentValPools = parentValPools.stream().filter(valPool -> valPool.getDirect() == Direct.OUT).collect(Collectors.toList()); + + List subValPools = Lists.newArrayList(JSONUtils.toList(subValPool, Property.class)); + + Set parentValPoolKeys = parentValPools.stream().map(Property::getProp).collect(toSet()); + List extraSubValPools = subValPools.stream().filter(sub -> !parentValPoolKeys.contains(sub.getProp())).collect(Collectors.toList()); + parentValPools.addAll(extraSubValPools); + return JSONUtils.toJsonString(parentValPools); + } + /** * initialize task instance * @@ -1175,7 +1198,7 @@ public class ProcessServiceImpl implements ProcessService { private void initTaskInstance(TaskInstance taskInstance) { if (!taskInstance.isSubProcess() - && (taskInstance.getState().typeIsCancel() || taskInstance.getState().typeIsFailure())) { + && (taskInstance.getState().typeIsCancel() || taskInstance.getState().typeIsFailure())) { taskInstance.setFlag(Flag.NO); updateTaskInstance(taskInstance); return; @@ -1220,12 +1243,12 @@ public class ProcessServiceImpl implements ProcessService { @Transactional(rollbackFor = Exception.class) public TaskInstance submitTask(ProcessInstance processInstance, TaskInstance taskInstance) { logger.info("start submit task : {}, instance id:{}, state: {}", - taskInstance.getName(), taskInstance.getProcessInstanceId(), processInstance.getState()); + taskInstance.getName(), taskInstance.getProcessInstanceId(), processInstance.getState()); //submit to db TaskInstance task = submitTaskInstanceToDB(taskInstance, processInstance); if (task == null) { logger.error("end submit task to db error, task name:{}, process id:{} state: {} ", - taskInstance.getName(), taskInstance.getProcessInstance(), processInstance.getState()); + taskInstance.getName(), taskInstance.getProcessInstance(), processInstance.getState()); return null; } @@ -1234,7 +1257,7 @@ public class ProcessServiceImpl implements ProcessService { } logger.info("end submit task to db successfully:{} {} state:{} complete, instance id:{} state: {} ", - taskInstance.getId(), taskInstance.getName(), task.getState(), processInstance.getId(), processInstance.getState()); + taskInstance.getId(), taskInstance.getName(), task.getState(), processInstance.getId(), processInstance.getState()); return task; } @@ -1292,7 +1315,7 @@ public class ProcessServiceImpl implements ProcessService { } } logger.info("sub process instance is not found,parent task:{},parent instance:{}", - parentTask.getId(), parentProcessInstance.getId()); + parentTask.getId(), parentProcessInstance.getId()); return null; } @@ -1390,21 +1413,21 @@ public class ProcessServiceImpl implements ProcessService { String processParam = getSubWorkFlowParam(instanceMap, parentProcessInstance, fatherParams); int subProcessInstanceId = childInstance == null ? 0 : childInstance.getId(); return new Command( - commandType, - TaskDependType.TASK_POST, - parentProcessInstance.getFailureStrategy(), - parentProcessInstance.getExecutorId(), - subProcessDefinition.getCode(), - processParam, - parentProcessInstance.getWarningType(), - parentProcessInstance.getWarningGroupId(), - parentProcessInstance.getScheduleTime(), - task.getWorkerGroup(), - task.getEnvironmentCode(), - parentProcessInstance.getProcessInstancePriority(), - parentProcessInstance.getDryRun(), - subProcessInstanceId, - subProcessDefinition.getVersion() + commandType, + TaskDependType.TASK_POST, + parentProcessInstance.getFailureStrategy(), + parentProcessInstance.getExecutorId(), + subProcessDefinition.getCode(), + processParam, + parentProcessInstance.getWarningType(), + parentProcessInstance.getWarningGroupId(), + parentProcessInstance.getScheduleTime(), + task.getWorkerGroup(), + task.getEnvironmentCode(), + parentProcessInstance.getProcessInstancePriority(), + parentProcessInstance.getDryRun(), + subProcessInstanceId, + subProcessDefinition.getVersion() ); } @@ -1441,7 +1464,7 @@ public class ProcessServiceImpl implements ProcessService { */ private void updateSubProcessDefinitionByParent(ProcessInstance parentProcessInstance, long childDefinitionCode) { ProcessDefinition fatherDefinition = this.findProcessDefinition(parentProcessInstance.getProcessDefinitionCode(), - parentProcessInstance.getProcessDefinitionVersion()); + parentProcessInstance.getProcessDefinitionVersion()); ProcessDefinition childDefinition = this.findProcessDefinitionByCode(childDefinitionCode); if (childDefinition != null && fatherDefinition != null) { childDefinition.setWarningGroupId(fatherDefinition.getWarningGroupId()); @@ -1460,8 +1483,8 @@ public class ProcessServiceImpl implements ProcessService { public TaskInstance submitTaskInstanceToDB(TaskInstance taskInstance, ProcessInstance processInstance) { ExecutionStatus processInstanceState = processInstance.getState(); if (processInstanceState.typeIsFinished() - || processInstanceState == ExecutionStatus.READY_PAUSE - || processInstanceState == ExecutionStatus.READY_STOP) { + || processInstanceState == ExecutionStatus.READY_PAUSE + || processInstanceState == ExecutionStatus.READY_STOP) { logger.warn("processInstance {} was {}, skip submit task", processInstance.getProcessDefinitionCode(), processInstanceState); return null; } @@ -1500,10 +1523,10 @@ public class ProcessServiceImpl implements ProcessService { // the task already exists in task queue // return state if ( - state == ExecutionStatus.RUNNING_EXECUTION - || state == ExecutionStatus.DELAY_EXECUTION - || state == ExecutionStatus.KILL - || state == ExecutionStatus.DISPATCH + state == ExecutionStatus.RUNNING_EXECUTION + || state == ExecutionStatus.DELAY_EXECUTION + || state == ExecutionStatus.KILL + || state == ExecutionStatus.DISPATCH ) { return state; } @@ -1512,7 +1535,7 @@ public class ProcessServiceImpl implements ProcessService { if (processInstance.getState() == ExecutionStatus.READY_PAUSE) { state = ExecutionStatus.PAUSE; } else if (processInstance.getState() == ExecutionStatus.READY_STOP - || !checkProcessStrategy(taskInstance, processInstance)) { + || !checkProcessStrategy(taskInstance, processInstance)) { state = ExecutionStatus.KILL; } else { state = ExecutionStatus.SUBMITTED_SUCCESS; @@ -1535,7 +1558,7 @@ public class ProcessServiceImpl implements ProcessService { for (TaskInstance task : taskInstances) { if (task.getState() == ExecutionStatus.FAILURE - && task.getRetryTimes() >= task.getMaxRetryTimes()) { + && task.getRetryTimes() >= task.getMaxRetryTimes()) { return false; } } @@ -1647,8 +1670,8 @@ public class ProcessServiceImpl implements ProcessService { taskInstance.setProcessInstance(processInstance); taskInstance.setProcessDefine(processInstance.getProcessDefinition()); TaskDefinition taskDefinition = this.findTaskDefinition( - taskInstance.getTaskCode(), - taskInstance.getTaskDefinitionVersion()); + taskInstance.getTaskCode(), + taskInstance.getTaskDefinitionVersion()); this.updateTaskDefinitionResources(taskDefinition); taskInstance.setTaskDefine(taskDefinition); } @@ -1661,17 +1684,17 @@ public class ProcessServiceImpl implements ProcessService { @Override public void updateTaskDefinitionResources(TaskDefinition taskDefinition) { Map taskParameters = JSONUtils.parseObject( - taskDefinition.getTaskParams(), - new TypeReference>() { - }); + taskDefinition.getTaskParams(), + new TypeReference>() { + }); if (taskParameters != null) { // if contains mainJar field, query resource from database // Flink, Spark, MR if (taskParameters.containsKey("mainJar")) { Object mainJarObj = taskParameters.get("mainJar"); ResourceInfo mainJar = JSONUtils.parseObject( - JSONUtils.toJsonString(mainJarObj), - ResourceInfo.class); + JSONUtils.toJsonString(mainJarObj), + ResourceInfo.class); ResourceInfo resourceInfo = updateResourceInfo(mainJar); if (resourceInfo != null) { taskParameters.put("mainJar", resourceInfo); @@ -1682,10 +1705,10 @@ public class ProcessServiceImpl implements ProcessService { String resourceListStr = JSONUtils.toJsonString(taskParameters.get("resourceList")); List resourceInfos = JSONUtils.toList(resourceListStr, ResourceInfo.class); List updatedResourceInfos = resourceInfos - .stream() - .map(this::updateResourceInfo) - .filter(Objects::nonNull) - .collect(Collectors.toList()); + .stream() + .map(this::updateResourceInfo) + .filter(Objects::nonNull) + .collect(Collectors.toList()); taskParameters.put("resourceList", updatedResourceInfos); } // set task parameters @@ -1716,7 +1739,7 @@ public class ProcessServiceImpl implements ProcessService { resourceInfo.setResourceName(resource.getFullName()); if (logger.isInfoEnabled()) { logger.info("updated resource info {}", - JSONUtils.toJsonString(resourceInfo)); + JSONUtils.toJsonString(resourceInfo)); } } return resourceInfo; @@ -1938,7 +1961,7 @@ public class ProcessServiceImpl implements ProcessService { public Map queryWorkerGroupByProcessDefinitionCodes(List processDefinitionCodeList) { List processDefinitionScheduleList = scheduleMapper.querySchedulesByProcessDefinitionCodes(processDefinitionCodeList); return processDefinitionScheduleList.stream().collect(Collectors.toMap(Schedule::getProcessDefinitionCode, - Schedule::getWorkerGroup)); + Schedule::getWorkerGroup)); } /** @@ -2002,7 +2025,7 @@ public class ProcessServiceImpl implements ProcessService { @Override public List queryNeedFailoverTaskInstances(String host) { return taskInstanceMapper.queryByHostAndStatus(host, - stateArray); + stateArray); } /** @@ -2105,8 +2128,8 @@ public class ProcessServiceImpl implements ProcessService { @Override public ProcessInstance findLastSchedulerProcessInterval(Long definitionCode, DateInterval dateInterval) { return processInstanceMapper.queryLastSchedulerProcess(definitionCode, - dateInterval.getStartTime(), - dateInterval.getEndTime()); + dateInterval.getStartTime(), + dateInterval.getEndTime()); } /** @@ -2119,8 +2142,8 @@ public class ProcessServiceImpl implements ProcessService { @Override public ProcessInstance findLastManualProcessInterval(Long definitionCode, DateInterval dateInterval) { return processInstanceMapper.queryLastManualProcess(definitionCode, - dateInterval.getStartTime(), - dateInterval.getEndTime()); + dateInterval.getStartTime(), + dateInterval.getEndTime()); } /** @@ -2134,9 +2157,9 @@ public class ProcessServiceImpl implements ProcessService { @Override public ProcessInstance findLastRunningProcess(Long definitionCode, Date startTime, Date endTime) { return processInstanceMapper.queryLastRunningProcess(definitionCode, - startTime, - endTime, - stateArray); + startTime, + endTime, + stateArray); } /** @@ -2384,10 +2407,10 @@ public class ProcessServiceImpl implements ProcessService { if (params != null && CollectionUtils.isNotEmpty(params.getResourceFilesList())) { resourceIds = params.getResourceFilesList(). - stream() - .filter(t -> t.getId() != 0) - .map(ResourceInfo::getId) - .collect(Collectors.toSet()); + stream() + .filter(t -> t.getId() != 0) + .map(ResourceInfo::getId) + .collect(toSet()); } if (CollectionUtils.isEmpty(resourceIds)) { return StringUtils.EMPTY; @@ -2420,7 +2443,7 @@ public class ProcessServiceImpl implements ProcessService { } TaskDefinitionLog definitionCodeAndVersion = taskDefinitionLogMapper - .queryByDefinitionCodeAndVersion(taskDefinitionLog.getCode(), taskDefinitionLog.getVersion()); + .queryByDefinitionCodeAndVersion(taskDefinitionLog.getCode(), taskDefinitionLog.getVersion()); if (definitionCodeAndVersion == null) { taskDefinitionLog.setUserId(operator.getId()); taskDefinitionLog.setCreateTime(now); @@ -2502,7 +2525,7 @@ public class ProcessServiceImpl implements ProcessService { Map taskDefinitionLogMap = null; if (CollectionUtils.isNotEmpty(taskDefinitionLogs)) { taskDefinitionLogMap = taskDefinitionLogs.stream() - .collect(Collectors.toMap(TaskDefinition::getCode, taskDefinitionLog -> taskDefinitionLog)); + .collect(Collectors.toMap(TaskDefinition::getCode, taskDefinitionLog -> taskDefinitionLog)); } Date now = new Date(); for (ProcessTaskRelationLog processTaskRelationLog : taskRelationList) { @@ -2547,9 +2570,9 @@ public class ProcessServiceImpl implements ProcessService { List processTaskRelationList = processTaskRelationMapper.queryByTaskCode(taskCode); if (!processTaskRelationList.isEmpty()) { Set processDefinitionCodes = processTaskRelationList - .stream() - .map(ProcessTaskRelation::getProcessDefinitionCode) - .collect(Collectors.toSet()); + .stream() + .map(ProcessTaskRelation::getProcessDefinitionCode) + .collect(toSet()); List processDefinitionList = processDefineMapper.queryByCodes(processDefinitionCodes); // check process definition is already online for (ProcessDefinition processDefinition : processDefinitionList) { @@ -2673,7 +2696,7 @@ public class ProcessServiceImpl implements ProcessService { taskDefinitionLogs = genTaskDefineList(taskRelationList); } Map taskDefinitionLogMap = taskDefinitionLogs.stream() - .collect(Collectors.toMap(TaskDefinitionLog::getCode, taskDefinitionLog -> taskDefinitionLog)); + .collect(Collectors.toMap(TaskDefinitionLog::getCode, taskDefinitionLog -> taskDefinitionLog)); List taskNodeList = new ArrayList<>(); for (Entry> code : taskCodeMap.entrySet()) { TaskDefinitionLog taskDefinitionLog = taskDefinitionLogMap.get(code.getKey()); @@ -2698,8 +2721,8 @@ public class ProcessServiceImpl implements ProcessService { taskNode.setWorkerGroup(taskDefinitionLog.getWorkerGroup()); taskNode.setEnvironmentCode(taskDefinitionLog.getEnvironmentCode()); taskNode.setTimeout(JSONUtils.toJsonString(new TaskTimeoutParameter(taskDefinitionLog.getTimeoutFlag() == TimeoutFlag.OPEN, - taskDefinitionLog.getTimeoutNotifyStrategy(), - taskDefinitionLog.getTimeout()))); + taskDefinitionLog.getTimeoutNotifyStrategy(), + taskDefinitionLog.getTimeout()))); taskNode.setDelayTime(taskDefinitionLog.getDelayTime()); taskNode.setPreTasks(JSONUtils.toJsonString(code.getValue().stream().map(taskDefinitionLogMap::get).map(TaskDefinition::getCode).collect(Collectors.toList()))); taskNode.setTaskGroupId(taskDefinitionLog.getTaskGroupId()); @@ -2735,7 +2758,7 @@ public class ProcessServiceImpl implements ProcessService { @Override public int updateDqExecuteResultUserId(int taskInstanceId) { DqExecuteResult dqExecuteResult = - dqExecuteResultMapper.selectOne(new QueryWrapper().eq(TASK_INSTANCE_ID, taskInstanceId)); + dqExecuteResultMapper.selectOne(new QueryWrapper().eq(TASK_INSTANCE_ID, taskInstanceId)); if (dqExecuteResult == null) { return -1; } @@ -2764,15 +2787,15 @@ public class ProcessServiceImpl implements ProcessService { @Override public int deleteDqExecuteResultByTaskInstanceId(int taskInstanceId) { return dqExecuteResultMapper.delete( - new QueryWrapper() - .eq(TASK_INSTANCE_ID, taskInstanceId)); + new QueryWrapper() + .eq(TASK_INSTANCE_ID, taskInstanceId)); } @Override public int deleteTaskStatisticsValueByTaskInstanceId(int taskInstanceId) { return dqTaskStatisticsValueMapper.delete( - new QueryWrapper() - .eq(TASK_INSTANCE_ID, taskInstanceId)); + new QueryWrapper() + .eq(TASK_INSTANCE_ID, taskInstanceId)); } @Override @@ -2845,7 +2868,7 @@ public class ProcessServiceImpl implements ProcessService { public boolean robTaskGroupResouce(TaskGroupQueue taskGroupQueue) { TaskGroup taskGroup = taskGroupMapper.selectById(taskGroupQueue.getGroupId()); int affectedCount = taskGroupMapper.updateTaskGroupResource(taskGroup.getId(), taskGroupQueue.getId(), - TaskGroupQueueStatus.WAIT_QUEUE.getCode()); + TaskGroupQueueStatus.WAIT_QUEUE.getCode()); if (affectedCount > 0) { taskGroupQueue.setStatus(TaskGroupQueueStatus.ACQUIRE_SUCCESS); this.taskGroupQueueMapper.updateById(taskGroupQueue); @@ -2886,7 +2909,7 @@ public class ProcessServiceImpl implements ProcessService { } try { while (taskGroupMapper.releaseTaskGroupResource(taskGroup.getId(), taskGroup.getUseSize() - , thisTaskGroupQueue.getId(), TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()) != 1) { + , thisTaskGroupQueue.getId(), TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()) != 1) { thisTaskGroupQueue = this.taskGroupQueueMapper.queryByTaskId(taskInstance.getId()); if (thisTaskGroupQueue.getStatus() == TaskGroupQueueStatus.RELEASE) { return null; @@ -2899,13 +2922,13 @@ public class ProcessServiceImpl implements ProcessService { logger.info("updateTask:{}", taskInstance.getName()); changeTaskGroupQueueStatus(taskInstance.getId(), TaskGroupQueueStatus.RELEASE); TaskGroupQueue taskGroupQueue = this.taskGroupQueueMapper.queryTheHighestPriorityTasks(taskGroup.getId(), - TaskGroupQueueStatus.WAIT_QUEUE.getCode(), Flag.NO.getCode(), Flag.NO.getCode()); + TaskGroupQueueStatus.WAIT_QUEUE.getCode(), Flag.NO.getCode(), Flag.NO.getCode()); if (taskGroupQueue == null) { return null; } while (this.taskGroupQueueMapper.updateInQueueCAS(Flag.NO.getCode(), Flag.YES.getCode(), taskGroupQueue.getId()) != 1) { taskGroupQueue = this.taskGroupQueueMapper.queryTheHighestPriorityTasks(taskGroup.getId(), - TaskGroupQueueStatus.WAIT_QUEUE.getCode(), Flag.NO.getCode(), Flag.NO.getCode()); + TaskGroupQueueStatus.WAIT_QUEUE.getCode(), Flag.NO.getCode(), Flag.NO.getCode()); if (taskGroupQueue == null) { return null; } @@ -2971,7 +2994,7 @@ public class ProcessServiceImpl implements ProcessService { String address = host.split(":")[0]; int port = Integer.parseInt(host.split(":")[1]); TaskEventChangeCommand taskEventChangeCommand = new TaskEventChangeCommand( - processInstance.getId(), taskId + processInstance.getId(), taskId ); stateEventCallbackService.sendResult(address, port, taskEventChangeCommand.convert2Command(taskType)); } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/AbstractParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/AbstractParameters.java index 743fe1b945..c556925f90 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/AbstractParameters.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/AbstractParameters.java @@ -77,7 +77,6 @@ public abstract class AbstractParameters implements IParameters { public Map getLocalParametersMap() { Map localParametersMaps = new LinkedHashMap<>(); if (localParams != null) { - for (Property property : localParams) { localParametersMaps.put(property.getProp(),property); } @@ -113,14 +112,14 @@ public abstract class AbstractParameters implements IParameters { } public void dealOutParam(String result) { - if (org.apache.commons.collections4.CollectionUtils.isEmpty(localParams)) { + if (CollectionUtils.isEmpty(localParams)) { return; } List outProperty = getOutProperty(localParams); - if (org.apache.commons.collections4.CollectionUtils.isEmpty(outProperty)) { + if (CollectionUtils.isEmpty(outProperty)) { return; } - if (org.apache.dolphinscheduler.spi.utils.StringUtils.isEmpty(result)) { + if (StringUtils.isEmpty(result)) { varPool.addAll(outProperty); return; } @@ -130,9 +129,9 @@ public abstract class AbstractParameters implements IParameters { } for (Property info : outProperty) { String propValue = taskResult.get(info.getProp()); - if (org.apache.dolphinscheduler.spi.utils.StringUtils.isNotEmpty(propValue)) { + if (StringUtils.isNotEmpty(propValue)) { info.setValue(propValue); - varPool.add(info); + addPropertyToValPool(info); } } } @@ -180,4 +179,9 @@ public abstract class AbstractParameters implements IParameters { public ResourceParametersHelper getResources() { return new ResourceParametersHelper(); } + + private void addPropertyToValPool(Property property) { + varPool.removeIf(p -> p.getProp().equals(property.getProp())); + varPool.add(property); + } }