From 9ca3962bf129a8f112605191e2c36b7ab235faf5 Mon Sep 17 00:00:00 2001 From: zwZjut Date: Tue, 20 Sep 2022 20:06:38 +0800 Subject: [PATCH] to #10692: fix Parameter transfer problem when recover failed task and output of subprogress (#10694) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: 宏豁 --- .../master/runner/WorkflowExecuteThread.java | 21 +++-- .../master/runner/task/SubTaskProcessor.java | 91 +++++++++++++++++++ 2 files changed, 106 insertions(+), 6 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java index 411c07b2a1..ae837dfe86 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java @@ -78,6 +78,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Date; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -461,7 +462,7 @@ public class WorkflowExecuteThread implements Runnable { taskRetryCheckList.remove(task.getId()); depStateCheckList.remove(task.getId()); if (task.getState().typeIsSuccess()) { - processInstance.setVarPool(task.getVarPool()); + // processInstance.setVarPool(task.getVarPool()); processService.saveProcessInstance(processInstance); submitPostNode(Long.toString(task.getTaskCode())); } else if (task.getState().typeIsFailure()) { @@ -925,11 +926,12 @@ public class WorkflowExecuteThread implements Runnable { if (allProperty.size() > 0) { taskInstance.setVarPool(JSONUtils.toJsonString(allProperty.values())); } - } else { - if (StringUtils.isNotEmpty(processInstance.getVarPool())) { - taskInstance.setVarPool(processInstance.getVarPool()); - } } +// else { +// if (StringUtils.isNotEmpty(processInstance.getVarPool())) { +// taskInstance.setVarPool(processInstance.getVarPool()); +// } +// } } private void setVarPoolValue(Map allProperty, Map allTaskInstance, TaskInstance preTaskInstance, Property thisProperty) { @@ -1466,7 +1468,14 @@ public class WorkflowExecuteThread implements Runnable { //init varPool only this task is the first time running if (task.isFirstRun()) { //get pre task ,get all the task varPool to this task - Set preTask = dag.getPreviousNodes(Long.toString(task.getTaskCode())); + Set preTask = new HashSet<>(); + preTask.addAll(dag.getPreviousNodes(Long.toString(task.getTaskCode()))); + TaskNode taskNode = dag.getNode(Long.toString(task.getTaskCode())); + if (null != taskNode && null != taskNode.getDepList() && !taskNode.getDepList().isEmpty()) { + logger.debug("in submitStandByTask: taskCode:{}, taskType: {}, preTasks: {}, depList:{}", + task.getTaskCode(), taskNode.getType(), taskNode.getPreTasks(), taskNode.getDepList()); + preTask.addAll(taskNode.getDepList()); + } getPreVarPool(task, preTask); } DependResult dependResult = getDependResultForTask(task); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java index 29ecf7d1a4..280ef0fe27 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java @@ -17,9 +17,18 @@ package org.apache.dolphinscheduler.server.master.runner.task; +import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS; + +import org.apache.dolphinscheduler.common.enums.Direct; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy; import org.apache.dolphinscheduler.common.enums.TaskType; +import org.apache.dolphinscheduler.common.graph.DAG; +import org.apache.dolphinscheduler.common.model.TaskNode; +import org.apache.dolphinscheduler.common.model.TaskNodeRelation; +import org.apache.dolphinscheduler.common.process.Property; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; @@ -28,9 +37,19 @@ import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService; import org.apache.dolphinscheduler.server.utils.LogUtils; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.apache.commons.lang3.StringUtils; + +import java.util.Comparator; import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; + +import com.fasterxml.jackson.core.type.TypeReference; /** * @@ -110,6 +129,77 @@ public class SubTaskProcessor extends BaseTaskProcessor { } } + private Map mergeEndNodeTaskInstanceVarPool(Set taskCodes) { + List taskInstanceList = processService.findValidTaskListByProcessId(subProcessInstance.getId()); + logger.info("in dealFinish1, mergeEndNodeTaskInstanceVarPool, taskInstanceList.size:{}, subProcessInstance.getId:{}", taskInstanceList.size(),subProcessInstance.getId()); + // filter end nodes and sort by end time reversed + List endTaskInstancesSortedByEndTimeReversed = taskInstanceList.stream() + .filter(o -> taskCodes.contains(Long.toString(o.getTaskCode()))). + sorted(Comparator.comparing(TaskInstance::getEndTime).reversed()).collect(Collectors.toList()); + logger.info("in dealFinish1, mergeEndNodeTaskInstanceVarPool, endTaskInstancesSortedByEndTimeReversed.size:{}", endTaskInstancesSortedByEndTimeReversed.size()); + Map allProperties = new HashMap<>(); + for (TaskInstance taskInstance : endTaskInstancesSortedByEndTimeReversed) { + String varPool = taskInstance.getVarPool(); + if (org.apache.commons.lang.StringUtils.isNotEmpty(varPool)) { + List properties = JSONUtils.toList(varPool, Property.class); + properties.forEach(o -> { + allProperties.put(o.getProp(), o); + }); + } + } + return allProperties; + } + + private void dealFinish1() { + // build dag + ProcessDefinition processDefinition = processService.findProcessDefinition(subProcessInstance.getProcessDefinitionCode(), subProcessInstance.getProcessDefinitionVersion()); + if (null == processDefinition) { + logger.error("process definition not found in meta data, processDefinitionCode:{}, processDefinitionVersion:{}, processInstanceId:{}", + subProcessInstance.getProcessDefinitionCode(), subProcessInstance.getProcessDefinitionVersion(), subProcessInstance.getId()); + throw new RuntimeException(String.format("process definition code %s, version %s does not exist", subProcessInstance.getProcessDefinitionCode(), subProcessInstance.getProcessDefinitionVersion())); + } + subProcessInstance.setProcessDefinition(processDefinition); + DAG dag = processService.genDagGraph(subProcessInstance.getProcessDefinition()); + // get end nodes + Set endTaskCodes = dag.getEndNode().stream().collect(Collectors.toSet()); + logger.info("in dealFinish1, endTaskCodes:{}", endTaskCodes); + if (endTaskCodes == null || endTaskCodes.isEmpty()) { + return; + } + // get var pool of sub progress instance; + Map varPoolPropertiesMap = mergeEndNodeTaskInstanceVarPool(endTaskCodes); + logger.debug("in dealFinish1, varPoolPropertiesMap:{}", varPoolPropertiesMap); + // merge var pool: 1. task instance var pool from pre task ; 2. var pool from sub progress + // filter by localParams + String taskVarPool = taskInstance.getVarPool(); + Map taskVarPoolProperties = new HashMap<>(); + if (StringUtils.isNotEmpty(taskVarPool)) { + taskVarPoolProperties = JSONUtils.toList(taskVarPool, Property.class).stream().collect(Collectors.toMap(Property::getProp, (p) -> p)); + } + Map taskParams = JSONUtils.parseObject(taskInstance.getTaskParams(), new TypeReference>() { + }); + Object localParams = taskParams.get(LOCAL_PARAMS); + Map outProperties = new HashMap<>(); + if (localParams != null) { + List properties = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class); + outProperties = properties.stream().filter(r -> Direct.OUT == r.getDirect()).collect(Collectors.toMap(Property::getProp, (p) -> p)); + // put all task instance var pool from pre task + outProperties.putAll(taskVarPoolProperties); + for (Map.Entry o : outProperties.entrySet()) { + if (varPoolPropertiesMap.containsKey(o.getKey())) { + o.getValue().setValue(varPoolPropertiesMap.get(o.getKey()).getValue()); + } + } + } else { + outProperties.putAll(taskVarPoolProperties); + outProperties.putAll(varPoolPropertiesMap); + } + taskInstance.setVarPool(JSONUtils.toJsonString(outProperties.values())); + logger.debug("in dealFinish1, varPool:{}", taskInstance.getVarPool()); + //deal with localParam for show in the page + processService.changeOutParam(taskInstance); + } + @Override protected boolean persistTask(TaskAction taskAction) { switch (taskAction) { @@ -175,6 +265,7 @@ public class SubTaskProcessor extends BaseTaskProcessor { sendToSubProcess(); this.taskInstance.setState(ExecutionStatus.KILL); this.taskInstance.setEndTime(new Date()); + dealFinish1(); processService.saveTaskInstance(taskInstance); return true; }