Browse Source

[DSIP-53] Suggest add process lineage parsing for dependent task (#16288)

* add dsip 53
dev
xiangzihao 4 months ago committed by GitHub
parent
commit
761323f3e1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 9
      docs/docs/en/guide/upgrade/upgrade.md
  2. 9
      docs/docs/zh/guide/upgrade/upgrade.md
  3. 4
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
  4. 1
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
  5. 63
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessLineageController.java
  6. 29
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/TaskCodeVersionDto.java
  7. 13
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
  8. 5
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
  9. 40
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessLineageService.java
  10. 8
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
  11. 131
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  12. 7
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
  13. 307
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessLineageServiceImpl.java
  14. 306
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java
  15. 31
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessTaskLineageControllerTest.java
  16. 7
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecuteFunctionServiceTest.java
  17. 22
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
  18. 88
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskLineageServiceTest.java
  19. 8
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java
  20. 34
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentLineageTask.java
  21. 56
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentProcessDefinition.java
  22. 115
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessLineage.java
  23. 48
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskLineage.java
  24. 15
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowLineage.java
  25. 31
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowRelation.java
  26. 36
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowRelationDetail.java
  27. 48
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskLineageMapper.java
  28. 2
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java
  29. 148
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java
  30. 45
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessTaskLineageDao.java
  31. 97
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessTaskLineageDaoImpl.java
  32. 133
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskLineageMapper.xml
  33. 8
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml
  34. 249
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
  35. 22
      dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
  36. 21
      dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
  37. 22
      dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
  38. 35
      dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/mysql/dolphinscheduler_ddl.sql
  39. 16
      dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/mysql/dolphinscheduler_dml.sql
  40. 35
      dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/postgresql/dolphinscheduler_ddl.sql
  41. 16
      dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/postgresql/dolphinscheduler_dml.sql
  42. 51
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskLineageMapperTest.java
  43. 3
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  44. 16
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
  45. 3
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/TaskTypeUtils.java
  46. 31
      dolphinscheduler-tools/src/main/bin/migrate-lineage.sh
  47. 56
      dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/lineage/MigrateLineage.java
  48. 120
      dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/lineage/MigrateLineageService.java
  49. 6
      dolphinscheduler-ui/src/service/modules/lineages/types.ts
  50. 39
      dolphinscheduler-ui/src/views/projects/components/dependencies/use-dependencies.ts
  51. 6
      dolphinscheduler-ui/src/views/projects/workflow/relation/use-relation.ts

9
docs/docs/en/guide/upgrade/upgrade.md

@ -54,6 +54,15 @@ Execution result:
- The original UDF resource `x/y.jar` migrates to `/dolphinscheduler/abc/udf/.migrate/x/y.jar`.
- Update UDF function's bound resource info.
### Upgrade Lineage
Execute script: `sh ./tools/bin/migrate-lineage.sh`.
Execution result:
- Migrate lineage data to new table `t_ds_process_task_lineage`.
- This script only performs upsert operations, not deletes. You can delete it manually if you need to.
### Upgrade Service
#### Change Configuration `bin/env/install_env.sh`

9
docs/docs/zh/guide/upgrade/upgrade.md

@ -53,6 +53,15 @@ jar 包 并添加到 `./tools/libs` 目录下,设置以下环境变量
- 原 UDF 资源 `x/y.jar` 迁移至 `/dolphinscheduler/abc/udf/.migrate/x/y.jar`
- 更新 UDF 函数绑定资源信息。
### 血缘升级
执行脚本:`sh ./tools/bin/migrate-lineage.sh`。
执行结果:
- 原血缘数据迁移至新血缘表 `t_ds_process_task_lineage`
- 此脚本仅执行 upsert 操作,不执行删除操作,如果需要删除,您可以手动删除。
### 服务升级
#### 修改 `bin/env/install_env.sh` 配置内容

4
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java

@ -221,7 +221,6 @@ public class ProcessDefinitionController extends BaseController {
* @param timeout timeout
* @param taskRelationJson relation json for nodes
* @param taskDefinitionJson taskDefinitionJson
* @param otherParamsJson otherParamsJson handle other params
* @return update result code
*/
@Operation(summary = "update", description = "UPDATE_PROCESS_DEFINITION_NOTES")
@ -230,8 +229,7 @@ public class ProcessDefinitionController extends BaseController {
@Parameter(name = "code", description = "PROCESS_DEFINITION_CODE", required = true, schema = @Schema(implementation = long.class, example = "123456789")),
@Parameter(name = "locations", description = "PROCESS_DEFINITION_LOCATIONS", required = true, schema = @Schema(implementation = String.class)),
@Parameter(name = "description", description = "PROCESS_DEFINITION_DESC", required = false, schema = @Schema(implementation = String.class)),
@Parameter(name = "releaseState", description = "RELEASE_PROCESS_DEFINITION_NOTES", required = false, schema = @Schema(implementation = ReleaseState.class)),
@Parameter(name = "otherParamsJson", description = "OTHER_PARAMS_JSON", required = false, schema = @Schema(implementation = String.class))
@Parameter(name = "releaseState", description = "RELEASE_PROCESS_DEFINITION_NOTES", required = false, schema = @Schema(implementation = ReleaseState.class))
})
@PutMapping(value = "/{code}")
@ResponseStatus(HttpStatus.OK)

1
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java

@ -158,7 +158,6 @@ public class ProcessInstanceController extends BaseController {
* @param scheduleTime schedule time
* @param syncDefine sync define
* @param locations locations
* @param tenantCode tenantCode
* @return update result code
*/
@Operation(summary = "updateProcessInstance", description = "UPDATE_PROCESS_INSTANCE_NOTES")

63
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkFlowLineageController.java → dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessLineageController.java

@ -24,13 +24,16 @@ import static org.apache.dolphinscheduler.common.constants.Constants.SESSION_USE
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ApiException;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.WorkFlowLineageService;
import org.apache.dolphinscheduler.api.service.ProcessLineageService;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.dao.entity.DependentLineageTask;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage;
import org.apache.dolphinscheduler.dao.entity.WorkFlowRelationDetail;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -61,21 +64,21 @@ import io.swagger.v3.oas.annotations.tags.Tag;
@RestController
@RequestMapping("projects/{projectCode}/lineages")
@Slf4j
public class WorkFlowLineageController extends BaseController {
public class ProcessLineageController extends BaseController {
@Autowired
private WorkFlowLineageService workFlowLineageService;
private ProcessLineageService processLineageService;
@Operation(summary = "queryLineageByWorkFlowName", description = "QUERY_WORKFLOW_LINEAGE_BY_NAME_NOTES")
@GetMapping(value = "/query-by-name")
@ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_WORKFLOW_LINEAGE_ERROR)
public Result<List<WorkFlowLineage>> queryWorkFlowLineageByName(@Parameter(hidden = true) @RequestAttribute(value = SESSION_USER) User loginUser,
public Result<List<WorkFlowRelationDetail>> queryWorkFlowLineageByName(@Parameter(hidden = true) @RequestAttribute(value = SESSION_USER) User loginUser,
@Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam(value = "workFlowName", required = false) String workFlowName) {
workFlowName = ParameterUtils.handleEscapes(workFlowName);
List<WorkFlowLineage> workFlowLineages =
workFlowLineageService.queryWorkFlowLineageByName(projectCode, workFlowName);
@RequestParam(value = "processDefinitionName", required = false) String processDefinitionName) {
processDefinitionName = ParameterUtils.handleEscapes(processDefinitionName);
List<WorkFlowRelationDetail> workFlowLineages =
processLineageService.queryWorkFlowLineageByName(projectCode, processDefinitionName);
return Result.success(workFlowLineages);
}
@ -85,9 +88,12 @@ public class WorkFlowLineageController extends BaseController {
@ApiException(QUERY_WORKFLOW_LINEAGE_ERROR)
public Result<Map<String, Object>> queryWorkFlowLineageByCode(@Parameter(hidden = true) @RequestAttribute(value = SESSION_USER) User loginUser,
@Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@PathVariable(value = "workFlowCode", required = true) long workFlowCode) {
Map<String, Object> result = workFlowLineageService.queryWorkFlowLineageByCode(projectCode, workFlowCode);
@PathVariable(value = "workFlowCode") long workFlowCode) {
WorkFlowLineage workFlowLineage = processLineageService.queryWorkFlowLineageByCode(projectCode, workFlowCode);
Map<String, Object> result = new HashMap<>();
result.put(Constants.DATA_LIST, workFlowLineage);
return Result.success(result);
}
@Operation(summary = "queryWorkFlowList", description = "QUERY_WORKFLOW_LINEAGE_NOTES")
@ -96,8 +102,10 @@ public class WorkFlowLineageController extends BaseController {
public Result<Map<String, Object>> queryWorkFlowLineage(@Parameter(hidden = true) @RequestAttribute(value = SESSION_USER) User loginUser,
@Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode) {
try {
Map<String, Object> result = workFlowLineageService.queryWorkFlowLineage(projectCode);
return returnDataList(result);
Map<String, Object> result = new HashMap<>();
WorkFlowLineage workFlowLineage = processLineageService.queryWorkFlowLineage(projectCode);
result.put(Constants.DATA_LIST, workFlowLineage);
return Result.success(result);
} catch (Exception e) {
log.error(QUERY_WORKFLOW_LINEAGE_ERROR.getMsg(), e);
return error(QUERY_WORKFLOW_LINEAGE_ERROR.getCode(), QUERY_WORKFLOW_LINEAGE_ERROR.getMsg());
@ -122,13 +130,13 @@ public class WorkFlowLineageController extends BaseController {
@PostMapping(value = "/tasks/verify-delete")
@ResponseStatus(HttpStatus.OK)
@ApiException(TASK_WITH_DEPENDENT_ERROR)
public Result verifyTaskCanDelete(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
public Result<Map<String, Object>> verifyTaskCanDelete(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam(value = "processDefinitionCode", required = true) long processDefinitionCode,
@RequestParam(value = "taskCode", required = true) long taskCode) {
Result result = new Result();
Result<Map<String, Object>> result = new Result<>();
Optional<String> taskDepMsg =
workFlowLineageService.taskDepOnTaskMsg(projectCode, processDefinitionCode, taskCode);
processLineageService.taskDependentMsg(projectCode, processDefinitionCode, taskCode);
if (taskDepMsg.isPresent()) {
throw new ServiceException(taskDepMsg.get());
}
@ -136,19 +144,26 @@ public class WorkFlowLineageController extends BaseController {
return result;
}
@Operation(summary = "queryDownstreamDependentTaskList", description = "QUERY_DOWNSTREAM_DEPENDENT_TASK_NOTES")
/**
* Whether task can be deleted or not, avoiding task depend on other task of process definition delete by accident.
*/
@Operation(summary = "verifyTaskCanDelete", description = "VERIFY_TASK_CAN_DELETE")
@Parameters({
@Parameter(name = "workFlowCode", description = "PROCESS_DEFINITION_CODE", required = true, schema = @Schema(implementation = Long.class)),
@Parameter(name = "taskCode", description = "TASK_DEFINITION_CODE", required = false, schema = @Schema(implementation = Long.class, example = "123456789")),
@Parameter(name = "projectCode", description = "PROCESS_DEFINITION_NAME", required = true, schema = @Schema(implementation = long.class)),
@Parameter(name = "workFlowCode", description = "PROCESS_DEFINITION_CODE", required = true, schema = @Schema(implementation = long.class)),
})
@GetMapping(value = "/query-dependent-tasks")
@ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_WORKFLOW_LINEAGE_ERROR)
public Result<Map<String, Object>> queryDownstreamDependentTaskList(@Parameter(hidden = true) @RequestAttribute(value = SESSION_USER) User loginUser,
@RequestParam(value = "workFlowCode") Long workFlowCode,
@RequestParam(value = "taskCode", required = false, defaultValue = "0") Long taskCode) {
Map<String, Object> result =
workFlowLineageService.queryDownstreamDependentTasks(workFlowCode, taskCode);
return returnDataList(result);
public Result<Map<String, Object>> queryDependentTasks(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam(value = "workFlowCode") long workFlowCode,
@RequestParam(value = "taskCode", required = false) Long taskCode) {
Map<String, Object> result = new HashMap<>();
List<DependentLineageTask> dependentLineageTaskList =
processLineageService.queryDependentProcessDefinitions(projectCode, workFlowCode, taskCode);
result.put(Constants.DATA_LIST, dependentLineageTaskList);
return Result.success(result);
}
}

29
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/TaskCodeVersionDto.java

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.dto;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
public class TaskCodeVersionDto {
private long code;
private int version;
}

13
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java

@ -20,6 +20,8 @@ package org.apache.dolphinscheduler.api.enums;
import java.util.Locale;
import java.util.Optional;
import lombok.Getter;
import org.springframework.context.i18n.LocaleContextHolder;
/**
@ -238,7 +240,7 @@ public enum Status {
QUERY_EXECUTING_WORKFLOW_ERROR(10192, "query executing workflow error", "查询运行的工作流实例错误"),
DELETE_PROCESS_DEFINITION_USE_BY_OTHER_FAIL(10193, "delete process definition fail, cause used by other tasks: {0}",
"删除工作流定时失败,被其他任务引用:{0}"),
DELETE_TASK_USE_BY_OTHER_FAIL(10194, "delete task {0} fail, cause used by other tasks: {1}",
DELETE_TASK_USE_BY_OTHER_FAIL(10194, "delete task {0} fail, the reason is that used by other tasks: {1}",
"删除任务 {0} 失败,被其他任务引用:{1}"),
TASK_WITH_DEPENDENT_ERROR(10195, "task used in other tasks", "删除被其他任务引用"),
TASK_SAVEPOINT_ERROR(10196, "task savepoint error", "任务实例savepoint错误"),
@ -585,7 +587,12 @@ public enum Status {
USED_WORKER_GROUP_EXISTS(1402004,
"You can not reassign worker groups to the project, cause these worker groups {0} are already used.",
"Worker组{0}被项目中任务或定时引用,无法重新分配"),
CREATE_PROCESS_LINEAGE_ERROR(1403001, "create process lineage error", "创建工作流血缘错误"),
UPDATE_PROCESS_LINEAGE_ERROR(1403002, "update process lineage error", "更新工作流血缘错误"),
DELETE_PROCESS_LINEAGE_ERROR(1403003, "delete process lineage error", "删除工作流血缘错误"),
;
@Getter
private final int code;
private final String enMsg;
private final String zhMsg;
@ -596,10 +603,6 @@ public enum Status {
this.zhMsg = zhMsg;
}
public int getCode() {
return this.code;
}
public String getMsg() {
if (Locale.SIMPLIFIED_CHINESE.getLanguage().equals(LocaleContextHolder.getLocale().getLanguage())) {
return this.zhMsg;

5
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java

@ -427,4 +427,9 @@ public interface ProcessDefinitionService {
* @return variables data
*/
Map<String, Object> viewVariables(User loginUser, long projectCode, long code);
void saveProcessLineage(long projectCode,
long processDefinitionCode,
int processDefinitionVersion,
List<TaskDefinitionLog> taskDefinitionLogList);
}

40
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageService.java → dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessLineageService.java

@ -17,42 +17,33 @@
package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.dao.entity.TaskMainInfo;
import org.apache.dolphinscheduler.dao.entity.DependentLineageTask;
import org.apache.dolphinscheduler.dao.entity.DependentProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskLineage;
import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage;
import org.apache.dolphinscheduler.dao.entity.WorkFlowRelationDetail;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
/**
* work flow lineage service
*/
public interface WorkFlowLineageService {
public interface ProcessLineageService {
List<WorkFlowLineage> queryWorkFlowLineageByName(long projectCode, String workFlowName);
List<WorkFlowRelationDetail> queryWorkFlowLineageByName(long projectCode, String processDefinitionName);
Map<String, Object> queryWorkFlowLineageByCode(long projectCode, long workFlowCode);
WorkFlowLineage queryWorkFlowLineageByCode(long projectCode, long processDefinitionCode);
Map<String, Object> queryWorkFlowLineage(long projectCode);
/**
* Query tasks depend on process definition, include upstream or downstream
*
* @param projectCode Project code want to query tasks dependence
* @param processDefinitionCode Process definition code want to query tasks dependence
* @return Set of TaskMainInfo
*/
Set<TaskMainInfo> queryTaskDepOnProcess(long projectCode, long processDefinitionCode);
WorkFlowLineage queryWorkFlowLineage(long projectCode);
/**
* Query downstream tasks depend on a process definition or a task
*
* @param processDefinitionCode Process definition code want to query tasks dependence
* @param taskCode Task code want to query tasks dependence
* @return downstream dependent tasks
* @return downstream dependent process definition list
*/
Map<String, Object> queryDownstreamDependentTasks(Long processDefinitionCode, Long taskCode);
List<DependentProcessDefinition> queryDownstreamDependentProcessDefinitions(Long processDefinitionCode);
/**
* Query and return tasks dependence with string format, is a wrapper of queryTaskDepOnTask and task query method.
@ -62,5 +53,14 @@ public interface WorkFlowLineageService {
* @param taskCode Task code want to query tasks dependence
* @return dependent process definition
*/
Optional<String> taskDepOnTaskMsg(long projectCode, long processDefinitionCode, long taskCode);
Optional<String> taskDependentMsg(long projectCode, long processDefinitionCode, long taskCode);
List<DependentLineageTask> queryDependentProcessDefinitions(long projectCode, long processDefinitionCode,
Long taskCode);
int createProcessLineage(List<ProcessTaskLineage> processTaskLineages);
int updateProcessLineage(List<ProcessTaskLineage> processTaskLineages);
int deleteProcessLineage(List<Long> processDefinitionCodes);
}

8
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java

@ -41,6 +41,7 @@ import org.apache.dolphinscheduler.api.executor.ExecuteContext;
import org.apache.dolphinscheduler.api.service.ExecutorService;
import org.apache.dolphinscheduler.api.service.MonitorService;
import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
import org.apache.dolphinscheduler.api.service.ProcessLineageService;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.WorkerGroupService;
import org.apache.dolphinscheduler.common.constants.Constants;
@ -186,6 +187,9 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
@Autowired
private TenantMapper tenantMapper;
@Autowired
private ProcessLineageService processLineageService;
/**
* execute process instance
*
@ -999,7 +1003,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
boolean allLevelDependent) {
List<DependentProcessDefinition> dependentProcessDefinitionList =
checkDependentProcessDefinitionValid(
processService.queryDependentProcessDefinitionByProcessDefinitionCode(processDefinitionCode),
processLineageService.queryDownstreamDependentProcessDefinitions(processDefinitionCode),
processDefinitionCycle, workerGroup,
processDefinitionCode);
@ -1013,7 +1017,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
List<DependentProcessDefinition> childDependentList = childList
.stream()
.flatMap(dependentProcessDefinition -> checkDependentProcessDefinitionValid(
processService.queryDependentProcessDefinitionByProcessDefinitionCode(
processLineageService.queryDownstreamDependentProcessDefinitions(
dependentProcessDefinition.getProcessDefinitionCode()),
processDefinitionCycle,
workerGroup,

131
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -42,6 +42,7 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYP
import static org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager.checkTaskParameters;
import org.apache.dolphinscheduler.api.dto.DagDataSchedule;
import org.apache.dolphinscheduler.api.dto.TaskCodeVersionDto;
import org.apache.dolphinscheduler.api.dto.treeview.Instance;
import org.apache.dolphinscheduler.api.dto.treeview.TreeViewDto;
import org.apache.dolphinscheduler.api.dto.workflow.WorkflowCreateRequest;
@ -52,11 +53,11 @@ import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.MetricsCleanUpService;
import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
import org.apache.dolphinscheduler.api.service.ProcessInstanceService;
import org.apache.dolphinscheduler.api.service.ProcessLineageService;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.SchedulerService;
import org.apache.dolphinscheduler.api.service.TaskDefinitionLogService;
import org.apache.dolphinscheduler.api.service.TaskDefinitionService;
import org.apache.dolphinscheduler.api.service.WorkFlowLineageService;
import org.apache.dolphinscheduler.api.utils.CheckUtils;
import org.apache.dolphinscheduler.api.utils.FileUtils;
import org.apache.dolphinscheduler.api.utils.PageInfo;
@ -83,6 +84,7 @@ import org.apache.dolphinscheduler.dao.entity.DependentSimplifyDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskLineage;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.dolphinscheduler.dao.entity.Project;
@ -90,7 +92,6 @@ import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.TaskMainInfo;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.UserWithProcessDefinitionCode;
import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
@ -111,7 +112,10 @@ import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.SqlType;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem;
import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.SqlParameters;
import org.apache.dolphinscheduler.plugin.task.api.utils.TaskTypeUtils;
import org.apache.dolphinscheduler.plugin.task.sql.SqlTaskChannelFactory;
@ -144,6 +148,7 @@ import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
@ -153,6 +158,7 @@ import javax.servlet.http.HttpServletResponse;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.http.MediaType;
@ -239,7 +245,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
private DataSourceMapper dataSourceMapper;
@Autowired
private WorkFlowLineageService workFlowLineageService;
private ProcessLineageService processLineageService;
@Autowired
private MetricsCleanUpService metricsCleanUpService;
@ -407,11 +413,66 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion);
}
saveProcessLineage(processDefinition.getProjectCode(), processDefinition.getCode(),
insertVersion, taskDefinitionLogs);
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, processDefinition);
return result;
}
@Override
public void saveProcessLineage(long projectCode,
long processDefinitionCode,
int processDefinitionVersion,
List<TaskDefinitionLog> taskDefinitionLogList) {
List<ProcessTaskLineage> processTaskLineageList =
generateProcessLineageList(taskDefinitionLogList, processDefinitionCode, processDefinitionVersion);
if (processTaskLineageList.isEmpty()) {
return;
}
int insertProcessLineageResult = processLineageService.updateProcessLineage(processTaskLineageList);
if (insertProcessLineageResult <= 0) {
log.error(
"Save process lineage error, projectCode: {}, processDefinitionCode: {}, processDefinitionVersion: {}",
projectCode, processDefinitionCode, processDefinitionVersion);
throw new ServiceException(Status.CREATE_PROCESS_LINEAGE_ERROR);
} else {
log.info(
"Save process lineage complete, projectCode: {}, processDefinitionCode: {}, processDefinitionVersion: {}",
projectCode, processDefinitionCode, processDefinitionVersion);
}
}
private List<ProcessTaskLineage> generateProcessLineageList(List<TaskDefinitionLog> taskDefinitionLogList,
long processDefinitionCode,
int processDefinitionVersion) {
List<ProcessTaskLineage> processTaskLineageList = new ArrayList<>();
for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogList) {
if (!TaskTypeUtils.isDependentTask(taskDefinitionLog.getTaskType())) {
continue;
}
for (DependentTaskModel dependentTaskModel : JSONUtils
.parseObject(taskDefinitionLog.getTaskParams(), DependentParameters.class)
.getDependence().getDependTaskList()) {
for (DependentItem dependentItem : dependentTaskModel.getDependItemList()) {
ProcessTaskLineage processTaskLineage = new ProcessTaskLineage();
processTaskLineage.setProcessDefinitionCode(processDefinitionCode);
processTaskLineage.setProcessDefinitionVersion(processDefinitionVersion);
processTaskLineage.setTaskDefinitionCode(taskDefinitionLog.getCode());
processTaskLineage.setTaskDefinitionVersion(taskDefinitionLog.getVersion());
processTaskLineage.setDeptProjectCode(taskDefinitionLog.getProjectCode());
processTaskLineage.setDeptProcessDefinitionCode(dependentItem.getDefinitionCode());
processTaskLineage.setDeptTaskDefinitionCode(dependentItem.getDepTaskCode());
processTaskLineageList.add(processTaskLineage);
}
}
}
return processTaskLineageList;
}
private List<TaskDefinitionLog> generateTaskDefinitionList(String taskDefinitionJson) {
try {
List<TaskDefinitionLog> taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class);
@ -828,7 +889,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
boolean oldTaskExists = taskRelationList.stream()
.anyMatch(relation -> oldProcessTaskRelation.getPostTaskCode() == relation.getPostTaskCode());
if (!oldTaskExists) {
Optional<String> taskDepMsg = workFlowLineageService.taskDepOnTaskMsg(
Optional<String> taskDepMsg = processLineageService.taskDependentMsg(
processDefinition.getProjectCode(), oldProcessTaskRelation.getProcessDefinitionCode(),
oldProcessTaskRelation.getPostTaskCode());
taskDepMsg.ifPresent(sb::append);
@ -908,6 +969,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
}
saveProcessLineage(processDefinition.getProjectCode(), processDefinition.getCode(),
insertVersion, taskDefinitionLogs);
} else {
log.info(
"Process definition does not need to be updated because there is no change, projectCode:{}, processCode:{}, processVersion:{}.",
@ -1013,14 +1077,13 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
}
// check process used by other task, including subprocess and dependent task type
Set<TaskMainInfo> taskDepOnProcess = workFlowLineageService
.queryTaskDepOnProcess(processDefinition.getProjectCode(), processDefinition.getCode());
if (CollectionUtils.isNotEmpty(taskDepOnProcess)) {
String taskDepDetail = taskDepOnProcess.stream()
.map(task -> String.format(Constants.FORMAT_S_S_COLON, task.getProcessDefinitionName(),
task.getTaskName()))
.collect(Collectors.joining(Constants.COMMA));
throw new ServiceException(Status.DELETE_PROCESS_DEFINITION_USE_BY_OTHER_FAIL, taskDepDetail);
Optional<String> taskDepMsg = processLineageService.taskDependentMsg(processDefinition.getProjectCode(),
processDefinition.getCode(), 0);
if (taskDepMsg.isPresent()) {
String errorMeg = "Process definition cannot be deleted because it has dependent, " + taskDepMsg.get();
log.error(errorMeg);
throw new ServiceException(errorMeg);
}
}
@ -2219,12 +2282,47 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
putMsg(result, Status.SWITCH_PROCESS_DEFINITION_VERSION_ERROR);
throw new ServiceException(Status.SWITCH_PROCESS_DEFINITION_VERSION_ERROR);
}
List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper
.queryProcessTaskRelationsByProcessDefinitionCode(processDefinitionLog.getCode(),
processDefinitionLog.getVersion());
List<TaskCodeVersionDto> taskDefinitionList = getTaskCodeVersionDtos(processTaskRelationList);
List<TaskDefinitionLog> taskDefinitionLogList =
taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionList.stream()
.flatMap(taskCodeVersionDto -> {
TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog();
taskDefinitionLog.setCode(taskCodeVersionDto.getCode());
taskDefinitionLog.setVersion(taskCodeVersionDto.getVersion());
return Stream.of(taskDefinitionLog);
}).collect(Collectors.toList()));
saveProcessLineage(processDefinitionLog.getProjectCode(), processDefinitionLog.getCode(),
processDefinitionLog.getVersion(), taskDefinitionLogList);
log.info("Switch process definition version complete, projectCode:{}, processDefinitionCode:{}, version:{}.",
projectCode, code, version);
putMsg(result, Status.SUCCESS);
return result;
}
private static @NotNull List<TaskCodeVersionDto> getTaskCodeVersionDtos(List<ProcessTaskRelation> processTaskRelationList) {
List<TaskCodeVersionDto> taskDefinitionList = new ArrayList<>();
for (ProcessTaskRelation processTaskRelation : processTaskRelationList) {
if (processTaskRelation.getPreTaskCode() != 0) {
TaskCodeVersionDto taskCodeVersionDto = new TaskCodeVersionDto();
taskCodeVersionDto.setCode(processTaskRelation.getPreTaskCode());
taskCodeVersionDto.setVersion(processTaskRelation.getPreTaskVersion());
taskDefinitionList.add(taskCodeVersionDto);
}
if (processTaskRelation.getPostTaskCode() != 0) {
TaskCodeVersionDto taskCodeVersionDto = new TaskCodeVersionDto();
taskCodeVersionDto.setCode(processTaskRelation.getPostTaskCode());
taskCodeVersionDto.setVersion(processTaskRelation.getPostTaskVersion());
taskDefinitionList.add(taskCodeVersionDto);
}
}
return taskDefinitionList;
}
/**
* check batch operate result
*
@ -2330,7 +2428,14 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
if (deleteLog == 0 || deleteRelationLog == 0) {
throw new ServiceException(Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR);
}
log.info("Delete version: {} of workflow: {}, projectCode:{}.", version, code, projectCode);
log.info("Delete version: {} of workflow: {}, projectCode: {}", version, code, projectCode);
// delete process lineage
int deleteProcessLineageResult = processLineageService.deleteProcessLineage(Collections.singletonList(code));
if (deleteProcessLineageResult <= 0) {
log.error("Delete process lineage by process definition code error, processDefinitionCode: {}", code);
throw new ServiceException(Status.DELETE_PROCESS_LINEAGE_ERROR);
}
}
private void updateWorkflowValid(User user, ProcessDefinition oldProcessDefinition,

7
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java

@ -684,6 +684,13 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
} else
log.info("Update process definition complete, projectCode:{}, processDefinitionName:{}.", projectCode,
processDefinition.getName());
// save process lineage
if (syncDefine) {
processDefinitionService.saveProcessLineage(projectCode, processDefinition.getCode(),
insertVersion, taskDefinitionLogs);
}
int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(),
processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs, syncDefine);
if (insertResult == Constants.EXIT_CODE_SUCCESS) {

307
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessLineageServiceImpl.java

@ -0,0 +1,307 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.service.impl;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.ProcessLineageService;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.dao.entity.DependentLineageTask;
import org.apache.dolphinscheduler.dao.entity.DependentProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskLineage;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage;
import org.apache.dolphinscheduler.dao.entity.WorkFlowRelation;
import org.apache.dolphinscheduler.dao.entity.WorkFlowRelationDetail;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessTaskLineageDao;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
/**
* work flow lineage service impl
*/
@Slf4j
@Service
public class ProcessLineageServiceImpl extends BaseServiceImpl implements ProcessLineageService {
@Autowired
private ProjectMapper projectMapper;
@Autowired
private TaskDefinitionLogMapper taskDefinitionLogMapper;
@Autowired
private TaskDefinitionMapper taskDefinitionMapper;
@Autowired
private ProcessTaskLineageDao processTaskLineageDao;
@Autowired
private ProcessDefinitionMapper processDefinitionMapper;
@Override
public List<WorkFlowRelationDetail> queryWorkFlowLineageByName(long projectCode, String processDefinitionName) {
Project project = projectMapper.queryByCode(projectCode);
if (project == null) {
throw new ServiceException(Status.PROJECT_NOT_FOUND, projectCode);
}
return processTaskLineageDao.queryWorkFlowLineageByName(projectCode, processDefinitionName);
}
@Override
public WorkFlowLineage queryWorkFlowLineageByCode(long projectCode, long processDefinitionCode) {
Project project = projectMapper.queryByCode(projectCode);
if (project == null) {
throw new ServiceException(Status.PROJECT_NOT_FOUND, projectCode);
}
List<ProcessTaskLineage> upstreamProcessTaskLineageList =
processTaskLineageDao.queryByProcessDefinitionCode(processDefinitionCode);
List<ProcessTaskLineage> downstreamProcessTaskLineageList =
processTaskLineageDao.queryWorkFlowLineageByDept(projectCode, processDefinitionCode,
Constants.DEPENDENT_ALL_TASK);
List<ProcessTaskLineage> totalProcessTaskLineageList =
Stream.of(upstreamProcessTaskLineageList, downstreamProcessTaskLineageList)
.flatMap(List::stream)
.collect(Collectors.toList());
List<WorkFlowRelation> workFlowRelationList = getWorkFlowRelations(totalProcessTaskLineageList);
List<WorkFlowRelationDetail> workFlowRelationDetailList =
getWorkflowRelationDetails(totalProcessTaskLineageList.stream()
.flatMap(pl -> {
List<Long> processDefinitionCodes = new ArrayList<>();
processDefinitionCodes.add(pl.getProcessDefinitionCode());
processDefinitionCodes.add(pl.getDeptProcessDefinitionCode());
return processDefinitionCodes.stream();
}).distinct().collect(Collectors.toList()));
WorkFlowLineage workFlowLineage = new WorkFlowLineage();
workFlowLineage.setWorkFlowRelationDetailList(workFlowRelationDetailList);
workFlowLineage.setWorkFlowRelationList(workFlowRelationList);
return workFlowLineage;
}
@Override
public WorkFlowLineage queryWorkFlowLineage(long projectCode) {
Project project = projectMapper.queryByCode(projectCode);
if (project == null) {
throw new ServiceException(Status.PROJECT_NOT_FOUND, projectCode);
}
List<ProcessTaskLineage> processTaskLineageList = processTaskLineageDao.queryByProjectCode(projectCode);
List<WorkFlowRelation> workFlowRelationList = getWorkFlowRelations(processTaskLineageList);
List<WorkFlowRelationDetail> workFlowRelationDetailList =
getWorkflowRelationDetails(processTaskLineageList.stream()
.flatMap(pl -> {
List<Long> processDefinitionCodes = new ArrayList<>();
processDefinitionCodes.add(pl.getProcessDefinitionCode());
processDefinitionCodes.add(pl.getDeptProcessDefinitionCode());
return processDefinitionCodes.stream();
}).distinct().collect(Collectors.toList()));
WorkFlowLineage workFlowLineage = new WorkFlowLineage();
workFlowLineage.setWorkFlowRelationList(workFlowRelationList);
workFlowLineage.setWorkFlowRelationDetailList(workFlowRelationDetailList);
return workFlowLineage;
}
private List<WorkFlowRelation> getWorkFlowRelations(List<ProcessTaskLineage> processTaskLineageList) {
List<WorkFlowRelation> workFlowRelations = new ArrayList<>();
List<Long> processDefinitionCodes = processTaskLineageList.stream()
.map(ProcessTaskLineage::getProcessDefinitionCode).distinct().collect(Collectors.toList());
for (ProcessTaskLineage processTaskLineage : processTaskLineageList) {
workFlowRelations.add(new WorkFlowRelation(processTaskLineage.getDeptProcessDefinitionCode(),
processTaskLineage.getProcessDefinitionCode()));
if (!processDefinitionCodes.contains(processTaskLineage.getDeptProcessDefinitionCode())) {
workFlowRelations.add(new WorkFlowRelation(0, processTaskLineage.getProcessDefinitionCode()));
}
}
return workFlowRelations;
}
private List<WorkFlowRelationDetail> getWorkflowRelationDetails(List<Long> processDefinitionCodes) {
List<WorkFlowRelationDetail> workFlowRelationDetails = new ArrayList<>();
for (Long processDefinitionCode : processDefinitionCodes) {
List<WorkFlowRelationDetail> workFlowRelationDetailList =
processTaskLineageDao.queryWorkFlowLineageByCode(processDefinitionCode);
workFlowRelationDetails.addAll(workFlowRelationDetailList);
}
return workFlowRelationDetails;
}
/**
* Query tasks depend on process definition, include upstream or downstream
* and return tasks dependence with string format.
*
* @param projectCode Project code want to query tasks dependence
* @param processDefinitionCode Process definition code want to query tasks dependence
* @param taskCode Task code want to query tasks dependence
* @return Optional of formatter message
*/
@Override
public Optional<String> taskDependentMsg(long projectCode, long processDefinitionCode, long taskCode) {
long queryTaskCode = 0;
if (taskCode != 0) {
queryTaskCode = taskCode;
}
List<ProcessTaskLineage> dependentProcessList =
processTaskLineageDao.queryWorkFlowLineageByDept(projectCode, processDefinitionCode, queryTaskCode);
if (CollectionUtils.isEmpty(dependentProcessList)) {
return Optional.empty();
}
List<String> taskDepStrList = new ArrayList<>();
for (ProcessTaskLineage processTaskLineage : dependentProcessList) {
ProcessDefinition processDefinition =
processDefinitionMapper.queryByCode(processTaskLineage.getDeptProcessDefinitionCode());
String taskName = "";
if (processTaskLineage.getTaskDefinitionCode() != 0) {
TaskDefinition taskDefinition =
taskDefinitionMapper.queryByCode(processTaskLineage.getTaskDefinitionCode());
taskName = taskDefinition.getName();
}
taskDepStrList.add(String.format(Constants.FORMAT_S_S_COLON, processDefinition.getName(), taskName));
}
String taskDepStr = String.join(Constants.COMMA, taskDepStrList);
if (taskCode != 0) {
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode);
return Optional
.of(MessageFormat.format(Status.DELETE_TASK_USE_BY_OTHER_FAIL.getMsg(), taskDefinition.getName(),
taskDepStr));
} else {
return Optional.of(MessageFormat.format(Status.DELETE_TASK_USE_BY_OTHER_FAIL.getMsg(), "",
taskDepStr));
}
}
/**
* Query downstream tasks depend on a process definition or a task
*
* @param processDefinitionCode Process definition code want to query tasks dependence
* @return downstream dependent process definition list
*/
@Override
public List<DependentProcessDefinition> queryDownstreamDependentProcessDefinitions(Long processDefinitionCode) {
List<DependentProcessDefinition> dependentProcessDefinitionList = new ArrayList<>();
List<ProcessTaskLineage> processTaskLineageList =
processTaskLineageDao.queryWorkFlowLineageByDept(Constants.DEFAULT_PROJECT_CODE, processDefinitionCode,
Constants.DEPENDENT_ALL_TASK);
if (processTaskLineageList.isEmpty()) {
return dependentProcessDefinitionList;
}
List<ProcessDefinition> processDefinitionList =
processDefinitionMapper.queryByCodes(processTaskLineageList.stream()
.map(ProcessTaskLineage::getDeptProcessDefinitionCode).distinct().collect(Collectors.toList()));
List<TaskDefinition> taskDefinitionList = taskDefinitionMapper.queryByCodeList(processTaskLineageList.stream()
.map(ProcessTaskLineage::getDeptTaskDefinitionCode).distinct().collect(Collectors.toList()));
for (TaskDefinition taskDefinition : taskDefinitionList) {
DependentProcessDefinition dependentProcessDefinition = new DependentProcessDefinition();
processTaskLineageList.stream()
.filter(processLineage -> processLineage.getDeptTaskDefinitionCode() == taskDefinition.getCode())
.findFirst()
.ifPresent(processLineage -> {
dependentProcessDefinition
.setProcessDefinitionCode(processLineage.getDeptProcessDefinitionCode());
dependentProcessDefinition.setTaskDefinitionCode(taskDefinition.getCode());
dependentProcessDefinition.setTaskParams(taskDefinition.getTaskParams());
dependentProcessDefinition.setWorkerGroup(taskDefinition.getWorkerGroup());
});
processDefinitionList.stream()
.filter(processDefinition -> processDefinition.getCode() == dependentProcessDefinition
.getProcessDefinitionCode())
.findFirst()
.ifPresent(processDefinition -> {
dependentProcessDefinition.setProcessDefinitionVersion(processDefinition.getVersion());
});
}
return dependentProcessDefinitionList;
}
@Override
public List<DependentLineageTask> queryDependentProcessDefinitions(long projectCode, long processDefinitionCode,
Long taskCode) {
Project project = projectMapper.queryByCode(projectCode);
if (project == null) {
throw new ServiceException(Status.PROJECT_NOT_FOUND, projectCode);
}
List<ProcessTaskLineage> processTaskLineageList = processTaskLineageDao.queryWorkFlowLineageByDept(projectCode,
processDefinitionCode, taskCode == null ? 0 : taskCode);
List<ProcessDefinition> processDefinitionList =
processDefinitionMapper.queryByCodes(processTaskLineageList.stream()
.map(ProcessTaskLineage::getProcessDefinitionCode).distinct().collect(Collectors.toList()));
List<TaskDefinition> taskDefinitionList = taskDefinitionMapper.queryByCodeList(processTaskLineageList.stream()
.map(ProcessTaskLineage::getTaskDefinitionCode).filter(code -> code != 0).distinct()
.collect(Collectors.toList()));
List<DependentLineageTask> dependentLineageTaskList = new ArrayList<>();
for (ProcessTaskLineage processTaskLineage : processTaskLineageList) {
DependentLineageTask dependentLineageTask = new DependentLineageTask();
taskDefinitionList.stream()
.filter(taskDefinition -> taskDefinition.getCode() == processTaskLineage.getTaskDefinitionCode())
.findFirst()
.ifPresent(taskDefinition -> {
dependentLineageTask.setTaskDefinitionCode(taskDefinition.getCode());
dependentLineageTask.setTaskDefinitionName(taskDefinition.getName());
});
processDefinitionList.stream()
.filter(processDefinition -> processDefinition.getCode() == processTaskLineage
.getProcessDefinitionCode())
.findFirst()
.ifPresent(processDefinition -> {
dependentLineageTask.setProcessDefinitionCode(processDefinition.getCode());
dependentLineageTask.setProcessDefinitionName(processDefinition.getName());
dependentLineageTask.setProjectCode(processDefinition.getProjectCode());
});
dependentLineageTaskList.add(dependentLineageTask);
}
return dependentLineageTaskList;
}
@Override
public int createProcessLineage(List<ProcessTaskLineage> processTaskLineages) {
return processTaskLineageDao.batchInsert(processTaskLineages);
}
@Override
public int updateProcessLineage(List<ProcessTaskLineage> processTaskLineages) {
return processTaskLineageDao.updateProcessTaskLineage(processTaskLineages);
}
@Override
public int deleteProcessLineage(List<Long> processDefinitionCodes) {
return processTaskLineageDao.batchDeleteByProcessDefinitionCode(processDefinitionCodes);
}
}

306
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java

@ -1,306 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.service.impl;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.WorkFlowLineageService;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.DependentProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessLineage;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.TaskMainInfo;
import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage;
import org.apache.dolphinscheduler.dao.entity.WorkFlowRelation;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper;
import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem;
import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel;
import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters;
import org.apache.dolphinscheduler.plugin.task.api.utils.TaskTypeUtils;
import org.apache.commons.lang3.StringUtils;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
/**
* work flow lineage service impl
*/
@Slf4j
@Service
public class WorkFlowLineageServiceImpl extends BaseServiceImpl implements WorkFlowLineageService {
@Autowired
private WorkFlowLineageMapper workFlowLineageMapper;
@Autowired
private ProjectMapper projectMapper;
@Autowired
private TaskDefinitionLogMapper taskDefinitionLogMapper;
@Autowired
private TaskDefinitionMapper taskDefinitionMapper;
@Override
public List<WorkFlowLineage> queryWorkFlowLineageByName(long projectCode, String workFlowName) {
Project project = projectMapper.queryByCode(projectCode);
if (project == null) {
throw new ServiceException(Status.PROJECT_NOT_FOUND, projectCode);
}
return workFlowLineageMapper.queryWorkFlowLineageByName(projectCode, workFlowName);
}
@Override
public Map<String, Object> queryWorkFlowLineageByCode(long projectCode, long sourceWorkFlowCode) {
Project project = projectMapper.queryByCode(projectCode);
if (project == null) {
throw new ServiceException(Status.PROJECT_NOT_FOUND, projectCode);
}
List<WorkFlowLineage> workFlowLineages = new ArrayList<>();
Set<WorkFlowRelation> workFlowRelations = new HashSet<>();
recursiveWorkFlow(projectCode, sourceWorkFlowCode, workFlowLineages, workFlowRelations);
Map<String, Object> workFlowLists = new HashMap<>();
// todo: use vo
workFlowLists.put(Constants.WORKFLOW_LIST, workFlowLineages);
workFlowLists.put(Constants.WORKFLOW_RELATION_LIST, workFlowRelations);
return workFlowLists;
}
private void recursiveWorkFlow(long projectCode,
long sourceWorkFlowCode,
List<WorkFlowLineage> workFlowLineages,
Set<WorkFlowRelation> workFlowRelations) {
workFlowLineages.add(workFlowLineageMapper.queryWorkFlowLineageByCode(projectCode, sourceWorkFlowCode));
List<WorkFlowLineage> downStreamWorkFlowLineages =
workFlowLineageMapper.queryDownstreamLineageByProcessDefinitionCode(sourceWorkFlowCode, "DEPENDENT");
workFlowLineages.addAll(downStreamWorkFlowLineages);
downStreamWorkFlowLineages.forEach(workFlowLineage -> workFlowRelations
.add(new WorkFlowRelation(sourceWorkFlowCode, workFlowLineage.getWorkFlowCode())));
List<WorkFlowLineage> upstreamWorkFlowLineages = new ArrayList<>();
getUpstreamLineages(sourceWorkFlowCode, upstreamWorkFlowLineages);
workFlowLineages.addAll(upstreamWorkFlowLineages);
upstreamWorkFlowLineages.forEach(workFlowLineage -> workFlowRelations
.add(new WorkFlowRelation(workFlowLineage.getWorkFlowCode(), sourceWorkFlowCode)));
}
private void getUpstreamLineages(long sourceWorkFlowCode,
List<WorkFlowLineage> upstreamWorkFlowLineages) {
List<DependentProcessDefinition> workFlowDependentDefinitionList =
workFlowLineageMapper.queryUpstreamDependentParamsByProcessDefinitionCode(sourceWorkFlowCode,
"DEPENDENT");
List<Long> upstreamProcessDefinitionCodes = new ArrayList<>();
getProcessDefinitionCodeByDependentDefinitionList(workFlowDependentDefinitionList,
upstreamProcessDefinitionCodes);
if (!upstreamProcessDefinitionCodes.isEmpty()) {
upstreamWorkFlowLineages.addAll(
workFlowLineageMapper.queryWorkFlowLineageByProcessDefinitionCodes(upstreamProcessDefinitionCodes));
}
}
/**
* get dependent process definition code by dependent process definition list
*/
private void getProcessDefinitionCodeByDependentDefinitionList(List<DependentProcessDefinition> dependentDefinitionList,
List<Long> processDefinitionCodes) {
for (DependentProcessDefinition dependentProcessDefinition : dependentDefinitionList) {
for (DependentTaskModel dependentTaskModel : dependentProcessDefinition.getDependentParameters()
.getDependence().getDependTaskList()) {
for (DependentItem dependentItem : dependentTaskModel.getDependItemList()) {
if (!processDefinitionCodes.contains(dependentItem.getDefinitionCode())) {
processDefinitionCodes.add(dependentItem.getDefinitionCode());
}
}
}
}
}
@Override
public Map<String, Object> queryWorkFlowLineage(long projectCode) {
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByCode(projectCode);
if (project == null) {
log.error("Project does not exist, projectCode:{}.", projectCode);
putMsg(result, Status.PROJECT_NOT_FOUND, projectCode);
return result;
}
List<ProcessLineage> processLineages = workFlowLineageMapper.queryProcessLineage(projectCode);
Map<Long, WorkFlowLineage> workFlowLineagesMap = new HashMap<>();
Set<WorkFlowRelation> workFlowRelations = new HashSet<>();
if (!processLineages.isEmpty()) {
List<WorkFlowLineage> workFlowLineages =
workFlowLineageMapper.queryWorkFlowLineageByLineage(processLineages);
workFlowLineagesMap = workFlowLineages.stream()
.collect(Collectors.toMap(WorkFlowLineage::getWorkFlowCode, workFlowLineage -> workFlowLineage));
Map<Long, List<TaskDefinition>> workFlowMap = new HashMap<>();
for (ProcessLineage processLineage : processLineages) {
workFlowMap.compute(processLineage.getProcessDefinitionCode(), (k, v) -> {
if (v == null) {
v = new ArrayList<>();
}
if (processLineage.getPreTaskCode() > 0) {
v.add(new TaskDefinition(processLineage.getPreTaskCode(), processLineage.getPreTaskVersion()));
}
if (processLineage.getPostTaskCode() > 0) {
v.add(new TaskDefinition(processLineage.getPostTaskCode(),
processLineage.getPostTaskVersion()));
}
return v;
});
}
for (Entry<Long, List<TaskDefinition>> workFlow : workFlowMap.entrySet()) {
Set<Long> sourceWorkFlowCodes =
querySourceWorkFlowCodes(projectCode, workFlow.getKey(), workFlow.getValue());
if (sourceWorkFlowCodes.isEmpty()) {
workFlowRelations.add(new WorkFlowRelation(0L, workFlow.getKey()));
} else {
workFlowLineagesMap.get(workFlow.getKey())
.setSourceWorkFlowCode(StringUtils.join(sourceWorkFlowCodes, Constants.COMMA));
sourceWorkFlowCodes
.forEach(code -> workFlowRelations.add(new WorkFlowRelation(code, workFlow.getKey())));
}
}
}
Map<String, Object> workFlowLists = new HashMap<>();
workFlowLists.put(Constants.WORKFLOW_LIST, workFlowLineagesMap.values());
workFlowLists.put(Constants.WORKFLOW_RELATION_LIST, workFlowRelations);
result.put(Constants.DATA_LIST, workFlowLists);
putMsg(result, Status.SUCCESS);
return result;
}
private Set<Long> querySourceWorkFlowCodes(long projectCode, long workFlowCode,
List<TaskDefinition> taskDefinitionList) {
Set<Long> sourceWorkFlowCodes = new HashSet<>();
if (taskDefinitionList == null || taskDefinitionList.isEmpty()) {
return sourceWorkFlowCodes;
}
List<TaskDefinitionLog> taskDefinitionLogs = taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionList);
for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
if (taskDefinitionLog.getProjectCode() == projectCode) {
if (TaskTypeUtils.isDependentTask(taskDefinitionLog.getTaskType())) {
DependentParameters dependentParameters =
JSONUtils.parseObject(taskDefinitionLog.getDependence(), DependentParameters.class);
if (dependentParameters != null) {
List<DependentTaskModel> dependTaskList =
dependentParameters.getDependence().getDependTaskList();
if (!CollectionUtils.isEmpty(dependTaskList)) {
for (DependentTaskModel taskModel : dependTaskList) {
List<DependentItem> dependItemList = taskModel.getDependItemList();
for (DependentItem dependentItem : dependItemList) {
if (dependentItem.getProjectCode() == projectCode
&& dependentItem.getDefinitionCode() != workFlowCode) {
sourceWorkFlowCodes.add(dependentItem.getDefinitionCode());
}
}
}
}
}
}
}
}
return sourceWorkFlowCodes;
}
/**
* Query and return tasks dependence with string format, is a wrapper of queryTaskDepOnTask and task query method.
*
* @param projectCode Project code want to query tasks dependence
* @param processDefinitionCode Process definition code want to query tasks dependence
* @param taskCode Task code want to query tasks dependence
* @return Optional of formatter message
*/
@Override
public Optional<String> taskDepOnTaskMsg(long projectCode, long processDefinitionCode, long taskCode) {
List<TaskMainInfo> tasksDep =
workFlowLineageMapper.queryTaskDepOnTask(projectCode, processDefinitionCode, taskCode);
if (CollectionUtils.isEmpty(tasksDep)) {
return Optional.empty();
}
String taskDepStr = tasksDep.stream().map(
task -> String.format(Constants.FORMAT_S_S_COLON, task.getProcessDefinitionName(), task.getTaskName()))
.collect(Collectors.joining(Constants.COMMA));
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode);
return Optional.of(MessageFormat.format(Status.DELETE_TASK_USE_BY_OTHER_FAIL.getMsg(), taskDefinition.getName(),
taskDepStr));
}
/**
* Query tasks depend on process definition, include upstream or downstream
*
* @param projectCode Project code want to query tasks dependence
* @param processDefinitionCode Process definition code want to query tasks dependence
* @return Set of TaskMainInfo
*/
@Override
public Set<TaskMainInfo> queryTaskDepOnProcess(long projectCode, long processDefinitionCode) {
Set<TaskMainInfo> taskMainInfos = new HashSet<>();
List<TaskMainInfo> taskDependents =
workFlowLineageMapper.queryTaskDependentOnProcess(processDefinitionCode, 0);
List<TaskMainInfo> taskSubProcess =
workFlowLineageMapper.queryTaskSubProcessDepOnProcess(projectCode, processDefinitionCode);
taskMainInfos.addAll(taskDependents);
taskMainInfos.addAll(taskSubProcess);
return taskMainInfos;
}
/**
* Query downstream tasks depend on a process definition or a task
*
* @param processDefinitionCode Process definition code want to query tasks dependence
* @param taskCode Task code want to query tasks dependence
* @return downstream dependent tasks
*/
@Override
public Map<String, Object> queryDownstreamDependentTasks(Long processDefinitionCode, Long taskCode) {
Map<String, Object> result = new HashMap<>();
List<TaskMainInfo> taskDependents =
workFlowLineageMapper.queryTaskDependentOnProcess(processDefinitionCode,
Objects.isNull(taskCode) ? 0 : taskCode.longValue());
result.put(Constants.DATA_LIST, taskDependents);
putMsg(result, Status.SUCCESS);
return result;
}
}

31
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkFlowLineageControllerTest.java → dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessTaskLineageControllerTest.java

@ -20,14 +20,13 @@ package org.apache.dolphinscheduler.api.controller;
import static org.apache.dolphinscheduler.api.AssertionsHelper.assertDoesNotThrow;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.impl.WorkFlowLineageServiceImpl;
import org.apache.dolphinscheduler.api.service.impl.ProcessLineageServiceImpl;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.dao.entity.User;
import java.text.MessageFormat;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.junit.jupiter.api.BeforeEach;
@ -42,13 +41,13 @@ import org.mockito.junit.jupiter.MockitoExtension;
* work flow lineage controller test
*/
@ExtendWith(MockitoExtension.class)
public class WorkFlowLineageControllerTest {
public class ProcessTaskLineageControllerTest {
@InjectMocks
private WorkFlowLineageController workFlowLineageController;
private ProcessLineageController processLineageController;
@Mock
private WorkFlowLineageServiceImpl workFlowLineageService;
private ProcessLineageServiceImpl workFlowLineageService;
protected User user;
@ -76,27 +75,7 @@ public class WorkFlowLineageControllerTest {
String searchVal = "test";
Mockito.when(workFlowLineageService.queryWorkFlowLineageByName(projectCode, searchVal))
.thenReturn(Collections.emptyList());
assertDoesNotThrow(() -> workFlowLineageController.queryWorkFlowLineageByName(user, projectCode, searchVal));
assertDoesNotThrow(() -> processLineageController.queryWorkFlowLineageByName(user, projectCode, searchVal));
}
@Test
public void testQueryWorkFlowLineageByCode() {
long projectCode = 1L;
long code = 1L;
Mockito.when(workFlowLineageService.queryWorkFlowLineageByCode(projectCode, code)).thenReturn(new HashMap<>());
assertDoesNotThrow(() -> workFlowLineageController.queryWorkFlowLineageByCode(user, projectCode, code));
}
@Test
public void testQueryDownstreamDependentTaskList() {
long code = 1L;
long taskCode = 1L;
Map<String, Object> result = new HashMap<>();
result.put(Constants.STATUS, Status.SUCCESS);
Mockito.when(workFlowLineageService.queryDownstreamDependentTasks(code, taskCode))
.thenReturn(result);
assertDoesNotThrow(
() -> workFlowLineageController.queryDownstreamDependentTaskList(user, code, taskCode));
}
}

7
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecuteFunctionServiceTest.java

@ -162,6 +162,9 @@ public class ExecuteFunctionServiceTest {
@Mock
private ProcessDefinitionService processDefinitionService;
@Mock
private ProcessLineageService processLineageService;
private int processDefinitionId = 1;
private int processDefinitionVersion = 1;
@ -358,7 +361,7 @@ public class ExecuteFunctionServiceTest {
dependentProcessDefinition.setWorkerGroup(WorkerGroupUtils.getDefaultWorkerGroup());
dependentProcessDefinition.setTaskParams(
"{\"localParams\":[],\"resourceList\":[],\"dependence\":{\"relation\":\"AND\",\"dependTaskList\":[{\"relation\":\"AND\",\"dependItemList\":[{\"depTaskCode\":2,\"status\":\"SUCCESS\"}]}]},\"conditionResult\":{\"successNode\":[1],\"failedNode\":[1]}}");
Mockito.when(processService.queryDependentProcessDefinitionByProcessDefinitionCode(processDefinitionCode))
Mockito.when(processLineageService.queryDownstreamDependentProcessDefinitions(processDefinitionCode))
.thenReturn(Lists.newArrayList(dependentProcessDefinition));
Map<Long, String> processDefinitionWorkerGroupMap = new HashMap<>();
@ -387,7 +390,7 @@ public class ExecuteFunctionServiceTest {
childDependent.setWorkerGroup(WorkerGroupUtils.getDefaultWorkerGroup());
childDependent.setTaskParams(
"{\"localParams\":[],\"resourceList\":[],\"dependence\":{\"relation\":\"AND\",\"dependTaskList\":[{\"relation\":\"AND\",\"dependItemList\":[{\"depTaskCode\":3,\"status\":\"SUCCESS\"}]}]},\"conditionResult\":{\"successNode\":[1],\"failedNode\":[1]}}");
Mockito.when(processService.queryDependentProcessDefinitionByProcessDefinitionCode(
Mockito.when(processLineageService.queryDownstreamDependentProcessDefinitions(
dependentProcessDefinition.getProcessDefinitionCode())).thenReturn(Lists.newArrayList(childDependent))
.thenReturn(Lists.newArrayList());
int allLevelDependentCount = executorService.createComplementDependentCommand(schedules, command, true);

22
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java

@ -84,7 +84,6 @@ import java.nio.charset.StandardCharsets;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
@ -108,7 +107,6 @@ import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.mock.web.MockMultipartFile;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
@ExtendWith(MockitoExtension.class)
@ -171,7 +169,7 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool {
private DataSourceMapper dataSourceMapper;
@Mock
private WorkFlowLineageService workFlowLineageService;
private ProcessLineageService processLineageService;
@Mock
private MetricsCleanUpService metricsCleanUpService;
@ -524,8 +522,8 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool {
when(processDefinitionDao.queryByCode(46L)).thenReturn(Optional.of(processDefinition));
when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(getSchedule());
when(scheduleMapper.deleteById(46)).thenReturn(1);
when(workFlowLineageService.queryTaskDepOnProcess(project.getCode(), processDefinition.getCode()))
.thenReturn(Collections.emptySet());
when(processLineageService.taskDependentMsg(project.getCode(), processDefinition.getCode(), 0))
.thenReturn(Optional.empty());
processDefinitionService.deleteProcessDefinitionByCode(user, 46L);
Mockito.verify(metricsCleanUpService, times(1)).cleanUpWorkflowMetricsByDefinitionCode(46L);
@ -540,19 +538,17 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool {
// process used by other task, sub process
user.setUserType(UserType.ADMIN_USER);
TaskMainInfo taskMainInfo = getTaskMainInfo().get(0);
when(workFlowLineageService.queryTaskDepOnProcess(project.getCode(), processDefinition.getCode()))
.thenReturn(ImmutableSet.copyOf(getTaskMainInfo()));
when(processLineageService.taskDependentMsg(project.getCode(), processDefinition.getCode(), 0))
.thenReturn(Optional.of(taskMainInfo.getTaskName()));
exception = Assertions.assertThrows(ServiceException.class,
() -> processDefinitionService.deleteProcessDefinitionByCode(user, 46L));
Assertions.assertEquals(Status.DELETE_PROCESS_DEFINITION_USE_BY_OTHER_FAIL.getCode(),
((ServiceException) exception).getCode());
// delete success
schedule.setReleaseState(ReleaseState.OFFLINE);
when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(getSchedule());
when(scheduleMapper.deleteById(schedule.getId())).thenReturn(1);
when(workFlowLineageService.queryTaskDepOnProcess(project.getCode(), processDefinition.getCode()))
.thenReturn(Collections.emptySet());
when(processLineageService.taskDependentMsg(project.getCode(), processDefinition.getCode(), 0))
.thenReturn(Optional.empty());
Assertions.assertDoesNotThrow(() -> processDefinitionService.deleteProcessDefinitionByCode(user, 46L));
Mockito.verify(metricsCleanUpService, times(2)).cleanUpWorkflowMetricsByDefinitionCode(46L);
}
@ -600,8 +596,8 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool {
// delete success
process.setReleaseState(ReleaseState.OFFLINE);
when(processDefinitionDao.queryByCode(processDefinitionCode)).thenReturn(Optional.of(process));
when(workFlowLineageService.queryTaskDepOnProcess(project.getCode(), process.getCode()))
.thenReturn(Collections.emptySet());
when(processLineageService.taskDependentMsg(project.getCode(), process.getCode(), 0))
.thenReturn(Optional.empty());
putMsg(result, Status.SUCCESS, projectCode);
doNothing().when(metricsCleanUpService).cleanUpWorkflowMetricsByDefinitionCode(11L);
Map<String, Object> deleteSuccess =

88
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageServiceTest.java → dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskLineageServiceTest.java

@ -19,23 +19,20 @@ package org.apache.dolphinscheduler.api.service;
import static org.mockito.Mockito.when;
import org.apache.dolphinscheduler.api.service.impl.WorkFlowLineageServiceImpl;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.dao.entity.ProcessLineage;
import org.apache.dolphinscheduler.api.service.impl.ProcessLineageServiceImpl;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskLineage;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage;
import org.apache.dolphinscheduler.dao.entity.WorkFlowRelation;
import org.apache.dolphinscheduler.dao.entity.WorkFlowRelationDetail;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessTaskLineageDao;
import org.apache.commons.collections4.CollectionUtils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@ -49,13 +46,13 @@ import org.mockito.junit.jupiter.MockitoExtension;
* work flow lineage service test
*/
@ExtendWith(MockitoExtension.class)
public class WorkFlowLineageServiceTest {
public class ProcessTaskLineageServiceTest {
@InjectMocks
private WorkFlowLineageServiceImpl workFlowLineageService;
private ProcessLineageServiceImpl processLineageService;
@Mock
private WorkFlowLineageMapper workFlowLineageMapper;
private ProcessTaskLineageDao processTaskLineageDao;
@Mock
private ProjectMapper projectMapper;
@ -83,9 +80,9 @@ public class WorkFlowLineageServiceTest {
Project project = getProject("test");
String name = "test";
when(projectMapper.queryByCode(1L)).thenReturn(project);
when(workFlowLineageMapper.queryWorkFlowLineageByName(Mockito.anyLong(), Mockito.any()))
when(processTaskLineageDao.queryWorkFlowLineageByName(Mockito.anyLong(), Mockito.any()))
.thenReturn(getWorkFlowLineages());
List<WorkFlowLineage> workFlowLineages = workFlowLineageService.queryWorkFlowLineageByName(1L, name);
List<WorkFlowRelationDetail> workFlowLineages = processLineageService.queryWorkFlowLineageByName(1L, name);
Assertions.assertTrue(CollectionUtils.isNotEmpty(workFlowLineages));
}
@ -93,43 +90,44 @@ public class WorkFlowLineageServiceTest {
public void testQueryWorkFlowLineage() {
Project project = getProject("test");
List<ProcessLineage> processLineages = new ArrayList<>();
ProcessLineage processLineage = new ProcessLineage();
processLineage.setPreTaskVersion(1);
processLineage.setPreTaskCode(1L);
processLineage.setPostTaskCode(2L);
processLineage.setPostTaskVersion(1);
processLineage.setProcessDefinitionCode(1111L);
processLineage.setProcessDefinitionVersion(1);
processLineage.setProjectCode(1111L);
processLineages.add(processLineage);
WorkFlowLineage workFlowLineage = new WorkFlowLineage();
workFlowLineage.setSourceWorkFlowCode("");
workFlowLineage.setWorkFlowCode(1111L);
List<WorkFlowLineage> workFlowLineages = new ArrayList<>();
workFlowLineages.add(workFlowLineage);
List<ProcessTaskLineage> processTaskLineages = new ArrayList<>();
ProcessTaskLineage processTaskLineage = new ProcessTaskLineage();
processTaskLineage.setProcessDefinitionCode(1);
processTaskLineage.setProcessDefinitionVersion(1);
processTaskLineage.setTaskDefinitionCode(2L);
processTaskLineage.setTaskDefinitionVersion(1);
processTaskLineage.setDeptProjectCode(1111L);
processTaskLineage.setDeptProcessDefinitionCode(1);
processTaskLineage.setDeptTaskDefinitionCode(1111L);
processTaskLineages.add(processTaskLineage);
List<WorkFlowRelationDetail> workFlowRelationDetailList = new ArrayList<>();
WorkFlowRelationDetail workFlowRelationDetail = new WorkFlowRelationDetail();
workFlowRelationDetail.setWorkFlowCode(processTaskLineage.getProcessDefinitionCode());
workFlowRelationDetail.setWorkFlowName("testProcessDefinitionName");
workFlowRelationDetailList.add(workFlowRelationDetail);
when(projectMapper.queryByCode(1L)).thenReturn(project);
when(workFlowLineageMapper.queryProcessLineage(project.getCode())).thenReturn(processLineages);
when(workFlowLineageMapper.queryWorkFlowLineageByLineage(processLineages)).thenReturn(workFlowLineages);
Map<String, Object> result = workFlowLineageService.queryWorkFlowLineage(1L);
Map<String, Object> workFlowLists = (Map<String, Object>) result.get(Constants.DATA_LIST);
Collection<WorkFlowLineage> workFlowLineageList =
(Collection<WorkFlowLineage>) workFlowLists.get(Constants.WORKFLOW_LIST);
Set<WorkFlowRelation> workFlowRelations =
(Set<WorkFlowRelation>) workFlowLists.get(Constants.WORKFLOW_RELATION_LIST);
Assertions.assertTrue(workFlowLineageList.size() > 0);
Assertions.assertTrue(workFlowRelations.size() > 0);
when(processTaskLineageDao.queryByProjectCode(project.getCode())).thenReturn(processTaskLineages);
when(processTaskLineageDao.queryWorkFlowLineageByCode(processTaskLineage.getProcessDefinitionCode()))
.thenReturn(workFlowRelationDetailList);
WorkFlowLineage workFlowLineage = processLineageService.queryWorkFlowLineage(1L);
List<WorkFlowRelationDetail> workFlowLineageList =
workFlowLineage.getWorkFlowRelationDetailList();
List<WorkFlowRelation> workFlowRelations =
workFlowLineage.getWorkFlowRelationList();
Assertions.assertTrue(!workFlowLineageList.isEmpty());
Assertions.assertTrue(!workFlowRelations.isEmpty());
}
private List<WorkFlowLineage> getWorkFlowLineages() {
List<WorkFlowLineage> workFlowLineages = new ArrayList<>();
WorkFlowLineage workFlowLineage = new WorkFlowLineage();
workFlowLineage.setWorkFlowCode(1);
workFlowLineage.setWorkFlowName("testdag");
workFlowLineages.add(workFlowLineage);
private List<WorkFlowRelationDetail> getWorkFlowLineages() {
List<WorkFlowRelationDetail> workFlowLineages = new ArrayList<>();
WorkFlowRelationDetail workFlowRelationDetail = new WorkFlowRelationDetail();
workFlowRelationDetail.setWorkFlowCode(1);
workFlowRelationDetail.setWorkFlowName("testdag");
workFlowLineages.add(workFlowRelationDetail);
return workFlowLineages;
}

8
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java

@ -482,11 +482,9 @@ public final class Constants {
public static final String TOTAL = "total";
/**
* workflow
*/
public static final String WORKFLOW_LIST = "workFlowList";
public static final String WORKFLOW_RELATION_LIST = "workFlowRelationList";
public static final long DEFAULT_PROJECT_CODE = 0;
public static final long DEPENDENT_ALL_TASK = 0;
/**
* session user

34
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentLineageTask.java

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.entity;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class DependentLineageTask {
private long projectCode;
private long processDefinitionCode;
private String processDefinitionName;
private long taskDefinitionCode;
private String taskDefinitionName;
}

56
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentProcessDefinition.java

@ -25,9 +25,12 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameter
import java.util.List;
import lombok.Data;
/**
* dependent process definition
*/
@Data
public class DependentProcessDefinition {
/**
@ -35,11 +38,6 @@ public class DependentProcessDefinition {
*/
private long processDefinitionCode;
/**
* process definition name
*/
private String processDefinitionName;
/**
* process definition version
**/
@ -106,52 +104,4 @@ public class DependentProcessDefinition {
return JSONUtils.parseObject(taskParams, DependentParameters.class);
}
public String getProcessDefinitionName() {
return this.processDefinitionName;
}
public void setProcessDefinitionName(String name) {
this.processDefinitionName = name;
}
public long getProcessDefinitionCode() {
return this.processDefinitionCode;
}
public void setProcessDefinitionCode(long code) {
this.processDefinitionCode = code;
}
public int getProcessDefinitionVersion() {
return processDefinitionVersion;
}
public void setProcessDefinitionVersion(int processDefinitionVersion) {
this.processDefinitionVersion = processDefinitionVersion;
}
public long getTaskDefinitionCode() {
return this.taskDefinitionCode;
}
public void setTaskDefinitionCode(long code) {
this.taskDefinitionCode = code;
}
public String getTaskParams() {
return this.taskParams;
}
public void setTaskParams(String taskParams) {
this.taskParams = taskParams;
}
public String getWorkerGroup() {
return this.workerGroup;
}
public void setWorkerGroup(String workerGroup) {
this.workerGroup = workerGroup;
}
}

115
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessLineage.java

@ -1,115 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.entity;
/**
* Process lineage
*/
public class ProcessLineage {
/**
* project code
*/
private long projectCode;
/**
* post task code
*/
private long postTaskCode;
/**
* post task version
*/
private int postTaskVersion;
/**
* pre task code
*/
private long preTaskCode;
/**
* pre task version
*/
private int preTaskVersion;
/**
* process definition code
*/
private long processDefinitionCode;
/**
* process definition version
*/
private int processDefinitionVersion;
public long getProjectCode() {
return projectCode;
}
public void setProjectCode(long projectCode) {
this.projectCode = projectCode;
}
public long getPostTaskCode() {
return postTaskCode;
}
public void setPostTaskCode(long postTaskCode) {
this.postTaskCode = postTaskCode;
}
public int getPostTaskVersion() {
return postTaskVersion;
}
public void setPostTaskVersion(int postTaskVersion) {
this.postTaskVersion = postTaskVersion;
}
public long getPreTaskCode() {
return preTaskCode;
}
public void setPreTaskCode(long preTaskCode) {
this.preTaskCode = preTaskCode;
}
public int getPreTaskVersion() {
return preTaskVersion;
}
public void setPreTaskVersion(int preTaskVersion) {
this.preTaskVersion = preTaskVersion;
}
public long getProcessDefinitionCode() {
return processDefinitionCode;
}
public void setProcessDefinitionCode(long processDefinitionCode) {
this.processDefinitionCode = processDefinitionCode;
}
public int getProcessDefinitionVersion() {
return processDefinitionVersion;
}
public void setProcessDefinitionVersion(int processDefinitionVersion) {
this.processDefinitionVersion = processDefinitionVersion;
}
}

48
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskLineage.java

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.entity;
import java.util.Date;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
@Data
@NoArgsConstructor
@AllArgsConstructor
@TableName("t_ds_process_task_lineage")
public class ProcessTaskLineage {
@TableId(value = "id", type = IdType.AUTO)
private Integer id;
private long processDefinitionCode;
private int processDefinitionVersion;
private long taskDefinitionCode;
private int taskDefinitionVersion;
private long deptProjectCode;
private long deptProcessDefinitionCode;
private long deptTaskDefinitionCode;
private Date createTime;
private Date updateTime;
}

15
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowLineage.java

@ -17,20 +17,15 @@
package org.apache.dolphinscheduler.dao.entity;
import java.util.Date;
import java.util.List;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
public class WorkFlowLineage {
private long workFlowCode;
private String workFlowName;
private String workFlowPublishStatus;
private Date scheduleStartTime;
private Date scheduleEndTime;
private String crontab;
private int schedulePublishStatus;
private String sourceWorkFlowCode;
private List<WorkFlowRelation> workFlowRelationList;
private List<WorkFlowRelationDetail> workFlowRelationDetailList;
}

31
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowRelation.java

@ -19,35 +19,18 @@ package org.apache.dolphinscheduler.dao.entity;
import java.util.Objects;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class WorkFlowRelation {
private long sourceWorkFlowCode;
private long targetWorkFlowCode;
public long getSourceWorkFlowCode() {
return sourceWorkFlowCode;
}
public void setSourceWorkFlowCode(long sourceWorkFlowCode) {
this.sourceWorkFlowCode = sourceWorkFlowCode;
}
public long getTargetWorkFlowCode() {
return targetWorkFlowCode;
}
public void setTargetWorkFlowCode(long targetWorkFlowCode) {
this.targetWorkFlowCode = targetWorkFlowCode;
}
public WorkFlowRelation() {
}
public WorkFlowRelation(long sourceWorkFlowCode, long targetWorkFlowCode) {
this.sourceWorkFlowCode = sourceWorkFlowCode;
this.targetWorkFlowCode = targetWorkFlowCode;
}
@Override
public boolean equals(Object o) {
if (this == o) {

36
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowRelationDetail.java

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.entity;
import java.util.Date;
import lombok.Data;
@Data
public class WorkFlowRelationDetail {
private long workFlowCode;
private String workFlowName;
private String workFlowPublishStatus;
private Date scheduleStartTime;
private Date scheduleEndTime;
private String crontab;
private int schedulePublishStatus;
private String sourceWorkFlowCode;
}

48
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskLineageMapper.java

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskLineage;
import org.apache.dolphinscheduler.dao.entity.WorkFlowRelationDetail;
import org.apache.ibatis.annotations.Param;
import java.util.List;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
public interface ProcessTaskLineageMapper extends BaseMapper<ProcessTaskLineage> {
int batchDeleteByProcessDefinitionCode(@Param("processDefinitionCodes") List<Long> processDefinitionCodes);
int batchInsert(@Param("processLineages") List<ProcessTaskLineage> processTaskLineages);
List<ProcessTaskLineage> queryByProjectCode(@Param("projectCode") long projectCode);
List<WorkFlowRelationDetail> queryWorkFlowLineageByCode(@Param("processDefinitionCode") long processDefinitionCode);
List<WorkFlowRelationDetail> queryWorkFlowLineageByName(@Param("projectCode") long projectCode,
@Param("processDefinitionName") String processDefinitionName);
List<ProcessTaskLineage> queryWorkFlowLineageByDept(@Param("deptProjectCode") long deptProjectCode,
@Param("deptProcessDefinitionCode") long deptProcessDefinitionCode,
@Param("deptTaskDefinitionCode") long deptTaskDefinitionCode);
List<ProcessTaskLineage> queryByProcessDefinitionCode(@Param("processDefinitionCode") long processDefinitionCode);
}

2
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java

@ -140,4 +140,6 @@ public interface TaskDefinitionMapper extends BaseMapper<TaskDefinition> {
int deleteByBatchCodes(@Param("taskCodeList") List<Long> taskCodeList);
void deleteByWorkflowDefinitionCodeAndVersion(long workflowDefinitionCode, int workflowDefinitionVersion);
List<TaskDefinition> queryDefinitionsByTaskType(@Param("taskType") String taskType);
}

148
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java

@ -1,148 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.dao.entity.DependentProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessLineage;
import org.apache.dolphinscheduler.dao.entity.TaskMainInfo;
import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage;
import org.apache.ibatis.annotations.Param;
import java.util.List;
public interface WorkFlowLineageMapper {
/**
* queryByName
*
* @param projectCode projectCode
* @param workFlowName workFlowName
* @return WorkFlowLineage list
*/
List<WorkFlowLineage> queryWorkFlowLineageByName(@Param("projectCode") long projectCode,
@Param("workFlowName") String workFlowName);
/**
* queryWorkFlowLineageByCode
*
* @param projectCode projectCode
* @param workFlowCode workFlowCode
* @return WorkFlowLineage
*/
WorkFlowLineage queryWorkFlowLineageByCode(@Param("projectCode") long projectCode,
@Param("workFlowCode") long workFlowCode);
/**
* queryWorkFlowLineageByProcessDefinitionCodes
*
* @param workFlowCodes workFlowCodes
* @return WorkFlowLineage
*/
List<WorkFlowLineage> queryWorkFlowLineageByProcessDefinitionCodes(@Param("workFlowCodes") List<Long> workFlowCodes);
/**
* queryWorkFlowLineageByCode
*
* @param processLineages processLineages
* @return WorkFlowLineage list
*/
List<WorkFlowLineage> queryWorkFlowLineageByLineage(@Param("processLineages") List<ProcessLineage> processLineages);
/**
* queryProcessLineage
*
* @param projectCode projectCode
* @return ProcessLineage list
*/
List<ProcessLineage> queryProcessLineage(@Param("projectCode") long projectCode);
/**
* queryCodeRelation
*
* @param projectCode projectCode
* @param processDefinitionCode processDefinitionCode
* @return ProcessLineage list
*/
List<ProcessLineage> queryProcessLineageByCode(@Param("projectCode") long projectCode,
@Param("processDefinitionCode") long processDefinitionCode);
/**
* query process definition by name
*
* @return dependent process definition
*/
List<DependentProcessDefinition> queryDependentProcessDefinitionByProcessDefinitionCode(@Param("code") long code);
/**
* query downstream work flow lineage by process definition code
*
* @return dependent process definition
*/
List<WorkFlowLineage> queryDownstreamLineageByProcessDefinitionCode(@Param("code") long code,
@Param("taskType") String taskType);
/**
* query upstream work flow dependent task params by process definition code
*
* @return task_params
*/
List<DependentProcessDefinition> queryUpstreamDependentParamsByProcessDefinitionCode(@Param("code") long code,
@Param("taskType") String taskType);
/**
* Query all tasks type sub process depend on process definition.
*
* Query all upstream tasks from task type sub process.
*
* @param projectCode Project code want to query tasks dependence
* @param processDefinitionCode Process definition code want to query tasks dependence
* @return List of TaskMainInfo
*/
List<TaskMainInfo> queryTaskSubProcessDepOnProcess(@Param("projectCode") long projectCode,
@Param("processDefinitionCode") long processDefinitionCode);
/**
* Query all tasks type dependent depend on process definition.
*
* Query all downstream tasks from task type dependent, method `queryTaskDepOnTask` is a proper subset of
* current method `queryTaskDepOnProcess`. Which mean with the same parameter processDefinitionCode, all tasks in
* `queryTaskDepOnTask` are in the result of method `queryTaskDepOnProcess`.
*
* @param processDefinitionCode Process definition code want to query tasks dependence
* @param taskCode Task code want to query tasks dependence
* @return List of TaskMainInfo
*/
List<TaskMainInfo> queryTaskDependentOnProcess(@Param("processDefinitionCode") long processDefinitionCode,
@Param("taskCode") long taskCode);
/**
* Query all tasks depend on task, only downstream task support currently(from dependent task type).
*
* In case of dependent task type, method `queryTaskDepOnTask` is a proper subset of `queryTaskDepOnProcess`. Which
* mean with the same processDefinitionCode, all tasks in `queryTaskDepOnTask` are in method `queryTaskDepOnProcess`.
*
* @param projectCode Project code want to query tasks dependence
* @param processDefinitionCode Process definition code want to query tasks dependence
* @param taskCode Task code want to query tasks dependence
* @return dependent process definition
*/
List<TaskMainInfo> queryTaskDepOnTask(@Param("projectCode") long projectCode,
@Param("processDefinitionCode") long processDefinitionCode,
@Param("taskCode") long taskCode);
}

45
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessTaskLineageDao.java

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.repository;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskLineage;
import org.apache.dolphinscheduler.dao.entity.WorkFlowRelationDetail;
import java.util.List;
public interface ProcessTaskLineageDao extends IDao<ProcessTaskLineage> {
int batchDeleteByProcessDefinitionCode(List<Long> processDefinitionCodes);
int batchInsert(List<ProcessTaskLineage> processTaskLineages);
List<ProcessTaskLineage> queryByProjectCode(long projectCode);
List<WorkFlowRelationDetail> queryWorkFlowLineageByCode(long processDefinitionCode);
List<WorkFlowRelationDetail> queryWorkFlowLineageByName(long projectCode,
String processDefinitionName);
List<ProcessTaskLineage> queryWorkFlowLineageByDept(long deptProjectCode,
long deptProcessDefinitionCode,
long deptTaskDefinitionCode);
List<ProcessTaskLineage> queryByProcessDefinitionCode(long processDefinitionCode);
int updateProcessTaskLineage(List<ProcessTaskLineage> processTaskLineages);
}

97
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessTaskLineageDaoImpl.java

@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.repository.impl;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskLineage;
import org.apache.dolphinscheduler.dao.entity.WorkFlowRelationDetail;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskLineageMapper;
import org.apache.dolphinscheduler.dao.repository.BaseDao;
import org.apache.dolphinscheduler.dao.repository.ProcessTaskLineageDao;
import org.apache.commons.collections4.CollectionUtils;
import java.util.List;
import java.util.stream.Collectors;
import lombok.NonNull;
import org.springframework.stereotype.Repository;
@Repository
public class ProcessTaskLineageDaoImpl extends BaseDao<ProcessTaskLineage, ProcessTaskLineageMapper>
implements
ProcessTaskLineageDao {
public ProcessTaskLineageDaoImpl(@NonNull ProcessTaskLineageMapper processTaskLineageMapper) {
super(processTaskLineageMapper);
}
@Override
public int batchDeleteByProcessDefinitionCode(List<Long> processDefinitionCodes) {
if (CollectionUtils.isEmpty(processDefinitionCodes)) {
return 0;
}
return mybatisMapper.batchDeleteByProcessDefinitionCode(processDefinitionCodes);
}
@Override
public int batchInsert(List<ProcessTaskLineage> processTaskLineages) {
if (CollectionUtils.isEmpty(processTaskLineages)) {
return 0;
}
return mybatisMapper.batchInsert(processTaskLineages);
}
@Override
public List<ProcessTaskLineage> queryByProjectCode(long projectCode) {
return mybatisMapper.queryByProjectCode(projectCode);
}
@Override
public List<WorkFlowRelationDetail> queryWorkFlowLineageByCode(long processDefinitionCode) {
return mybatisMapper.queryWorkFlowLineageByCode(processDefinitionCode);
}
@Override
public List<WorkFlowRelationDetail> queryWorkFlowLineageByName(long projectCode, String processDefinitionName) {
return mybatisMapper.queryWorkFlowLineageByName(projectCode, processDefinitionName);
}
@Override
public List<ProcessTaskLineage> queryWorkFlowLineageByDept(long deptProjectCode, long deptProcessDefinitionCode,
long deptTaskDefinitionCode) {
return mybatisMapper.queryWorkFlowLineageByDept(deptProjectCode, deptProcessDefinitionCode,
deptTaskDefinitionCode);
}
@Override
public List<ProcessTaskLineage> queryByProcessDefinitionCode(long processDefinitionCode) {
return mybatisMapper.queryByProcessDefinitionCode(processDefinitionCode);
}
@Override
public int updateProcessTaskLineage(List<ProcessTaskLineage> processTaskLineages) {
if (CollectionUtils.isEmpty(processTaskLineages)) {
return 0;
}
this.batchDeleteByProcessDefinitionCode(
processTaskLineages.stream().map(ProcessTaskLineage::getProcessDefinitionCode)
.distinct().collect(Collectors.toList()));
return mybatisMapper.batchInsert(processTaskLineages);
}
}

133
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskLineageMapper.xml

@ -0,0 +1,133 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.ProcessTaskLineageMapper">
<sql id="baseSql">
id
, process_definition_code
, process_definition_version
, task_definition_code
, task_definition_version
, dept_project_code
, dept_process_definition_code
, dept_task_definition_code
, create_time
, update_time
</sql>
<delete id="batchDeleteByProcessDefinitionCode">
delete from t_ds_process_task_lineage
where process_definition_code in
<foreach collection="processDefinitionCodes" index="index" item="i" open="(" separator="," close=")">
#{i}
</foreach>
</delete>
<insert id="batchInsert">
insert into t_ds_process_task_lineage (process_definition_code, process_definition_version, task_definition_code,
task_definition_version, dept_project_code, dept_process_definition_code, dept_task_definition_code)
values
<foreach collection="processTaskLineages" item="processTaskLineage" separator=",">
(#{processTaskLineage.processDefinitionCode},#{processTaskLineage.processDefinitionVersion},
#{processTaskLineage.taskDefinitionCode},#{processTaskLineage.taskDefinitionVersion},
#{processTaskLineage.deptProjectCode},#{processTaskLineage.deptProcessDefinitionCode},
#{processTaskLineage.deptTaskDefinitionCode})
</foreach>
</insert>
<select id="queryByProjectCode" resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskLineage">
select
<include refid="baseSql"/>
from
t_ds_process_task_lineage
where process_definition_code in (select code from t_ds_process_definition where project_code = #{projectCode})
</select>
<select id="queryWorkFlowLineageByCode" resultType="org.apache.dolphinscheduler.dao.entity.WorkFlowRelationDetail">
select
pd.code as work_flow_code
,pd.name as work_flow_name
,pd.release_state as work_flow_publish_status
,schd.start_time as schedule_start_time
,schd.end_time as schedule_end_time
,schd.crontab as crontab
,schd.release_state as schedule_publish_status
from t_ds_process_definition pd
left join t_ds_schedules schd on pd.code = schd.process_definition_code
where pd.code = #{processDefinitionCode}
</select>
<select id="queryWorkFlowLineageByName" resultType="org.apache.dolphinscheduler.dao.entity.WorkFlowRelationDetail">
select
pd.code as work_flow_code
,pd.name as work_flow_name
,pd.release_state as work_flow_publish_status
,schd.start_time as schedule_start_time
,schd.end_time as schedule_end_time
,schd.crontab as crontab
,schd.release_state as schedule_publish_status
from t_ds_process_definition pd
left join t_ds_schedules schd on pd.code = schd.process_definition_code
where 1=1
<if test="processDefinitionName != null and processDefinitionName != ''">
and pd.name = #{processDefinitionName}
</if>
and pd.project_code = #{projectCode}
</select>
<select id="queryWorkFlowLineageByCode" resultType="org.apache.dolphinscheduler.dao.entity.WorkFlowRelationDetail">
select
pd.code as work_flow_code
,pd.name as work_flow_name
,pd.release_state as work_flow_publish_status
,schd.start_time as schedule_start_time
,schd.end_time as schedule_end_time
,schd.crontab as crontab
,schd.release_state as schedule_publish_status
from t_ds_process_definition pd
left join t_ds_schedules schd on pd.code = schd.process_definition_code
where 1=1
and pd.code = #{processDefinitionCode}
and pd.project_code = #{projectCode}
</select>
<select id="queryWorkFlowLineageByDept" resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskLineage">
select
<include refid="baseSql"/>
from
t_ds_process_task_lineage
where 1=1
<if test="deptProjectCode != null and deptProjectCode != 0">
and dept_project_code = #{deptProjectCode}
</if>
and dept_process_definition_code = #{deptProcessDefinitionCode}
<if test="deptTaskDefinitionCode != null and deptTaskDefinitionCode != 0">
and dept_task_definition_code = #{deptTaskDefinitionCode}
</if>
</select>
<select id="queryByProcessDefinitionCode" resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskLineage">
select
<include refid="baseSql"/>
from
t_ds_process_task_lineage
where process_definition_code = #{processDefinitionCode}
</select>
</mapper>

8
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml

@ -160,4 +160,12 @@
</where>
order by update_time desc, id asc
</select>
<select id="queryDefinitionsByTaskType" resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinition">
select
<include refid="baseSql"/>
from t_ds_task_definition
where task_type = #{taskType}
</select>
</mapper>

249
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml

@ -1,249 +0,0 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper">
<select id="queryWorkFlowLineageByName" resultType="org.apache.dolphinscheduler.dao.entity.WorkFlowLineage">
select tepd.code as work_flow_code,tepd.name as work_flow_name
from t_ds_process_definition tepd
left join t_ds_schedules tes on tepd.code = tes.process_definition_code
where tepd.project_code = #{projectCode}
<if test="workFlowName != null and workFlowName != ''">
and tepd.name like concat('%', #{workFlowName}, '%')
</if>
</select>
<select id="queryWorkFlowLineageByCode" resultType="org.apache.dolphinscheduler.dao.entity.WorkFlowLineage">
select tepd.code as work_flow_code,tepd.name as work_flow_name,
'' as source_work_flow_code,
tepd.release_state as work_flow_publish_status,
tes.start_time as schedule_start_time,
tes.end_time as schedule_end_time,
tes.crontab as crontab,
tes.release_state as schedule_publish_status
from t_ds_process_definition tepd
left join t_ds_schedules tes on tepd.code = tes.process_definition_code
where tepd.project_code = #{projectCode} and tepd.code = #{workFlowCode}
</select>
<select id="queryWorkFlowLineageByProcessDefinitionCodes" resultType="org.apache.dolphinscheduler.dao.entity.WorkFlowLineage">
select tepd.code as work_flow_code,
tepd.name as work_flow_name,
tepd.release_state as work_flow_publish_status,
tes.start_time as schedule_start_time,
tes.end_time as schedule_end_time,
tes.crontab as crontab,
tes.release_state as schedule_publish_status
from t_ds_process_definition tepd
left join t_ds_schedules tes on tepd.code = tes.process_definition_code
where 1=1
<if test="workFlowCodes != null and workFlowCodes.size() != 0 ">
and tepd.code in
<foreach collection="workFlowCodes" item="code" index="index" open="(" close=")" separator=",">
#{code}
</foreach>
</if>
</select>
<select id="queryWorkFlowLineageByLineage" resultType="org.apache.dolphinscheduler.dao.entity.WorkFlowLineage">
select tepd.code as work_flow_code,tepd.name as work_flow_name,
'' as source_work_flow_code,
tepd.release_state as work_flow_publish_status,
tes.start_time as schedule_start_time,
tes.end_time as schedule_end_time,
tes.crontab as crontab,
tes.release_state as schedule_publish_status
from t_ds_process_definition tepd
left join t_ds_schedules tes on tepd.code = tes.process_definition_code
where 1=1
<if test="processLineages != null and processLineages.size != 0">
and
<foreach collection="processLineages" index="index" item="item" open="(" separator=" or " close=")">
(tepd.project_code = #{item.projectCode}
and tepd.code = #{item.processDefinitionCode})
</foreach>
</if>
</select>
<select id="queryProcessLineage" resultType="org.apache.dolphinscheduler.dao.entity.ProcessLineage">
select ptr.project_code,
ptr.post_task_code,
ptr.post_task_version,
ptr.pre_task_code,
ptr.pre_task_version,
ptr.process_definition_code,
ptr.process_definition_version
from t_ds_process_definition pd
join t_ds_process_task_relation ptr on pd.code = ptr.process_definition_code
and pd.version = ptr.process_definition_version
where pd.project_code = #{projectCode}
</select>
<select id="queryProcessLineageByCode" resultType="org.apache.dolphinscheduler.dao.entity.ProcessLineage">
select project_code,
post_task_code,
post_task_version,
pre_task_code,
pre_task_version,
process_definition_code,
process_definition_version
from t_ds_process_task_relation
where project_code = #{projectCode}
and process_definition_code = #{processDefinitionCode}
</select>
<select id="queryUpstreamDependentParamsByProcessDefinitionCode"
resultType="org.apache.dolphinscheduler.dao.entity.DependentProcessDefinition">
SELECT
DISTINCT c.task_params
FROM
t_ds_process_definition a
JOIN t_ds_process_task_relation b ON a.code = b.process_definition_code AND a.version = b.process_definition_version AND a.project_code = b.project_code
JOIN t_ds_task_definition c ON c.code = b.pre_task_code and c.version = b.pre_task_version
WHERE 1=1
AND a.code = #{code}
AND c.task_type = #{taskType}
;
</select>
<select id="queryDownstreamLineageByProcessDefinitionCode"
resultType="org.apache.dolphinscheduler.dao.entity.WorkFlowLineage">
SELECT
c.code AS work_flow_code
,c.name AS work_flow_name
,c.release_state AS work_flow_publish_status
,d.start_time AS schedule_start_time
,d.end_time AS schedule_end_time
,d.crontab AS crontab
,d.release_state AS schedule_publish_status
,'' AS source_work_flow_code
FROM
t_ds_task_definition a
JOIN t_ds_process_task_relation b ON a.code = b.pre_task_code and a.version = b.pre_task_version
JOIN t_ds_process_definition c ON c.code = b.process_definition_code AND c.version = b.process_definition_version AND c.project_code = b.project_code
LEFT JOIN t_ds_schedules d ON d.process_definition_code = c.code
WHERE 1=1
<if test="code != null and code != ''">
AND a.task_params LIKE concat('%', #{code}, '%')
</if>
AND a.task_type = #{taskType}
</select>
<select id="queryDependentProcessDefinitionByProcessDefinitionCode" resultType="org.apache.dolphinscheduler.dao.entity.DependentProcessDefinition">
SELECT
c.code AS process_definition_code
,c.name AS process_definition_name
,c.version as process_definition_version
,a.code AS task_definition_code
,a.task_params
FROM
t_ds_task_definition a
JOIN t_ds_process_task_relation b ON a.code = b.pre_task_code and a.version = b.pre_task_version
JOIN t_ds_process_definition c ON c.code = b.process_definition_code AND c.version = b.process_definition_version AND c.project_code = b.project_code
WHERE 1=1
<if test="code != null and code != ''">
AND a.task_params LIKE concat('%', #{code}, '%')
</if>
AND a.task_type = 'DEPENDENT'
AND c.release_state = 1
</select>
<select id="queryTaskSubProcessDepOnProcess" resultType="org.apache.dolphinscheduler.dao.entity.TaskMainInfo">
select td.id
, td.name as taskName
, td.code as taskCode
, td.version as taskVersion
, td.task_type as taskType
, ptr.process_definition_code as processDefinitionCode
, pd.name as processDefinitionName
, pd.version as processDefinitionVersion
, pd.release_state as processReleaseState
from t_ds_task_definition td
join t_ds_process_task_relation ptr on ptr.post_task_code = td.code and td.version = ptr.post_task_version
join t_ds_process_definition pd on pd.code = ptr.process_definition_code and pd.version = ptr.process_definition_version
<where>
<if test="projectCode != 0">
and ptr.project_code = #{projectCode}
</if>
<!-- ptr.process_definition_code != #{processDefinitionCode} query task not in current workflow -->
<!-- For subprocess task type, using `concat('%"processDefinitionCode":', #{processDefinitionCode}, '%')` -->
<if test="processDefinitionCode != 0">
and td.task_type = 'SUB_PROCESS'
and ptr.process_definition_code != #{processDefinitionCode}
and td.task_params like concat('%"processDefinitionCode":', #{processDefinitionCode}, '%')
</if>
</where>
</select>
<select id="queryTaskDependentOnProcess" resultType="org.apache.dolphinscheduler.dao.entity.TaskMainInfo">
select td.id
, td.name as taskName
, td.code as taskCode
, td.version as taskVersion
, td.task_type as taskType
, pd.project_code as projectCode
, ptr.process_definition_code as processDefinitionCode
, pd.name as processDefinitionName
, pd.version as processDefinitionVersion
, pd.release_state as processReleaseState
from t_ds_task_definition td
join t_ds_process_task_relation ptr on ptr.post_task_code = td.code and td.version = ptr.post_task_version
join t_ds_process_definition pd on pd.code = ptr.process_definition_code and pd.version = ptr.process_definition_version
<where>
<!-- ptr.process_definition_code != #{processDefinitionCode} query task not in current workflow -->
<!-- For dependnet task type, using `like concat('%"definitionCode":', #{processDefinitionCode}, '%')` -->
<if test="processDefinitionCode != 0">
and td.task_type = 'DEPENDENT'
and ptr.process_definition_code != #{processDefinitionCode}
and td.task_params like concat('%"definitionCode":', #{processDefinitionCode}, '%')
</if>
<if test="taskCode != 0">
and (td.task_params like concat('%"depTaskCode":', #{taskCode}, '%') or td.task_params like concat('%"depTaskCode":-1%'))
</if>
</where>
</select>
<select id="queryTaskDepOnTask" resultType="org.apache.dolphinscheduler.dao.entity.TaskMainInfo">
select td.id
, td.name as taskName
, td.code as taskCode
, td.version as taskVersion
, td.task_type as taskType
, ptr.process_definition_code as processDefinitionCode
, pd.name as processDefinitionName
, pd.version as processDefinitionVersion
, pd.release_state as processReleaseState
from t_ds_task_definition td
join t_ds_process_task_relation ptr on ptr.post_task_code = td.code and td.version = ptr.post_task_version
join t_ds_process_definition pd on pd.code = ptr.process_definition_code and pd.version = ptr.process_definition_version
<where>
<if test="projectCode != 0">
and ptr.project_code = #{projectCode}
</if>
<!-- ptr.process_definition_code != #{processDefinitionCode} query task not in current workflow -->
<if test="processDefinitionCode != 0">
and ptr.process_definition_code != #{processDefinitionCode}
and td.task_params like concat('%"definitionCode":', #{processDefinitionCode}, '%')
</if>
<if test="taskCode != 0">
and td.task_params like concat('%"depTaskCode":', #{taskCode}, '%')
</if>
</where>
</select>
</mapper>

22
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql

@ -2185,3 +2185,25 @@ CREATE TABLE t_ds_listener_event
KEY idx_status (post_status),
KEY idx_event_sign (sign)
);
-- ----------------------------
-- Table structure for t_ds_process_task_lineage
-- ----------------------------
DROP TABLE IF EXISTS t_ds_process_task_lineage;
CREATE TABLE t_ds_process_task_lineage
(
`id` int NOT NULL AUTO_INCREMENT,
`process_definition_code` bigint(20) NOT NULL DEFAULT 0,
`process_definition_version` int NOT NULL DEFAULT 0,
`task_definition_code` bigint(20) NOT NULL DEFAULT 0,
`task_definition_version` int NOT NULL DEFAULT 0,
`dept_project_code` bigint(20) NOT NULL DEFAULT 0,
`dept_process_definition_code` bigint(20) NOT NULL DEFAULT 0,
`dept_task_definition_code` bigint(20) NOT NULL DEFAULT 0,
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
KEY `idx_process_code_version` (`process_definition_code`,`process_definition_version`),
KEY `idx_task_code_version` (`task_definition_code`,`task_definition_version`),
KEY `idx_dept_code` (`dept_project_code`,`dept_process_definition_code`,`dept_task_definition_code`)
);

21
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql

@ -2157,3 +2157,24 @@ CREATE TABLE `t_ds_listener_event` (
KEY `idx_status` (`post_status`) USING BTREE,
KEY `idx_sign` (`sign`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE = utf8_bin;
-- ----------------------------
-- Table structure for t_ds_process_task_lineage
-- ----------------------------
DROP TABLE IF EXISTS `t_ds_process_task_lineage`;
CREATE TABLE `t_ds_process_task_lineage` (
`id` int NOT NULL AUTO_INCREMENT,
`process_definition_code` bigint NOT NULL DEFAULT 0,
`process_definition_version` int NOT NULL DEFAULT 0,
`task_definition_code` bigint NOT NULL DEFAULT 0,
`task_definition_version` int NOT NULL DEFAULT 0,
`dept_project_code` bigint NOT NULL DEFAULT 0 COMMENT 'dependent project code',
`dept_process_definition_code` bigint NOT NULL DEFAULT 0 COMMENT 'dependent process definition code',
`dept_task_definition_code` bigint NOT NULL DEFAULT 0 COMMENT 'dependent task definition code',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'update time',
PRIMARY KEY (`id`),
KEY `idx_process_code_version` (`process_definition_code`,`process_definition_version`),
KEY `idx_task_code_version` (`task_definition_code`,`task_definition_version`),
KEY `idx_dept_code` (`dept_project_code`,`dept_process_definition_code`,`dept_task_definition_code`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

22
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql

@ -2149,3 +2149,25 @@ comment on column t_ds_listener_event.sign is 'sign=sha1(content)';
create index idx_listener_event_post_status on t_ds_listener_event (post_status);
create index idx_listener_event_sign on t_ds_listener_event (sign);
-- ----------------------------
-- Table structure for t_ds_process_task_lineage
-- ----------------------------
DROP TABLE IF EXISTS t_ds_process_task_lineage;
CREATE TABLE t_ds_process_task_lineage (
id int NOT NULL,
process_definition_code bigint NOT NULL DEFAULT 0,
process_definition_version int NOT NULL DEFAULT 0,
task_definition_code bigint NOT NULL DEFAULT 0,
task_definition_version int NOT NULL DEFAULT 0,
dept_project_code bigint NOT NULL DEFAULT 0,
dept_process_definition_code bigint NOT NULL DEFAULT 0,
dept_task_definition_code bigint NOT NULL DEFAULT 0,
create_time timestamp NOT NULL DEFAULT current_timestamp,
update_time timestamp NOT NULL DEFAULT current_timestamp,
PRIMARY KEY (id)
);
create index idx_process_code_version on t_ds_process_task_lineage (process_definition_code,process_definition_version);
create index idx_task_code_version on t_ds_process_task_lineage (task_definition_code,task_definition_version);
create index idx_dept_code on t_ds_process_task_lineage (dept_project_code,dept_process_definition_code,dept_task_definition_code);

35
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/mysql/dolphinscheduler_ddl.sql

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
DROP TABLE IF EXISTS `t_ds_process_task_lineage`;
CREATE TABLE `t_ds_process_task_lineage`
(
`id` int NOT NULL AUTO_INCREMENT,
`process_definition_code` bigint NOT NULL DEFAULT 0,
`process_definition_version` int NOT NULL DEFAULT 0,
`task_definition_code` bigint NOT NULL DEFAULT 0,
`task_definition_version` int NOT NULL DEFAULT 0,
`dept_project_code` bigint NOT NULL DEFAULT 0 COMMENT 'dependent project code',
`dept_process_definition_code` bigint NOT NULL DEFAULT 0 COMMENT 'dependent process definition code',
`dept_task_definition_code` bigint NOT NULL DEFAULT 0 COMMENT 'dependent task definition code',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'update time',
PRIMARY KEY (`id`),
KEY `idx_process_code_version` (`process_definition_code`,`process_definition_version`),
KEY `idx_task_code_version` (`task_definition_code`,`task_definition_version`),
KEY `idx_dept_code` (`dept_project_code`,`dept_process_definition_code`,`dept_task_definition_code`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

16
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/mysql/dolphinscheduler_dml.sql

@ -0,0 +1,16 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

35
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/postgresql/dolphinscheduler_ddl.sql

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
DROP TABLE IF EXISTS t_ds_process_task_lineage;
CREATE TABLE t_ds_process_task_lineage (
id int NOT NULL,
process_definition_code bigint NOT NULL DEFAULT 0,
process_definition_version int NOT NULL DEFAULT 0,
task_definition_code bigint NOT NULL DEFAULT 0,
task_definition_version int NOT NULL DEFAULT 0,
dept_project_code bigint NOT NULL DEFAULT 0,
dept_process_definition_code bigint NOT NULL DEFAULT 0,
dept_task_definition_code bigint NOT NULL DEFAULT 0,
create_time timestamp NOT NULL DEFAULT current_timestamp,
update_time timestamp NOT NULL DEFAULT current_timestamp,
PRIMARY KEY (id)
);
create index idx_process_code_version on t_ds_process_task_lineage (process_definition_code,process_definition_version);
create index idx_task_code_version on t_ds_process_task_lineage (task_definition_code,task_definition_version);
create index idx_dept_code on t_ds_process_task_lineage (dept_project_code,dept_process_definition_code,dept_task_definition_code);

16
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/postgresql/dolphinscheduler_dml.sql

@ -0,0 +1,16 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

51
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapperTest.java → dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskLineageMapperTest.java

@ -22,10 +22,10 @@ import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.dao.BaseDaoTest;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessLineage;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskLineage;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage;
import org.apache.dolphinscheduler.dao.entity.WorkFlowRelationDetail;
import java.util.Date;
import java.util.List;
@ -34,10 +34,10 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
public class WorkFlowLineageMapperTest extends BaseDaoTest {
public class ProcessTaskLineageMapperTest extends BaseDaoTest {
@Autowired
private WorkFlowLineageMapper workFlowLineageMapper;
private ProcessTaskLineageMapper processTaskLineageMapper;
@Autowired
private ProcessDefinitionMapper processDefinitionMapper;
@ -50,10 +50,8 @@ public class WorkFlowLineageMapperTest extends BaseDaoTest {
/**
* insert
*
* @return ProcessDefinition
*/
private ProcessTaskRelation insertOneProcessTaskRelation() {
private void insertOneProcessTaskRelation() {
// insertOne
ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
processTaskRelation.setName("def 1");
@ -67,7 +65,6 @@ public class WorkFlowLineageMapperTest extends BaseDaoTest {
processTaskRelation.setUpdateTime(new Date());
processTaskRelation.setCreateTime(new Date());
processTaskRelationMapper.insert(processTaskRelation);
return processTaskRelation;
}
/**
@ -86,6 +83,21 @@ public class WorkFlowLineageMapperTest extends BaseDaoTest {
processDefinitionMapper.insert(processDefinition);
}
private void insertOneProcessLineage() {
// insertOne
ProcessTaskLineage processTaskLineage = new ProcessTaskLineage();
processTaskLineage.setProcessDefinitionCode(1L);
processTaskLineage.setProcessDefinitionVersion(1);
processTaskLineage.setTaskDefinitionCode(1L);
processTaskLineage.setTaskDefinitionVersion(1);
processTaskLineage.setDeptProjectCode(1L);
processTaskLineage.setDeptProcessDefinitionCode(1L);
processTaskLineage.setDeptTaskDefinitionCode(1L);
processTaskLineage.setUpdateTime(new Date());
processTaskLineage.setCreateTime(new Date());
processTaskLineageMapper.insert(processTaskLineage);
}
/**
* insert
*
@ -110,29 +122,20 @@ public class WorkFlowLineageMapperTest extends BaseDaoTest {
insertOneProcessDefinition();
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(1L);
insertOneSchedule(processDefinition.getId());
List<WorkFlowLineage> workFlowLineages = workFlowLineageMapper
List<WorkFlowRelationDetail> workFlowLineages = processTaskLineageMapper
.queryWorkFlowLineageByName(processDefinition.getProjectCode(), processDefinition.getName());
Assertions.assertNotEquals(0, workFlowLineages.size());
}
@Test
public void testQueryCodeRelation() {
ProcessTaskRelation processTaskRelation = insertOneProcessTaskRelation();
List<ProcessLineage> workFlowLineages =
workFlowLineageMapper.queryProcessLineageByCode(processTaskRelation.getProjectCode(),
processTaskRelation.getProcessDefinitionCode());
Assertions.assertNotEquals(0, workFlowLineages.size());
}
@Test
public void testQueryWorkFlowLineage() {
insertOneProcessDefinition();
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(1L);
insertOneProcessTaskRelation();
List<ProcessLineage> workFlowLineages =
workFlowLineageMapper.queryProcessLineage(processDefinition.getProjectCode());
Assertions.assertNotEquals(0, workFlowLineages.size());
insertOneProcessLineage();
List<ProcessTaskLineage> processTaskLineages =
processTaskLineageMapper.queryByProjectCode(processDefinition.getProjectCode());
Assertions.assertNotEquals(0, processTaskLineages.size());
}
@Test
@ -140,8 +143,8 @@ public class WorkFlowLineageMapperTest extends BaseDaoTest {
insertOneProcessDefinition();
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(1L);
insertOneSchedule(processDefinition.getId());
WorkFlowLineage workFlowLineages = workFlowLineageMapper
.queryWorkFlowLineageByCode(processDefinition.getProjectCode(), processDefinition.getCode());
List<WorkFlowRelationDetail> workFlowLineages = processTaskLineageMapper
.queryWorkFlowLineageByCode(processDefinition.getCode());
Assertions.assertNotNull(workFlowLineages);
}

3
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -25,7 +25,6 @@ import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.DagData;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.DependentProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.DqComparisonType;
import org.apache.dolphinscheduler.dao.entity.DqExecuteResult;
import org.apache.dolphinscheduler.dao.entity.DqRule;
@ -108,8 +107,6 @@ public interface ProcessService {
List<Schedule> queryReleaseSchedulerListByProcessDefinitionCode(long processDefinitionCode);
List<DependentProcessDefinition> queryDependentProcessDefinitionByProcessDefinitionCode(long processDefinitionCode);
List<ProcessInstance> queryNeedFailoverProcessInstances(String host);
List<String> queryNeedFailoverProcessInstanceHost();

16
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -51,7 +51,6 @@ import org.apache.dolphinscheduler.dao.entity.Cluster;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.DagData;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.DependentProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.DqComparisonType;
import org.apache.dolphinscheduler.dao.entity.DqExecuteResult;
import org.apache.dolphinscheduler.dao.entity.DqRule;
@ -98,7 +97,6 @@ import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceMapDao;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
@ -256,9 +254,6 @@ public class ProcessServiceImpl implements ProcessService {
@Autowired
private TaskGroupMapper taskGroupMapper;
@Autowired
private WorkFlowLineageMapper workFlowLineageMapper;
@Autowired
private ClusterMapper clusterMapper;
@ -1430,17 +1425,6 @@ public class ProcessServiceImpl implements ProcessService {
return scheduleMapper.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode);
}
/**
* query dependent process definition by process definition code
*
* @param processDefinitionCode processDefinitionCode
* @see DependentProcessDefinition
*/
@Override
public List<DependentProcessDefinition> queryDependentProcessDefinitionByProcessDefinitionCode(long processDefinitionCode) {
return workFlowLineageMapper.queryDependentProcessDefinitionByProcessDefinitionCode(processDefinitionCode);
}
/**
* query need failover process instance
*

3
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/TaskTypeUtils.java

@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.plugin.task.api.utils;
import org.apache.dolphinscheduler.plugin.task.api.ILogicTaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.plugin.task.api.task.ConditionsLogicTaskChannelFactory;
import org.apache.dolphinscheduler.plugin.task.api.task.DependentLogicTaskChannelFactory;
import org.apache.dolphinscheduler.plugin.task.api.task.DynamicLogicTaskChannelFactory;
import org.apache.dolphinscheduler.plugin.task.api.task.SubWorkflowLogicTaskChannelFactory;
import org.apache.dolphinscheduler.plugin.task.api.task.SwitchLogicTaskChannelFactory;
@ -41,7 +42,7 @@ public class TaskTypeUtils {
}
public boolean isDependentTask(String taskType) {
return SubWorkflowLogicTaskChannelFactory.NAME.equals(taskType);
return DependentLogicTaskChannelFactory.NAME.equals(taskType);
}
public boolean isDynamicTask(String taskType) {

31
dolphinscheduler-tools/src/main/bin/migrate-lineage.sh

@ -0,0 +1,31 @@
#!/bin/bash
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
BIN_DIR=$(dirname $0)
DOLPHINSCHEDULER_HOME=${DOLPHINSCHEDULER_HOME:-$(cd $BIN_DIR/../..; pwd)}
if [ "$DOCKER" != "true" ]; then
source "$DOLPHINSCHEDULER_HOME/bin/env/dolphinscheduler_env.sh"
fi
JAVA_OPTS=${JAVA_OPTS:-"-server -Duser.timezone=${SPRING_JACKSON_TIME_ZONE} -Xms4g -Xmx4g -Xmn512m -XX:+PrintGCDetails -Xloggc:gc.log -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=dump.hprof"}
$JAVA_HOME/bin/java $JAVA_OPTS \
-cp "$DOLPHINSCHEDULER_HOME/tools/conf":"$DOLPHINSCHEDULER_HOME/tools/libs/*":"$DOLPHINSCHEDULER_HOME/tools/sql" \
-Dspring.profiles.active=lineage,${DATABASE} \
org.apache.dolphinscheduler.tools.lineage.MigrateLineage

56
dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/lineage/MigrateLineage.java

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.tools.lineage;
import org.apache.dolphinscheduler.tools.resource.MigrateResource;
import java.sql.SQLException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;
@SpringBootApplication
@ComponentScan("org.apache.dolphinscheduler")
@Slf4j
public class MigrateLineage {
public static void main(String[] args) {
SpringApplication.run(MigrateResource.class, args);
}
@Component
@Profile("lineage")
static class MigrateLineageRunner implements CommandLineRunner {
@Autowired
private MigrateLineageService migrateLineageService;
@Override
public void run(String... args) throws SQLException {
log.info("Starting Migrate lineage data...");
migrateLineageService.migrateLineageOnce();
}
}
}

120
dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/lineage/MigrateLineageService.java

@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.tools.lineage;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskLineage;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessTaskLineageDao;
import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem;
import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel;
import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters;
import org.apache.dolphinscheduler.plugin.task.api.task.DependentLogicTaskChannelFactory;
import org.apache.commons.collections.CollectionUtils;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class MigrateLineageService {
@Autowired
private TaskDefinitionMapper taskDefinitionMapper;
@Autowired
private ProcessTaskLineageDao processTaskLineageDao;
@Autowired
private ProcessTaskRelationMapper processTaskRelationMapper;
@Autowired
private ProcessDefinitionMapper processDefinitionMapper;
public void migrateLineageOnce() {
try {
List<ProcessTaskLineage> processTaskLineageList = getAllProcessLineages();
int insertResult = processTaskLineageDao.batchInsert(processTaskLineageList);
if (insertResult > 0) {
log.info("Migrate lineage successfully, insert count: {}", insertResult);
} else {
log.info("No lineage to migrate.");
}
} catch (Exception e) {
log.error("Failed to migrate lineage:", e);
}
}
private List<ProcessTaskLineage> getAllProcessLineages() throws SQLException {
List<TaskDefinition> taskDefinitionList =
taskDefinitionMapper.queryDefinitionsByTaskType(DependentLogicTaskChannelFactory.NAME);
List<ProcessTaskRelation> processTaskRelationList =
processTaskRelationMapper.queryByTaskCodes(taskDefinitionList.stream()
.map(TaskDefinition::getCode).toArray(Long[]::new));
List<ProcessTaskLineage> processTaskLineageList = new ArrayList<>();
for (TaskDefinition taskDefinition : taskDefinitionList) {
parseDependentTaskParams(taskDefinition, processTaskLineageList);
for (ProcessTaskLineage processTaskLineage : processTaskLineageList) {
processTaskLineage.setProcessDefinitionCode(processTaskRelationList.stream()
.filter(processTaskRelation -> processTaskRelation.getPreTaskCode() == taskDefinition.getCode()
|| processTaskRelation.getPostTaskCode() == taskDefinition.getCode())
.findFirst().get().getProcessDefinitionCode());
processTaskLineage.setProcessDefinitionVersion(processTaskRelationList.stream()
.filter(processTaskRelation -> processTaskRelation.getPreTaskCode() == taskDefinition.getCode()
|| processTaskRelation.getPostTaskCode() == taskDefinition.getCode())
.findFirst().get().getProcessDefinitionVersion());
}
}
return processTaskLineageList;
}
private void parseDependentTaskParams(TaskDefinition taskDefinition, List<ProcessTaskLineage> taskLineageList) {
DependentParameters dependentParameters =
JSONUtils.parseObject(taskDefinition.getTaskParams(), DependentParameters.class);
if (dependentParameters != null) {
List<DependentTaskModel> dependTaskList =
dependentParameters.getDependence().getDependTaskList();
if (!CollectionUtils.isEmpty(dependTaskList)) {
for (DependentTaskModel taskModel : dependTaskList) {
List<DependentItem> dependItemList = taskModel.getDependItemList();
for (DependentItem dependentItem : dependItemList) {
ProcessTaskLineage processTaskLineage = new ProcessTaskLineage();
processTaskLineage.setDeptProjectCode(dependentItem.getProjectCode());
processTaskLineage.setDeptProcessDefinitionCode(dependentItem.getDefinitionCode());
processTaskLineage.setDeptTaskDefinitionCode(dependentItem.getDepTaskCode());
taskLineageList.add(processTaskLineage);
}
}
}
}
}
}

6
dolphinscheduler-ui/src/service/modules/lineages/types.ts

@ -27,7 +27,7 @@ interface WorkFlowNameReq {
workFlowName: string
}
interface WorkFlowListRes extends WorkflowCodeReq {
interface WorkFlowRelationDetailListRes extends WorkflowCodeReq {
workFlowName: string
workFlowPublishStatus: string
scheduleStartTime?: any
@ -43,7 +43,7 @@ interface WorkFlowRelationList {
}
interface WorkflowRes {
workFlowList: WorkFlowListRes[]
workFlowRelationDetailList: WorkFlowRelationDetailListRes[]
workFlowRelationList: WorkFlowRelationList[]
}
@ -57,5 +57,5 @@ export {
WorkFlowNameReq,
DependentTaskReq,
WorkflowRes,
WorkFlowListRes
WorkFlowRelationDetailListRes
}

39
dolphinscheduler-ui/src/views/projects/components/dependencies/use-dependencies.ts

@ -17,7 +17,6 @@
import { DependentTaskReq } from '@/service/modules/lineages/types'
import { queryDependentTasks } from '@/service/modules/lineages'
import { TASK_TYPES_MAP } from '@/store/project'
export function useDependencies() {
const getDependentTasksBySingleTask = async (
@ -33,13 +32,11 @@ export function useDependencies() {
} as DependentTaskReq
const res = await queryDependentTasks(projectCode, dependentTaskReq)
res
.filter(
(item: any) =>
item.processDefinitionCode !== workflowCode &&
item.taskType === TASK_TYPES_MAP.DEPENDENT.alias
)
.filter((item: any) => item.processDefinitionCode !== workflowCode)
.forEach((item: any) => {
tasks.push(item.processDefinitionName + '->' + item.taskName)
tasks.push(
item.processDefinitionName + '->' + item.taskDefinitionName
)
})
}
return tasks
@ -56,13 +53,11 @@ export function useDependencies() {
} as DependentTaskReq
const res = await queryDependentTasks(projectCode, dependentTaskReq)
res
.filter(
(item: any) =>
item.processDefinitionCode !== workflowCode &&
item.taskType === TASK_TYPES_MAP.DEPENDENT.alias
)
.filter((item: any) => item.processDefinitionCode !== workflowCode)
.forEach((item: any) => {
tasks.push(item.processDefinitionName + '->' + item.taskName)
tasks.push(
item.processDefinitionName + '->' + item.taskDefinitionName
)
})
}
return tasks
@ -116,14 +111,11 @@ export function useDependencies() {
await queryDependentTasks(projectCode, dependentTaskReq).then(
(res: any) => {
res
.filter(
(item: any) =>
item.processDefinitionCode !== workflowCode &&
item.taskType === TASK_TYPES_MAP.DEPENDENT.alias
)
.filter((item: any) => item.processDefinitionCode !== workflowCode)
.forEach((item: any) => {
dependentTaskLinks.push({
text: item.processDefinitionName + '->' + item.taskName,
text:
item.processDefinitionName + '->' + item.taskDefinitionName,
show: true,
action: () => {
const url = `/projects/${item.projectCode}/workflow/definitions/${item.processDefinitionCode}`
@ -151,14 +143,11 @@ export function useDependencies() {
await queryDependentTasks(projectCode, dependentTaskReq).then(
(res: any) => {
res
.filter(
(item: any) =>
item.processDefinitionCode !== workflowCode &&
item.taskType === TASK_TYPES_MAP.DEPENDENT.alias
)
.filter((item: any) => item.processDefinitionCode !== workflowCode)
.forEach((item: any) => {
dependentTaskLinks.push({
text: item.processDefinitionName + '->' + item.taskName,
text:
item.processDefinitionName + '->' + item.taskDefinitionName,
show: true,
action: () => {
const url = `/projects/${item.projectCode}/workflow/definitions/${item.processDefinitionCode}`

6
dolphinscheduler-ui/src/views/projects/workflow/relation/use-relation.ts

@ -24,7 +24,7 @@ import {
} from '@/service/modules/lineages'
import type {
WorkflowRes,
WorkFlowListRes
WorkFlowRelationDetailListRes
} from '@/service/modules/lineages/types'
export function useRelation() {
@ -40,7 +40,7 @@ export function useRelation() {
variables.seriesData = []
variables.links = []
variables.seriesData = obj.workFlowList.map((item) => {
variables.seriesData = obj.workFlowRelationDetailList.map((item) => {
return {
name: item.workFlowName,
id: item.workFlowCode,
@ -59,7 +59,7 @@ export function useRelation() {
const getWorkflowName = (projectCode: number) => {
const { state } = useAsyncState(
queryLineageByWorkFlowName({ projectCode }).then(
(res: Array<WorkFlowListRes>) => {
(res: Array<WorkFlowRelationDetailListRes>) => {
variables.workflowOptions = res.map((item) => {
return {
label: item.workFlowName,

Loading…
Cancel
Save