Browse Source

Check the status of the child process when the parent process is running (#9567)

Co-authored-by: WangJPLeo <wangjipeng@whaleops.com>
3.0.0/version-upgrade
WangJPLeo 3 years ago committed by GitHub
parent
commit
b1d57dbce4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
  2. 7
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
  3. 57
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
  4. 8
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
  5. 8
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
  6. 7
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml

1
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java

@ -242,6 +242,7 @@ public enum Status {
PROCESS_INSTANCE_EXIST(50002, "process instance {0} already exists", "工作流实例[{0}]已存在"),
PROCESS_DEFINE_NOT_EXIST(50003, "process definition {0} does not exist", "工作流定义[{0}]不存在"),
PROCESS_DEFINE_NOT_RELEASE(50004, "process definition {0} not on line", "工作流定义[{0}]不是上线状态"),
SUB_PROCESS_DEFINE_NOT_RELEASE(50004, "exist sub process definition not on line", "存在子工作流定义不是上线状态"),
PROCESS_INSTANCE_ALREADY_CHANGED(50005, "the status of process instance {0} is already {1}", "工作流实例[{0}]的状态已经是[{1}]"),
PROCESS_INSTANCE_STATE_OPERATION_ERROR(50006, "the status of process instance {0} is {1},Cannot perform {2} operation", "工作流实例[{0}]的状态是[{1}],无法执行[{2}]操作"),
SUB_PROCESS_INSTANCE_NOT_EXIST(50007, "the task belong to process instance does not exist", "子工作流实例不存在"),

7
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java

@ -95,4 +95,11 @@ public interface ExecutorService {
* @return check result code
*/
Map<String, Object> startCheckByProcessDefinedCode(long processDefinitionCode);
/**
* check if the current process has subprocesses and all subprocesses are valid
* @param processDefinition
* @return check result
*/
boolean checkSubProcessDefinitionValid(ProcessDefinition processDefinition);
}

57
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java

@ -44,18 +44,9 @@ import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.DependentProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.mapper.*;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
@ -67,11 +58,7 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.stream.Collectors;
import org.slf4j.Logger;
@ -110,6 +97,12 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
@Autowired
StateEventCallbackService stateEventCallbackService;
@Autowired
private TaskDefinitionMapper taskDefinitionMapper;
@Autowired
private ProcessTaskRelationMapper processTaskRelationMapper;
/**
* execute process instance
*
@ -226,12 +219,42 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
} else if (processDefinition.getReleaseState() != ReleaseState.ONLINE) {
// check process definition online
putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefineCode);
} else if (!checkSubProcessDefinitionValid(processDefinition)){
// check sub process definition online
putMsg(result, Status.SUB_PROCESS_DEFINE_NOT_RELEASE);
} else {
result.put(Constants.STATUS, Status.SUCCESS);
}
return result;
}
/**
* check if the current process has subprocesses and all subprocesses are valid
* @param processDefinition
* @return check result
*/
@Override
public boolean checkSubProcessDefinitionValid(ProcessDefinition processDefinition) {
// query all subprocesses under the current process
List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryDownstreamByProcessDefinitionCode(processDefinition.getCode());
if (processTaskRelations.isEmpty()){
return true;
}
Set<Long> relationCodes = processTaskRelations.stream().map(ProcessTaskRelation::getPostTaskCode).collect(Collectors.toSet());
List<TaskDefinition> taskDefinitions = taskDefinitionMapper.queryByCodeList(relationCodes);
// find out the process definition code
Set<Long> processDefinitionCodeSet = new HashSet<>();
taskDefinitions.stream()
.filter(task -> TaskConstants.TASK_TYPE_SUB_PROCESS.equalsIgnoreCase(task.getTaskType()))
.forEach(taskDefinition -> processDefinitionCodeSet.add(Long.valueOf(JSONUtils.getNodeString(taskDefinition.getTaskParams(), Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE))));
// check sub releaseState
List<ProcessDefinition> processDefinitions = processDefinitionMapper.queryByCodes(processDefinitionCodeSet);
return processDefinitions.stream().filter(definition -> definition.getReleaseState().equals(ReleaseState.OFFLINE)).collect(Collectors.toSet()).isEmpty();
}
/**
* do action to process instancepause, stop, repeat, recover from pause, recover from stop
*

8
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java

@ -40,7 +40,9 @@ import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.service.process.ProcessService;
@ -78,6 +80,12 @@ public class ExecutorServiceTest {
@Mock
private ProcessDefinitionMapper processDefinitionMapper;
@Mock
private ProcessTaskRelationMapper processTaskRelationMapper;
@Mock
private TaskDefinitionMapper taskDefinitionMapper;
@Mock
private ProjectMapper projectMapper;

8
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java

@ -180,4 +180,12 @@ public interface ProcessTaskRelationMapper extends BaseMapper<ProcessTaskRelatio
@Param("preTaskCode") long preTaskCode,
@Param("postTaskCode") long postTaskCode);
/**
* query downstream process task relation by processDefinitionCode
* @param processDefinitionCode
* @return ProcessTaskRelation
*/
List<ProcessTaskRelation> queryDownstreamByProcessDefinitionCode(@Param("processDefinitionCode") long processDefinitionCode);
}

7
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml

@ -69,6 +69,13 @@
#{relation.conditionType},#{relation.conditionParams},#{relation.createTime},#{relation.updateTime})
</foreach>
</insert>
<select id="queryDownstreamByProcessDefinitionCode" resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation">
select
<include refid="baseSql"/>
from t_ds_process_task_relation
where process_definition_code = #{processDefinitionCode}
and pre_task_version >= 1
</select>
<select id="queryDownstreamByTaskCode" resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation">
select
<include refid="baseSql"/>

Loading…
Cancel
Save