diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java index 56072e2c60..b0e149ae2b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java @@ -495,7 +495,7 @@ public class WorkflowExecuteThread implements Runnable { processInstance.getProcessDefinitionVersion()); recoverNodeIdList = getStartTaskInstanceList(processInstance.getCommandParam()); List taskNodeList = - processService.genTaskNodeList(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion(), new HashMap<>()); + processService.transformTask(processService.findRelationByCode(processDefinition.getProjectCode(), processDefinition.getCode()), Lists.newArrayList()); forbiddenTaskList.clear(); taskNodeList.forEach(taskNode -> { if (taskNode.isForbidden()) { diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 3c2e0dd3e5..2abce01923 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -2144,26 +2144,6 @@ public class ProcessService { return result; } - /** - * update task definition - */ - public int updateTaskDefinition(User operator, Long projectCode, TaskNode taskNode, TaskDefinition taskDefinition) { - Integer version = taskDefinitionLogMapper.queryMaxVersionForDefinition(taskDefinition.getCode()); - Date now = new Date(); - taskDefinition.setProjectCode(projectCode); - taskDefinition.setUserId(operator.getId()); - taskDefinition.setVersion(version == null || version == 0 ? 1 : version + 1); - taskDefinition.setUpdateTime(now); - setTaskFromTaskNode(taskNode, taskDefinition); - int update = taskDefinitionMapper.updateById(taskDefinition); - // save task definition log - TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog(taskDefinition); - taskDefinitionLog.setOperator(operator.getId()); - taskDefinitionLog.setOperateTime(now); - int insert = taskDefinitionLogMapper.insert(taskDefinitionLog); - return insert & update; - } - private void setTaskFromTaskNode(TaskNode taskNode, TaskDefinition taskDefinition) { taskDefinition.setName(taskNode.getName()); taskDefinition.setDescription(taskNode.getDesc()); @@ -2382,67 +2362,6 @@ public class ProcessService { return taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionSet); } - @Deprecated - public List genTaskNodeList(Long processCode, int processVersion, Map locationMap) { - List processTaskRelations = processTaskRelationLogMapper.queryByProcessCodeAndVersion(processCode, processVersion); - Set taskDefinitionSet = new HashSet<>(); - Map taskNodeMap = new HashMap<>(); - for (ProcessTaskRelationLog processTaskRelation : processTaskRelations) { - if (processTaskRelation.getPreTaskCode() > 0) { - taskDefinitionSet.add(new TaskDefinition(processTaskRelation.getPreTaskCode(), processTaskRelation.getPreTaskVersion())); - } - if (processTaskRelation.getPostTaskCode() > 0) { - taskDefinitionSet.add(new TaskDefinition(processTaskRelation.getPostTaskCode(), processTaskRelation.getPostTaskVersion())); - } - taskNodeMap.compute(processTaskRelation.getPostTaskCode(), (k, v) -> { - if (v == null) { - v = new TaskNode(); - v.setCode(processTaskRelation.getPostTaskCode()); - v.setVersion(processTaskRelation.getPostTaskVersion()); - List preTaskNodeList = new ArrayList<>(); - if (processTaskRelation.getPreTaskCode() > 0) { - preTaskNodeList.add(new PreviousTaskNode(processTaskRelation.getPreTaskCode(), "", processTaskRelation.getPreTaskVersion())); - } - v.setPreTaskNodeList(preTaskNodeList); - } else { - List preTaskDefinitionList = v.getPreTaskNodeList(); - preTaskDefinitionList.add(new PreviousTaskNode(processTaskRelation.getPreTaskCode(), "", processTaskRelation.getPreTaskVersion())); - } - return v; - }); - } - List taskDefinitionLogs = taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionSet); - Map taskDefinitionLogMap = taskDefinitionLogs.stream().collect(Collectors.toMap(TaskDefinitionLog::getCode, log -> log)); - taskNodeMap.forEach((k, v) -> { - TaskDefinitionLog taskDefinitionLog = taskDefinitionLogMap.get(k); - v.setId(locationMap.get(taskDefinitionLog.getName())); - v.setCode(taskDefinitionLog.getCode()); - v.setName(taskDefinitionLog.getName()); - v.setDesc(taskDefinitionLog.getDescription()); - v.setType(taskDefinitionLog.getTaskType().toUpperCase()); - v.setRunFlag(taskDefinitionLog.getFlag() == Flag.YES ? Constants.FLOWNODE_RUN_FLAG_NORMAL : Constants.FLOWNODE_RUN_FLAG_FORBIDDEN); - v.setMaxRetryTimes(taskDefinitionLog.getFailRetryTimes()); - v.setRetryInterval(taskDefinitionLog.getFailRetryInterval()); - Map taskParamsMap = v.taskParamsToJsonObj(taskDefinitionLog.getTaskParams()); - v.setConditionResult(JSONUtils.toJsonString(taskParamsMap.get(Constants.CONDITION_RESULT))); - v.setSwitchResult(JSONUtils.toJsonString(taskParamsMap.get(Constants.SWITCH_RESULT))); - v.setDependence(JSONUtils.toJsonString(taskParamsMap.get(Constants.DEPENDENCE))); - taskParamsMap.remove(Constants.CONDITION_RESULT); - taskParamsMap.remove(Constants.DEPENDENCE); - v.setParams(JSONUtils.toJsonString(taskParamsMap)); - v.setTaskInstancePriority(taskDefinitionLog.getTaskPriority()); - v.setWorkerGroup(taskDefinitionLog.getWorkerGroup()); - v.setEnvironmentCode(taskDefinitionLog.getEnvironmentCode()); - v.setTimeout(JSONUtils.toJsonString(new TaskTimeoutParameter(taskDefinitionLog.getTimeoutFlag() == TimeoutFlag.OPEN, - taskDefinitionLog.getTimeoutNotifyStrategy(), - taskDefinitionLog.getTimeout()))); - v.setDelayTime(taskDefinitionLog.getDelayTime()); - v.getPreTaskNodeList().forEach(task -> task.setName(taskDefinitionLogMap.get(task.getCode()).getName())); - v.setPreTasks(JSONUtils.toJsonString(v.getPreTaskNodeList().stream().map(PreviousTaskNode::getName).collect(Collectors.toList()))); - }); - return new ArrayList<>(taskNodeMap.values()); - } - /** * find task definition by code and version */ @@ -2450,6 +2369,13 @@ public class ProcessService { return taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode, taskDefinitionVersion); } + /** + * find process task relation list by projectCode and processDefinitionCode + */ + public List findRelationByCode(long projectCode, long processDefinitionCode) { + return processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode); + } + /** * add authorized resources * @@ -2498,12 +2424,14 @@ public class ProcessService { taskNode.setRetryInterval(taskDefinitionLog.getFailRetryInterval()); Map taskParamsMap = taskNode.taskParamsToJsonObj(taskDefinitionLog.getTaskParams()); taskNode.setConditionResult(JSONUtils.toJsonString(taskParamsMap.get(Constants.CONDITION_RESULT))); + taskNode.setSwitchResult(JSONUtils.toJsonString(taskParamsMap.get(Constants.SWITCH_RESULT))); taskNode.setDependence(JSONUtils.toJsonString(taskParamsMap.get(Constants.DEPENDENCE))); taskParamsMap.remove(Constants.CONDITION_RESULT); taskParamsMap.remove(Constants.DEPENDENCE); taskNode.setParams(JSONUtils.toJsonString(taskParamsMap)); taskNode.setTaskInstancePriority(taskDefinitionLog.getTaskPriority()); taskNode.setWorkerGroup(taskDefinitionLog.getWorkerGroup()); + taskNode.setEnvironmentCode(taskDefinitionLog.getEnvironmentCode()); taskNode.setTimeout(JSONUtils.toJsonString(new TaskTimeoutParameter(taskDefinitionLog.getTimeoutFlag() == TimeoutFlag.OPEN, taskDefinitionLog.getTimeoutNotifyStrategy(), taskDefinitionLog.getTimeout())));