From b1d57dbce491d13d29ca44ebab776c49fa3656c7 Mon Sep 17 00:00:00 2001 From: WangJPLeo <103574007+WangJPLeo@users.noreply.github.com> Date: Mon, 18 Apr 2022 20:27:11 +0800 Subject: [PATCH] Check the status of the child process when the parent process is running (#9567) Co-authored-by: WangJPLeo --- .../dolphinscheduler/api/enums/Status.java | 1 + .../api/service/ExecutorService.java | 7 +++ .../api/service/impl/ExecutorServiceImpl.java | 57 +++++++++++++------ .../api/service/ExecutorServiceTest.java | 8 +++ .../dao/mapper/ProcessTaskRelationMapper.java | 8 +++ .../dao/mapper/ProcessTaskRelationMapper.xml | 7 +++ 6 files changed, 71 insertions(+), 17 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java index 81802d5f9f..4954adf431 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java @@ -242,6 +242,7 @@ public enum Status { PROCESS_INSTANCE_EXIST(50002, "process instance {0} already exists", "工作流实例[{0}]已存在"), PROCESS_DEFINE_NOT_EXIST(50003, "process definition {0} does not exist", "工作流定义[{0}]不存在"), PROCESS_DEFINE_NOT_RELEASE(50004, "process definition {0} not on line", "工作流定义[{0}]不是上线状态"), + SUB_PROCESS_DEFINE_NOT_RELEASE(50004, "exist sub process definition not on line", "存在子工作流定义不是上线状态"), PROCESS_INSTANCE_ALREADY_CHANGED(50005, "the status of process instance {0} is already {1}", "工作流实例[{0}]的状态已经是[{1}]"), PROCESS_INSTANCE_STATE_OPERATION_ERROR(50006, "the status of process instance {0} is {1},Cannot perform {2} operation", "工作流实例[{0}]的状态是[{1}],无法执行[{2}]操作"), SUB_PROCESS_INSTANCE_NOT_EXIST(50007, "the task belong to process instance does not exist", "子工作流实例不存在"), diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java index 1087d595eb..2c9fcff7a9 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java @@ -95,4 +95,11 @@ public interface ExecutorService { * @return check result code */ Map startCheckByProcessDefinedCode(long processDefinitionCode); + + /** + * check if the current process has subprocesses and all subprocesses are valid + * @param processDefinition + * @return check result + */ + boolean checkSubProcessDefinitionValid(ProcessDefinition processDefinition); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java index 4486824314..33cc6a2fbe 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java @@ -44,18 +44,9 @@ import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.dao.entity.Command; -import org.apache.dolphinscheduler.dao.entity.DependentProcessDefinition; -import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; -import org.apache.dolphinscheduler.dao.entity.ProcessInstance; -import org.apache.dolphinscheduler.dao.entity.Project; -import org.apache.dolphinscheduler.dao.entity.Schedule; -import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue; -import org.apache.dolphinscheduler.dao.entity.Tenant; -import org.apache.dolphinscheduler.dao.entity.User; -import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; -import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; -import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; +import org.apache.dolphinscheduler.dao.entity.*; +import org.apache.dolphinscheduler.dao.mapper.*; +import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand; import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService; @@ -67,11 +58,7 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang.StringUtils; -import java.util.ArrayList; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.stream.Collectors; import org.slf4j.Logger; @@ -110,6 +97,12 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ @Autowired StateEventCallbackService stateEventCallbackService; + @Autowired + private TaskDefinitionMapper taskDefinitionMapper; + + @Autowired + private ProcessTaskRelationMapper processTaskRelationMapper; + /** * execute process instance * @@ -226,12 +219,42 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ } else if (processDefinition.getReleaseState() != ReleaseState.ONLINE) { // check process definition online putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefineCode); + } else if (!checkSubProcessDefinitionValid(processDefinition)){ + // check sub process definition online + putMsg(result, Status.SUB_PROCESS_DEFINE_NOT_RELEASE); } else { result.put(Constants.STATUS, Status.SUCCESS); } return result; } + /** + * check if the current process has subprocesses and all subprocesses are valid + * @param processDefinition + * @return check result + */ + @Override + public boolean checkSubProcessDefinitionValid(ProcessDefinition processDefinition) { + // query all subprocesses under the current process + List processTaskRelations = processTaskRelationMapper.queryDownstreamByProcessDefinitionCode(processDefinition.getCode()); + if (processTaskRelations.isEmpty()){ + return true; + } + Set relationCodes = processTaskRelations.stream().map(ProcessTaskRelation::getPostTaskCode).collect(Collectors.toSet()); + List taskDefinitions = taskDefinitionMapper.queryByCodeList(relationCodes); + + // find out the process definition code + Set processDefinitionCodeSet = new HashSet<>(); + taskDefinitions.stream() + .filter(task -> TaskConstants.TASK_TYPE_SUB_PROCESS.equalsIgnoreCase(task.getTaskType())) + .forEach(taskDefinition -> processDefinitionCodeSet.add(Long.valueOf(JSONUtils.getNodeString(taskDefinition.getTaskParams(), Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE)))); + + // check sub releaseState + List processDefinitions = processDefinitionMapper.queryByCodes(processDefinitionCodeSet); + return processDefinitions.stream().filter(definition -> definition.getReleaseState().equals(ReleaseState.OFFLINE)).collect(Collectors.toSet()).isEmpty(); + } + + /** * do action to process instance:pause, stop, repeat, recover from pause, recover from stop * diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java index cc6fc86e25..3f2bd02575 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java @@ -40,7 +40,9 @@ import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; +import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; +import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -78,6 +80,12 @@ public class ExecutorServiceTest { @Mock private ProcessDefinitionMapper processDefinitionMapper; + @Mock + private ProcessTaskRelationMapper processTaskRelationMapper; + + @Mock + private TaskDefinitionMapper taskDefinitionMapper; + @Mock private ProjectMapper projectMapper; diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java index 264c40e080..c14f4b4db8 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java @@ -180,4 +180,12 @@ public interface ProcessTaskRelationMapper extends BaseMapper queryDownstreamByProcessDefinitionCode(@Param("processDefinitionCode") long processDefinitionCode); + + } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml index 4598ec0ead..e25fedefbb 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml @@ -69,6 +69,13 @@ #{relation.conditionType},#{relation.conditionParams},#{relation.createTime},#{relation.updateTime}) +