diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java index 5fcf581b6a..a49a08f6dc 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java @@ -324,14 +324,5 @@ public interface ProcessDefinitionService { long projectCode, int processDefinitionId, long version); - - /** - * check has associated process definition - * - * @param processDefinitionId process definition id - * @param version version - * @return The query result has a specific process definition return true - */ - boolean checkHasAssociatedProcessDefinition(int processDefinitionId, long version); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index 65e2b10da2..f0fcec3632 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -255,12 +255,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro return result; } - // TODO check has cycle - // if (graphHasCycle(taskRelationList)) { - // logger.error("process DAG has cycle"); - // putMsg(result, Status.PROCESS_NODE_HAS_CYCLE); - // return result; - // } + if (graphHasCycle(processService.transformTask(taskRelationList))) { + logger.error("process DAG has cycle"); + putMsg(result, Status.PROCESS_NODE_HAS_CYCLE); + return result; + } // check whether the task relation json is normal for (ProcessTaskRelationLog processTaskRelationLog : taskRelationList) { @@ -1323,19 +1322,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro } } - /** - * check has associated process definition - * - * @param processDefinitionId process definition id - * @param version version - * @return The query result has a specific process definition return true - */ - @Override - public boolean checkHasAssociatedProcessDefinition(int processDefinitionId, long version) { - Integer hasAssociatedDefinitionId = processDefinitionMapper.queryHasAssociatedDefinitionByIdAndVersion(processDefinitionId, version); - return Objects.nonNull(hasAssociatedDefinitionId); - } - /** * query the pagination versions info by one certain process definition code * diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index bd5e4df5c5..ac91b0685d 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -139,10 +139,10 @@ public class ProcessService { private final Logger logger = LoggerFactory.getLogger(getClass()); private final int[] stateArray = new int[]{ExecutionStatus.SUBMITTED_SUCCESS.ordinal(), - ExecutionStatus.RUNNING_EXECUTION.ordinal(), - ExecutionStatus.DELAY_EXECUTION.ordinal(), - ExecutionStatus.READY_PAUSE.ordinal(), - ExecutionStatus.READY_STOP.ordinal()}; + ExecutionStatus.RUNNING_EXECUTION.ordinal(), + ExecutionStatus.DELAY_EXECUTION.ordinal(), + ExecutionStatus.READY_PAUSE.ordinal(), + ExecutionStatus.READY_STOP.ordinal()}; @Autowired private UserMapper userMapper; @@ -526,17 +526,17 @@ public class ProcessService { // process instance quit by "waiting thread" state if (originCommand == null) { Command command = new Command( - CommandType.RECOVER_WAITING_THREAD, - processInstance.getTaskDependType(), - processInstance.getFailureStrategy(), - processInstance.getExecutorId(), - processInstance.getProcessDefinition().getCode(), - JSONUtils.toJsonString(cmdParam), - processInstance.getWarningType(), - processInstance.getWarningGroupId(), - processInstance.getScheduleTime(), - processInstance.getWorkerGroup(), - processInstance.getProcessInstancePriority() + CommandType.RECOVER_WAITING_THREAD, + processInstance.getTaskDependType(), + processInstance.getFailureStrategy(), + processInstance.getExecutorId(), + processInstance.getProcessDefinition().getCode(), + JSONUtils.toJsonString(cmdParam), + processInstance.getWarningType(), + processInstance.getWarningGroupId(), + processInstance.getScheduleTime(), + processInstance.getWorkerGroup(), + processInstance.getProcessInstancePriority() ); saveCommand(command); return; @@ -617,10 +617,10 @@ public class ProcessService { // curing global params processInstance.setGlobalParams(ParameterUtils.curingGlobalParams( - processDefinition.getGlobalParamMap(), - processDefinition.getGlobalParamList(), - getCommandTypeIfComplement(processInstance, command), - processInstance.getScheduleTime())); + processDefinition.getGlobalParamMap(), + processDefinition.getGlobalParamList(), + getCommandTypeIfComplement(processInstance, command), + processInstance.getScheduleTime())); // set process instance priority processInstance.setProcessInstancePriority(command.getProcessInstancePriority()); @@ -646,7 +646,7 @@ public class ProcessService { startParamMap.putAll(fatherParamMap); // set start param into global params if (startParamMap.size() > 0 - && processDefinition.getGlobalParamMap() != null) { + && processDefinition.getGlobalParamMap() != null) { for (Map.Entry param : processDefinition.getGlobalParamMap().entrySet()) { String val = startParamMap.get(param.getKey()); if (val != null) { @@ -693,8 +693,8 @@ public class ProcessService { private Boolean checkCmdParam(Command command, Map cmdParam) { if (command.getTaskDependType() == TaskDependType.TASK_ONLY || command.getTaskDependType() == TaskDependType.TASK_PRE) { if (cmdParam == null - || !cmdParam.containsKey(Constants.CMD_PARAM_START_NODE_NAMES) - || cmdParam.get(Constants.CMD_PARAM_START_NODE_NAMES).isEmpty()) { + || !cmdParam.containsKey(Constants.CMD_PARAM_START_NODE_NAMES) + || cmdParam.get(Constants.CMD_PARAM_START_NODE_NAMES).isEmpty()) { logger.error("command node depend type is {}, but start nodes is null ", command.getTaskDependType()); return false; } @@ -753,10 +753,10 @@ public class ProcessService { // Recalculate global parameters after rerun. processInstance.setGlobalParams(ParameterUtils.curingGlobalParams( - processDefinition.getGlobalParamMap(), - processDefinition.getGlobalParamList(), - commandTypeIfComplement, - processInstance.getScheduleTime())); + processDefinition.getGlobalParamMap(), + processDefinition.getGlobalParamList(), + commandTypeIfComplement, + processInstance.getScheduleTime())); processInstance.setProcessDefinition(processDefinition); } //reset command parameter @@ -804,7 +804,7 @@ public class ProcessService { initTaskInstance(this.findTaskInstanceById(taskId)); } cmdParam.put(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING, - String.join(Constants.COMMA, convertIntListToString(failedList))); + String.join(Constants.COMMA, convertIntListToString(failedList))); processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam)); processInstance.setRunTimes(runTime + 1); break; @@ -817,7 +817,7 @@ public class ProcessService { cmdParam.remove(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING); List suspendedNodeList = this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.PAUSE); List stopNodeList = findTaskIdByInstanceState(processInstance.getId(), - ExecutionStatus.KILL); + ExecutionStatus.KILL); suspendedNodeList.addAll(stopNodeList); for (Integer taskId : suspendedNodeList) { // initialize the pause state @@ -872,8 +872,6 @@ public class ProcessService { * If it is a fault-tolerant command, get the specified version of ProcessDefinition through ProcessInstance * Otherwise, get the latest version of ProcessDefinition * - * @param processDefinitionCode - * @param cmdParam * @return ProcessDefinition */ private ProcessDefinition getProcessDefinitionByCommand(long processDefinitionCode, Map cmdParam) { @@ -894,7 +892,7 @@ public class ProcessService { } return processDefineLogMapper.queryByDefinitionCodeAndVersion( - processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion()); + processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion()); } } @@ -931,14 +929,14 @@ public class ProcessService { } Date startComplementTime = DateUtils.parse(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE), - YYYY_MM_DD_HH_MM_SS); + YYYY_MM_DD_HH_MM_SS); if (Flag.NO == processInstance.getIsSubProcess()) { processInstance.setScheduleTime(startComplementTime); } processInstance.setGlobalParams(ParameterUtils.curingGlobalParams( - processDefinition.getGlobalParamMap(), - processDefinition.getGlobalParamList(), - CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime())); + processDefinition.getGlobalParamMap(), + processDefinition.getGlobalParamList(), + CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime())); } @@ -958,7 +956,7 @@ public class ProcessService { Map paramMap = JSONUtils.toMap(cmdParam); // write sub process id into cmd param. if (paramMap.containsKey(CMD_PARAM_SUB_PROCESS) - && CMD_PARAM_EMPTY_SUB_PROCESS.equals(paramMap.get(CMD_PARAM_SUB_PROCESS))) { + && CMD_PARAM_EMPTY_SUB_PROCESS.equals(paramMap.get(CMD_PARAM_SUB_PROCESS))) { paramMap.remove(CMD_PARAM_SUB_PROCESS); paramMap.put(CMD_PARAM_SUB_PROCESS, String.valueOf(subProcessInstance.getId())); subProcessInstance.setCommandParam(JSONUtils.toJsonString(paramMap)); @@ -971,7 +969,7 @@ public class ProcessService { ProcessInstance parentInstance = findProcessInstanceDetailById(Integer.parseInt(parentInstanceId)); if (parentInstance != null) { subProcessInstance.setGlobalParams( - joinGlobalParams(parentInstance.getGlobalParams(), subProcessInstance.getGlobalParams())); + joinGlobalParams(parentInstance.getGlobalParams(), subProcessInstance.getGlobalParams())); this.saveProcessInstance(subProcessInstance); } else { logger.error("sub process command params error, cannot find parent instance: {} ", cmdParam); @@ -1019,7 +1017,7 @@ public class ProcessService { private void initTaskInstance(TaskInstance taskInstance) { if (!taskInstance.isSubProcess() - && (taskInstance.getState().typeIsCancel() || taskInstance.getState().typeIsFailure())) { + && (taskInstance.getState().typeIsCancel() || taskInstance.getState().typeIsFailure())) { taskInstance.setFlag(Flag.NO); updateTaskInstance(taskInstance); return; @@ -1039,12 +1037,12 @@ public class ProcessService { public TaskInstance submitTask(TaskInstance taskInstance) { ProcessInstance processInstance = this.findProcessInstanceDetailById(taskInstance.getProcessInstanceId()); logger.info("start submit task : {}, instance id:{}, state: {}", - taskInstance.getName(), taskInstance.getProcessInstanceId(), processInstance.getState()); + taskInstance.getName(), taskInstance.getProcessInstanceId(), processInstance.getState()); //submit to db TaskInstance task = submitTaskInstanceToDB(taskInstance, processInstance); if (task == null) { logger.error("end submit task to db error, task name:{}, process id:{} state: {} ", - taskInstance.getName(), taskInstance.getProcessInstance(), processInstance.getState()); + taskInstance.getName(), taskInstance.getProcessInstance(), processInstance.getState()); return task; } if (!task.getState().typeIsFinished()) { @@ -1052,7 +1050,7 @@ public class ProcessService { } logger.info("end submit task to db successfully:{} state:{} complete, instance id:{} state: {} ", - taskInstance.getName(), task.getState(), processInstance.getId(), processInstance.getState()); + taskInstance.getName(), task.getState(), processInstance.getId(), processInstance.getState()); return task; } @@ -1110,7 +1108,7 @@ public class ProcessService { } } logger.info("sub process instance is not found,parent task:{},parent instance:{}", - parentTask.getId(), parentProcessInstance.getId()); + parentTask.getId(), parentProcessInstance.getId()); return null; } @@ -1199,17 +1197,17 @@ public class ProcessService { String processParam = getSubWorkFlowParam(instanceMap, parentProcessInstance, fatherParams); return new Command( - commandType, - TaskDependType.TASK_POST, - parentProcessInstance.getFailureStrategy(), - parentProcessInstance.getExecutorId(), - processDefinition.getCode(), - processParam, - parentProcessInstance.getWarningType(), - parentProcessInstance.getWarningGroupId(), - parentProcessInstance.getScheduleTime(), - task.getWorkerGroup(), - parentProcessInstance.getProcessInstancePriority() + commandType, + TaskDependType.TASK_POST, + parentProcessInstance.getFailureStrategy(), + parentProcessInstance.getExecutorId(), + processDefinition.getCode(), + processParam, + parentProcessInstance.getWarningType(), + parentProcessInstance.getWarningGroupId(), + parentProcessInstance.getScheduleTime(), + task.getWorkerGroup(), + parentProcessInstance.getProcessInstancePriority() ); } @@ -1246,7 +1244,7 @@ public class ProcessService { */ private void updateSubProcessDefinitionByParent(ProcessInstance parentProcessInstance, long childDefinitionCode) { ProcessDefinition fatherDefinition = this.findProcessDefinition(parentProcessInstance.getProcessDefinitionCode(), - parentProcessInstance.getProcessDefinitionVersion()); + parentProcessInstance.getProcessDefinitionVersion()); ProcessDefinition childDefinition = this.findProcessDefinitionByCode(childDefinitionCode); if (childDefinition != null && fatherDefinition != null) { childDefinition.setWarningGroupId(fatherDefinition.getWarningGroupId()); @@ -1269,7 +1267,7 @@ public class ProcessService { taskInstance.setRetryTimes(taskInstance.getRetryTimes() + 1); } else { if (processInstanceState != ExecutionStatus.READY_STOP - && processInstanceState != ExecutionStatus.READY_PAUSE) { + && processInstanceState != ExecutionStatus.READY_PAUSE) { // failure task set invalid taskInstance.setFlag(Flag.NO); updateTaskInstance(taskInstance); @@ -1320,9 +1318,9 @@ public class ProcessService { // the task already exists in task queue // return state if ( - state == ExecutionStatus.RUNNING_EXECUTION - || state == ExecutionStatus.DELAY_EXECUTION - || state == ExecutionStatus.KILL + state == ExecutionStatus.RUNNING_EXECUTION + || state == ExecutionStatus.DELAY_EXECUTION + || state == ExecutionStatus.KILL ) { return state; } @@ -1331,7 +1329,7 @@ public class ProcessService { if (processInstanceState == ExecutionStatus.READY_PAUSE) { state = ExecutionStatus.PAUSE; } else if (processInstanceState == ExecutionStatus.READY_STOP - || !checkProcessStrategy(taskInstance)) { + || !checkProcessStrategy(taskInstance)) { state = ExecutionStatus.KILL; } else { state = ExecutionStatus.SUBMITTED_SUCCESS; @@ -1355,7 +1353,7 @@ public class ProcessService { for (TaskInstance task : taskInstances) { if (task.getState() == ExecutionStatus.FAILURE - && task.getRetryTimes() >= task.getMaxRetryTimes()) { + && task.getRetryTimes() >= task.getMaxRetryTimes()) { return false; } } @@ -1455,12 +1453,12 @@ public class ProcessService { ProcessInstance processInstance = findProcessInstanceDetailById(taskInstance.getProcessInstanceId()); // get process define ProcessDefinition processDefine = findProcessDefinition(processInstance.getProcessDefinitionCode(), - processInstance.getProcessDefinitionVersion()); + processInstance.getProcessDefinitionVersion()); taskInstance.setProcessInstance(processInstance); taskInstance.setProcessDefine(processDefine); TaskDefinition taskDefinition = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion( - taskInstance.getTaskCode(), - taskInstance.getTaskDefinitionVersion()); + taskInstance.getTaskCode(), + taskInstance.getTaskDefinitionVersion()); taskInstance.setTaskDefine(taskDefinition); return taskInstance; } @@ -1632,7 +1630,6 @@ public class ProcessService { /** * for show in page of taskInstance - * @param taskInstance */ public void changeOutParam(TaskInstance taskInstance) { if (StringUtils.isEmpty(taskInstance.getVarPool())) { @@ -1742,7 +1739,7 @@ public class ProcessService { */ public List queryNeedFailoverTaskInstances(String host) { return taskInstanceMapper.queryByHostAndStatus(host, - stateArray); + stateArray); } /** @@ -1838,8 +1835,8 @@ public class ProcessService { */ public ProcessInstance findLastSchedulerProcessInterval(Long definitionCode, DateInterval dateInterval) { return processInstanceMapper.queryLastSchedulerProcess(definitionCode, - dateInterval.getStartTime(), - dateInterval.getEndTime()); + dateInterval.getStartTime(), + dateInterval.getEndTime()); } /** @@ -1851,8 +1848,8 @@ public class ProcessService { */ public ProcessInstance findLastManualProcessInterval(Long definitionCode, DateInterval dateInterval) { return processInstanceMapper.queryLastManualProcess(definitionCode, - dateInterval.getStartTime(), - dateInterval.getEndTime()); + dateInterval.getStartTime(), + dateInterval.getEndTime()); } /** @@ -1865,9 +1862,9 @@ public class ProcessService { */ public ProcessInstance findLastRunningProcess(Long definitionCode, Date startTime, Date endTime) { return processInstanceMapper.queryLastRunningProcess(definitionCode, - startTime, - endTime, - stateArray); + startTime, + endTime, + stateArray); } /** @@ -2125,10 +2122,10 @@ public class ProcessService { AbstractParameters params = TaskParametersUtils.getParameters(taskDefinition.getTaskType(), taskDefinition.getTaskParams()); if (params != null && CollectionUtils.isNotEmpty(params.getResourceFilesList())) { resourceIds = params.getResourceFilesList(). - stream() - .filter(t -> t.getId() != 0) - .map(ResourceInfo::getId) - .collect(Collectors.toSet()); + stream() + .filter(t -> t.getId() != 0) + .map(ResourceInfo::getId) + .collect(Collectors.toSet()); } if (CollectionUtils.isEmpty(resourceIds)) { return StringUtils.EMPTY; @@ -2189,7 +2186,7 @@ public class ProcessService { public int saveProcessDefinition(User operator, Project project, String name, String desc, String locations, ProcessData processData, ProcessDefinition processDefinition, Boolean isFromProcessDefine) { ProcessDefinitionLog processDefinitionLog = insertProcessDefinitionLog(operator, processDefinition.getCode(), - name, processData, project, desc, locations); + name, processData, project, desc, locations); Map taskDefinitionMap = handleTaskDefinition(operator, project.getCode(), processData.getTasks(), isFromProcessDefine); if (Constants.DEFINITION_FAILURE == handleTaskRelation(operator, project.getCode(), processDefinitionLog, processData.getTasks(), taskDefinitionMap)) { return Constants.DEFINITION_FAILURE; @@ -2289,33 +2286,33 @@ public class ProcessService { if (CollectionUtils.isNotEmpty(depList)) { for (String preTaskName : depList) { builderRelationList.add(new ProcessTaskRelation( - StringUtils.EMPTY, - processDefinition.getVersion(), - projectCode, - processDefinition.getCode(), - taskDefinitionMap.get(preTaskName).getCode(), - taskDefinitionMap.get(preTaskName).getVersion(), - taskDefinitionMap.get(taskNode.getName()).getCode(), - taskDefinitionMap.get(taskNode.getName()).getVersion(), - ConditionType.NONE, - StringUtils.EMPTY, - now, - now)); - } - } else { - builderRelationList.add(new ProcessTaskRelation( StringUtils.EMPTY, processDefinition.getVersion(), projectCode, processDefinition.getCode(), - 0L, // this isn't previous task node, set zero - 0, + taskDefinitionMap.get(preTaskName).getCode(), + taskDefinitionMap.get(preTaskName).getVersion(), taskDefinitionMap.get(taskNode.getName()).getCode(), taskDefinitionMap.get(taskNode.getName()).getVersion(), ConditionType.NONE, StringUtils.EMPTY, now, now)); + } + } else { + builderRelationList.add(new ProcessTaskRelation( + StringUtils.EMPTY, + processDefinition.getVersion(), + projectCode, + processDefinition.getCode(), + 0L, // this isn't previous task node, set zero + 0, + taskDefinitionMap.get(taskNode.getName()).getCode(), + taskDefinitionMap.get(taskNode.getName()).getVersion(), + ConditionType.NONE, + StringUtils.EMPTY, + now, + now)); } } for (ProcessTaskRelation processTaskRelation : builderRelationList) { @@ -2354,9 +2351,9 @@ public class ProcessService { List processTaskRelationList = processTaskRelationMapper.queryByTaskCode(taskCode); if (!processTaskRelationList.isEmpty()) { Set processDefinitionCodes = processTaskRelationList - .stream() - .map(ProcessTaskRelation::getProcessDefinitionCode) - .collect(Collectors.toSet()); + .stream() + .map(ProcessTaskRelation::getProcessDefinitionCode) + .collect(Collectors.toSet()); List processDefinitionList = processDefineMapper.queryByCodes(processDefinitionCodes); // check process definition is already online for (ProcessDefinition processDefinition : processDefinitionList) { @@ -2389,6 +2386,10 @@ public class ProcessService { */ public DagData genDagData(ProcessDefinition processDefinition) { List processTaskRelations = processTaskRelationLogMapper.queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion()); + return new DagData(processDefinition, processTaskRelations, genTaskDefineList(processTaskRelations)); + } + + private List genTaskDefineList(List processTaskRelations) { Set taskDefinitionSet = new HashSet<>(); for (ProcessTaskRelationLog processTaskRelation : processTaskRelations) { if (processTaskRelation.getPreTaskCode() > 0) { @@ -2398,8 +2399,7 @@ public class ProcessService { taskDefinitionSet.add(new TaskDefinition(processTaskRelation.getPostTaskCode(), processTaskRelation.getPostTaskVersion())); } } - List taskDefinitionLogs = taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionSet); - return new DagData(processDefinition, processTaskRelations, taskDefinitionLogs); + return taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionSet); } /** @@ -2467,8 +2467,8 @@ public class ProcessService { v.setTaskInstancePriority(taskDefinitionLog.getTaskPriority()); v.setWorkerGroup(taskDefinitionLog.getWorkerGroup()); v.setTimeout(JSONUtils.toJsonString(new TaskTimeoutParameter(taskDefinitionLog.getTimeoutFlag() == TimeoutFlag.OPEN, - taskDefinitionLog.getTimeoutNotifyStrategy(), - taskDefinitionLog.getTimeout()))); + taskDefinitionLog.getTimeoutNotifyStrategy(), + taskDefinitionLog.getTimeout()))); v.setDelayTime(taskDefinitionLog.getDelayTime()); v.getPreTaskNodeList().forEach(task -> task.setName(taskDefinitionLogMap.get(task.getCode()).getName())); v.setPreTasks(JSONUtils.toJsonString(v.getPreTaskNodeList().stream().map(PreviousTaskNode::getName).collect(Collectors.toList()))); @@ -2488,7 +2488,7 @@ public class ProcessService { */ public List queryTaskDefinitionListByProcess(long processCode, int processVersion) { List processTaskRelationLogs = - processTaskRelationLogMapper.queryByProcessCodeAndVersion(processCode, processVersion); + processTaskRelationLogMapper.queryByProcessCodeAndVersion(processCode, processVersion); Set taskDefinitionSet = new HashSet<>(); for (ProcessTaskRelationLog processTaskRelationLog : processTaskRelationLogs) { if (processTaskRelationLog.getPreTaskCode() > 0) { @@ -2532,4 +2532,55 @@ public class ProcessService { List relationResources = CollectionUtils.isNotEmpty(relationResourceIds) ? resourceMapper.queryResourceListById(relationResourceIds) : new ArrayList<>(); ownResources.addAll(relationResources); } + + /** + * Use temporarily before refactoring taskNode + */ + public List transformTask(List taskRelationList) { + Map> taskCodeMap = new HashMap<>(); + for (ProcessTaskRelationLog processTaskRelation : taskRelationList) { + taskCodeMap.compute(processTaskRelation.getPostTaskCode(), (k, v) -> { + if (v == null) { + v = new ArrayList<>(); + } + if (processTaskRelation.getPreTaskCode() != 0L) { + v.add(processTaskRelation.getPreTaskCode()); + } + return v; + }); + } + List taskDefinitionLogs = genTaskDefineList(taskRelationList); + Map taskDefinitionLogMap = taskDefinitionLogs.stream() + .collect(Collectors.toMap(TaskDefinitionLog::getCode, taskDefinitionLog -> taskDefinitionLog)); + List taskNodeList = new ArrayList<>(); + for (Entry> code : taskCodeMap.entrySet()) { + TaskDefinitionLog taskDefinitionLog = taskDefinitionLogMap.get(code.getKey()); + if (taskDefinitionLog != null) { + TaskNode taskNode = new TaskNode(); + taskNode.setCode(taskDefinitionLog.getCode()); + taskNode.setVersion(taskDefinitionLog.getVersion()); + taskNode.setName(taskDefinitionLog.getName()); + taskNode.setDesc(taskDefinitionLog.getDescription()); + taskNode.setType(taskDefinitionLog.getTaskType().toUpperCase()); + taskNode.setRunFlag(taskDefinitionLog.getFlag() == Flag.YES ? Constants.FLOWNODE_RUN_FLAG_NORMAL : Constants.FLOWNODE_RUN_FLAG_FORBIDDEN); + taskNode.setMaxRetryTimes(taskDefinitionLog.getFailRetryTimes()); + taskNode.setRetryInterval(taskDefinitionLog.getFailRetryInterval()); + Map taskParamsMap = taskNode.taskParamsToJsonObj(taskDefinitionLog.getTaskParams()); + taskNode.setConditionResult((String) taskParamsMap.get(Constants.CONDITION_RESULT)); + taskNode.setDependence((String) taskParamsMap.get(Constants.DEPENDENCE)); + taskParamsMap.remove(Constants.CONDITION_RESULT); + taskParamsMap.remove(Constants.DEPENDENCE); + taskNode.setParams(JSONUtils.toJsonString(taskParamsMap)); + taskNode.setTaskInstancePriority(taskDefinitionLog.getTaskPriority()); + taskNode.setWorkerGroup(taskDefinitionLog.getWorkerGroup()); + taskNode.setTimeout(JSONUtils.toJsonString(new TaskTimeoutParameter(taskDefinitionLog.getTimeoutFlag() == TimeoutFlag.OPEN, + taskDefinitionLog.getTimeoutNotifyStrategy(), + taskDefinitionLog.getTimeout()))); + taskNode.setDelayTime(taskDefinitionLog.getDelayTime()); + taskNode.setPreTasks(JSONUtils.toJsonString(code.getValue().stream().map(taskDefinitionLogMap::get).map(TaskDefinition::getName).collect(Collectors.toList()))); + taskNodeList.add(taskNode); + } + } + return taskNodeList; + } }