Browse Source

[fix] delete workflow or task used by other tasks (#10873)

currently, users can delete process definitions used
in other sub-process tasks or in other dependent tasks.
but this change will break the dependence of those task
and failed DAG, this patch add validation of those
delete behavior to avoid this error
k8s/config
Jiajie Zhong 2 years ago committed by GitHub
parent
commit
0db9bbd538
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
  2. 41
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkFlowLineageController.java
  3. 5
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
  4. 4
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
  5. 23
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageService.java
  6. 183
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  7. 44
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java
  8. 117
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
  9. 1
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  10. 109
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskMainInfo.java
  11. 257
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskRecord.java
  12. 41
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java
  13. 82
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml

4
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java

@ -208,7 +208,7 @@ public class ProcessDefinitionController extends BaseController {
}
/**
* update process definition
* update process definition, with whole process definition object including task definition, task relation and location.
*
* @param loginUser login user
* @param projectCode project code
@ -791,7 +791,7 @@ public class ProcessDefinitionController extends BaseController {
}
/**
* update process definition basic info
* update process definition basic info, not including task definition, task relation and location.
*
* @param loginUser login user
* @param projectCode project code

41
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkFlowLineageController.java

@ -18,17 +18,23 @@
package org.apache.dolphinscheduler.api.controller;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_WORKFLOW_LINEAGE_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.TASK_WITH_DEPENDENT_ERROR;
import static org.apache.dolphinscheduler.common.Constants.SESSION_USER;
import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ApiException;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.WorkFlowLineageService;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -36,6 +42,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestAttribute;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
@ -43,6 +50,8 @@ import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import springfox.documentation.annotations.ApiIgnore;
@ -106,4 +115,36 @@ public class WorkFlowLineageController extends BaseController {
return error(QUERY_WORKFLOW_LINEAGE_ERROR.getCode(), QUERY_WORKFLOW_LINEAGE_ERROR.getMsg());
}
}
/**
* Whether task can be deleted or not, avoiding task depend on other task of process definition delete by accident.
*
* @param loginUser login user
* @param projectCode project codes which taskCode belong
* @param processDefinitionCode project code which taskCode belong
* @param taskCode task definition code
* @return Result of task can be delete or not
*/
@ApiOperation(value = "verifyTaskCanDelete", notes = "VERIFY_TASK_CAN_DELETE")
@ApiImplicitParams({
@ApiImplicitParam(name = "projectCode", value = "PROCESS_DEFINITION_NAME", required = true, type = "Long"),
@ApiImplicitParam(name = "processDefinitionCode", value = "PROCESS_DEFINITION_CODE", required = true, type = "processDefinitionCode"),
@ApiImplicitParam(name = "taskCode", value = "TASK_DEFINITION_CODE", required = true, dataType = "Long", example = "123456789"),
})
@PostMapping(value = "/tasks/verify-delete")
@ResponseStatus(HttpStatus.OK)
@ApiException(TASK_WITH_DEPENDENT_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result verifyTaskCanDelete(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam(value = "processDefinitionCode", required = true) long processDefinitionCode,
@RequestParam(value = "taskCode", required = true) long taskCode) {
Result result = new Result();
Optional<String> taskDepMsg = workFlowLineageService.taskDepOnTaskMsg(projectCode, processDefinitionCode, taskCode);
if (taskDepMsg.isPresent()) {
throw new ServiceException(taskDepMsg.get());
}
putMsg(result, Status.SUCCESS);
return result;
}
}

5
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java

@ -193,7 +193,7 @@ public enum Status {
BATCH_MOVE_PROCESS_DEFINITION_ERROR(10160, "batch move process definition error", "移动工作流错误"),
QUERY_WORKFLOW_LINEAGE_ERROR(10161, "query workflow lineage error", "查询血缘失败"),
QUERY_AUTHORIZED_AND_USER_CREATED_PROJECT_ERROR(10162, "query authorized and user created project error error", "查询授权的和用户创建的项目错误"),
DELETE_PROCESS_DEFINITION_BY_CODE_FAIL(10163, "delete process definition by code fail, for there are {0} process instances in executing using it", "删除工作流定义失败,有[{0}]个运行中的工作流实例正在使用"),
DELETE_PROCESS_DEFINITION_EXECUTING_FAIL(10163, "delete process definition by code fail, for there are {0} process instances in executing using it", "删除工作流定义失败,有[{0}]个运行中的工作流实例正在使用"),
CHECK_OS_TENANT_CODE_ERROR(10164, "Tenant code invalid, should follow linux's users naming conventions", "非法的租户名,需要遵守 Linux 用户命名规范"),
FORCE_TASK_SUCCESS_ERROR(10165, "force task success error", "强制成功任务实例错误"),
TASK_INSTANCE_STATE_OPERATION_ERROR(10166, "the status of task instance {0} is {1},Cannot perform force success operation", "任务实例[{0}]的状态是[{1}],无法执行强制成功操作"),
@ -217,6 +217,9 @@ public enum Status {
PROJECT_NOT_EXIST(10190, "This project was not found. Please refresh page.", "该项目不存在,请刷新页面"),
TASK_INSTANCE_HOST_IS_NULL(10191, "task instance host is null", "任务实例host为空"),
QUERY_EXECUTING_WORKFLOW_ERROR(10192, "query executing workflow error", "查询运行的工作流实例错误"),
DELETE_PROCESS_DEFINITION_USE_BY_OTHER_FAIL(10193, "delete process definition fail, cause used by other tasks: {0}", "删除工作流定时失败,被其他任务引用:{0}"),
DELETE_TASK_USE_BY_OTHER_FAIL(10194, "delete task {0} fail, cause used by other tasks: {1}", "删除任务 {0} 失败,被其他任务引用:{1}"),
TASK_WITH_DEPENDENT_ERROR(10195, "task used in other tasks", "删除被其他任务引用"),
UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found", "UDF函数不存在"),
UDF_FUNCTION_EXISTS(20002, "UDF function already exists", "UDF函数已存在"),

4
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java

@ -158,7 +158,7 @@ public interface ProcessDefinitionService {
long targetProjectCode);
/**
* update process definition
* update process definition, with whole process definition object including task definition, task relation and location.
*
* @param loginUser login user
* @param projectCode project code
@ -398,7 +398,7 @@ public interface ProcessDefinitionService {
ProcessExecutionTypeEnum executionType);
/**
* update process definition basic info
* update process definition basic info, not including task definition, task relation and location.
*
* @param loginUser login user
* @param projectCode project code

23
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageService.java

@ -17,7 +17,11 @@
package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.dao.entity.TaskMainInfo;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
/**
* work flow lineage service
@ -29,4 +33,23 @@ public interface WorkFlowLineageService {
Map<String, Object> queryWorkFlowLineageByCode(long projectCode, long workFlowCode);
Map<String, Object> queryWorkFlowLineage(long projectCode);
/**
* Query tasks depend on process definition, include upstream or downstream
*
* @param projectCode Project code want to query tasks dependence
* @param processDefinitionCode Process definition code want to query tasks dependence
* @return Set of TaskMainInfo
*/
Set<TaskMainInfo> queryTaskDepOnProcess(long projectCode, long processDefinitionCode);
/**
* Query and return tasks dependence with string format, is a wrapper of queryTaskDepOnTask and task query method.
*
* @param projectCode Project code want to query tasks dependence
* @param processDefinitionCode Process definition code want to query tasks dependence
* @param taskCode Task code want to query tasks dependence
* @return dependent process definition
*/
Optional<String> taskDepOnTaskMsg(long projectCode, long processDefinitionCode, long taskCode);
}

183
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -17,40 +17,26 @@
package org.apache.dolphinscheduler.api.service.impl;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletResponse;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.Lists;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_MOVE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.VERSION_DELETE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.VERSION_LIST;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_BATCH_COPY;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_CREATE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_DEFINITION;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_DEFINITION_DELETE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_DEFINITION_EXPORT;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_EXPORT;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_IMPORT;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_ONLINE_OFFLINE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_SWITCH_TO_THIS_VERSION;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_TREE_VIEW;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_UPDATE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
import static org.apache.dolphinscheduler.common.Constants.EMPTY_STRING;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.COMPLEX_TASK_TYPES;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SQL;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.api.dto.DagDataSchedule;
import org.apache.dolphinscheduler.api.dto.ScheduleParam;
import org.apache.dolphinscheduler.api.dto.treeview.Instance;
@ -61,6 +47,7 @@ import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
import org.apache.dolphinscheduler.api.service.ProcessInstanceService;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.SchedulerService;
import org.apache.dolphinscheduler.api.service.WorkFlowLineageService;
import org.apache.dolphinscheduler.api.utils.CheckUtils;
import org.apache.dolphinscheduler.api.utils.FileUtils;
import org.apache.dolphinscheduler.api.utils.PageInfo;
@ -96,6 +83,7 @@ import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.TaskMainInfo;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
@ -116,6 +104,36 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
import org.apache.dolphinscheduler.plugin.task.api.parameters.SqlParameters;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -123,25 +141,13 @@ import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.multipart.MultipartFile;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_MOVE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.VERSION_DELETE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.VERSION_LIST;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_BATCH_COPY;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_CREATE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_DEFINITION;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_DEFINITION_DELETE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_DEFINITION_EXPORT;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_EXPORT;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_IMPORT;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_ONLINE_OFFLINE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_SWITCH_TO_THIS_VERSION;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_TREE_VIEW;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_UPDATE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
import static org.apache.dolphinscheduler.common.Constants.EMPTY_STRING;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.COMPLEX_TASK_TYPES;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SQL;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.Lists;
/**
* process definition service impl
@ -204,6 +210,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
@Autowired
private TaskPluginManager taskPluginManager;
@Autowired
private WorkFlowLineageService workFlowLineageService;
/**
* create process definition
*
@ -614,6 +623,31 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return updateDagDefine(loginUser, taskRelationList, processDefinition, processDefinitionDeepCopy, taskDefinitionLogs, otherParamsJson);
}
/**
* Task want to delete whether used in other task, should throw exception when have be used.
*
* This function avoid delete task already dependencies by other tasks by accident.
*
* @param processDefinition ProcessDefinition you change task definition and task relation
* @param taskRelationList All the latest task relation list from process definition
*/
private void taskUsedInOtherTaskValid(ProcessDefinition processDefinition, List<ProcessTaskRelationLog> taskRelationList) {
List<ProcessTaskRelation> oldProcessTaskRelationList = processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode());
Set<ProcessTaskRelationLog> oldProcessTaskRelationSet = oldProcessTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toSet());
StringBuilder sb = new StringBuilder();
for (ProcessTaskRelationLog oldProcessTaskRelation: oldProcessTaskRelationSet) {
boolean oldTaskExists = taskRelationList.stream().anyMatch(relation -> oldProcessTaskRelation.getPostTaskCode() == relation.getPostTaskCode());
if (!oldTaskExists) {
Optional<String> taskDepMsg = workFlowLineageService.taskDepOnTaskMsg(
processDefinition.getProjectCode(), oldProcessTaskRelation.getProcessDefinitionCode(), oldProcessTaskRelation.getPostTaskCode());
taskDepMsg.ifPresent(sb::append);
}
if (sb.length() != 0) {
throw new ServiceException(sb.toString());
}
}
}
protected Map<String, Object> updateDagDefine(User loginUser,
List<ProcessTaskRelationLog> taskRelationList,
ProcessDefinition processDefinition,
@ -656,6 +690,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
}
taskUsedInOtherTaskValid(processDefinition, taskRelationList);
int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(),
processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs, Boolean.TRUE);
if (insertResult == Constants.EXIT_CODE_SUCCESS) {
@ -698,6 +734,35 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return result;
}
/**
* Process definition want to delete whether used in other task, should throw exception when have be used.
*
* This function avoid delete process definition already dependencies by other tasks by accident.
*
* @param processDefinition ProcessDefinition you change task definition and task relation
*/
private void processDefinitionUsedInOtherTaskValid(ProcessDefinition processDefinition) {
// check process definition is already online
if (processDefinition.getReleaseState() == ReleaseState.ONLINE) {
throw new ServiceException(Status.PROCESS_DEFINE_STATE_ONLINE, processDefinition.getName());
}
// check process instances is already running
List<ProcessInstance> processInstances = processInstanceService.queryByProcessDefineCodeAndStatus(processDefinition.getCode(), Constants.NOT_TERMINATED_STATES);
if (CollectionUtils.isNotEmpty(processInstances)) {
throw new ServiceException(Status.DELETE_PROCESS_DEFINITION_EXECUTING_FAIL, processInstances.size());
}
// check process used by other task, including subprocess and dependent task type
Set<TaskMainInfo> taskDepOnProcess = workFlowLineageService.queryTaskDepOnProcess(processDefinition.getProjectCode(), processDefinition.getCode());
if (CollectionUtils.isNotEmpty(taskDepOnProcess)) {
String taskDepDetail = taskDepOnProcess.stream()
.map(task -> String.format(Constants.FORMAT_S_S_COLON, task.getProcessDefinitionName(), task.getTaskName()))
.collect(Collectors.joining(Constants.COMMA));
throw new ServiceException(Status.DELETE_PROCESS_DEFINITION_USE_BY_OTHER_FAIL, taskDepDetail);
}
}
/**
* delete process definition by code
*
@ -727,17 +792,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return result;
}
// check process definition is already online
if (processDefinition.getReleaseState() == ReleaseState.ONLINE) {
putMsg(result, Status.PROCESS_DEFINE_STATE_ONLINE, processDefinition.getName());
return result;
}
// check process instances is already running
List<ProcessInstance> processInstances = processInstanceService.queryByProcessDefineCodeAndStatus(processDefinition.getCode(), Constants.NOT_TERMINATED_STATES);
if (CollectionUtils.isNotEmpty(processInstances)) {
putMsg(result, Status.DELETE_PROCESS_DEFINITION_BY_CODE_FAIL, processInstances.size());
return result;
}
processDefinitionUsedInOtherTaskValid(processDefinition);
// get the timing according to the process definition
Schedule scheduleObj = scheduleMapper.queryByProcessDefinitionCode(code);
@ -946,7 +1001,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
public Map<String, Object> importSqlProcessDefinition(User loginUser, long projectCode, MultipartFile file) {
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByCode(projectCode);
result = projectService.checkProjectAndAuth(loginUser, project, projectCode,WORKFLOW_IMPORT);
result = projectService.checkProjectAndAuth(loginUser, project, projectCode,WORKFLOW_IMPORT);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}

44
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java

@ -28,22 +28,26 @@ import org.apache.dolphinscheduler.dao.entity.ProcessLineage;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.TaskMainInfo;
import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage;
import org.apache.dolphinscheduler.dao.entity.WorkFlowRelation;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper;
import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem;
import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel;
import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@ -66,6 +70,9 @@ public class WorkFlowLineageServiceImpl extends BaseServiceImpl implements WorkF
@Autowired
private TaskDefinitionLogMapper taskDefinitionLogMapper;
@Autowired
private TaskDefinitionMapper taskDefinitionMapper;
@Override
public Map<String, Object> queryWorkFlowLineageByName(long projectCode, String workFlowName) {
Map<String, Object> result = new HashMap<>();
@ -223,4 +230,41 @@ public class WorkFlowLineageServiceImpl extends BaseServiceImpl implements WorkF
}
return sourceWorkFlowCodes;
}
/**
* Query and return tasks dependence with string format, is a wrapper of queryTaskDepOnTask and task query method.
*
* @param projectCode Project code want to query tasks dependence
* @param processDefinitionCode Process definition code want to query tasks dependence
* @param taskCode Task code want to query tasks dependence
* @return Optional of formatter message
*/
@Override
public Optional<String> taskDepOnTaskMsg(long projectCode, long processDefinitionCode, long taskCode) {
List<TaskMainInfo> tasksDep = workFlowLineageMapper.queryTaskDepOnTask(projectCode, processDefinitionCode, taskCode);
if (CollectionUtils.isEmpty(tasksDep)) {
return Optional.empty();
}
String taskDepStr = tasksDep.stream().map(task -> String.format(Constants.FORMAT_S_S_COLON, task.getProcessDefinitionName(), task.getTaskName())).collect(Collectors.joining(Constants.COMMA));
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode);
return Optional.of(MessageFormat.format(Status.DELETE_TASK_USE_BY_OTHER_FAIL.getMsg(), taskDefinition.getName(), taskDepStr));
}
/**
* Query tasks depend on process definition, include upstream or downstream
*
* @param projectCode Project code want to query tasks dependence
* @param processDefinitionCode Process definition code want to query tasks dependence
* @return Set of TaskMainInfo
*/
@Override
public Set<TaskMainInfo> queryTaskDepOnProcess(long projectCode, long processDefinitionCode) {
Set<TaskMainInfo> taskMainInfos = new HashSet<>();
List<TaskMainInfo> taskDependents = workFlowLineageMapper.queryTaskDependentDepOnProcess(projectCode, processDefinitionCode);
List<TaskMainInfo> taskSubProcess = workFlowLineageMapper.queryTaskSubProcessDepOnProcess(projectCode, processDefinitionCode);
taskMainInfos.addAll(taskDependents);
taskMainInfos.addAll(taskSubProcess);
return taskMainInfos;
}
}

117
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java

@ -17,28 +17,19 @@
package org.apache.dolphinscheduler.api.service;
import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
import javax.servlet.http.HttpServletResponse;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_MOVE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_BATCH_COPY;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_CREATE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_DEFINITION;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_DEFINITION_DELETE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_IMPORT;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_TREE_VIEW;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_UPDATE;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.google.common.collect.Lists;
import static org.powermock.api.mockito.PowerMockito.mock;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.impl.ProcessDefinitionServiceImpl;
import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
import org.apache.dolphinscheduler.api.utils.Result;
@ -58,6 +49,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.TaskMainInfo;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
@ -65,28 +57,44 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.commons.lang3.StringUtils;
import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
import javax.servlet.http.HttpServletResponse;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.mock.web.MockMultipartFile;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_MOVE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_BATCH_COPY;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_CREATE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_DEFINITION;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_DEFINITION_DELETE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_IMPORT;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_TREE_VIEW;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_UPDATE;
import static org.powermock.api.mockito.PowerMockito.mock;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
/**
* process definition service test
@ -98,13 +106,13 @@ public class ProcessDefinitionServiceTest {
+ "\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"},{\"name\":\"\",\"preTaskCode\":123456789,"
+ "\"preTaskVersion\":1,\"postTaskCode\":123451234,\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"}]";
private static final String taskDefinitionJson = "[{\"code\":123456789,\"name\":\"test1\",\"version\":1,\"description\":\"\",\"delayTime\":0,\"taskType\":\"SHELL\"," +
"\"taskParams\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo 1\",\"dependence\":{},\"conditionResult\":{\"successNode\":[],\"failedNode\":[]},\"waitStartTimeout\":{}," +
"\"switchResult\":{}},\"flag\":\"YES\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":1,\"timeoutFlag\":\"CLOSE\"," +
"\"timeoutNotifyStrategy\":null,\"timeout\":0,\"environmentCode\":-1},{\"code\":123451234,\"name\":\"test2\",\"version\":1,\"description\":\"\",\"delayTime\":0,\"taskType\":\"SHELL\"," +
"\"taskParams\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo 2\",\"dependence\":{},\"conditionResult\":{\"successNode\":[],\"failedNode\":[]},\"waitStartTimeout\":{}," +
"\"switchResult\":{}},\"flag\":\"YES\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":1,\"timeoutFlag\":\"CLOSE\"," +
"\"timeoutNotifyStrategy\":\"WARN\",\"timeout\":0,\"environmentCode\":-1}]";
private static final String taskDefinitionJson = "[{\"code\":123456789,\"name\":\"test1\",\"version\":1,\"description\":\"\",\"delayTime\":0,\"taskType\":\"SHELL\","
+ "\"taskParams\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo 1\",\"dependence\":{},\"conditionResult\":{\"successNode\":[],\"failedNode\":[]},\"waitStartTimeout\":{},"
+ "\"switchResult\":{}},\"flag\":\"YES\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":1,\"timeoutFlag\":\"CLOSE\","
+ "\"timeoutNotifyStrategy\":null,\"timeout\":0,\"environmentCode\":-1},{\"code\":123451234,\"name\":\"test2\",\"version\":1,\"description\":\"\",\"delayTime\":0,\"taskType\":\"SHELL\","
+ "\"taskParams\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo 2\",\"dependence\":{},\"conditionResult\":{\"successNode\":[],\"failedNode\":[]},\"waitStartTimeout\":{},"
+ "\"switchResult\":{}},\"flag\":\"YES\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":1,\"timeoutFlag\":\"CLOSE\","
+ "\"timeoutNotifyStrategy\":\"WARN\",\"timeout\":0,\"environmentCode\":-1}]";
@InjectMocks
private ProcessDefinitionServiceImpl processDefinitionService;
@ -130,14 +138,15 @@ public class ProcessDefinitionServiceTest {
@Mock
private ProcessInstanceService processInstanceService;
@Mock
private TaskInstanceMapper taskInstanceMapper;
@Mock
private TenantMapper tenantMapper;
@Mock
private DataSourceMapper dataSourceMapper;
@Mock
private WorkFlowLineageService workFlowLineageService;
@Test
public void testQueryProcessDefinitionList() {
long projectCode = 1L;
@ -392,8 +401,9 @@ public class ProcessDefinitionServiceTest {
putMsg(result, Status.SUCCESS, projectCode);
processDefinition.setReleaseState(ReleaseState.ONLINE);
Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
Map<String, Object> dfOnlineRes = processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 46L);
Assert.assertEquals(Status.PROCESS_DEFINE_STATE_ONLINE, dfOnlineRes.get(Constants.STATUS));
Throwable exception = Assertions.assertThrows(ServiceException.class, () -> processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 46L));
String formatter = MessageFormat.format(Status.PROCESS_DEFINE_STATE_ONLINE.getMsg(), processDefinition.getName());
Assertions.assertEquals(formatter, exception.getMessage());
//scheduler list elements > 1
processDefinition.setReleaseState(ReleaseState.OFFLINE);
@ -403,6 +413,7 @@ public class ProcessDefinitionServiceTest {
Mockito.when(scheduleMapper.deleteById(46)).thenReturn(1);
Mockito.when(processDefineMapper.deleteById(processDefinition.getId())).thenReturn(1);
Mockito.when(processTaskRelationMapper.deleteByCode(project.getCode(), processDefinition.getCode())).thenReturn(1);
Mockito.when(workFlowLineageService.queryTaskDepOnProcess(project.getCode(), processDefinition.getCode())).thenReturn(Collections.emptySet());
Map<String, Object> schedulerGreaterThanOneRes = processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 46L);
Assert.assertEquals(Status.SUCCESS, schedulerGreaterThanOneRes.get(Constants.STATUS));
@ -411,15 +422,26 @@ public class ProcessDefinitionServiceTest {
schedule.setReleaseState(ReleaseState.ONLINE);
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(schedule);
Mockito.when(workFlowLineageService.queryTaskDepOnProcess(project.getCode(), processDefinition.getCode())).thenReturn(Collections.emptySet());
Map<String, Object> schedulerOnlineRes = processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 46L);
Assert.assertEquals(Status.SCHEDULE_CRON_STATE_ONLINE, schedulerOnlineRes.get(Constants.STATUS));
//process used by other task, sub process
loginUser.setUserType(UserType.ADMIN_USER);
putMsg(result, Status.SUCCESS, projectCode);
TaskMainInfo taskMainInfo = getTaskMainInfo().get(0);
Mockito.when(workFlowLineageService.queryTaskDepOnProcess(project.getCode(), processDefinition.getCode())).thenReturn(ImmutableSet.copyOf(getTaskMainInfo()));
exception = Assertions.assertThrows(ServiceException.class, () -> processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 46L));
formatter = MessageFormat.format(Status.DELETE_PROCESS_DEFINITION_USE_BY_OTHER_FAIL.getMsg(), String.format("%s:%s", taskMainInfo.getProcessDefinitionName(), taskMainInfo.getTaskName()));
Assertions.assertEquals(formatter, exception.getMessage());
//delete success
schedule.setReleaseState(ReleaseState.OFFLINE);
Mockito.when(processDefineMapper.deleteById(46)).thenReturn(1);
Mockito.when(scheduleMapper.deleteById(schedule.getId())).thenReturn(1);
Mockito.when(processTaskRelationMapper.deleteByCode(project.getCode(), processDefinition.getCode())).thenReturn(1);
Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(getSchedule());
Mockito.when(workFlowLineageService.queryTaskDepOnProcess(project.getCode(), processDefinition.getCode())).thenReturn(Collections.emptySet());
putMsg(result, Status.SUCCESS, projectCode);
Map<String, Object> deleteSuccess = processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 46L);
Assert.assertEquals(Status.SUCCESS, deleteSuccess.get(Constants.STATUS));
@ -812,4 +834,19 @@ public class ProcessDefinitionServiceTest {
result.put(Constants.MSG, status.getMsg());
}
}
/**
* get mock task main info
*
* @return schedule
*/
private List<TaskMainInfo> getTaskMainInfo() {
List<TaskMainInfo> taskMainInfos = new ArrayList<>();
TaskMainInfo taskMainInfo = new TaskMainInfo();
taskMainInfo.setId(1);
taskMainInfo.setProcessDefinitionName("process");
taskMainInfo.setTaskName("task");
taskMainInfos.add(taskMainInfo);
return taskMainInfos;
}
}

1
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -52,6 +52,7 @@ public final class Constants {
public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS = "/lock/failover/startup-masters";
public static final String FORMAT_SS = "%s%s";
public static final String FORMAT_S_S = "%s/%s";
public static final String FORMAT_S_S_COLON = "%s:%s";
public static final String FOLDER_SEPARATOR = "/";
public static final String RESOURCE_TYPE_FILE = "resources";

109
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskMainInfo.java

@ -22,11 +22,16 @@ import org.apache.dolphinscheduler.common.enums.ReleaseState;
import java.util.Date;
import java.util.Map;
import lombok.Data;
/**
* task main info
*/
@Data
public class TaskMainInfo {
private long id;
/**
* task name
*/
@ -91,108 +96,4 @@ public class TaskMainInfo {
* upstreamTaskName
*/
private String upstreamTaskName;
public String getTaskName() {
return taskName;
}
public void setTaskName(String taskName) {
this.taskName = taskName;
}
public long getTaskCode() {
return taskCode;
}
public void setTaskCode(long taskCode) {
this.taskCode = taskCode;
}
public int getTaskVersion() {
return taskVersion;
}
public void setTaskVersion(int taskVersion) {
this.taskVersion = taskVersion;
}
public String getTaskType() {
return taskType;
}
public void setTaskType(String taskType) {
this.taskType = taskType;
}
public Date getTaskCreateTime() {
return taskCreateTime;
}
public void setTaskCreateTime(Date taskCreateTime) {
this.taskCreateTime = taskCreateTime;
}
public Date getTaskUpdateTime() {
return taskUpdateTime;
}
public void setTaskUpdateTime(Date taskUpdateTime) {
this.taskUpdateTime = taskUpdateTime;
}
public long getProcessDefinitionCode() {
return processDefinitionCode;
}
public void setProcessDefinitionCode(long processDefinitionCode) {
this.processDefinitionCode = processDefinitionCode;
}
public int getProcessDefinitionVersion() {
return processDefinitionVersion;
}
public void setProcessDefinitionVersion(int processDefinitionVersion) {
this.processDefinitionVersion = processDefinitionVersion;
}
public String getProcessDefinitionName() {
return processDefinitionName;
}
public void setProcessDefinitionName(String processDefinitionName) {
this.processDefinitionName = processDefinitionName;
}
public ReleaseState getProcessReleaseState() {
return processReleaseState;
}
public void setProcessReleaseState(ReleaseState processReleaseState) {
this.processReleaseState = processReleaseState;
}
public Map<Long, String> getUpstreamTaskMap() {
return upstreamTaskMap;
}
public void setUpstreamTaskMap(Map<Long, String> upstreamTaskMap) {
this.upstreamTaskMap = upstreamTaskMap;
}
public long getUpstreamTaskCode() {
return upstreamTaskCode;
}
public void setUpstreamTaskCode(long upstreamTaskCode) {
this.upstreamTaskCode = upstreamTaskCode;
}
public String getUpstreamTaskName() {
return upstreamTaskName;
}
public void setUpstreamTaskName(String upstreamTaskName) {
this.upstreamTaskName = upstreamTaskName;
}
}

257
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskRecord.java

@ -1,257 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.entity;
import java.util.Date;
/**
* task record for qianfan
*/
public class TaskRecord {
/**
* id
*/
private int id;
/**
* process id
*/
private int procId;
/**
* procedure name
*/
private String procName;
/**
* procedure date
*/
private String procDate;
/**
* start date
*/
private Date startTime;
/**
* end date
*/
private Date endTime;
/**
* result
*/
private String result;
/**
* duration unit: second
*/
private int duration;
/**
* note
*/
private String note;
/**
* schema
*/
private String schema;
/**
* job id
*/
private String jobId;
/**
* source tab
*/
private String sourceTab;
/**
* source row count
*/
private Long sourceRowCount;
/**
* target tab
*/
private String targetTab;
/**
* target row count
*/
private Long targetRowCount;
/**
* error code
*/
private String errorCode;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public int getProcId() {
return procId;
}
public void setProcId(int procId) {
this.procId = procId;
}
public String getProcName() {
return procName;
}
public void setProcName(String procName) {
this.procName = procName;
}
public String getProcDate() {
return procDate;
}
public void setProcDate(String procDate) {
this.procDate = procDate;
}
public Date getStartTime() {
return startTime;
}
public void setStartTime(Date startTime) {
this.startTime = startTime;
}
public Date getEndTime() {
return endTime;
}
public void setEndTime(Date endTime) {
this.endTime = endTime;
}
public String getResult() {
return result;
}
public void setResult(String result) {
this.result = result;
}
public int getDuration() {
return duration;
}
public void setDuration(int duration) {
this.duration = duration;
}
public String getNote() {
return note;
}
public void setNote(String note) {
this.note = note;
}
public String getSchema() {
return schema;
}
public void setSchema(String schema) {
this.schema = schema;
}
public String getJobId() {
return jobId;
}
public void setJobId(String jobId) {
this.jobId = jobId;
}
public String getSourceTab() {
return sourceTab;
}
public void setSourceTab(String sourceTab) {
this.sourceTab = sourceTab;
}
public Long getSourceRowCount() {
return sourceRowCount;
}
public void setSourceRowCount(Long sourceRowCount) {
this.sourceRowCount = sourceRowCount;
}
public String getTargetTab() {
return targetTab;
}
public void setTargetTab(String targetTab) {
this.targetTab = targetTab;
}
public Long getTargetRowCount() {
return targetRowCount;
}
public void setTargetRowCount(Long targetRowCount) {
this.targetRowCount = targetRowCount;
}
public String getErrorCode() {
return errorCode;
}
public void setErrorCode(String errorCode) {
this.errorCode = errorCode;
}
@Override
public String toString() {
return "task record, id:" + id
+ " proc id:" + procId
+ " proc name:" + procName
+ " proc date: " + procDate
+ " start date:" + startTime
+ " end date:" + endTime
+ " result : " + result
+ " duration : " + duration
+ " note : " + note
+ " schema : " + schema
+ " job id : " + jobId
+ " source table : " + sourceTab
+ " source row count: " + sourceRowCount
+ " target table : " + targetTab
+ " target row count: " + targetRowCount
+ " error code: " + errorCode
;
}
}

41
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java

@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.dao.entity.DependentProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessLineage;
import org.apache.dolphinscheduler.dao.entity.TaskMainInfo;
import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage;
import org.apache.ibatis.annotations.Param;
@ -103,4 +104,44 @@ public interface WorkFlowLineageMapper {
List<DependentProcessDefinition> queryUpstreamDependentParamsByProcessDefinitionCode(@Param("code") long code,
@Param("taskType") String taskType);
/**
* Query all tasks type sub process depend on process definition.
*
* Query all upstream tasks from task type sub process.
*
* @param projectCode Project code want to query tasks dependence
* @param processDefinitionCode Process definition code want to query tasks dependence
* @return List of TaskMainInfo
*/
List<TaskMainInfo> queryTaskSubProcessDepOnProcess(@Param("projectCode") long projectCode,
@Param("processDefinitionCode") long processDefinitionCode);
/**
* Query all tasks type dependent depend on process definition.
*
* Query all downstream tasks from task type dependent, method `queryTaskDepOnTask` is a proper subset of
* current method `queryTaskDepOnProcess`. Which mean with the same parameter processDefinitionCode, all tasks in
* `queryTaskDepOnTask` are in the result of method `queryTaskDepOnProcess`.
*
* @param projectCode Project code want to query tasks dependence
* @param processDefinitionCode Process definition code want to query tasks dependence
* @return List of TaskMainInfo
*/
List<TaskMainInfo> queryTaskDependentDepOnProcess(@Param("projectCode") long projectCode,
@Param("processDefinitionCode") long processDefinitionCode);
/**
* Query all tasks depend on task, only downstream task support currently(from dependent task type).
*
* In case of dependent task type, method `queryTaskDepOnTask` is a proper subset of `queryTaskDepOnProcess`. Which
* mean with the same processDefinitionCode, all tasks in `queryTaskDepOnTask` are in method `queryTaskDepOnProcess`.
*
* @param projectCode Project code want to query tasks dependence
* @param processDefinitionCode Process definition code want to query tasks dependence
* @param taskCode Task code want to query tasks dependence
* @return dependent process definition
*/
List<TaskMainInfo> queryTaskDepOnTask(@Param("projectCode") long projectCode,
@Param("processDefinitionCode") long processDefinitionCode,
@Param("taskCode") long taskCode);
}

82
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml

@ -161,4 +161,86 @@
</if>
AND a.task_type = 'DEPENDENT'
</select>
<select id="queryTaskSubProcessDepOnProcess" resultType="org.apache.dolphinscheduler.dao.entity.TaskMainInfo">
select td.id
, td.name as taskName
, td.code as taskCode
, td.version as taskVersion
, td.task_type as taskType
, ptr.process_definition_code as processDefinitionCode
, pd.name as processDefinitionName
, pd.version as processDefinitionVersion
, pd.release_state as processReleaseState
from t_ds_task_definition td
join t_ds_process_task_relation ptr on ptr.post_task_code = td.code and td.version = ptr.post_task_version
join t_ds_process_definition pd on pd.code = ptr.process_definition_code and pd.version = ptr.process_definition_version
<where>
<if test="projectCode != 0">
and ptr.project_code = #{projectCode}
</if>
<!-- ptr.process_definition_code != #{processDefinitionCode} query task not in current workflow -->
<!-- For subprocess task type, using `concat('%"processDefinitionCode":', #{processDefinitionCode}, '%')` -->
<if test="processDefinitionCode != 0">
and td.task_type = 'SUB_PROCESS'
and ptr.process_definition_code != #{processDefinitionCode}
and td.task_params like concat('%"processDefinitionCode":', #{processDefinitionCode}, '%')
</if>
</where>
</select>
<select id="queryTaskDependentDepOnProcess" resultType="org.apache.dolphinscheduler.dao.entity.TaskMainInfo">
select td.id
, td.name as taskName
, td.code as taskCode
, td.version as taskVersion
, td.task_type as taskType
, ptr.process_definition_code as processDefinitionCode
, pd.name as processDefinitionName
, pd.version as processDefinitionVersion
, pd.release_state as processReleaseState
from t_ds_task_definition td
join t_ds_process_task_relation ptr on ptr.post_task_code = td.code and td.version = ptr.post_task_version
join t_ds_process_definition pd on pd.code = ptr.process_definition_code and pd.version = ptr.process_definition_version
<where>
<if test="projectCode != 0">
and ptr.project_code = #{projectCode}
</if>
<!-- ptr.process_definition_code != #{processDefinitionCode} query task not in current workflow -->
<!-- For dependnet task type, using `like concat('%"definitionCode":', #{processDefinitionCode}, '%')` -->
<if test="processDefinitionCode != 0">
and td.task_type = 'DEPENDENT'
and ptr.process_definition_code != #{processDefinitionCode}
and td.task_params like concat('%"definitionCode":', #{processDefinitionCode}, '%')
</if>
</where>
</select>
<select id="queryTaskDepOnTask" resultType="org.apache.dolphinscheduler.dao.entity.TaskMainInfo">
select td.id
, td.name as taskName
, td.code as taskCode
, td.version as taskVersion
, td.task_type as taskType
, ptr.process_definition_code as processDefinitionCode
, pd.name as processDefinitionName
, pd.version as processDefinitionVersion
, pd.release_state as processReleaseState
from t_ds_task_definition td
join t_ds_process_task_relation ptr on ptr.post_task_code = td.code and td.version = ptr.post_task_version
join t_ds_process_definition pd on pd.code = ptr.process_definition_code and pd.version = ptr.process_definition_version
<where>
<if test="projectCode != 0">
and ptr.project_code = #{projectCode}
</if>
<!-- ptr.process_definition_code != #{processDefinitionCode} query task not in current workflow -->
<if test="processDefinitionCode != 0">
and ptr.process_definition_code != #{processDefinitionCode}
and td.task_params like concat('%"definitionCode":', #{processDefinitionCode}, '%')
</if>
<if test="taskCode != 0">
and td.task_params like concat('%"depTaskCode":', #{taskCode}, '%')
</if>
</where>
</select>
</mapper>

Loading…
Cancel
Save