diff --git a/docs/docs/en/guide/upgrade/upgrade.md b/docs/docs/en/guide/upgrade/upgrade.md index b4935b29e5..0c8fd347df 100644 --- a/docs/docs/en/guide/upgrade/upgrade.md +++ b/docs/docs/en/guide/upgrade/upgrade.md @@ -54,6 +54,15 @@ Execution result: - The original UDF resource `x/y.jar` migrates to `/dolphinscheduler/abc/udf/.migrate/x/y.jar`. - Update UDF function's bound resource info. +### Upgrade Lineage + +Execute script: `sh ./tools/bin/migrate-lineage.sh`. + +Execution result: + +- Migrate lineage data to new table `t_ds_process_task_lineage`. +- This script only performs upsert operations, not deletes. You can delete it manually if you need to. + ### Upgrade Service #### Change Configuration `bin/env/install_env.sh` diff --git a/docs/docs/zh/guide/upgrade/upgrade.md b/docs/docs/zh/guide/upgrade/upgrade.md index 1e2301c84f..4140f11829 100644 --- a/docs/docs/zh/guide/upgrade/upgrade.md +++ b/docs/docs/zh/guide/upgrade/upgrade.md @@ -53,6 +53,15 @@ jar 包 并添加到 `./tools/libs` 目录下,设置以下环境变量 - 原 UDF 资源 `x/y.jar` 迁移至 `/dolphinscheduler/abc/udf/.migrate/x/y.jar`。 - 更新 UDF 函数绑定资源信息。 +### 血缘升级 + +执行脚本:`sh ./tools/bin/migrate-lineage.sh`。 + +执行结果: + +- 原血缘数据迁移至新血缘表 `t_ds_process_task_lineage`。 +- 此脚本仅执行 upsert 操作,不执行删除操作,如果需要删除,您可以手动删除。 + ### 服务升级 #### 修改 `bin/env/install_env.sh` 配置内容 diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java index d7ed288ef6..1c216155f8 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java @@ -221,7 +221,6 @@ public class ProcessDefinitionController extends BaseController { * @param timeout timeout * @param taskRelationJson relation json for nodes * @param taskDefinitionJson taskDefinitionJson - * @param otherParamsJson otherParamsJson handle other params * @return update result code */ @Operation(summary = "update", description = "UPDATE_PROCESS_DEFINITION_NOTES") @@ -230,8 +229,7 @@ public class ProcessDefinitionController extends BaseController { @Parameter(name = "code", description = "PROCESS_DEFINITION_CODE", required = true, schema = @Schema(implementation = long.class, example = "123456789")), @Parameter(name = "locations", description = "PROCESS_DEFINITION_LOCATIONS", required = true, schema = @Schema(implementation = String.class)), @Parameter(name = "description", description = "PROCESS_DEFINITION_DESC", required = false, schema = @Schema(implementation = String.class)), - @Parameter(name = "releaseState", description = "RELEASE_PROCESS_DEFINITION_NOTES", required = false, schema = @Schema(implementation = ReleaseState.class)), - @Parameter(name = "otherParamsJson", description = "OTHER_PARAMS_JSON", required = false, schema = @Schema(implementation = String.class)) + @Parameter(name = "releaseState", description = "RELEASE_PROCESS_DEFINITION_NOTES", required = false, schema = @Schema(implementation = ReleaseState.class)) }) @PutMapping(value = "/{code}") @ResponseStatus(HttpStatus.OK) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java index 6d7b70918f..546a3644ad 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java @@ -158,7 +158,6 @@ public class ProcessInstanceController extends BaseController { * @param scheduleTime schedule time * @param syncDefine sync define * @param locations locations - * @param tenantCode tenantCode * @return update result code */ @Operation(summary = "updateProcessInstance", description = "UPDATE_PROCESS_INSTANCE_NOTES") diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkFlowLineageController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessLineageController.java similarity index 61% rename from dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkFlowLineageController.java rename to dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessLineageController.java index 6a6b110d6e..dc27c4a147 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkFlowLineageController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessLineageController.java @@ -24,13 +24,16 @@ import static org.apache.dolphinscheduler.common.constants.Constants.SESSION_USE import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.exceptions.ApiException; import org.apache.dolphinscheduler.api.exceptions.ServiceException; -import org.apache.dolphinscheduler.api.service.WorkFlowLineageService; +import org.apache.dolphinscheduler.api.service.ProcessLineageService; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.constants.Constants; +import org.apache.dolphinscheduler.dao.entity.DependentLineageTask; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage; +import org.apache.dolphinscheduler.dao.entity.WorkFlowRelationDetail; import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -61,21 +64,21 @@ import io.swagger.v3.oas.annotations.tags.Tag; @RestController @RequestMapping("projects/{projectCode}/lineages") @Slf4j -public class WorkFlowLineageController extends BaseController { +public class ProcessLineageController extends BaseController { @Autowired - private WorkFlowLineageService workFlowLineageService; + private ProcessLineageService processLineageService; @Operation(summary = "queryLineageByWorkFlowName", description = "QUERY_WORKFLOW_LINEAGE_BY_NAME_NOTES") @GetMapping(value = "/query-by-name") @ResponseStatus(HttpStatus.OK) @ApiException(QUERY_WORKFLOW_LINEAGE_ERROR) - public Result> queryWorkFlowLineageByName(@Parameter(hidden = true) @RequestAttribute(value = SESSION_USER) User loginUser, - @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode, - @RequestParam(value = "workFlowName", required = false) String workFlowName) { - workFlowName = ParameterUtils.handleEscapes(workFlowName); - List workFlowLineages = - workFlowLineageService.queryWorkFlowLineageByName(projectCode, workFlowName); + public Result> queryWorkFlowLineageByName(@Parameter(hidden = true) @RequestAttribute(value = SESSION_USER) User loginUser, + @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode, + @RequestParam(value = "processDefinitionName", required = false) String processDefinitionName) { + processDefinitionName = ParameterUtils.handleEscapes(processDefinitionName); + List workFlowLineages = + processLineageService.queryWorkFlowLineageByName(projectCode, processDefinitionName); return Result.success(workFlowLineages); } @@ -85,9 +88,12 @@ public class WorkFlowLineageController extends BaseController { @ApiException(QUERY_WORKFLOW_LINEAGE_ERROR) public Result> queryWorkFlowLineageByCode(@Parameter(hidden = true) @RequestAttribute(value = SESSION_USER) User loginUser, @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode, - @PathVariable(value = "workFlowCode", required = true) long workFlowCode) { - Map result = workFlowLineageService.queryWorkFlowLineageByCode(projectCode, workFlowCode); + @PathVariable(value = "workFlowCode") long workFlowCode) { + WorkFlowLineage workFlowLineage = processLineageService.queryWorkFlowLineageByCode(projectCode, workFlowCode); + Map result = new HashMap<>(); + result.put(Constants.DATA_LIST, workFlowLineage); return Result.success(result); + } @Operation(summary = "queryWorkFlowList", description = "QUERY_WORKFLOW_LINEAGE_NOTES") @@ -96,8 +102,10 @@ public class WorkFlowLineageController extends BaseController { public Result> queryWorkFlowLineage(@Parameter(hidden = true) @RequestAttribute(value = SESSION_USER) User loginUser, @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode) { try { - Map result = workFlowLineageService.queryWorkFlowLineage(projectCode); - return returnDataList(result); + Map result = new HashMap<>(); + WorkFlowLineage workFlowLineage = processLineageService.queryWorkFlowLineage(projectCode); + result.put(Constants.DATA_LIST, workFlowLineage); + return Result.success(result); } catch (Exception e) { log.error(QUERY_WORKFLOW_LINEAGE_ERROR.getMsg(), e); return error(QUERY_WORKFLOW_LINEAGE_ERROR.getCode(), QUERY_WORKFLOW_LINEAGE_ERROR.getMsg()); @@ -107,10 +115,10 @@ public class WorkFlowLineageController extends BaseController { /** * Whether task can be deleted or not, avoiding task depend on other task of process definition delete by accident. * - * @param loginUser login user - * @param projectCode project codes which taskCode belong + * @param loginUser login user + * @param projectCode project codes which taskCode belong * @param processDefinitionCode project code which taskCode belong - * @param taskCode task definition code + * @param taskCode task definition code * @return Result of task can be deleted or not */ @Operation(summary = "verifyTaskCanDelete", description = "VERIFY_TASK_CAN_DELETE") @@ -122,13 +130,13 @@ public class WorkFlowLineageController extends BaseController { @PostMapping(value = "/tasks/verify-delete") @ResponseStatus(HttpStatus.OK) @ApiException(TASK_WITH_DEPENDENT_ERROR) - public Result verifyTaskCanDelete(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, - @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode, - @RequestParam(value = "processDefinitionCode", required = true) long processDefinitionCode, - @RequestParam(value = "taskCode", required = true) long taskCode) { - Result result = new Result(); + public Result> verifyTaskCanDelete(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode, + @RequestParam(value = "processDefinitionCode", required = true) long processDefinitionCode, + @RequestParam(value = "taskCode", required = true) long taskCode) { + Result> result = new Result<>(); Optional taskDepMsg = - workFlowLineageService.taskDepOnTaskMsg(projectCode, processDefinitionCode, taskCode); + processLineageService.taskDependentMsg(projectCode, processDefinitionCode, taskCode); if (taskDepMsg.isPresent()) { throw new ServiceException(taskDepMsg.get()); } @@ -136,19 +144,26 @@ public class WorkFlowLineageController extends BaseController { return result; } - @Operation(summary = "queryDownstreamDependentTaskList", description = "QUERY_DOWNSTREAM_DEPENDENT_TASK_NOTES") + /** + * Whether task can be deleted or not, avoiding task depend on other task of process definition delete by accident. + */ + @Operation(summary = "verifyTaskCanDelete", description = "VERIFY_TASK_CAN_DELETE") @Parameters({ - @Parameter(name = "workFlowCode", description = "PROCESS_DEFINITION_CODE", required = true, schema = @Schema(implementation = Long.class)), - @Parameter(name = "taskCode", description = "TASK_DEFINITION_CODE", required = false, schema = @Schema(implementation = Long.class, example = "123456789")), + @Parameter(name = "projectCode", description = "PROCESS_DEFINITION_NAME", required = true, schema = @Schema(implementation = long.class)), + @Parameter(name = "workFlowCode", description = "PROCESS_DEFINITION_CODE", required = true, schema = @Schema(implementation = long.class)), }) @GetMapping(value = "/query-dependent-tasks") @ResponseStatus(HttpStatus.OK) @ApiException(QUERY_WORKFLOW_LINEAGE_ERROR) - public Result> queryDownstreamDependentTaskList(@Parameter(hidden = true) @RequestAttribute(value = SESSION_USER) User loginUser, - @RequestParam(value = "workFlowCode") Long workFlowCode, - @RequestParam(value = "taskCode", required = false, defaultValue = "0") Long taskCode) { - Map result = - workFlowLineageService.queryDownstreamDependentTasks(workFlowCode, taskCode); - return returnDataList(result); + public Result> queryDependentTasks(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode, + @RequestParam(value = "workFlowCode") long workFlowCode, + @RequestParam(value = "taskCode", required = false) Long taskCode) { + Map result = new HashMap<>(); + List dependentLineageTaskList = + processLineageService.queryDependentProcessDefinitions(projectCode, workFlowCode, taskCode); + result.put(Constants.DATA_LIST, dependentLineageTaskList); + return Result.success(result); } + } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/TaskCodeVersionDto.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/TaskCodeVersionDto.java new file mode 100644 index 0000000000..89a7d99381 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/TaskCodeVersionDto.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.dto; + +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +public class TaskCodeVersionDto { + + private long code; + private int version; +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java index a2208b602b..5355650725 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java @@ -20,6 +20,8 @@ package org.apache.dolphinscheduler.api.enums; import java.util.Locale; import java.util.Optional; +import lombok.Getter; + import org.springframework.context.i18n.LocaleContextHolder; /** @@ -238,7 +240,7 @@ public enum Status { QUERY_EXECUTING_WORKFLOW_ERROR(10192, "query executing workflow error", "查询运行的工作流实例错误"), DELETE_PROCESS_DEFINITION_USE_BY_OTHER_FAIL(10193, "delete process definition fail, cause used by other tasks: {0}", "删除工作流定时失败,被其他任务引用:{0}"), - DELETE_TASK_USE_BY_OTHER_FAIL(10194, "delete task {0} fail, cause used by other tasks: {1}", + DELETE_TASK_USE_BY_OTHER_FAIL(10194, "delete task {0} fail, the reason is that used by other tasks: {1}", "删除任务 {0} 失败,被其他任务引用:{1}"), TASK_WITH_DEPENDENT_ERROR(10195, "task used in other tasks", "删除被其他任务引用"), TASK_SAVEPOINT_ERROR(10196, "task savepoint error", "任务实例savepoint错误"), @@ -585,7 +587,12 @@ public enum Status { USED_WORKER_GROUP_EXISTS(1402004, "You can not reassign worker groups to the project, cause these worker groups {0} are already used.", "Worker组{0}被项目中任务或定时引用,无法重新分配"), - ; + CREATE_PROCESS_LINEAGE_ERROR(1403001, "create process lineage error", "创建工作流血缘错误"), + UPDATE_PROCESS_LINEAGE_ERROR(1403002, "update process lineage error", "更新工作流血缘错误"), + DELETE_PROCESS_LINEAGE_ERROR(1403003, "delete process lineage error", "删除工作流血缘错误"), + ; + + @Getter private final int code; private final String enMsg; private final String zhMsg; @@ -596,10 +603,6 @@ public enum Status { this.zhMsg = zhMsg; } - public int getCode() { - return this.code; - } - public String getMsg() { if (Locale.SIMPLIFIED_CHINESE.getLanguage().equals(LocaleContextHolder.getLocale().getLanguage())) { return this.zhMsg; diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java index 52a95839ba..850ac5b0c6 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java @@ -427,4 +427,9 @@ public interface ProcessDefinitionService { * @return variables data */ Map viewVariables(User loginUser, long projectCode, long code); + + void saveProcessLineage(long projectCode, + long processDefinitionCode, + int processDefinitionVersion, + List taskDefinitionLogList); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessLineageService.java similarity index 52% rename from dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageService.java rename to dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessLineageService.java index 2e535f3307..6cab4482aa 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessLineageService.java @@ -17,50 +17,50 @@ package org.apache.dolphinscheduler.api.service; -import org.apache.dolphinscheduler.dao.entity.TaskMainInfo; +import org.apache.dolphinscheduler.dao.entity.DependentLineageTask; +import org.apache.dolphinscheduler.dao.entity.DependentProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.ProcessTaskLineage; import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage; +import org.apache.dolphinscheduler.dao.entity.WorkFlowRelationDetail; import java.util.List; -import java.util.Map; import java.util.Optional; -import java.util.Set; /** * work flow lineage service */ -public interface WorkFlowLineageService { +public interface ProcessLineageService { - List queryWorkFlowLineageByName(long projectCode, String workFlowName); + List queryWorkFlowLineageByName(long projectCode, String processDefinitionName); - Map queryWorkFlowLineageByCode(long projectCode, long workFlowCode); + WorkFlowLineage queryWorkFlowLineageByCode(long projectCode, long processDefinitionCode); - Map queryWorkFlowLineage(long projectCode); - - /** - * Query tasks depend on process definition, include upstream or downstream - * - * @param projectCode Project code want to query tasks dependence - * @param processDefinitionCode Process definition code want to query tasks dependence - * @return Set of TaskMainInfo - */ - Set queryTaskDepOnProcess(long projectCode, long processDefinitionCode); + WorkFlowLineage queryWorkFlowLineage(long projectCode); /** * Query downstream tasks depend on a process definition or a task * * @param processDefinitionCode Process definition code want to query tasks dependence - * @param taskCode Task code want to query tasks dependence - * @return downstream dependent tasks + * @return downstream dependent process definition list */ - Map queryDownstreamDependentTasks(Long processDefinitionCode, Long taskCode); + List queryDownstreamDependentProcessDefinitions(Long processDefinitionCode); /** * Query and return tasks dependence with string format, is a wrapper of queryTaskDepOnTask and task query method. * - * @param projectCode Project code want to query tasks dependence + * @param projectCode Project code want to query tasks dependence * @param processDefinitionCode Process definition code want to query tasks dependence - * @param taskCode Task code want to query tasks dependence + * @param taskCode Task code want to query tasks dependence * @return dependent process definition */ - Optional taskDepOnTaskMsg(long projectCode, long processDefinitionCode, long taskCode); + Optional taskDependentMsg(long projectCode, long processDefinitionCode, long taskCode); + + List queryDependentProcessDefinitions(long projectCode, long processDefinitionCode, + Long taskCode); + + int createProcessLineage(List processTaskLineages); + + int updateProcessLineage(List processTaskLineages); + + int deleteProcessLineage(List processDefinitionCodes); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java index 6bb7d228ff..64f43e5d81 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java @@ -41,6 +41,7 @@ import org.apache.dolphinscheduler.api.executor.ExecuteContext; import org.apache.dolphinscheduler.api.service.ExecutorService; import org.apache.dolphinscheduler.api.service.MonitorService; import org.apache.dolphinscheduler.api.service.ProcessDefinitionService; +import org.apache.dolphinscheduler.api.service.ProcessLineageService; import org.apache.dolphinscheduler.api.service.ProjectService; import org.apache.dolphinscheduler.api.service.WorkerGroupService; import org.apache.dolphinscheduler.common.constants.Constants; @@ -186,6 +187,9 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ @Autowired private TenantMapper tenantMapper; + @Autowired + private ProcessLineageService processLineageService; + /** * execute process instance * @@ -999,7 +1003,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ boolean allLevelDependent) { List dependentProcessDefinitionList = checkDependentProcessDefinitionValid( - processService.queryDependentProcessDefinitionByProcessDefinitionCode(processDefinitionCode), + processLineageService.queryDownstreamDependentProcessDefinitions(processDefinitionCode), processDefinitionCycle, workerGroup, processDefinitionCode); @@ -1013,7 +1017,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ List childDependentList = childList .stream() .flatMap(dependentProcessDefinition -> checkDependentProcessDefinitionValid( - processService.queryDependentProcessDefinitionByProcessDefinitionCode( + processLineageService.queryDownstreamDependentProcessDefinitions( dependentProcessDefinition.getProcessDefinitionCode()), processDefinitionCycle, workerGroup, diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index 820d35b3b1..fc0c273ea6 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -42,6 +42,7 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYP import static org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager.checkTaskParameters; import org.apache.dolphinscheduler.api.dto.DagDataSchedule; +import org.apache.dolphinscheduler.api.dto.TaskCodeVersionDto; import org.apache.dolphinscheduler.api.dto.treeview.Instance; import org.apache.dolphinscheduler.api.dto.treeview.TreeViewDto; import org.apache.dolphinscheduler.api.dto.workflow.WorkflowCreateRequest; @@ -52,11 +53,11 @@ import org.apache.dolphinscheduler.api.exceptions.ServiceException; import org.apache.dolphinscheduler.api.service.MetricsCleanUpService; import org.apache.dolphinscheduler.api.service.ProcessDefinitionService; import org.apache.dolphinscheduler.api.service.ProcessInstanceService; +import org.apache.dolphinscheduler.api.service.ProcessLineageService; import org.apache.dolphinscheduler.api.service.ProjectService; import org.apache.dolphinscheduler.api.service.SchedulerService; import org.apache.dolphinscheduler.api.service.TaskDefinitionLogService; import org.apache.dolphinscheduler.api.service.TaskDefinitionService; -import org.apache.dolphinscheduler.api.service.WorkFlowLineageService; import org.apache.dolphinscheduler.api.utils.CheckUtils; import org.apache.dolphinscheduler.api.utils.FileUtils; import org.apache.dolphinscheduler.api.utils.PageInfo; @@ -83,6 +84,7 @@ import org.apache.dolphinscheduler.dao.entity.DependentSimplifyDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.ProcessTaskLineage; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; import org.apache.dolphinscheduler.dao.entity.Project; @@ -90,7 +92,6 @@ import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.dao.entity.TaskMainInfo; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.UserWithProcessDefinitionCode; import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper; @@ -111,7 +112,10 @@ import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao; import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils; import org.apache.dolphinscheduler.plugin.task.api.enums.SqlType; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; +import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem; +import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel; import org.apache.dolphinscheduler.plugin.task.api.model.Property; +import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.SqlParameters; import org.apache.dolphinscheduler.plugin.task.api.utils.TaskTypeUtils; import org.apache.dolphinscheduler.plugin.task.sql.SqlTaskChannelFactory; @@ -144,6 +148,7 @@ import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; import java.util.zip.ZipEntry; import java.util.zip.ZipInputStream; @@ -153,6 +158,7 @@ import javax.servlet.http.HttpServletResponse; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; +import org.jetbrains.annotations.NotNull; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Lazy; import org.springframework.http.MediaType; @@ -239,7 +245,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro private DataSourceMapper dataSourceMapper; @Autowired - private WorkFlowLineageService workFlowLineageService; + private ProcessLineageService processLineageService; @Autowired private MetricsCleanUpService metricsCleanUpService; @@ -407,11 +413,66 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion); } + saveProcessLineage(processDefinition.getProjectCode(), processDefinition.getCode(), + insertVersion, taskDefinitionLogs); + putMsg(result, Status.SUCCESS); result.put(Constants.DATA_LIST, processDefinition); return result; } + @Override + public void saveProcessLineage(long projectCode, + long processDefinitionCode, + int processDefinitionVersion, + List taskDefinitionLogList) { + List processTaskLineageList = + generateProcessLineageList(taskDefinitionLogList, processDefinitionCode, processDefinitionVersion); + if (processTaskLineageList.isEmpty()) { + return; + } + + int insertProcessLineageResult = processLineageService.updateProcessLineage(processTaskLineageList); + if (insertProcessLineageResult <= 0) { + log.error( + "Save process lineage error, projectCode: {}, processDefinitionCode: {}, processDefinitionVersion: {}", + projectCode, processDefinitionCode, processDefinitionVersion); + throw new ServiceException(Status.CREATE_PROCESS_LINEAGE_ERROR); + } else { + log.info( + "Save process lineage complete, projectCode: {}, processDefinitionCode: {}, processDefinitionVersion: {}", + projectCode, processDefinitionCode, processDefinitionVersion); + } + } + + private List generateProcessLineageList(List taskDefinitionLogList, + long processDefinitionCode, + int processDefinitionVersion) { + List processTaskLineageList = new ArrayList<>(); + for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogList) { + if (!TaskTypeUtils.isDependentTask(taskDefinitionLog.getTaskType())) { + continue; + } + + for (DependentTaskModel dependentTaskModel : JSONUtils + .parseObject(taskDefinitionLog.getTaskParams(), DependentParameters.class) + .getDependence().getDependTaskList()) { + for (DependentItem dependentItem : dependentTaskModel.getDependItemList()) { + ProcessTaskLineage processTaskLineage = new ProcessTaskLineage(); + processTaskLineage.setProcessDefinitionCode(processDefinitionCode); + processTaskLineage.setProcessDefinitionVersion(processDefinitionVersion); + processTaskLineage.setTaskDefinitionCode(taskDefinitionLog.getCode()); + processTaskLineage.setTaskDefinitionVersion(taskDefinitionLog.getVersion()); + processTaskLineage.setDeptProjectCode(taskDefinitionLog.getProjectCode()); + processTaskLineage.setDeptProcessDefinitionCode(dependentItem.getDefinitionCode()); + processTaskLineage.setDeptTaskDefinitionCode(dependentItem.getDepTaskCode()); + processTaskLineageList.add(processTaskLineage); + } + } + } + return processTaskLineageList; + } + private List generateTaskDefinitionList(String taskDefinitionJson) { try { List taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class); @@ -828,7 +889,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro boolean oldTaskExists = taskRelationList.stream() .anyMatch(relation -> oldProcessTaskRelation.getPostTaskCode() == relation.getPostTaskCode()); if (!oldTaskExists) { - Optional taskDepMsg = workFlowLineageService.taskDepOnTaskMsg( + Optional taskDepMsg = processLineageService.taskDependentMsg( processDefinition.getProjectCode(), oldProcessTaskRelation.getProcessDefinitionCode(), oldProcessTaskRelation.getPostTaskCode()); taskDepMsg.ifPresent(sb::append); @@ -908,6 +969,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR); throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR); } + + saveProcessLineage(processDefinition.getProjectCode(), processDefinition.getCode(), + insertVersion, taskDefinitionLogs); } else { log.info( "Process definition does not need to be updated because there is no change, projectCode:{}, processCode:{}, processVersion:{}.", @@ -1013,14 +1077,13 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro } // check process used by other task, including subprocess and dependent task type - Set taskDepOnProcess = workFlowLineageService - .queryTaskDepOnProcess(processDefinition.getProjectCode(), processDefinition.getCode()); - if (CollectionUtils.isNotEmpty(taskDepOnProcess)) { - String taskDepDetail = taskDepOnProcess.stream() - .map(task -> String.format(Constants.FORMAT_S_S_COLON, task.getProcessDefinitionName(), - task.getTaskName())) - .collect(Collectors.joining(Constants.COMMA)); - throw new ServiceException(Status.DELETE_PROCESS_DEFINITION_USE_BY_OTHER_FAIL, taskDepDetail); + Optional taskDepMsg = processLineageService.taskDependentMsg(processDefinition.getProjectCode(), + processDefinition.getCode(), 0); + + if (taskDepMsg.isPresent()) { + String errorMeg = "Process definition cannot be deleted because it has dependent, " + taskDepMsg.get(); + log.error(errorMeg); + throw new ServiceException(errorMeg); } } @@ -2219,12 +2282,47 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro putMsg(result, Status.SWITCH_PROCESS_DEFINITION_VERSION_ERROR); throw new ServiceException(Status.SWITCH_PROCESS_DEFINITION_VERSION_ERROR); } + + List processTaskRelationList = processTaskRelationMapper + .queryProcessTaskRelationsByProcessDefinitionCode(processDefinitionLog.getCode(), + processDefinitionLog.getVersion()); + List taskDefinitionList = getTaskCodeVersionDtos(processTaskRelationList); + List taskDefinitionLogList = + taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionList.stream() + .flatMap(taskCodeVersionDto -> { + TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog(); + taskDefinitionLog.setCode(taskCodeVersionDto.getCode()); + taskDefinitionLog.setVersion(taskCodeVersionDto.getVersion()); + return Stream.of(taskDefinitionLog); + }).collect(Collectors.toList())); + saveProcessLineage(processDefinitionLog.getProjectCode(), processDefinitionLog.getCode(), + processDefinitionLog.getVersion(), taskDefinitionLogList); + log.info("Switch process definition version complete, projectCode:{}, processDefinitionCode:{}, version:{}.", projectCode, code, version); putMsg(result, Status.SUCCESS); return result; } + private static @NotNull List getTaskCodeVersionDtos(List processTaskRelationList) { + List taskDefinitionList = new ArrayList<>(); + for (ProcessTaskRelation processTaskRelation : processTaskRelationList) { + if (processTaskRelation.getPreTaskCode() != 0) { + TaskCodeVersionDto taskCodeVersionDto = new TaskCodeVersionDto(); + taskCodeVersionDto.setCode(processTaskRelation.getPreTaskCode()); + taskCodeVersionDto.setVersion(processTaskRelation.getPreTaskVersion()); + taskDefinitionList.add(taskCodeVersionDto); + } + if (processTaskRelation.getPostTaskCode() != 0) { + TaskCodeVersionDto taskCodeVersionDto = new TaskCodeVersionDto(); + taskCodeVersionDto.setCode(processTaskRelation.getPostTaskCode()); + taskCodeVersionDto.setVersion(processTaskRelation.getPostTaskVersion()); + taskDefinitionList.add(taskCodeVersionDto); + } + } + return taskDefinitionList; + } + /** * check batch operate result * @@ -2330,7 +2428,14 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro if (deleteLog == 0 || deleteRelationLog == 0) { throw new ServiceException(Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR); } - log.info("Delete version: {} of workflow: {}, projectCode:{}.", version, code, projectCode); + log.info("Delete version: {} of workflow: {}, projectCode: {}", version, code, projectCode); + + // delete process lineage + int deleteProcessLineageResult = processLineageService.deleteProcessLineage(Collections.singletonList(code)); + if (deleteProcessLineageResult <= 0) { + log.error("Delete process lineage by process definition code error, processDefinitionCode: {}", code); + throw new ServiceException(Status.DELETE_PROCESS_LINEAGE_ERROR); + } } private void updateWorkflowValid(User user, ProcessDefinition oldProcessDefinition, diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java index 11ff034bb6..3258fbc236 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java @@ -684,6 +684,13 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce } else log.info("Update process definition complete, projectCode:{}, processDefinitionName:{}.", projectCode, processDefinition.getName()); + + // save process lineage + if (syncDefine) { + processDefinitionService.saveProcessLineage(projectCode, processDefinition.getCode(), + insertVersion, taskDefinitionLogs); + } + int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs, syncDefine); if (insertResult == Constants.EXIT_CODE_SUCCESS) { diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessLineageServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessLineageServiceImpl.java new file mode 100644 index 0000000000..a2f18714f4 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessLineageServiceImpl.java @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.service.impl; + +import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.exceptions.ServiceException; +import org.apache.dolphinscheduler.api.service.ProcessLineageService; +import org.apache.dolphinscheduler.common.constants.Constants; +import org.apache.dolphinscheduler.dao.entity.DependentLineageTask; +import org.apache.dolphinscheduler.dao.entity.DependentProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.ProcessTaskLineage; +import org.apache.dolphinscheduler.dao.entity.Project; +import org.apache.dolphinscheduler.dao.entity.TaskDefinition; +import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage; +import org.apache.dolphinscheduler.dao.entity.WorkFlowRelation; +import org.apache.dolphinscheduler.dao.entity.WorkFlowRelationDetail; +import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; +import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; +import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; +import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; +import org.apache.dolphinscheduler.dao.repository.ProcessTaskLineageDao; + +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; + +/** + * work flow lineage service impl + */ +@Slf4j +@Service +public class ProcessLineageServiceImpl extends BaseServiceImpl implements ProcessLineageService { + + @Autowired + private ProjectMapper projectMapper; + + @Autowired + private TaskDefinitionLogMapper taskDefinitionLogMapper; + + @Autowired + private TaskDefinitionMapper taskDefinitionMapper; + + @Autowired + private ProcessTaskLineageDao processTaskLineageDao; + @Autowired + private ProcessDefinitionMapper processDefinitionMapper; + + @Override + public List queryWorkFlowLineageByName(long projectCode, String processDefinitionName) { + Project project = projectMapper.queryByCode(projectCode); + if (project == null) { + throw new ServiceException(Status.PROJECT_NOT_FOUND, projectCode); + } + return processTaskLineageDao.queryWorkFlowLineageByName(projectCode, processDefinitionName); + } + + @Override + public WorkFlowLineage queryWorkFlowLineageByCode(long projectCode, long processDefinitionCode) { + Project project = projectMapper.queryByCode(projectCode); + if (project == null) { + throw new ServiceException(Status.PROJECT_NOT_FOUND, projectCode); + } + List upstreamProcessTaskLineageList = + processTaskLineageDao.queryByProcessDefinitionCode(processDefinitionCode); + List downstreamProcessTaskLineageList = + processTaskLineageDao.queryWorkFlowLineageByDept(projectCode, processDefinitionCode, + Constants.DEPENDENT_ALL_TASK); + List totalProcessTaskLineageList = + Stream.of(upstreamProcessTaskLineageList, downstreamProcessTaskLineageList) + .flatMap(List::stream) + .collect(Collectors.toList()); + + List workFlowRelationList = getWorkFlowRelations(totalProcessTaskLineageList); + List workFlowRelationDetailList = + getWorkflowRelationDetails(totalProcessTaskLineageList.stream() + .flatMap(pl -> { + List processDefinitionCodes = new ArrayList<>(); + processDefinitionCodes.add(pl.getProcessDefinitionCode()); + processDefinitionCodes.add(pl.getDeptProcessDefinitionCode()); + return processDefinitionCodes.stream(); + }).distinct().collect(Collectors.toList())); + + WorkFlowLineage workFlowLineage = new WorkFlowLineage(); + workFlowLineage.setWorkFlowRelationDetailList(workFlowRelationDetailList); + workFlowLineage.setWorkFlowRelationList(workFlowRelationList); + return workFlowLineage; + } + + @Override + public WorkFlowLineage queryWorkFlowLineage(long projectCode) { + Project project = projectMapper.queryByCode(projectCode); + if (project == null) { + throw new ServiceException(Status.PROJECT_NOT_FOUND, projectCode); + } + List processTaskLineageList = processTaskLineageDao.queryByProjectCode(projectCode); + List workFlowRelationList = getWorkFlowRelations(processTaskLineageList); + List workFlowRelationDetailList = + getWorkflowRelationDetails(processTaskLineageList.stream() + .flatMap(pl -> { + List processDefinitionCodes = new ArrayList<>(); + processDefinitionCodes.add(pl.getProcessDefinitionCode()); + processDefinitionCodes.add(pl.getDeptProcessDefinitionCode()); + return processDefinitionCodes.stream(); + }).distinct().collect(Collectors.toList())); + + WorkFlowLineage workFlowLineage = new WorkFlowLineage(); + workFlowLineage.setWorkFlowRelationList(workFlowRelationList); + workFlowLineage.setWorkFlowRelationDetailList(workFlowRelationDetailList); + return workFlowLineage; + } + + private List getWorkFlowRelations(List processTaskLineageList) { + List workFlowRelations = new ArrayList<>(); + List processDefinitionCodes = processTaskLineageList.stream() + .map(ProcessTaskLineage::getProcessDefinitionCode).distinct().collect(Collectors.toList()); + for (ProcessTaskLineage processTaskLineage : processTaskLineageList) { + workFlowRelations.add(new WorkFlowRelation(processTaskLineage.getDeptProcessDefinitionCode(), + processTaskLineage.getProcessDefinitionCode())); + + if (!processDefinitionCodes.contains(processTaskLineage.getDeptProcessDefinitionCode())) { + workFlowRelations.add(new WorkFlowRelation(0, processTaskLineage.getProcessDefinitionCode())); + } + } + return workFlowRelations; + } + + private List getWorkflowRelationDetails(List processDefinitionCodes) { + List workFlowRelationDetails = new ArrayList<>(); + for (Long processDefinitionCode : processDefinitionCodes) { + List workFlowRelationDetailList = + processTaskLineageDao.queryWorkFlowLineageByCode(processDefinitionCode); + workFlowRelationDetails.addAll(workFlowRelationDetailList); + } + return workFlowRelationDetails; + } + + /** + * Query tasks depend on process definition, include upstream or downstream + * and return tasks dependence with string format. + * + * @param projectCode Project code want to query tasks dependence + * @param processDefinitionCode Process definition code want to query tasks dependence + * @param taskCode Task code want to query tasks dependence + * @return Optional of formatter message + */ + @Override + public Optional taskDependentMsg(long projectCode, long processDefinitionCode, long taskCode) { + long queryTaskCode = 0; + if (taskCode != 0) { + queryTaskCode = taskCode; + } + List dependentProcessList = + processTaskLineageDao.queryWorkFlowLineageByDept(projectCode, processDefinitionCode, queryTaskCode); + if (CollectionUtils.isEmpty(dependentProcessList)) { + return Optional.empty(); + } + + List taskDepStrList = new ArrayList<>(); + + for (ProcessTaskLineage processTaskLineage : dependentProcessList) { + ProcessDefinition processDefinition = + processDefinitionMapper.queryByCode(processTaskLineage.getDeptProcessDefinitionCode()); + String taskName = ""; + if (processTaskLineage.getTaskDefinitionCode() != 0) { + TaskDefinition taskDefinition = + taskDefinitionMapper.queryByCode(processTaskLineage.getTaskDefinitionCode()); + taskName = taskDefinition.getName(); + } + taskDepStrList.add(String.format(Constants.FORMAT_S_S_COLON, processDefinition.getName(), taskName)); + } + + String taskDepStr = String.join(Constants.COMMA, taskDepStrList); + if (taskCode != 0) { + TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode); + return Optional + .of(MessageFormat.format(Status.DELETE_TASK_USE_BY_OTHER_FAIL.getMsg(), taskDefinition.getName(), + taskDepStr)); + } else { + return Optional.of(MessageFormat.format(Status.DELETE_TASK_USE_BY_OTHER_FAIL.getMsg(), "", + taskDepStr)); + } + } + + /** + * Query downstream tasks depend on a process definition or a task + * + * @param processDefinitionCode Process definition code want to query tasks dependence + * @return downstream dependent process definition list + */ + @Override + public List queryDownstreamDependentProcessDefinitions(Long processDefinitionCode) { + List dependentProcessDefinitionList = new ArrayList<>(); + List processTaskLineageList = + processTaskLineageDao.queryWorkFlowLineageByDept(Constants.DEFAULT_PROJECT_CODE, processDefinitionCode, + Constants.DEPENDENT_ALL_TASK); + if (processTaskLineageList.isEmpty()) { + return dependentProcessDefinitionList; + } + + List processDefinitionList = + processDefinitionMapper.queryByCodes(processTaskLineageList.stream() + .map(ProcessTaskLineage::getDeptProcessDefinitionCode).distinct().collect(Collectors.toList())); + List taskDefinitionList = taskDefinitionMapper.queryByCodeList(processTaskLineageList.stream() + .map(ProcessTaskLineage::getDeptTaskDefinitionCode).distinct().collect(Collectors.toList())); + for (TaskDefinition taskDefinition : taskDefinitionList) { + DependentProcessDefinition dependentProcessDefinition = new DependentProcessDefinition(); + processTaskLineageList.stream() + .filter(processLineage -> processLineage.getDeptTaskDefinitionCode() == taskDefinition.getCode()) + .findFirst() + .ifPresent(processLineage -> { + dependentProcessDefinition + .setProcessDefinitionCode(processLineage.getDeptProcessDefinitionCode()); + dependentProcessDefinition.setTaskDefinitionCode(taskDefinition.getCode()); + dependentProcessDefinition.setTaskParams(taskDefinition.getTaskParams()); + dependentProcessDefinition.setWorkerGroup(taskDefinition.getWorkerGroup()); + }); + processDefinitionList.stream() + .filter(processDefinition -> processDefinition.getCode() == dependentProcessDefinition + .getProcessDefinitionCode()) + .findFirst() + .ifPresent(processDefinition -> { + dependentProcessDefinition.setProcessDefinitionVersion(processDefinition.getVersion()); + }); + } + + return dependentProcessDefinitionList; + } + + @Override + public List queryDependentProcessDefinitions(long projectCode, long processDefinitionCode, + Long taskCode) { + Project project = projectMapper.queryByCode(projectCode); + if (project == null) { + throw new ServiceException(Status.PROJECT_NOT_FOUND, projectCode); + } + List processTaskLineageList = processTaskLineageDao.queryWorkFlowLineageByDept(projectCode, + processDefinitionCode, taskCode == null ? 0 : taskCode); + List processDefinitionList = + processDefinitionMapper.queryByCodes(processTaskLineageList.stream() + .map(ProcessTaskLineage::getProcessDefinitionCode).distinct().collect(Collectors.toList())); + List taskDefinitionList = taskDefinitionMapper.queryByCodeList(processTaskLineageList.stream() + .map(ProcessTaskLineage::getTaskDefinitionCode).filter(code -> code != 0).distinct() + .collect(Collectors.toList())); + List dependentLineageTaskList = new ArrayList<>(); + for (ProcessTaskLineage processTaskLineage : processTaskLineageList) { + DependentLineageTask dependentLineageTask = new DependentLineageTask(); + taskDefinitionList.stream() + .filter(taskDefinition -> taskDefinition.getCode() == processTaskLineage.getTaskDefinitionCode()) + .findFirst() + .ifPresent(taskDefinition -> { + dependentLineageTask.setTaskDefinitionCode(taskDefinition.getCode()); + dependentLineageTask.setTaskDefinitionName(taskDefinition.getName()); + }); + processDefinitionList.stream() + .filter(processDefinition -> processDefinition.getCode() == processTaskLineage + .getProcessDefinitionCode()) + .findFirst() + .ifPresent(processDefinition -> { + dependentLineageTask.setProcessDefinitionCode(processDefinition.getCode()); + dependentLineageTask.setProcessDefinitionName(processDefinition.getName()); + dependentLineageTask.setProjectCode(processDefinition.getProjectCode()); + }); + dependentLineageTaskList.add(dependentLineageTask); + } + return dependentLineageTaskList; + } + + @Override + public int createProcessLineage(List processTaskLineages) { + return processTaskLineageDao.batchInsert(processTaskLineages); + } + + @Override + public int updateProcessLineage(List processTaskLineages) { + return processTaskLineageDao.updateProcessTaskLineage(processTaskLineages); + } + + @Override + public int deleteProcessLineage(List processDefinitionCodes) { + return processTaskLineageDao.batchDeleteByProcessDefinitionCode(processDefinitionCodes); + } +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java deleted file mode 100644 index 9763f21b0f..0000000000 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java +++ /dev/null @@ -1,306 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.api.service.impl; - -import org.apache.dolphinscheduler.api.enums.Status; -import org.apache.dolphinscheduler.api.exceptions.ServiceException; -import org.apache.dolphinscheduler.api.service.WorkFlowLineageService; -import org.apache.dolphinscheduler.common.constants.Constants; -import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.dao.entity.DependentProcessDefinition; -import org.apache.dolphinscheduler.dao.entity.ProcessLineage; -import org.apache.dolphinscheduler.dao.entity.Project; -import org.apache.dolphinscheduler.dao.entity.TaskDefinition; -import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; -import org.apache.dolphinscheduler.dao.entity.TaskMainInfo; -import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage; -import org.apache.dolphinscheduler.dao.entity.WorkFlowRelation; -import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; -import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; -import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; -import org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper; -import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem; -import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel; -import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters; -import org.apache.dolphinscheduler.plugin.task.api.utils.TaskTypeUtils; - -import org.apache.commons.lang3.StringUtils; - -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Objects; -import java.util.Optional; -import java.util.Set; -import java.util.stream.Collectors; - -import lombok.extern.slf4j.Slf4j; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; -import org.springframework.util.CollectionUtils; - -/** - * work flow lineage service impl - */ -@Slf4j -@Service -public class WorkFlowLineageServiceImpl extends BaseServiceImpl implements WorkFlowLineageService { - - @Autowired - private WorkFlowLineageMapper workFlowLineageMapper; - - @Autowired - private ProjectMapper projectMapper; - - @Autowired - private TaskDefinitionLogMapper taskDefinitionLogMapper; - - @Autowired - private TaskDefinitionMapper taskDefinitionMapper; - - @Override - public List queryWorkFlowLineageByName(long projectCode, String workFlowName) { - Project project = projectMapper.queryByCode(projectCode); - if (project == null) { - throw new ServiceException(Status.PROJECT_NOT_FOUND, projectCode); - } - return workFlowLineageMapper.queryWorkFlowLineageByName(projectCode, workFlowName); - } - - @Override - public Map queryWorkFlowLineageByCode(long projectCode, long sourceWorkFlowCode) { - Project project = projectMapper.queryByCode(projectCode); - if (project == null) { - throw new ServiceException(Status.PROJECT_NOT_FOUND, projectCode); - } - List workFlowLineages = new ArrayList<>(); - Set workFlowRelations = new HashSet<>(); - recursiveWorkFlow(projectCode, sourceWorkFlowCode, workFlowLineages, workFlowRelations); - Map workFlowLists = new HashMap<>(); - // todo: use vo - workFlowLists.put(Constants.WORKFLOW_LIST, workFlowLineages); - workFlowLists.put(Constants.WORKFLOW_RELATION_LIST, workFlowRelations); - return workFlowLists; - } - - private void recursiveWorkFlow(long projectCode, - long sourceWorkFlowCode, - List workFlowLineages, - Set workFlowRelations) { - workFlowLineages.add(workFlowLineageMapper.queryWorkFlowLineageByCode(projectCode, sourceWorkFlowCode)); - - List downStreamWorkFlowLineages = - workFlowLineageMapper.queryDownstreamLineageByProcessDefinitionCode(sourceWorkFlowCode, "DEPENDENT"); - workFlowLineages.addAll(downStreamWorkFlowLineages); - downStreamWorkFlowLineages.forEach(workFlowLineage -> workFlowRelations - .add(new WorkFlowRelation(sourceWorkFlowCode, workFlowLineage.getWorkFlowCode()))); - - List upstreamWorkFlowLineages = new ArrayList<>(); - getUpstreamLineages(sourceWorkFlowCode, upstreamWorkFlowLineages); - workFlowLineages.addAll(upstreamWorkFlowLineages); - upstreamWorkFlowLineages.forEach(workFlowLineage -> workFlowRelations - .add(new WorkFlowRelation(workFlowLineage.getWorkFlowCode(), sourceWorkFlowCode))); - } - - private void getUpstreamLineages(long sourceWorkFlowCode, - List upstreamWorkFlowLineages) { - List workFlowDependentDefinitionList = - workFlowLineageMapper.queryUpstreamDependentParamsByProcessDefinitionCode(sourceWorkFlowCode, - "DEPENDENT"); - - List upstreamProcessDefinitionCodes = new ArrayList<>(); - - getProcessDefinitionCodeByDependentDefinitionList(workFlowDependentDefinitionList, - upstreamProcessDefinitionCodes); - - if (!upstreamProcessDefinitionCodes.isEmpty()) { - upstreamWorkFlowLineages.addAll( - workFlowLineageMapper.queryWorkFlowLineageByProcessDefinitionCodes(upstreamProcessDefinitionCodes)); - } - } - - /** - * get dependent process definition code by dependent process definition list - */ - private void getProcessDefinitionCodeByDependentDefinitionList(List dependentDefinitionList, - List processDefinitionCodes) { - for (DependentProcessDefinition dependentProcessDefinition : dependentDefinitionList) { - for (DependentTaskModel dependentTaskModel : dependentProcessDefinition.getDependentParameters() - .getDependence().getDependTaskList()) { - for (DependentItem dependentItem : dependentTaskModel.getDependItemList()) { - if (!processDefinitionCodes.contains(dependentItem.getDefinitionCode())) { - processDefinitionCodes.add(dependentItem.getDefinitionCode()); - } - } - } - } - } - - @Override - public Map queryWorkFlowLineage(long projectCode) { - Map result = new HashMap<>(); - Project project = projectMapper.queryByCode(projectCode); - if (project == null) { - log.error("Project does not exist, projectCode:{}.", projectCode); - putMsg(result, Status.PROJECT_NOT_FOUND, projectCode); - return result; - } - List processLineages = workFlowLineageMapper.queryProcessLineage(projectCode); - Map workFlowLineagesMap = new HashMap<>(); - Set workFlowRelations = new HashSet<>(); - if (!processLineages.isEmpty()) { - List workFlowLineages = - workFlowLineageMapper.queryWorkFlowLineageByLineage(processLineages); - workFlowLineagesMap = workFlowLineages.stream() - .collect(Collectors.toMap(WorkFlowLineage::getWorkFlowCode, workFlowLineage -> workFlowLineage)); - Map> workFlowMap = new HashMap<>(); - for (ProcessLineage processLineage : processLineages) { - workFlowMap.compute(processLineage.getProcessDefinitionCode(), (k, v) -> { - if (v == null) { - v = new ArrayList<>(); - } - if (processLineage.getPreTaskCode() > 0) { - v.add(new TaskDefinition(processLineage.getPreTaskCode(), processLineage.getPreTaskVersion())); - } - if (processLineage.getPostTaskCode() > 0) { - v.add(new TaskDefinition(processLineage.getPostTaskCode(), - processLineage.getPostTaskVersion())); - } - return v; - }); - } - for (Entry> workFlow : workFlowMap.entrySet()) { - Set sourceWorkFlowCodes = - querySourceWorkFlowCodes(projectCode, workFlow.getKey(), workFlow.getValue()); - if (sourceWorkFlowCodes.isEmpty()) { - workFlowRelations.add(new WorkFlowRelation(0L, workFlow.getKey())); - } else { - workFlowLineagesMap.get(workFlow.getKey()) - .setSourceWorkFlowCode(StringUtils.join(sourceWorkFlowCodes, Constants.COMMA)); - sourceWorkFlowCodes - .forEach(code -> workFlowRelations.add(new WorkFlowRelation(code, workFlow.getKey()))); - } - } - } - Map workFlowLists = new HashMap<>(); - workFlowLists.put(Constants.WORKFLOW_LIST, workFlowLineagesMap.values()); - workFlowLists.put(Constants.WORKFLOW_RELATION_LIST, workFlowRelations); - result.put(Constants.DATA_LIST, workFlowLists); - putMsg(result, Status.SUCCESS); - return result; - } - - private Set querySourceWorkFlowCodes(long projectCode, long workFlowCode, - List taskDefinitionList) { - Set sourceWorkFlowCodes = new HashSet<>(); - if (taskDefinitionList == null || taskDefinitionList.isEmpty()) { - return sourceWorkFlowCodes; - } - List taskDefinitionLogs = taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionList); - for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) { - if (taskDefinitionLog.getProjectCode() == projectCode) { - if (TaskTypeUtils.isDependentTask(taskDefinitionLog.getTaskType())) { - DependentParameters dependentParameters = - JSONUtils.parseObject(taskDefinitionLog.getDependence(), DependentParameters.class); - if (dependentParameters != null) { - List dependTaskList = - dependentParameters.getDependence().getDependTaskList(); - if (!CollectionUtils.isEmpty(dependTaskList)) { - for (DependentTaskModel taskModel : dependTaskList) { - List dependItemList = taskModel.getDependItemList(); - for (DependentItem dependentItem : dependItemList) { - if (dependentItem.getProjectCode() == projectCode - && dependentItem.getDefinitionCode() != workFlowCode) { - sourceWorkFlowCodes.add(dependentItem.getDefinitionCode()); - } - } - } - } - } - } - } - } - return sourceWorkFlowCodes; - } - - /** - * Query and return tasks dependence with string format, is a wrapper of queryTaskDepOnTask and task query method. - * - * @param projectCode Project code want to query tasks dependence - * @param processDefinitionCode Process definition code want to query tasks dependence - * @param taskCode Task code want to query tasks dependence - * @return Optional of formatter message - */ - @Override - public Optional taskDepOnTaskMsg(long projectCode, long processDefinitionCode, long taskCode) { - List tasksDep = - workFlowLineageMapper.queryTaskDepOnTask(projectCode, processDefinitionCode, taskCode); - if (CollectionUtils.isEmpty(tasksDep)) { - return Optional.empty(); - } - - String taskDepStr = tasksDep.stream().map( - task -> String.format(Constants.FORMAT_S_S_COLON, task.getProcessDefinitionName(), task.getTaskName())) - .collect(Collectors.joining(Constants.COMMA)); - TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode); - return Optional.of(MessageFormat.format(Status.DELETE_TASK_USE_BY_OTHER_FAIL.getMsg(), taskDefinition.getName(), - taskDepStr)); - } - - /** - * Query tasks depend on process definition, include upstream or downstream - * - * @param projectCode Project code want to query tasks dependence - * @param processDefinitionCode Process definition code want to query tasks dependence - * @return Set of TaskMainInfo - */ - @Override - public Set queryTaskDepOnProcess(long projectCode, long processDefinitionCode) { - Set taskMainInfos = new HashSet<>(); - List taskDependents = - workFlowLineageMapper.queryTaskDependentOnProcess(processDefinitionCode, 0); - List taskSubProcess = - workFlowLineageMapper.queryTaskSubProcessDepOnProcess(projectCode, processDefinitionCode); - taskMainInfos.addAll(taskDependents); - taskMainInfos.addAll(taskSubProcess); - return taskMainInfos; - } - - /** - * Query downstream tasks depend on a process definition or a task - * - * @param processDefinitionCode Process definition code want to query tasks dependence - * @param taskCode Task code want to query tasks dependence - * @return downstream dependent tasks - */ - @Override - public Map queryDownstreamDependentTasks(Long processDefinitionCode, Long taskCode) { - Map result = new HashMap<>(); - List taskDependents = - workFlowLineageMapper.queryTaskDependentOnProcess(processDefinitionCode, - Objects.isNull(taskCode) ? 0 : taskCode.longValue()); - result.put(Constants.DATA_LIST, taskDependents); - putMsg(result, Status.SUCCESS); - return result; - } -} diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkFlowLineageControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessTaskLineageControllerTest.java similarity index 67% rename from dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkFlowLineageControllerTest.java rename to dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessTaskLineageControllerTest.java index 6329cc584a..0a3da8437c 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkFlowLineageControllerTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessTaskLineageControllerTest.java @@ -20,14 +20,13 @@ package org.apache.dolphinscheduler.api.controller; import static org.apache.dolphinscheduler.api.AssertionsHelper.assertDoesNotThrow; import org.apache.dolphinscheduler.api.enums.Status; -import org.apache.dolphinscheduler.api.service.impl.WorkFlowLineageServiceImpl; +import org.apache.dolphinscheduler.api.service.impl.ProcessLineageServiceImpl; import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.dao.entity.User; import java.text.MessageFormat; import java.util.Collections; -import java.util.HashMap; import java.util.Map; import org.junit.jupiter.api.BeforeEach; @@ -42,13 +41,13 @@ import org.mockito.junit.jupiter.MockitoExtension; * work flow lineage controller test */ @ExtendWith(MockitoExtension.class) -public class WorkFlowLineageControllerTest { +public class ProcessTaskLineageControllerTest { @InjectMocks - private WorkFlowLineageController workFlowLineageController; + private ProcessLineageController processLineageController; @Mock - private WorkFlowLineageServiceImpl workFlowLineageService; + private ProcessLineageServiceImpl workFlowLineageService; protected User user; @@ -76,27 +75,7 @@ public class WorkFlowLineageControllerTest { String searchVal = "test"; Mockito.when(workFlowLineageService.queryWorkFlowLineageByName(projectCode, searchVal)) .thenReturn(Collections.emptyList()); - assertDoesNotThrow(() -> workFlowLineageController.queryWorkFlowLineageByName(user, projectCode, searchVal)); + assertDoesNotThrow(() -> processLineageController.queryWorkFlowLineageByName(user, projectCode, searchVal)); } - @Test - public void testQueryWorkFlowLineageByCode() { - long projectCode = 1L; - long code = 1L; - Mockito.when(workFlowLineageService.queryWorkFlowLineageByCode(projectCode, code)).thenReturn(new HashMap<>()); - assertDoesNotThrow(() -> workFlowLineageController.queryWorkFlowLineageByCode(user, projectCode, code)); - } - - @Test - public void testQueryDownstreamDependentTaskList() { - long code = 1L; - long taskCode = 1L; - Map result = new HashMap<>(); - result.put(Constants.STATUS, Status.SUCCESS); - Mockito.when(workFlowLineageService.queryDownstreamDependentTasks(code, taskCode)) - .thenReturn(result); - - assertDoesNotThrow( - () -> workFlowLineageController.queryDownstreamDependentTaskList(user, code, taskCode)); - } } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecuteFunctionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecuteFunctionServiceTest.java index 04dabf5ead..c722fb6014 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecuteFunctionServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecuteFunctionServiceTest.java @@ -162,6 +162,9 @@ public class ExecuteFunctionServiceTest { @Mock private ProcessDefinitionService processDefinitionService; + @Mock + private ProcessLineageService processLineageService; + private int processDefinitionId = 1; private int processDefinitionVersion = 1; @@ -358,7 +361,7 @@ public class ExecuteFunctionServiceTest { dependentProcessDefinition.setWorkerGroup(WorkerGroupUtils.getDefaultWorkerGroup()); dependentProcessDefinition.setTaskParams( "{\"localParams\":[],\"resourceList\":[],\"dependence\":{\"relation\":\"AND\",\"dependTaskList\":[{\"relation\":\"AND\",\"dependItemList\":[{\"depTaskCode\":2,\"status\":\"SUCCESS\"}]}]},\"conditionResult\":{\"successNode\":[1],\"failedNode\":[1]}}"); - Mockito.when(processService.queryDependentProcessDefinitionByProcessDefinitionCode(processDefinitionCode)) + Mockito.when(processLineageService.queryDownstreamDependentProcessDefinitions(processDefinitionCode)) .thenReturn(Lists.newArrayList(dependentProcessDefinition)); Map processDefinitionWorkerGroupMap = new HashMap<>(); @@ -387,7 +390,7 @@ public class ExecuteFunctionServiceTest { childDependent.setWorkerGroup(WorkerGroupUtils.getDefaultWorkerGroup()); childDependent.setTaskParams( "{\"localParams\":[],\"resourceList\":[],\"dependence\":{\"relation\":\"AND\",\"dependTaskList\":[{\"relation\":\"AND\",\"dependItemList\":[{\"depTaskCode\":3,\"status\":\"SUCCESS\"}]}]},\"conditionResult\":{\"successNode\":[1],\"failedNode\":[1]}}"); - Mockito.when(processService.queryDependentProcessDefinitionByProcessDefinitionCode( + Mockito.when(processLineageService.queryDownstreamDependentProcessDefinitions( dependentProcessDefinition.getProcessDefinitionCode())).thenReturn(Lists.newArrayList(childDependent)) .thenReturn(Lists.newArrayList()); int allLevelDependentCount = executorService.createComplementDependentCommand(schedules, command, true); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java index 37af53dbbd..b4b6dcb00d 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java @@ -84,7 +84,6 @@ import java.nio.charset.StandardCharsets; import java.text.MessageFormat; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.HashSet; @@ -108,7 +107,6 @@ import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.mock.web.MockMultipartFile; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; @ExtendWith(MockitoExtension.class) @@ -171,7 +169,7 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool { private DataSourceMapper dataSourceMapper; @Mock - private WorkFlowLineageService workFlowLineageService; + private ProcessLineageService processLineageService; @Mock private MetricsCleanUpService metricsCleanUpService; @@ -524,8 +522,8 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool { when(processDefinitionDao.queryByCode(46L)).thenReturn(Optional.of(processDefinition)); when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(getSchedule()); when(scheduleMapper.deleteById(46)).thenReturn(1); - when(workFlowLineageService.queryTaskDepOnProcess(project.getCode(), processDefinition.getCode())) - .thenReturn(Collections.emptySet()); + when(processLineageService.taskDependentMsg(project.getCode(), processDefinition.getCode(), 0)) + .thenReturn(Optional.empty()); processDefinitionService.deleteProcessDefinitionByCode(user, 46L); Mockito.verify(metricsCleanUpService, times(1)).cleanUpWorkflowMetricsByDefinitionCode(46L); @@ -540,19 +538,17 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool { // process used by other task, sub process user.setUserType(UserType.ADMIN_USER); TaskMainInfo taskMainInfo = getTaskMainInfo().get(0); - when(workFlowLineageService.queryTaskDepOnProcess(project.getCode(), processDefinition.getCode())) - .thenReturn(ImmutableSet.copyOf(getTaskMainInfo())); + when(processLineageService.taskDependentMsg(project.getCode(), processDefinition.getCode(), 0)) + .thenReturn(Optional.of(taskMainInfo.getTaskName())); exception = Assertions.assertThrows(ServiceException.class, () -> processDefinitionService.deleteProcessDefinitionByCode(user, 46L)); - Assertions.assertEquals(Status.DELETE_PROCESS_DEFINITION_USE_BY_OTHER_FAIL.getCode(), - ((ServiceException) exception).getCode()); // delete success schedule.setReleaseState(ReleaseState.OFFLINE); when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(getSchedule()); when(scheduleMapper.deleteById(schedule.getId())).thenReturn(1); - when(workFlowLineageService.queryTaskDepOnProcess(project.getCode(), processDefinition.getCode())) - .thenReturn(Collections.emptySet()); + when(processLineageService.taskDependentMsg(project.getCode(), processDefinition.getCode(), 0)) + .thenReturn(Optional.empty()); Assertions.assertDoesNotThrow(() -> processDefinitionService.deleteProcessDefinitionByCode(user, 46L)); Mockito.verify(metricsCleanUpService, times(2)).cleanUpWorkflowMetricsByDefinitionCode(46L); } @@ -600,8 +596,8 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool { // delete success process.setReleaseState(ReleaseState.OFFLINE); when(processDefinitionDao.queryByCode(processDefinitionCode)).thenReturn(Optional.of(process)); - when(workFlowLineageService.queryTaskDepOnProcess(project.getCode(), process.getCode())) - .thenReturn(Collections.emptySet()); + when(processLineageService.taskDependentMsg(project.getCode(), process.getCode(), 0)) + .thenReturn(Optional.empty()); putMsg(result, Status.SUCCESS, projectCode); doNothing().when(metricsCleanUpService).cleanUpWorkflowMetricsByDefinitionCode(11L); Map deleteSuccess = diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskLineageServiceTest.java similarity index 50% rename from dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageServiceTest.java rename to dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskLineageServiceTest.java index 316ed5d18e..2d8a28cad6 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskLineageServiceTest.java @@ -19,23 +19,20 @@ package org.apache.dolphinscheduler.api.service; import static org.mockito.Mockito.when; -import org.apache.dolphinscheduler.api.service.impl.WorkFlowLineageServiceImpl; -import org.apache.dolphinscheduler.common.constants.Constants; -import org.apache.dolphinscheduler.dao.entity.ProcessLineage; +import org.apache.dolphinscheduler.api.service.impl.ProcessLineageServiceImpl; +import org.apache.dolphinscheduler.dao.entity.ProcessTaskLineage; import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage; import org.apache.dolphinscheduler.dao.entity.WorkFlowRelation; +import org.apache.dolphinscheduler.dao.entity.WorkFlowRelationDetail; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; -import org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper; +import org.apache.dolphinscheduler.dao.repository.ProcessTaskLineageDao; import org.apache.commons.collections4.CollectionUtils; import java.util.ArrayList; -import java.util.Collection; import java.util.List; -import java.util.Map; -import java.util.Set; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -49,13 +46,13 @@ import org.mockito.junit.jupiter.MockitoExtension; * work flow lineage service test */ @ExtendWith(MockitoExtension.class) -public class WorkFlowLineageServiceTest { +public class ProcessTaskLineageServiceTest { @InjectMocks - private WorkFlowLineageServiceImpl workFlowLineageService; + private ProcessLineageServiceImpl processLineageService; @Mock - private WorkFlowLineageMapper workFlowLineageMapper; + private ProcessTaskLineageDao processTaskLineageDao; @Mock private ProjectMapper projectMapper; @@ -83,9 +80,9 @@ public class WorkFlowLineageServiceTest { Project project = getProject("test"); String name = "test"; when(projectMapper.queryByCode(1L)).thenReturn(project); - when(workFlowLineageMapper.queryWorkFlowLineageByName(Mockito.anyLong(), Mockito.any())) + when(processTaskLineageDao.queryWorkFlowLineageByName(Mockito.anyLong(), Mockito.any())) .thenReturn(getWorkFlowLineages()); - List workFlowLineages = workFlowLineageService.queryWorkFlowLineageByName(1L, name); + List workFlowLineages = processLineageService.queryWorkFlowLineageByName(1L, name); Assertions.assertTrue(CollectionUtils.isNotEmpty(workFlowLineages)); } @@ -93,43 +90,44 @@ public class WorkFlowLineageServiceTest { public void testQueryWorkFlowLineage() { Project project = getProject("test"); - List processLineages = new ArrayList<>(); - ProcessLineage processLineage = new ProcessLineage(); - processLineage.setPreTaskVersion(1); - processLineage.setPreTaskCode(1L); - processLineage.setPostTaskCode(2L); - processLineage.setPostTaskVersion(1); - processLineage.setProcessDefinitionCode(1111L); - processLineage.setProcessDefinitionVersion(1); - processLineage.setProjectCode(1111L); - processLineages.add(processLineage); - WorkFlowLineage workFlowLineage = new WorkFlowLineage(); - workFlowLineage.setSourceWorkFlowCode(""); - workFlowLineage.setWorkFlowCode(1111L); - List workFlowLineages = new ArrayList<>(); - workFlowLineages.add(workFlowLineage); + List processTaskLineages = new ArrayList<>(); + ProcessTaskLineage processTaskLineage = new ProcessTaskLineage(); + processTaskLineage.setProcessDefinitionCode(1); + processTaskLineage.setProcessDefinitionVersion(1); + processTaskLineage.setTaskDefinitionCode(2L); + processTaskLineage.setTaskDefinitionVersion(1); + processTaskLineage.setDeptProjectCode(1111L); + processTaskLineage.setDeptProcessDefinitionCode(1); + processTaskLineage.setDeptTaskDefinitionCode(1111L); + processTaskLineages.add(processTaskLineage); + + List workFlowRelationDetailList = new ArrayList<>(); + WorkFlowRelationDetail workFlowRelationDetail = new WorkFlowRelationDetail(); + workFlowRelationDetail.setWorkFlowCode(processTaskLineage.getProcessDefinitionCode()); + workFlowRelationDetail.setWorkFlowName("testProcessDefinitionName"); + workFlowRelationDetailList.add(workFlowRelationDetail); when(projectMapper.queryByCode(1L)).thenReturn(project); - when(workFlowLineageMapper.queryProcessLineage(project.getCode())).thenReturn(processLineages); - when(workFlowLineageMapper.queryWorkFlowLineageByLineage(processLineages)).thenReturn(workFlowLineages); - - Map result = workFlowLineageService.queryWorkFlowLineage(1L); - - Map workFlowLists = (Map) result.get(Constants.DATA_LIST); - Collection workFlowLineageList = - (Collection) workFlowLists.get(Constants.WORKFLOW_LIST); - Set workFlowRelations = - (Set) workFlowLists.get(Constants.WORKFLOW_RELATION_LIST); - Assertions.assertTrue(workFlowLineageList.size() > 0); - Assertions.assertTrue(workFlowRelations.size() > 0); + when(processTaskLineageDao.queryByProjectCode(project.getCode())).thenReturn(processTaskLineages); + when(processTaskLineageDao.queryWorkFlowLineageByCode(processTaskLineage.getProcessDefinitionCode())) + .thenReturn(workFlowRelationDetailList); + + WorkFlowLineage workFlowLineage = processLineageService.queryWorkFlowLineage(1L); + + List workFlowLineageList = + workFlowLineage.getWorkFlowRelationDetailList(); + List workFlowRelations = + workFlowLineage.getWorkFlowRelationList(); + Assertions.assertTrue(!workFlowLineageList.isEmpty()); + Assertions.assertTrue(!workFlowRelations.isEmpty()); } - private List getWorkFlowLineages() { - List workFlowLineages = new ArrayList<>(); - WorkFlowLineage workFlowLineage = new WorkFlowLineage(); - workFlowLineage.setWorkFlowCode(1); - workFlowLineage.setWorkFlowName("testdag"); - workFlowLineages.add(workFlowLineage); + private List getWorkFlowLineages() { + List workFlowLineages = new ArrayList<>(); + WorkFlowRelationDetail workFlowRelationDetail = new WorkFlowRelationDetail(); + workFlowRelationDetail.setWorkFlowCode(1); + workFlowRelationDetail.setWorkFlowName("testdag"); + workFlowLineages.add(workFlowRelationDetail); return workFlowLineages; } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java index c7d771796f..03a00e67d2 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java @@ -482,11 +482,9 @@ public final class Constants { public static final String TOTAL = "total"; - /** - * workflow - */ - public static final String WORKFLOW_LIST = "workFlowList"; - public static final String WORKFLOW_RELATION_LIST = "workFlowRelationList"; + public static final long DEFAULT_PROJECT_CODE = 0; + + public static final long DEPENDENT_ALL_TASK = 0; /** * session user diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentLineageTask.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentLineageTask.java new file mode 100644 index 0000000000..2f3ffdff56 --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentLineageTask.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.entity; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class DependentLineageTask { + + private long projectCode; + private long processDefinitionCode; + private String processDefinitionName; + private long taskDefinitionCode; + private String taskDefinitionName; +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentProcessDefinition.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentProcessDefinition.java index fcce2a33d3..111b07aaba 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentProcessDefinition.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentProcessDefinition.java @@ -25,9 +25,12 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameter import java.util.List; +import lombok.Data; + /** * dependent process definition */ +@Data public class DependentProcessDefinition { /** @@ -35,11 +38,6 @@ public class DependentProcessDefinition { */ private long processDefinitionCode; - /** - * process definition name - */ - private String processDefinitionName; - /** * process definition version **/ @@ -106,52 +104,4 @@ public class DependentProcessDefinition { return JSONUtils.parseObject(taskParams, DependentParameters.class); } - public String getProcessDefinitionName() { - return this.processDefinitionName; - } - - public void setProcessDefinitionName(String name) { - this.processDefinitionName = name; - } - - public long getProcessDefinitionCode() { - return this.processDefinitionCode; - } - - public void setProcessDefinitionCode(long code) { - this.processDefinitionCode = code; - } - - public int getProcessDefinitionVersion() { - return processDefinitionVersion; - } - - public void setProcessDefinitionVersion(int processDefinitionVersion) { - this.processDefinitionVersion = processDefinitionVersion; - } - - public long getTaskDefinitionCode() { - return this.taskDefinitionCode; - } - - public void setTaskDefinitionCode(long code) { - this.taskDefinitionCode = code; - } - - public String getTaskParams() { - return this.taskParams; - } - - public void setTaskParams(String taskParams) { - this.taskParams = taskParams; - } - - public String getWorkerGroup() { - return this.workerGroup; - } - - public void setWorkerGroup(String workerGroup) { - this.workerGroup = workerGroup; - } - } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessLineage.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessLineage.java deleted file mode 100644 index 27d0f8cac0..0000000000 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessLineage.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.dao.entity; - -/** - * Process lineage - */ -public class ProcessLineage { - - /** - * project code - */ - private long projectCode; - - /** - * post task code - */ - private long postTaskCode; - - /** - * post task version - */ - private int postTaskVersion; - - /** - * pre task code - */ - private long preTaskCode; - - /** - * pre task version - */ - private int preTaskVersion; - - /** - * process definition code - */ - private long processDefinitionCode; - - /** - * process definition version - */ - private int processDefinitionVersion; - - public long getProjectCode() { - return projectCode; - } - - public void setProjectCode(long projectCode) { - this.projectCode = projectCode; - } - - public long getPostTaskCode() { - return postTaskCode; - } - - public void setPostTaskCode(long postTaskCode) { - this.postTaskCode = postTaskCode; - } - - public int getPostTaskVersion() { - return postTaskVersion; - } - - public void setPostTaskVersion(int postTaskVersion) { - this.postTaskVersion = postTaskVersion; - } - - public long getPreTaskCode() { - return preTaskCode; - } - - public void setPreTaskCode(long preTaskCode) { - this.preTaskCode = preTaskCode; - } - - public int getPreTaskVersion() { - return preTaskVersion; - } - - public void setPreTaskVersion(int preTaskVersion) { - this.preTaskVersion = preTaskVersion; - } - - public long getProcessDefinitionCode() { - return processDefinitionCode; - } - - public void setProcessDefinitionCode(long processDefinitionCode) { - this.processDefinitionCode = processDefinitionCode; - } - - public int getProcessDefinitionVersion() { - return processDefinitionVersion; - } - - public void setProcessDefinitionVersion(int processDefinitionVersion) { - this.processDefinitionVersion = processDefinitionVersion; - } -} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskLineage.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskLineage.java new file mode 100644 index 0000000000..8e45fde680 --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskLineage.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.entity; + +import java.util.Date; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; + +@Data +@NoArgsConstructor +@AllArgsConstructor +@TableName("t_ds_process_task_lineage") +public class ProcessTaskLineage { + + @TableId(value = "id", type = IdType.AUTO) + private Integer id; + + private long processDefinitionCode; + private int processDefinitionVersion; + private long taskDefinitionCode; + private int taskDefinitionVersion; + private long deptProjectCode; + private long deptProcessDefinitionCode; + private long deptTaskDefinitionCode; + private Date createTime; + private Date updateTime; +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowLineage.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowLineage.java index 0f2e325da5..ffeffcc9be 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowLineage.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowLineage.java @@ -17,20 +17,15 @@ package org.apache.dolphinscheduler.dao.entity; -import java.util.Date; +import java.util.List; import lombok.Data; +import lombok.NoArgsConstructor; @Data +@NoArgsConstructor public class WorkFlowLineage { - private long workFlowCode; - private String workFlowName; - private String workFlowPublishStatus; - private Date scheduleStartTime; - private Date scheduleEndTime; - private String crontab; - private int schedulePublishStatus; - private String sourceWorkFlowCode; - + private List workFlowRelationList; + private List workFlowRelationDetailList; } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowRelation.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowRelation.java index 72dbf8790d..1327fae6ef 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowRelation.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowRelation.java @@ -19,35 +19,18 @@ package org.apache.dolphinscheduler.dao.entity; import java.util.Objects; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@AllArgsConstructor public class WorkFlowRelation { private long sourceWorkFlowCode; private long targetWorkFlowCode; - public long getSourceWorkFlowCode() { - return sourceWorkFlowCode; - } - - public void setSourceWorkFlowCode(long sourceWorkFlowCode) { - this.sourceWorkFlowCode = sourceWorkFlowCode; - } - - public long getTargetWorkFlowCode() { - return targetWorkFlowCode; - } - - public void setTargetWorkFlowCode(long targetWorkFlowCode) { - this.targetWorkFlowCode = targetWorkFlowCode; - } - - public WorkFlowRelation() { - } - - public WorkFlowRelation(long sourceWorkFlowCode, long targetWorkFlowCode) { - this.sourceWorkFlowCode = sourceWorkFlowCode; - this.targetWorkFlowCode = targetWorkFlowCode; - } - @Override public boolean equals(Object o) { if (this == o) { diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowRelationDetail.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowRelationDetail.java new file mode 100644 index 0000000000..fc2b09e824 --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowRelationDetail.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.entity; + +import java.util.Date; + +import lombok.Data; + +@Data +public class WorkFlowRelationDetail { + + private long workFlowCode; + private String workFlowName; + private String workFlowPublishStatus; + private Date scheduleStartTime; + private Date scheduleEndTime; + private String crontab; + private int schedulePublishStatus; + private String sourceWorkFlowCode; + +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskLineageMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskLineageMapper.java new file mode 100644 index 0000000000..08920b89d3 --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskLineageMapper.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.mapper; + +import org.apache.dolphinscheduler.dao.entity.ProcessTaskLineage; +import org.apache.dolphinscheduler.dao.entity.WorkFlowRelationDetail; + +import org.apache.ibatis.annotations.Param; + +import java.util.List; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; + +public interface ProcessTaskLineageMapper extends BaseMapper { + + int batchDeleteByProcessDefinitionCode(@Param("processDefinitionCodes") List processDefinitionCodes); + + int batchInsert(@Param("processLineages") List processTaskLineages); + + List queryByProjectCode(@Param("projectCode") long projectCode); + + List queryWorkFlowLineageByCode(@Param("processDefinitionCode") long processDefinitionCode); + + List queryWorkFlowLineageByName(@Param("projectCode") long projectCode, + @Param("processDefinitionName") String processDefinitionName); + + List queryWorkFlowLineageByDept(@Param("deptProjectCode") long deptProjectCode, + @Param("deptProcessDefinitionCode") long deptProcessDefinitionCode, + @Param("deptTaskDefinitionCode") long deptTaskDefinitionCode); + + List queryByProcessDefinitionCode(@Param("processDefinitionCode") long processDefinitionCode); + +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java index 4caadac540..cfbe6e94e0 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java @@ -140,4 +140,6 @@ public interface TaskDefinitionMapper extends BaseMapper { int deleteByBatchCodes(@Param("taskCodeList") List taskCodeList); void deleteByWorkflowDefinitionCodeAndVersion(long workflowDefinitionCode, int workflowDefinitionVersion); + + List queryDefinitionsByTaskType(@Param("taskType") String taskType); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java deleted file mode 100644 index 649e188e9c..0000000000 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.dao.mapper; - -import org.apache.dolphinscheduler.dao.entity.DependentProcessDefinition; -import org.apache.dolphinscheduler.dao.entity.ProcessLineage; -import org.apache.dolphinscheduler.dao.entity.TaskMainInfo; -import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage; - -import org.apache.ibatis.annotations.Param; - -import java.util.List; - -public interface WorkFlowLineageMapper { - - /** - * queryByName - * - * @param projectCode projectCode - * @param workFlowName workFlowName - * @return WorkFlowLineage list - */ - List queryWorkFlowLineageByName(@Param("projectCode") long projectCode, - @Param("workFlowName") String workFlowName); - - /** - * queryWorkFlowLineageByCode - * - * @param projectCode projectCode - * @param workFlowCode workFlowCode - * @return WorkFlowLineage - */ - WorkFlowLineage queryWorkFlowLineageByCode(@Param("projectCode") long projectCode, - @Param("workFlowCode") long workFlowCode); - - /** - * queryWorkFlowLineageByProcessDefinitionCodes - * - * @param workFlowCodes workFlowCodes - * @return WorkFlowLineage - */ - List queryWorkFlowLineageByProcessDefinitionCodes(@Param("workFlowCodes") List workFlowCodes); - - /** - * queryWorkFlowLineageByCode - * - * @param processLineages processLineages - * @return WorkFlowLineage list - */ - List queryWorkFlowLineageByLineage(@Param("processLineages") List processLineages); - - /** - * queryProcessLineage - * - * @param projectCode projectCode - * @return ProcessLineage list - */ - List queryProcessLineage(@Param("projectCode") long projectCode); - - /** - * queryCodeRelation - * - * @param projectCode projectCode - * @param processDefinitionCode processDefinitionCode - * @return ProcessLineage list - */ - List queryProcessLineageByCode(@Param("projectCode") long projectCode, - @Param("processDefinitionCode") long processDefinitionCode); - - /** - * query process definition by name - * - * @return dependent process definition - */ - List queryDependentProcessDefinitionByProcessDefinitionCode(@Param("code") long code); - - /** - * query downstream work flow lineage by process definition code - * - * @return dependent process definition - */ - List queryDownstreamLineageByProcessDefinitionCode(@Param("code") long code, - @Param("taskType") String taskType); - - /** - * query upstream work flow dependent task params by process definition code - * - * @return task_params - */ - List queryUpstreamDependentParamsByProcessDefinitionCode(@Param("code") long code, - @Param("taskType") String taskType); - - /** - * Query all tasks type sub process depend on process definition. - * - * Query all upstream tasks from task type sub process. - * - * @param projectCode Project code want to query tasks dependence - * @param processDefinitionCode Process definition code want to query tasks dependence - * @return List of TaskMainInfo - */ - List queryTaskSubProcessDepOnProcess(@Param("projectCode") long projectCode, - @Param("processDefinitionCode") long processDefinitionCode); - - /** - * Query all tasks type dependent depend on process definition. - * - * Query all downstream tasks from task type dependent, method `queryTaskDepOnTask` is a proper subset of - * current method `queryTaskDepOnProcess`. Which mean with the same parameter processDefinitionCode, all tasks in - * `queryTaskDepOnTask` are in the result of method `queryTaskDepOnProcess`. - * - * @param processDefinitionCode Process definition code want to query tasks dependence - * @param taskCode Task code want to query tasks dependence - * @return List of TaskMainInfo - */ - List queryTaskDependentOnProcess(@Param("processDefinitionCode") long processDefinitionCode, - @Param("taskCode") long taskCode); - - /** - * Query all tasks depend on task, only downstream task support currently(from dependent task type). - * - * In case of dependent task type, method `queryTaskDepOnTask` is a proper subset of `queryTaskDepOnProcess`. Which - * mean with the same processDefinitionCode, all tasks in `queryTaskDepOnTask` are in method `queryTaskDepOnProcess`. - * - * @param projectCode Project code want to query tasks dependence - * @param processDefinitionCode Process definition code want to query tasks dependence - * @param taskCode Task code want to query tasks dependence - * @return dependent process definition - */ - List queryTaskDepOnTask(@Param("projectCode") long projectCode, - @Param("processDefinitionCode") long processDefinitionCode, - @Param("taskCode") long taskCode); -} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessTaskLineageDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessTaskLineageDao.java new file mode 100644 index 0000000000..953db08701 --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessTaskLineageDao.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.repository; + +import org.apache.dolphinscheduler.dao.entity.ProcessTaskLineage; +import org.apache.dolphinscheduler.dao.entity.WorkFlowRelationDetail; + +import java.util.List; + +public interface ProcessTaskLineageDao extends IDao { + + int batchDeleteByProcessDefinitionCode(List processDefinitionCodes); + + int batchInsert(List processTaskLineages); + + List queryByProjectCode(long projectCode); + + List queryWorkFlowLineageByCode(long processDefinitionCode); + + List queryWorkFlowLineageByName(long projectCode, + String processDefinitionName); + + List queryWorkFlowLineageByDept(long deptProjectCode, + long deptProcessDefinitionCode, + long deptTaskDefinitionCode); + + List queryByProcessDefinitionCode(long processDefinitionCode); + + int updateProcessTaskLineage(List processTaskLineages); +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessTaskLineageDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessTaskLineageDaoImpl.java new file mode 100644 index 0000000000..e7fe04f4c7 --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessTaskLineageDaoImpl.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.repository.impl; + +import org.apache.dolphinscheduler.dao.entity.ProcessTaskLineage; +import org.apache.dolphinscheduler.dao.entity.WorkFlowRelationDetail; +import org.apache.dolphinscheduler.dao.mapper.ProcessTaskLineageMapper; +import org.apache.dolphinscheduler.dao.repository.BaseDao; +import org.apache.dolphinscheduler.dao.repository.ProcessTaskLineageDao; + +import org.apache.commons.collections4.CollectionUtils; + +import java.util.List; +import java.util.stream.Collectors; + +import lombok.NonNull; + +import org.springframework.stereotype.Repository; + +@Repository +public class ProcessTaskLineageDaoImpl extends BaseDao + implements + ProcessTaskLineageDao { + + public ProcessTaskLineageDaoImpl(@NonNull ProcessTaskLineageMapper processTaskLineageMapper) { + super(processTaskLineageMapper); + } + + @Override + public int batchDeleteByProcessDefinitionCode(List processDefinitionCodes) { + if (CollectionUtils.isEmpty(processDefinitionCodes)) { + return 0; + } + return mybatisMapper.batchDeleteByProcessDefinitionCode(processDefinitionCodes); + } + + @Override + public int batchInsert(List processTaskLineages) { + if (CollectionUtils.isEmpty(processTaskLineages)) { + return 0; + } + return mybatisMapper.batchInsert(processTaskLineages); + } + + @Override + public List queryByProjectCode(long projectCode) { + return mybatisMapper.queryByProjectCode(projectCode); + } + + @Override + public List queryWorkFlowLineageByCode(long processDefinitionCode) { + return mybatisMapper.queryWorkFlowLineageByCode(processDefinitionCode); + } + + @Override + public List queryWorkFlowLineageByName(long projectCode, String processDefinitionName) { + return mybatisMapper.queryWorkFlowLineageByName(projectCode, processDefinitionName); + } + + @Override + public List queryWorkFlowLineageByDept(long deptProjectCode, long deptProcessDefinitionCode, + long deptTaskDefinitionCode) { + return mybatisMapper.queryWorkFlowLineageByDept(deptProjectCode, deptProcessDefinitionCode, + deptTaskDefinitionCode); + } + + @Override + public List queryByProcessDefinitionCode(long processDefinitionCode) { + return mybatisMapper.queryByProcessDefinitionCode(processDefinitionCode); + } + + @Override + public int updateProcessTaskLineage(List processTaskLineages) { + if (CollectionUtils.isEmpty(processTaskLineages)) { + return 0; + } + this.batchDeleteByProcessDefinitionCode( + processTaskLineages.stream().map(ProcessTaskLineage::getProcessDefinitionCode) + .distinct().collect(Collectors.toList())); + return mybatisMapper.batchInsert(processTaskLineages); + } +} diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml index 874db82ccc..bcd21647b3 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml @@ -92,10 +92,10 @@ and project_code = #{pd.projectCode} - + and name like concat('%', #{pd.name}, '%') - + and release_state = #{pd.releaseState} diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskLineageMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskLineageMapper.xml new file mode 100644 index 0000000000..0b11d730ac --- /dev/null +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskLineageMapper.xml @@ -0,0 +1,133 @@ + + + + + + + id + , process_definition_code + , process_definition_version + , task_definition_code + , task_definition_version + , dept_project_code + , dept_process_definition_code + , dept_task_definition_code + , create_time + , update_time + + + + delete from t_ds_process_task_lineage + where process_definition_code in + + #{i} + + + + + insert into t_ds_process_task_lineage (process_definition_code, process_definition_version, task_definition_code, + task_definition_version, dept_project_code, dept_process_definition_code, dept_task_definition_code) + values + + (#{processTaskLineage.processDefinitionCode},#{processTaskLineage.processDefinitionVersion}, + #{processTaskLineage.taskDefinitionCode},#{processTaskLineage.taskDefinitionVersion}, + #{processTaskLineage.deptProjectCode},#{processTaskLineage.deptProcessDefinitionCode}, + #{processTaskLineage.deptTaskDefinitionCode}) + + + + + + + + + + + + + + + + diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml index c4e912cab6..ba06d5a9bd 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml @@ -160,4 +160,12 @@ order by update_time desc, id asc + + + diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml deleted file mode 100644 index 51c60394be..0000000000 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml +++ /dev/null @@ -1,249 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql index a907ec69a1..6069284bbd 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql @@ -2185,3 +2185,25 @@ CREATE TABLE t_ds_listener_event KEY idx_status (post_status), KEY idx_event_sign (sign) ); + +-- ---------------------------- +-- Table structure for t_ds_process_task_lineage +-- ---------------------------- +DROP TABLE IF EXISTS t_ds_process_task_lineage; +CREATE TABLE t_ds_process_task_lineage +( + `id` int NOT NULL AUTO_INCREMENT, + `process_definition_code` bigint(20) NOT NULL DEFAULT 0, + `process_definition_version` int NOT NULL DEFAULT 0, + `task_definition_code` bigint(20) NOT NULL DEFAULT 0, + `task_definition_version` int NOT NULL DEFAULT 0, + `dept_project_code` bigint(20) NOT NULL DEFAULT 0, + `dept_process_definition_code` bigint(20) NOT NULL DEFAULT 0, + `dept_task_definition_code` bigint(20) NOT NULL DEFAULT 0, + `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP, + `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (`id`), + KEY `idx_process_code_version` (`process_definition_code`,`process_definition_version`), + KEY `idx_task_code_version` (`task_definition_code`,`task_definition_version`), + KEY `idx_dept_code` (`dept_project_code`,`dept_process_definition_code`,`dept_task_definition_code`) +); diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql index 3a4f17a357..f60d78bf5f 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql @@ -2157,3 +2157,24 @@ CREATE TABLE `t_ds_listener_event` ( KEY `idx_status` (`post_status`) USING BTREE, KEY `idx_sign` (`sign`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE = utf8_bin; + +-- ---------------------------- +-- Table structure for t_ds_process_task_lineage +-- ---------------------------- +DROP TABLE IF EXISTS `t_ds_process_task_lineage`; +CREATE TABLE `t_ds_process_task_lineage` ( + `id` int NOT NULL AUTO_INCREMENT, + `process_definition_code` bigint NOT NULL DEFAULT 0, + `process_definition_version` int NOT NULL DEFAULT 0, + `task_definition_code` bigint NOT NULL DEFAULT 0, + `task_definition_version` int NOT NULL DEFAULT 0, + `dept_project_code` bigint NOT NULL DEFAULT 0 COMMENT 'dependent project code', + `dept_process_definition_code` bigint NOT NULL DEFAULT 0 COMMENT 'dependent process definition code', + `dept_task_definition_code` bigint NOT NULL DEFAULT 0 COMMENT 'dependent task definition code', + `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time', + `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'update time', + PRIMARY KEY (`id`), + KEY `idx_process_code_version` (`process_definition_code`,`process_definition_version`), + KEY `idx_task_code_version` (`task_definition_code`,`task_definition_version`), + KEY `idx_dept_code` (`dept_project_code`,`dept_process_definition_code`,`dept_task_definition_code`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin; diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql index d314c6ef09..cbaa38f039 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql @@ -2149,3 +2149,25 @@ comment on column t_ds_listener_event.sign is 'sign=sha1(content)'; create index idx_listener_event_post_status on t_ds_listener_event (post_status); create index idx_listener_event_sign on t_ds_listener_event (sign); + +-- ---------------------------- +-- Table structure for t_ds_process_task_lineage +-- ---------------------------- +DROP TABLE IF EXISTS t_ds_process_task_lineage; +CREATE TABLE t_ds_process_task_lineage ( + id int NOT NULL, + process_definition_code bigint NOT NULL DEFAULT 0, + process_definition_version int NOT NULL DEFAULT 0, + task_definition_code bigint NOT NULL DEFAULT 0, + task_definition_version int NOT NULL DEFAULT 0, + dept_project_code bigint NOT NULL DEFAULT 0, + dept_process_definition_code bigint NOT NULL DEFAULT 0, + dept_task_definition_code bigint NOT NULL DEFAULT 0, + create_time timestamp NOT NULL DEFAULT current_timestamp, + update_time timestamp NOT NULL DEFAULT current_timestamp, + PRIMARY KEY (id) +); + +create index idx_process_code_version on t_ds_process_task_lineage (process_definition_code,process_definition_version); +create index idx_task_code_version on t_ds_process_task_lineage (task_definition_code,task_definition_version); +create index idx_dept_code on t_ds_process_task_lineage (dept_project_code,dept_process_definition_code,dept_task_definition_code); diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/mysql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/mysql/dolphinscheduler_ddl.sql new file mode 100644 index 0000000000..e90459d209 --- /dev/null +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/mysql/dolphinscheduler_ddl.sql @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +DROP TABLE IF EXISTS `t_ds_process_task_lineage`; +CREATE TABLE `t_ds_process_task_lineage` +( + `id` int NOT NULL AUTO_INCREMENT, + `process_definition_code` bigint NOT NULL DEFAULT 0, + `process_definition_version` int NOT NULL DEFAULT 0, + `task_definition_code` bigint NOT NULL DEFAULT 0, + `task_definition_version` int NOT NULL DEFAULT 0, + `dept_project_code` bigint NOT NULL DEFAULT 0 COMMENT 'dependent project code', + `dept_process_definition_code` bigint NOT NULL DEFAULT 0 COMMENT 'dependent process definition code', + `dept_task_definition_code` bigint NOT NULL DEFAULT 0 COMMENT 'dependent task definition code', + `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time', + `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'update time', + PRIMARY KEY (`id`), + KEY `idx_process_code_version` (`process_definition_code`,`process_definition_version`), + KEY `idx_task_code_version` (`task_definition_code`,`task_definition_version`), + KEY `idx_dept_code` (`dept_project_code`,`dept_process_definition_code`,`dept_task_definition_code`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin; diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/mysql/dolphinscheduler_dml.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/mysql/dolphinscheduler_dml.sql new file mode 100644 index 0000000000..4a14f326b9 --- /dev/null +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/mysql/dolphinscheduler_dml.sql @@ -0,0 +1,16 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/postgresql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/postgresql/dolphinscheduler_ddl.sql new file mode 100644 index 0000000000..593d9998fd --- /dev/null +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/postgresql/dolphinscheduler_ddl.sql @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +DROP TABLE IF EXISTS t_ds_process_task_lineage; +CREATE TABLE t_ds_process_task_lineage ( + id int NOT NULL, + process_definition_code bigint NOT NULL DEFAULT 0, + process_definition_version int NOT NULL DEFAULT 0, + task_definition_code bigint NOT NULL DEFAULT 0, + task_definition_version int NOT NULL DEFAULT 0, + dept_project_code bigint NOT NULL DEFAULT 0, + dept_process_definition_code bigint NOT NULL DEFAULT 0, + dept_task_definition_code bigint NOT NULL DEFAULT 0, + create_time timestamp NOT NULL DEFAULT current_timestamp, + update_time timestamp NOT NULL DEFAULT current_timestamp, + PRIMARY KEY (id) +); + +create index idx_process_code_version on t_ds_process_task_lineage (process_definition_code,process_definition_version); +create index idx_task_code_version on t_ds_process_task_lineage (task_definition_code,task_definition_version); +create index idx_dept_code on t_ds_process_task_lineage (dept_project_code,dept_process_definition_code,dept_task_definition_code); diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/postgresql/dolphinscheduler_dml.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/postgresql/dolphinscheduler_dml.sql new file mode 100644 index 0000000000..4a14f326b9 --- /dev/null +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/postgresql/dolphinscheduler_dml.sql @@ -0,0 +1,16 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskLineageMapperTest.java similarity index 75% rename from dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapperTest.java rename to dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskLineageMapperTest.java index 7b86bd2170..fb95c18352 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskLineageMapperTest.java @@ -22,10 +22,10 @@ import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.dao.BaseDaoTest; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; -import org.apache.dolphinscheduler.dao.entity.ProcessLineage; +import org.apache.dolphinscheduler.dao.entity.ProcessTaskLineage; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; import org.apache.dolphinscheduler.dao.entity.Schedule; -import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage; +import org.apache.dolphinscheduler.dao.entity.WorkFlowRelationDetail; import java.util.Date; import java.util.List; @@ -34,10 +34,10 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; -public class WorkFlowLineageMapperTest extends BaseDaoTest { +public class ProcessTaskLineageMapperTest extends BaseDaoTest { @Autowired - private WorkFlowLineageMapper workFlowLineageMapper; + private ProcessTaskLineageMapper processTaskLineageMapper; @Autowired private ProcessDefinitionMapper processDefinitionMapper; @@ -50,10 +50,8 @@ public class WorkFlowLineageMapperTest extends BaseDaoTest { /** * insert - * - * @return ProcessDefinition */ - private ProcessTaskRelation insertOneProcessTaskRelation() { + private void insertOneProcessTaskRelation() { // insertOne ProcessTaskRelation processTaskRelation = new ProcessTaskRelation(); processTaskRelation.setName("def 1"); @@ -67,7 +65,6 @@ public class WorkFlowLineageMapperTest extends BaseDaoTest { processTaskRelation.setUpdateTime(new Date()); processTaskRelation.setCreateTime(new Date()); processTaskRelationMapper.insert(processTaskRelation); - return processTaskRelation; } /** @@ -86,6 +83,21 @@ public class WorkFlowLineageMapperTest extends BaseDaoTest { processDefinitionMapper.insert(processDefinition); } + private void insertOneProcessLineage() { + // insertOne + ProcessTaskLineage processTaskLineage = new ProcessTaskLineage(); + processTaskLineage.setProcessDefinitionCode(1L); + processTaskLineage.setProcessDefinitionVersion(1); + processTaskLineage.setTaskDefinitionCode(1L); + processTaskLineage.setTaskDefinitionVersion(1); + processTaskLineage.setDeptProjectCode(1L); + processTaskLineage.setDeptProcessDefinitionCode(1L); + processTaskLineage.setDeptTaskDefinitionCode(1L); + processTaskLineage.setUpdateTime(new Date()); + processTaskLineage.setCreateTime(new Date()); + processTaskLineageMapper.insert(processTaskLineage); + } + /** * insert * @@ -110,29 +122,20 @@ public class WorkFlowLineageMapperTest extends BaseDaoTest { insertOneProcessDefinition(); ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(1L); insertOneSchedule(processDefinition.getId()); - - List workFlowLineages = workFlowLineageMapper + List workFlowLineages = processTaskLineageMapper .queryWorkFlowLineageByName(processDefinition.getProjectCode(), processDefinition.getName()); Assertions.assertNotEquals(0, workFlowLineages.size()); } - @Test - public void testQueryCodeRelation() { - ProcessTaskRelation processTaskRelation = insertOneProcessTaskRelation(); - List workFlowLineages = - workFlowLineageMapper.queryProcessLineageByCode(processTaskRelation.getProjectCode(), - processTaskRelation.getProcessDefinitionCode()); - Assertions.assertNotEquals(0, workFlowLineages.size()); - } - @Test public void testQueryWorkFlowLineage() { insertOneProcessDefinition(); ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(1L); insertOneProcessTaskRelation(); - List workFlowLineages = - workFlowLineageMapper.queryProcessLineage(processDefinition.getProjectCode()); - Assertions.assertNotEquals(0, workFlowLineages.size()); + insertOneProcessLineage(); + List processTaskLineages = + processTaskLineageMapper.queryByProjectCode(processDefinition.getProjectCode()); + Assertions.assertNotEquals(0, processTaskLineages.size()); } @Test @@ -140,8 +143,8 @@ public class WorkFlowLineageMapperTest extends BaseDaoTest { insertOneProcessDefinition(); ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(1L); insertOneSchedule(processDefinition.getId()); - WorkFlowLineage workFlowLineages = workFlowLineageMapper - .queryWorkFlowLineageByCode(processDefinition.getProjectCode(), processDefinition.getCode()); + List workFlowLineages = processTaskLineageMapper + .queryWorkFlowLineageByCode(processDefinition.getCode()); Assertions.assertNotNull(workFlowLineages); } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index b9e4d811f3..8a09e3bb98 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -25,7 +25,6 @@ import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils; import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.DagData; import org.apache.dolphinscheduler.dao.entity.DataSource; -import org.apache.dolphinscheduler.dao.entity.DependentProcessDefinition; import org.apache.dolphinscheduler.dao.entity.DqComparisonType; import org.apache.dolphinscheduler.dao.entity.DqExecuteResult; import org.apache.dolphinscheduler.dao.entity.DqRule; @@ -108,8 +107,6 @@ public interface ProcessService { List queryReleaseSchedulerListByProcessDefinitionCode(long processDefinitionCode); - List queryDependentProcessDefinitionByProcessDefinitionCode(long processDefinitionCode); - List queryNeedFailoverProcessInstances(String host); List queryNeedFailoverProcessInstanceHost(); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index 2b0a8e073c..5553024701 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -51,7 +51,6 @@ import org.apache.dolphinscheduler.dao.entity.Cluster; import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.DagData; import org.apache.dolphinscheduler.dao.entity.DataSource; -import org.apache.dolphinscheduler.dao.entity.DependentProcessDefinition; import org.apache.dolphinscheduler.dao.entity.DqComparisonType; import org.apache.dolphinscheduler.dao.entity.DqExecuteResult; import org.apache.dolphinscheduler.dao.entity.DqRule; @@ -98,7 +97,6 @@ import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.TenantMapper; import org.apache.dolphinscheduler.dao.mapper.UserMapper; -import org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper; import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; import org.apache.dolphinscheduler.dao.repository.ProcessInstanceMapDao; import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao; @@ -256,9 +254,6 @@ public class ProcessServiceImpl implements ProcessService { @Autowired private TaskGroupMapper taskGroupMapper; - @Autowired - private WorkFlowLineageMapper workFlowLineageMapper; - @Autowired private ClusterMapper clusterMapper; @@ -1430,17 +1425,6 @@ public class ProcessServiceImpl implements ProcessService { return scheduleMapper.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode); } - /** - * query dependent process definition by process definition code - * - * @param processDefinitionCode processDefinitionCode - * @see DependentProcessDefinition - */ - @Override - public List queryDependentProcessDefinitionByProcessDefinitionCode(long processDefinitionCode) { - return workFlowLineageMapper.queryDependentProcessDefinitionByProcessDefinitionCode(processDefinitionCode); - } - /** * query need failover process instance * diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/TaskTypeUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/TaskTypeUtils.java index cf1f2022ea..14ee1c40b6 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/TaskTypeUtils.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/TaskTypeUtils.java @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.plugin.task.api.utils; import org.apache.dolphinscheduler.plugin.task.api.ILogicTaskChannel; import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager; import org.apache.dolphinscheduler.plugin.task.api.task.ConditionsLogicTaskChannelFactory; +import org.apache.dolphinscheduler.plugin.task.api.task.DependentLogicTaskChannelFactory; import org.apache.dolphinscheduler.plugin.task.api.task.DynamicLogicTaskChannelFactory; import org.apache.dolphinscheduler.plugin.task.api.task.SubWorkflowLogicTaskChannelFactory; import org.apache.dolphinscheduler.plugin.task.api.task.SwitchLogicTaskChannelFactory; @@ -41,7 +42,7 @@ public class TaskTypeUtils { } public boolean isDependentTask(String taskType) { - return SubWorkflowLogicTaskChannelFactory.NAME.equals(taskType); + return DependentLogicTaskChannelFactory.NAME.equals(taskType); } public boolean isDynamicTask(String taskType) { diff --git a/dolphinscheduler-tools/src/main/bin/migrate-lineage.sh b/dolphinscheduler-tools/src/main/bin/migrate-lineage.sh new file mode 100644 index 0000000000..c34ae0e117 --- /dev/null +++ b/dolphinscheduler-tools/src/main/bin/migrate-lineage.sh @@ -0,0 +1,31 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +BIN_DIR=$(dirname $0) +DOLPHINSCHEDULER_HOME=${DOLPHINSCHEDULER_HOME:-$(cd $BIN_DIR/../..; pwd)} + +if [ "$DOCKER" != "true" ]; then + source "$DOLPHINSCHEDULER_HOME/bin/env/dolphinscheduler_env.sh" +fi + +JAVA_OPTS=${JAVA_OPTS:-"-server -Duser.timezone=${SPRING_JACKSON_TIME_ZONE} -Xms4g -Xmx4g -Xmn512m -XX:+PrintGCDetails -Xloggc:gc.log -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=dump.hprof"} + +$JAVA_HOME/bin/java $JAVA_OPTS \ + -cp "$DOLPHINSCHEDULER_HOME/tools/conf":"$DOLPHINSCHEDULER_HOME/tools/libs/*":"$DOLPHINSCHEDULER_HOME/tools/sql" \ + -Dspring.profiles.active=lineage,${DATABASE} \ + org.apache.dolphinscheduler.tools.lineage.MigrateLineage diff --git a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/lineage/MigrateLineage.java b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/lineage/MigrateLineage.java new file mode 100644 index 0000000000..0fab0abaed --- /dev/null +++ b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/lineage/MigrateLineage.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.tools.lineage; + +import org.apache.dolphinscheduler.tools.resource.MigrateResource; + +import java.sql.SQLException; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.CommandLineRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.context.annotation.Profile; +import org.springframework.stereotype.Component; + +@SpringBootApplication +@ComponentScan("org.apache.dolphinscheduler") +@Slf4j +public class MigrateLineage { + + public static void main(String[] args) { + SpringApplication.run(MigrateResource.class, args); + } + + @Component + @Profile("lineage") + static class MigrateLineageRunner implements CommandLineRunner { + + @Autowired + private MigrateLineageService migrateLineageService; + + @Override + public void run(String... args) throws SQLException { + log.info("Starting Migrate lineage data..."); + migrateLineageService.migrateLineageOnce(); + } + } +} diff --git a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/lineage/MigrateLineageService.java b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/lineage/MigrateLineageService.java new file mode 100644 index 0000000000..8fdad845f4 --- /dev/null +++ b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/lineage/MigrateLineageService.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.tools.lineage; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.entity.ProcessTaskLineage; +import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; +import org.apache.dolphinscheduler.dao.entity.TaskDefinition; +import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; +import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; +import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; +import org.apache.dolphinscheduler.dao.repository.ProcessTaskLineageDao; +import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem; +import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel; +import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters; +import org.apache.dolphinscheduler.plugin.task.api.task.DependentLogicTaskChannelFactory; + +import org.apache.commons.collections.CollectionUtils; + +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +@Service +@Slf4j +public class MigrateLineageService { + + @Autowired + private TaskDefinitionMapper taskDefinitionMapper; + + @Autowired + private ProcessTaskLineageDao processTaskLineageDao; + + @Autowired + private ProcessTaskRelationMapper processTaskRelationMapper; + + @Autowired + private ProcessDefinitionMapper processDefinitionMapper; + + public void migrateLineageOnce() { + try { + List processTaskLineageList = getAllProcessLineages(); + int insertResult = processTaskLineageDao.batchInsert(processTaskLineageList); + if (insertResult > 0) { + log.info("Migrate lineage successfully, insert count: {}", insertResult); + } else { + log.info("No lineage to migrate."); + } + } catch (Exception e) { + log.error("Failed to migrate lineage:", e); + } + } + + private List getAllProcessLineages() throws SQLException { + List taskDefinitionList = + taskDefinitionMapper.queryDefinitionsByTaskType(DependentLogicTaskChannelFactory.NAME); + List processTaskRelationList = + processTaskRelationMapper.queryByTaskCodes(taskDefinitionList.stream() + .map(TaskDefinition::getCode).toArray(Long[]::new)); + List processTaskLineageList = new ArrayList<>(); + + for (TaskDefinition taskDefinition : taskDefinitionList) { + parseDependentTaskParams(taskDefinition, processTaskLineageList); + + for (ProcessTaskLineage processTaskLineage : processTaskLineageList) { + processTaskLineage.setProcessDefinitionCode(processTaskRelationList.stream() + .filter(processTaskRelation -> processTaskRelation.getPreTaskCode() == taskDefinition.getCode() + || processTaskRelation.getPostTaskCode() == taskDefinition.getCode()) + .findFirst().get().getProcessDefinitionCode()); + processTaskLineage.setProcessDefinitionVersion(processTaskRelationList.stream() + .filter(processTaskRelation -> processTaskRelation.getPreTaskCode() == taskDefinition.getCode() + || processTaskRelation.getPostTaskCode() == taskDefinition.getCode()) + .findFirst().get().getProcessDefinitionVersion()); + } + } + return processTaskLineageList; + } + + private void parseDependentTaskParams(TaskDefinition taskDefinition, List taskLineageList) { + DependentParameters dependentParameters = + JSONUtils.parseObject(taskDefinition.getTaskParams(), DependentParameters.class); + if (dependentParameters != null) { + List dependTaskList = + dependentParameters.getDependence().getDependTaskList(); + if (!CollectionUtils.isEmpty(dependTaskList)) { + for (DependentTaskModel taskModel : dependTaskList) { + List dependItemList = taskModel.getDependItemList(); + for (DependentItem dependentItem : dependItemList) { + ProcessTaskLineage processTaskLineage = new ProcessTaskLineage(); + processTaskLineage.setDeptProjectCode(dependentItem.getProjectCode()); + processTaskLineage.setDeptProcessDefinitionCode(dependentItem.getDefinitionCode()); + processTaskLineage.setDeptTaskDefinitionCode(dependentItem.getDepTaskCode()); + taskLineageList.add(processTaskLineage); + } + } + } + } + } + +} diff --git a/dolphinscheduler-ui/src/service/modules/lineages/types.ts b/dolphinscheduler-ui/src/service/modules/lineages/types.ts index 8434491053..13ebb90fcb 100644 --- a/dolphinscheduler-ui/src/service/modules/lineages/types.ts +++ b/dolphinscheduler-ui/src/service/modules/lineages/types.ts @@ -27,7 +27,7 @@ interface WorkFlowNameReq { workFlowName: string } -interface WorkFlowListRes extends WorkflowCodeReq { +interface WorkFlowRelationDetailListRes extends WorkflowCodeReq { workFlowName: string workFlowPublishStatus: string scheduleStartTime?: any @@ -43,7 +43,7 @@ interface WorkFlowRelationList { } interface WorkflowRes { - workFlowList: WorkFlowListRes[] + workFlowRelationDetailList: WorkFlowRelationDetailListRes[] workFlowRelationList: WorkFlowRelationList[] } @@ -57,5 +57,5 @@ export { WorkFlowNameReq, DependentTaskReq, WorkflowRes, - WorkFlowListRes + WorkFlowRelationDetailListRes } diff --git a/dolphinscheduler-ui/src/views/projects/components/dependencies/use-dependencies.ts b/dolphinscheduler-ui/src/views/projects/components/dependencies/use-dependencies.ts index 3093aad669..d2be5c347d 100644 --- a/dolphinscheduler-ui/src/views/projects/components/dependencies/use-dependencies.ts +++ b/dolphinscheduler-ui/src/views/projects/components/dependencies/use-dependencies.ts @@ -17,7 +17,6 @@ import { DependentTaskReq } from '@/service/modules/lineages/types' import { queryDependentTasks } from '@/service/modules/lineages' -import { TASK_TYPES_MAP } from '@/store/project' export function useDependencies() { const getDependentTasksBySingleTask = async ( @@ -33,13 +32,11 @@ export function useDependencies() { } as DependentTaskReq const res = await queryDependentTasks(projectCode, dependentTaskReq) res - .filter( - (item: any) => - item.processDefinitionCode !== workflowCode && - item.taskType === TASK_TYPES_MAP.DEPENDENT.alias - ) + .filter((item: any) => item.processDefinitionCode !== workflowCode) .forEach((item: any) => { - tasks.push(item.processDefinitionName + '->' + item.taskName) + tasks.push( + item.processDefinitionName + '->' + item.taskDefinitionName + ) }) } return tasks @@ -56,13 +53,11 @@ export function useDependencies() { } as DependentTaskReq const res = await queryDependentTasks(projectCode, dependentTaskReq) res - .filter( - (item: any) => - item.processDefinitionCode !== workflowCode && - item.taskType === TASK_TYPES_MAP.DEPENDENT.alias - ) + .filter((item: any) => item.processDefinitionCode !== workflowCode) .forEach((item: any) => { - tasks.push(item.processDefinitionName + '->' + item.taskName) + tasks.push( + item.processDefinitionName + '->' + item.taskDefinitionName + ) }) } return tasks @@ -116,14 +111,11 @@ export function useDependencies() { await queryDependentTasks(projectCode, dependentTaskReq).then( (res: any) => { res - .filter( - (item: any) => - item.processDefinitionCode !== workflowCode && - item.taskType === TASK_TYPES_MAP.DEPENDENT.alias - ) + .filter((item: any) => item.processDefinitionCode !== workflowCode) .forEach((item: any) => { dependentTaskLinks.push({ - text: item.processDefinitionName + '->' + item.taskName, + text: + item.processDefinitionName + '->' + item.taskDefinitionName, show: true, action: () => { const url = `/projects/${item.projectCode}/workflow/definitions/${item.processDefinitionCode}` @@ -151,14 +143,11 @@ export function useDependencies() { await queryDependentTasks(projectCode, dependentTaskReq).then( (res: any) => { res - .filter( - (item: any) => - item.processDefinitionCode !== workflowCode && - item.taskType === TASK_TYPES_MAP.DEPENDENT.alias - ) + .filter((item: any) => item.processDefinitionCode !== workflowCode) .forEach((item: any) => { dependentTaskLinks.push({ - text: item.processDefinitionName + '->' + item.taskName, + text: + item.processDefinitionName + '->' + item.taskDefinitionName, show: true, action: () => { const url = `/projects/${item.projectCode}/workflow/definitions/${item.processDefinitionCode}` diff --git a/dolphinscheduler-ui/src/views/projects/workflow/relation/use-relation.ts b/dolphinscheduler-ui/src/views/projects/workflow/relation/use-relation.ts index 7b49e910f7..4fa826d49b 100644 --- a/dolphinscheduler-ui/src/views/projects/workflow/relation/use-relation.ts +++ b/dolphinscheduler-ui/src/views/projects/workflow/relation/use-relation.ts @@ -24,7 +24,7 @@ import { } from '@/service/modules/lineages' import type { WorkflowRes, - WorkFlowListRes + WorkFlowRelationDetailListRes } from '@/service/modules/lineages/types' export function useRelation() { @@ -40,7 +40,7 @@ export function useRelation() { variables.seriesData = [] variables.links = [] - variables.seriesData = obj.workFlowList.map((item) => { + variables.seriesData = obj.workFlowRelationDetailList.map((item) => { return { name: item.workFlowName, id: item.workFlowCode, @@ -59,7 +59,7 @@ export function useRelation() { const getWorkflowName = (projectCode: number) => { const { state } = useAsyncState( queryLineageByWorkFlowName({ projectCode }).then( - (res: Array) => { + (res: Array) => { variables.workflowOptions = res.map((item) => { return { label: item.workFlowName,