diff --git a/docs/docs/en/guide/project/task-definition.md b/docs/docs/en/guide/project/task-definition.md deleted file mode 100644 index a48cc1a848..0000000000 --- a/docs/docs/en/guide/project/task-definition.md +++ /dev/null @@ -1,25 +0,0 @@ -# Task Definition - -## Batch Task Definition - -Task definition allows to modify or operate tasks at the task level rather than modifying them in the workflow definition. -We already have workflow level task editor in [workflow definition](workflow-definition.md) which you can click the specific -workflow and then edit its task definition. It is depressing when you want to edit the task definition but do not remember -which workflow it belongs to. So we decide to add `Task Definition` view under `Task` menu. - -![task-definition](../../../../img/new_ui/dev/project/batch-task-definition.png) - -In this view, you can create, query, update, delete task definition by click the related button in `operation` column. The -most exciting thing is you could query task by task name in the wildcard, and it is useful when you only remember the task -name but forget which workflow it belongs to. It is also supported query by the task name alone with `Task Type` or -`Workflow Name` - -## Stream Task Definition - -Stream task definitions are created in the workflow definition, and can be modified and executed. - -![task-definition](../../../../img/new_ui/dev/project/stream-task-definition.png) - -Click the execute button, check the execution parameters and click Confirm to submit the stream task. - -![task-definition](../../../../img/new_ui/dev/project/stream-task-execute.png) diff --git a/docs/docs/zh/guide/project/task-definition.md b/docs/docs/zh/guide/project/task-definition.md deleted file mode 100644 index c6247d489c..0000000000 --- a/docs/docs/zh/guide/project/task-definition.md +++ /dev/null @@ -1,22 +0,0 @@ -# 任务定义 - -## 批量任务定义 - -批量任务定义允许您在基于任务级别而不是在工作流中操作修改任务。再此之前,我们已经有了工作流级别的任务编辑器,你可以在[工作流定义](workflow-definition.md) -单击特定的工作流,然后编辑任务的定义。当您想编辑特定的任务定义但不记得它属于哪个工作流时,这是令人沮丧的。所以我们决定在 `任务` 菜单下添加 `任务定义` 视图。 - -![task-definition](../../../../img/new_ui/dev/project/batch-task-definition.png) - -在该视图中,您可以通过单击 `操作` 列中的相关按钮来进行创建、查询、更新、删除任务定义。最令人兴奋的是您可以通过通配符进行全部任务查询,当您只 -记得任务名称但忘记它属于哪个工作流时是非常有用的。也支持通过任务名称结合使用 `任务类型` 或 `工作流程名称` 进行查询。 - -## 实时任务定义 - -实时任务定义在工作流定义中创建,在任务定义页面可以进行修改和执行。 - -![task-definition](../../../../img/new_ui/dev/project/stream-task-definition.png) - -点击实时任务执行,检查执行参数后点击确认,即可提交实时任务。 - -![task-definition](../../../../img/new_ui/dev/project/stream-task-execute.png) - diff --git a/docs/img/new_ui/dev/project/batch-task-definition.png b/docs/img/new_ui/dev/project/batch-task-definition.png deleted file mode 100644 index ca1608477e..0000000000 Binary files a/docs/img/new_ui/dev/project/batch-task-definition.png and /dev/null differ diff --git a/docs/img/new_ui/dev/project/stream-task-definition.png b/docs/img/new_ui/dev/project/stream-task-definition.png deleted file mode 100644 index b6c0629a45..0000000000 Binary files a/docs/img/new_ui/dev/project/stream-task-definition.png and /dev/null differ diff --git a/docs/img/new_ui/dev/project/stream-task-execute.png b/docs/img/new_ui/dev/project/stream-task-execute.png deleted file mode 100644 index 2b49cd8df7..0000000000 Binary files a/docs/img/new_ui/dev/project/stream-task-execute.png and /dev/null differ diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java index fc819c284d..66f1271db4 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java @@ -17,12 +17,9 @@ package org.apache.dolphinscheduler.api.controller; -import static org.apache.dolphinscheduler.api.enums.Status.CREATE_TASK_DEFINITION_ERROR; -import static org.apache.dolphinscheduler.api.enums.Status.DELETE_TASK_DEFINE_BY_CODE_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.DELETE_TASK_DEFINITION_VERSION_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.LOGIN_USER_QUERY_PROJECT_LIST_PAGING_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.QUERY_DETAIL_OF_TASK_DEFINITION_ERROR; -import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_DEFINITION_LIST_PAGING_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_DEFINITION_VERSIONS_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.RELEASE_TASK_DEFINITION_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.SWITCH_TASK_DEFINITION_VERSION_ERROR; @@ -30,17 +27,12 @@ import static org.apache.dolphinscheduler.api.enums.Status.UPDATE_TASK_DEFINITIO import org.apache.dolphinscheduler.api.audit.OperatorLog; import org.apache.dolphinscheduler.api.audit.enums.AuditType; -import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.exceptions.ApiException; import org.apache.dolphinscheduler.api.service.TaskDefinitionService; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.enums.ReleaseState; -import org.apache.dolphinscheduler.common.enums.TaskExecuteType; import org.apache.dolphinscheduler.dao.entity.User; -import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils; - -import org.apache.commons.lang3.StringUtils; import java.util.Map; @@ -74,90 +66,6 @@ public class TaskDefinitionController extends BaseController { @Autowired private TaskDefinitionService taskDefinitionService; - /** - * create task definition - * - * @param loginUser login user - * @param projectCode project code - * @param taskDefinitionJson task definition json - * @return create result code - */ - @Operation(summary = "save", description = "CREATE_TASK_DEFINITION_NOTES") - @Parameters({ - @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true, schema = @Schema(implementation = long.class)), - @Parameter(name = "taskDefinitionJson", description = "TASK_DEFINITION_JSON", required = true, schema = @Schema(implementation = String.class)) - }) - @PostMapping() - @ResponseStatus(HttpStatus.CREATED) - @ApiException(CREATE_TASK_DEFINITION_ERROR) - @OperatorLog(auditType = AuditType.TASK_CREATE) - public Result createTaskDefinition(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, - @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode, - @RequestParam(value = "taskDefinitionJson", required = true) String taskDefinitionJson) { - Map result = - taskDefinitionService.createTaskDefinition(loginUser, projectCode, taskDefinitionJson); - return returnDataList(result); - } - - /** - * create single task definition that binds the workflow - * - * @param loginUser login user - * @param projectCode project code - * @param processDefinitionCode process definition code - * @param taskDefinitionJsonObj task definition json object - * @param upstreamCodes upstream task codes, sep comma - * @return create result code - */ - @Operation(summary = "saveSingle", description = "CREATE_SINGLE_TASK_DEFINITION_NOTES") - @Parameters({ - @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true, schema = @Schema(implementation = long.class)), - @Parameter(name = "processDefinitionCode", description = "PROCESS_DEFINITION_CODE", required = true, schema = @Schema(implementation = long.class)), - @Parameter(name = "taskDefinitionJsonObj", description = "TASK_DEFINITION_JSON", required = true, schema = @Schema(implementation = String.class)), - @Parameter(name = "upstreamCodes", description = "UPSTREAM_CODES", required = false, schema = @Schema(implementation = String.class)) - }) - @PostMapping("/save-single") - @ResponseStatus(HttpStatus.CREATED) - @ApiException(CREATE_TASK_DEFINITION_ERROR) - @OperatorLog(auditType = AuditType.TASK_CREATE) - public Result createTaskBindsWorkFlow(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, - @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode, - @RequestParam(value = "processDefinitionCode", required = true) long processDefinitionCode, - @RequestParam(value = "taskDefinitionJsonObj", required = true) String taskDefinitionJsonObj, - @RequestParam(value = "upstreamCodes", required = false) String upstreamCodes) { - Map result = taskDefinitionService.createTaskBindsWorkFlow(loginUser, projectCode, - processDefinitionCode, taskDefinitionJsonObj, StringUtils.defaultString(upstreamCodes)); - return returnDataList(result); - } - - /** - * update task definition - * - * @param loginUser login user - * @param projectCode project code - * @param code task definition code - * @param taskDefinitionJsonObj task definition json object - * @return update result code - */ - @Operation(summary = "update", description = "UPDATE_TASK_DEFINITION_NOTES") - @Parameters({ - @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true, schema = @Schema(implementation = long.class)), - @Parameter(name = "code", description = "TASK_DEFINITION_CODE", required = true, schema = @Schema(implementation = long.class, example = "1")), - @Parameter(name = "taskDefinitionJsonObj", description = "TASK_DEFINITION_JSON", required = true, schema = @Schema(implementation = String.class)) - }) - @PutMapping(value = "/{code}") - @ResponseStatus(HttpStatus.OK) - @ApiException(UPDATE_TASK_DEFINITION_ERROR) - @OperatorLog(auditType = AuditType.TASK_UPDATE) - public Result updateTaskDefinition(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, - @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode, - @PathVariable(value = "code") long code, - @RequestParam(value = "taskDefinitionJsonObj", required = true) String taskDefinitionJsonObj) { - Map result = - taskDefinitionService.updateTaskDefinition(loginUser, projectCode, code, taskDefinitionJsonObj); - return returnDataList(result); - } - /** * update task definition * @@ -271,29 +179,6 @@ public class TaskDefinitionController extends BaseController { return returnDataList(result); } - /** - * delete task definition by code - * - * @param loginUser login user - * @param projectCode project code - * @param code the task definition code - * @return delete result code - */ - @Operation(summary = "deleteTaskDefinition", description = "DELETE_TASK_DEFINITION_BY_CODE_NOTES") - @Parameters({ - @Parameter(name = "code", description = "TASK_DEFINITION_CODE", required = true, schema = @Schema(implementation = long.class, example = "1")) - }) - @DeleteMapping(value = "/{code}") - @ResponseStatus(HttpStatus.OK) - @ApiException(DELETE_TASK_DEFINE_BY_CODE_ERROR) - @OperatorLog(auditType = AuditType.TASK_DELETE) - public Result deleteTaskDefinitionByCode(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, - @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode, - @PathVariable(value = "code") long code) { - taskDefinitionService.deleteTaskDefinitionByCode(loginUser, code); - return new Result(Status.SUCCESS); - } - /** * query detail of task definition by code * @@ -316,45 +201,6 @@ public class TaskDefinitionController extends BaseController { return returnDataList(result); } - /** - * query task definition list paging - * - * @param loginUser login user - * @param projectCode project code - * @param searchWorkflowName searchWorkflowName - * @param searchTaskName searchTaskName - * @param taskType taskType - * @param taskExecuteType taskExecuteType - * @param pageNo page number - * @param pageSize page size - * @return task definition page - */ - @Operation(summary = "queryTaskDefinitionListPaging", description = "QUERY_TASK_DEFINITION_LIST_PAGING_NOTES") - @Parameters({ - @Parameter(name = "projectCode", description = "PROJECT_CODE", required = false, schema = @Schema(implementation = long.class)), - @Parameter(name = "searchWorkflowName", description = "SEARCH_WORKFLOW_NAME", required = false, schema = @Schema(implementation = String.class)), - @Parameter(name = "searchTaskName", description = "SEARCH_TASK_NAME", required = false, schema = @Schema(implementation = String.class)), - @Parameter(name = "taskType", description = "TASK_TYPE", required = false, schema = @Schema(implementation = String.class, example = "SHELL")), - @Parameter(name = "taskExecuteType", description = "TASK_EXECUTE_TYPE", required = false, schema = @Schema(implementation = TaskExecuteType.class, example = "STREAM")), - @Parameter(name = "pageNo", description = "PAGE_NO", required = true, schema = @Schema(implementation = int.class, example = "1")), - @Parameter(name = "pageSize", description = "PAGE_SIZE", required = true, schema = @Schema(implementation = int.class, example = "10")) - }) - @GetMapping() - @ResponseStatus(HttpStatus.OK) - @ApiException(QUERY_TASK_DEFINITION_LIST_PAGING_ERROR) - public Result queryTaskDefinitionListPaging(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, - @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode, - @RequestParam(value = "searchTaskName", required = false) String searchTaskName, - @RequestParam(value = "taskType", required = false) String taskType, - @RequestParam(value = "taskExecuteType", required = false, defaultValue = "BATCH") TaskExecuteType taskExecuteType, - @RequestParam("pageNo") Integer pageNo, - @RequestParam("pageSize") Integer pageSize) { - checkPageParams(pageNo, pageSize); - searchTaskName = ParameterUtils.handleEscapes(searchTaskName); - return taskDefinitionService.queryTaskDefinitionListPaging(loginUser, projectCode, - searchTaskName, taskType, taskExecuteType, pageNo, pageSize); - } - /** * gen task code list * diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/v2/TaskDefinitionV2Controller.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/v2/TaskDefinitionV2Controller.java index 66fff8cbe9..f96368067a 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/v2/TaskDefinitionV2Controller.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/v2/TaskDefinitionV2Controller.java @@ -17,16 +17,11 @@ package org.apache.dolphinscheduler.api.controller.v2; -import static org.apache.dolphinscheduler.api.enums.Status.CREATE_TASK_DEFINITION_ERROR; -import static org.apache.dolphinscheduler.api.enums.Status.DELETE_SCHEDULE_BY_ID_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.QUERY_DETAIL_OF_TASK_DEFINITION_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.QUERY_PROCESS_DEFINITION_LIST; -import static org.apache.dolphinscheduler.api.enums.Status.UPDATE_TASK_DEFINITION_ERROR; import org.apache.dolphinscheduler.api.controller.BaseController; -import org.apache.dolphinscheduler.api.dto.task.TaskCreateRequest; import org.apache.dolphinscheduler.api.dto.task.TaskFilterRequest; -import org.apache.dolphinscheduler.api.dto.task.TaskUpdateRequest; import org.apache.dolphinscheduler.api.exceptions.ApiException; import org.apache.dolphinscheduler.api.service.TaskDefinitionService; import org.apache.dolphinscheduler.api.utils.PageInfo; @@ -37,11 +32,9 @@ import org.apache.dolphinscheduler.dao.entity.User; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; -import org.springframework.web.bind.annotation.DeleteMapping; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.PutMapping; import org.springframework.web.bind.annotation.RequestAttribute; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; @@ -65,65 +58,6 @@ public class TaskDefinitionV2Controller extends BaseController { @Autowired private TaskDefinitionService taskDefinitionService; - /** - * Create resource task definition - * - * @param loginUser login user - * @param taskCreateRequest task definition json - * @return Result object created - */ - @Operation(summary = "create", description = "CREATE_TASK_DEFINITION_NOTES") - @PostMapping(consumes = {"application/json"}) - @ResponseStatus(HttpStatus.CREATED) - @ApiException(CREATE_TASK_DEFINITION_ERROR) - public Result createTaskDefinition(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, - @RequestBody TaskCreateRequest taskCreateRequest) { - TaskDefinition taskDefinition = taskDefinitionService.createTaskDefinitionV2(loginUser, taskCreateRequest); - return Result.success(taskDefinition); - } - - /** - * Delete resource task definition by code - * - * @param loginUser login user - * @param code task definition code - */ - @Operation(summary = "delete", description = "DELETE_TASK_DEFINITION_VERSION_NOTES") - @Parameters({ - @Parameter(name = "code", description = "TASK_DEFINITION_CODE", schema = @Schema(implementation = long.class, example = "123456", required = true)) - }) - @DeleteMapping(value = "/{code}") - @ResponseStatus(HttpStatus.OK) - @ApiException(DELETE_SCHEDULE_BY_ID_ERROR) - public Result deleteTaskDefinition(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, - @PathVariable("code") Long code) { - taskDefinitionService.deleteTaskDefinitionByCode(loginUser, code); - return Result.success(); - } - - /** - * Update resource task definition by code - * - * @param loginUser login user - * @param code task code of resource you want to update - * @param taskUpdateRequest workflowUpdateRequest - * @return ResourceResponse object updated - */ - @Operation(summary = "update", description = "UPDATE_TASK_DEFINITION_NOTES") - @Parameters({ - @Parameter(name = "code", description = "TASK_DEFINITION_CODE", schema = @Schema(implementation = long.class, example = "123456", required = true)) - }) - @PutMapping(value = "/{code}") - @ResponseStatus(HttpStatus.OK) - @ApiException(UPDATE_TASK_DEFINITION_ERROR) - public Result updateTaskDefinition(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, - @PathVariable("code") Long code, - @RequestBody TaskUpdateRequest taskUpdateRequest) { - TaskDefinition taskDefinition = - taskDefinitionService.updateTaskDefinitionV2(loginUser, code, taskUpdateRequest); - return Result.success(taskDefinition); - } - /** * Get resource task definition by code * diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java index 00ebbc64e5..a0eccf7e8e 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java @@ -17,13 +17,10 @@ package org.apache.dolphinscheduler.api.service; -import org.apache.dolphinscheduler.api.dto.task.TaskCreateRequest; import org.apache.dolphinscheduler.api.dto.task.TaskFilterRequest; -import org.apache.dolphinscheduler.api.dto.task.TaskUpdateRequest; import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.enums.ReleaseState; -import org.apache.dolphinscheduler.common.enums.TaskExecuteType; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.User; @@ -34,43 +31,6 @@ import java.util.Map; */ public interface TaskDefinitionService { - /** - * create task definition - * - * @param loginUser login user - * @param projectCode project code - * @param taskDefinitionJson task definition json - */ - Map createTaskDefinition(User loginUser, - long projectCode, - String taskDefinitionJson); - - /** - * Create resource task definition - * - * @param loginUser login user - * @param taskCreateRequest task definition json - * @Return new TaskDefinition have created - */ - TaskDefinition createTaskDefinitionV2(User loginUser, - TaskCreateRequest taskCreateRequest); - - /** - * create single task definition that binds the workflow - * - * @param loginUser login user - * @param projectCode project code - * @param processDefinitionCode process definition code - * @param taskDefinitionJsonObj task definition json object - * @param upstreamCodes upstream task codes, sep comma - * @return create result code - */ - Map createTaskBindsWorkFlow(User loginUser, - long projectCode, - long processDefinitionCode, - String taskDefinitionJsonObj, - String upstreamCodes); - /** * query task definition * @@ -84,43 +44,6 @@ public interface TaskDefinitionService { long processCode, String taskName); - /** - * Delete resource task definition by code - * - * Only task release state offline and no downstream tasks can be deleted, will also remove the exists - * task relation [upstreamTaskCode, taskCode] - * - * @param loginUser login user - * @param taskCode task code - */ - void deleteTaskDefinitionByCode(User loginUser, - long taskCode); - - /** - * update task definition - * - * @param loginUser login user - * @param projectCode project code - * @param taskCode task code - * @param taskDefinitionJsonObj task definition json object - */ - Map updateTaskDefinition(User loginUser, - long projectCode, - long taskCode, - String taskDefinitionJsonObj); - - /** - * Update resource task definition by code - * - * @param loginUser login user - * @param taskCode task code - * @param taskUpdateRequest task definition json object - * @return new TaskDefinition have updated - */ - TaskDefinition updateTaskDefinitionV2(User loginUser, - long taskCode, - TaskUpdateRequest taskUpdateRequest); - /** * Get resource task definition by code * @@ -212,26 +135,6 @@ public interface TaskDefinitionService { long projectCode, long taskCode); - /** - * query task definition list paging - * - * @param loginUser login user - * @param projectCode project code - * @param searchTaskName searchTaskName - * @param taskType taskType - * @param taskExecuteType taskExecuteType - * @param pageNo page number - * @param pageSize page size - * @return task definition page - */ - Result queryTaskDefinitionListPaging(User loginUser, - long projectCode, - String searchTaskName, - String taskType, - TaskExecuteType taskExecuteType, - Integer pageNo, - Integer pageSize); - /** * gen task code list * diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java index ae3410cb77..c080c405b2 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java @@ -18,17 +18,13 @@ package org.apache.dolphinscheduler.api.service.impl; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION; -import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_CREATE; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_DELETE; -import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_UPDATE; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_VERSION_VIEW; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_DEFINITION; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_SWITCH_TO_THIS_VERSION; import static org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager.checkTaskParameters; -import org.apache.dolphinscheduler.api.dto.task.TaskCreateRequest; import org.apache.dolphinscheduler.api.dto.task.TaskFilterRequest; -import org.apache.dolphinscheduler.api.dto.task.TaskUpdateRequest; import org.apache.dolphinscheduler.api.dto.taskRelation.TaskRelationUpdateUpstreamRequest; import org.apache.dolphinscheduler.api.dto.workflow.WorkflowUpdateRequest; import org.apache.dolphinscheduler.api.enums.Status; @@ -139,54 +135,6 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe @Autowired private ProcessDefinitionLogMapper processDefinitionLogMapper; - /** - * create task definition - * - * @param loginUser login user - * @param projectCode project code - * @param taskDefinitionJson task definition json - */ - @Transactional - @Override - public Map createTaskDefinition(User loginUser, - long projectCode, - String taskDefinitionJson) { - Project project = projectMapper.queryByCode(projectCode); - // check if user have write perm for project - Map result = new HashMap<>(); - boolean hasProjectAndWritePerm = projectService.hasProjectAndWritePerm(loginUser, project, result); - if (!hasProjectAndWritePerm) { - return result; - } - - List taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class); - if (CollectionUtils.isEmpty(taskDefinitionLogs)) { - log.warn("Parameter taskDefinitionJson is invalid."); - putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJson); - return result; - } - for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) { - if (!checkTaskParameters(taskDefinitionLog.getTaskType(), taskDefinitionLog.getTaskParams())) { - log.warn("Task definition {} parameters are invalid.", taskDefinitionLog.getName()); - putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinitionLog.getName()); - return result; - } - } - int saveTaskResult = processService.saveTaskDefine(loginUser, projectCode, taskDefinitionLogs, Boolean.TRUE); - if (saveTaskResult == Constants.DEFINITION_FAILURE) { - log.error("Create task definition error, projectCode:{}.", projectCode); - putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR); - throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR); - } - Map resData = new HashMap<>(); - resData.put("total", taskDefinitionLogs.size()); - resData.put("code", StringUtils - .join(taskDefinitionLogs.stream().map(TaskDefinition::getCode).collect(Collectors.toList()), ",")); - putMsg(result, Status.SUCCESS); - result.put(Constants.DATA_LIST, resData); - return result; - } - private TaskDefinitionLog persist2TaskDefinitionLog(User user, TaskDefinition taskDefinition) { TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog(taskDefinition); taskDefinitionLog.setOperator(user.getId()); @@ -226,171 +174,6 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe workflowUpdateRequest); } - /** - * Create resource task definition - * - * @param loginUser login user - * @param taskCreateRequest task definition json - * @return new TaskDefinition have created - */ - @Override - @Transactional - public TaskDefinition createTaskDefinitionV2(User loginUser, - TaskCreateRequest taskCreateRequest) { - TaskDefinition taskDefinition = taskCreateRequest.convert2TaskDefinition(); - ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(taskCreateRequest.getWorkflowCode()); - if (processDefinition == null) { - throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST, taskCreateRequest.getWorkflowCode()); - } - // Add project code from process definition if not exists - if (taskDefinition.getProjectCode() == 0L) { - taskDefinition.setProjectCode(processDefinition.getProjectCode()); - } - this.checkTaskDefinitionValid(loginUser, taskDefinition, TASK_DEFINITION_CREATE); - - long taskDefinitionCode; - try { - taskDefinitionCode = CodeGenerateUtils.genCode(); - } catch (CodeGenerateException e) { - throw new ServiceException(Status.INTERNAL_SERVER_ERROR_ARGS); - } - taskDefinition.setCode(taskDefinitionCode); - - int create = taskDefinitionMapper.insert(taskDefinition); - if (create <= 0) { - throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR); - } - this.persist2TaskDefinitionLog(loginUser, taskDefinition); - - // update related objects: task relationship, workflow's location(need to set to null and front-end will auto - // format it) - this.updateTaskUpstreams(loginUser, taskCreateRequest.getWorkflowCode(), taskDefinition.getCode(), - taskCreateRequest.getUpstreamTasksCodes()); - this.updateWorkflowLocation(loginUser, processDefinition); - return taskDefinition; - } - - /** - * create single task definition that binds the workflow - * - * @param loginUser login user - * @param projectCode project code - * @param processDefinitionCode process definition code - * @param taskDefinitionJsonObj task definition json object - * @param upstreamCodes upstream task codes, sep comma - * @return create result code - */ - @Transactional - @Override - public Map createTaskBindsWorkFlow(User loginUser, - long projectCode, - long processDefinitionCode, - String taskDefinitionJsonObj, - String upstreamCodes) { - Project project = projectMapper.queryByCode(projectCode); - // check if user have write perm for project - Map result = new HashMap<>(); - boolean hasProjectAndWritePerm = projectService.hasProjectAndWritePerm(loginUser, project, result); - if (!hasProjectAndWritePerm) { - return result; - } - ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode); - if (processDefinition == null || projectCode != processDefinition.getProjectCode()) { - log.error("Process definition does not exist, processDefinitionCode:{}.", processDefinitionCode); - putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(processDefinitionCode)); - return result; - } - if (processDefinition.getReleaseState() == ReleaseState.ONLINE) { - log.warn("Task definition can not be created due to process definition is {}, processDefinitionCode:{}.", - ReleaseState.ONLINE.getDescp(), processDefinition.getCode()); - putMsg(result, Status.PROCESS_DEFINE_STATE_ONLINE, String.valueOf(processDefinitionCode)); - return result; - } - TaskDefinitionLog taskDefinition = JSONUtils.parseObject(taskDefinitionJsonObj, TaskDefinitionLog.class); - if (taskDefinition == null) { - log.warn("Parameter taskDefinitionJsonObj is invalid json."); - putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJsonObj); - return result; - } - if (!checkTaskParameters(taskDefinition.getTaskType(), taskDefinition.getTaskParams())) { - putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinition.getName()); - return result; - } - long taskCode = taskDefinition.getCode(); - if (taskCode == 0) { - taskDefinition.setCode(CodeGenerateUtils.genCode()); - } - List processTaskRelationLogList = - processTaskRelationMapper.queryByProcessCode(processDefinitionCode) - .stream() - .map(ProcessTaskRelationLog::new) - .collect(Collectors.toList()); - - if (StringUtils.isNotBlank(upstreamCodes)) { - Set upstreamTaskCodes = Arrays.stream(upstreamCodes.split(Constants.COMMA)).map(Long::parseLong) - .collect(Collectors.toSet()); - List upstreamTaskDefinitionList = taskDefinitionMapper.queryByCodeList(upstreamTaskCodes); - Set queryUpStreamTaskCodes = - upstreamTaskDefinitionList.stream().map(TaskDefinition::getCode).collect(Collectors.toSet()); - // upstreamTaskCodes - queryUpStreamTaskCodes - Set diffCode = upstreamTaskCodes.stream().filter(code -> !queryUpStreamTaskCodes.contains(code)) - .collect(Collectors.toSet()); - if (CollectionUtils.isNotEmpty(diffCode)) { - String taskCodes = StringUtils.join(diffCode, Constants.COMMA); - log.error("Some task definitions with parameter upstreamCodes do not exist, taskDefinitionCodes:{}.", - taskCodes); - putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCodes); - return result; - } - for (TaskDefinition upstreamTask : upstreamTaskDefinitionList) { - ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(); - processTaskRelationLog.setPreTaskCode(upstreamTask.getCode()); - processTaskRelationLog.setPreTaskVersion(upstreamTask.getVersion()); - processTaskRelationLog.setPostTaskCode(taskCode); - processTaskRelationLog.setPostTaskVersion(Constants.VERSION_FIRST); - processTaskRelationLog.setConditionType(ConditionType.NONE); - processTaskRelationLog.setConditionParams("{}"); - processTaskRelationLogList.add(processTaskRelationLog); - } - } else { - ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(); - processTaskRelationLog.setPreTaskCode(0); - processTaskRelationLog.setPreTaskVersion(0); - processTaskRelationLog.setPostTaskCode(taskCode); - processTaskRelationLog.setPostTaskVersion(Constants.VERSION_FIRST); - processTaskRelationLog.setConditionType(ConditionType.NONE); - processTaskRelationLog.setConditionParams("{}"); - processTaskRelationLogList.add(processTaskRelationLog); - } - int insertResult = processService.saveTaskRelation(loginUser, projectCode, processDefinition.getCode(), - processDefinition.getVersion(), - processTaskRelationLogList, Lists.newArrayList(), Boolean.TRUE); - if (insertResult != Constants.EXIT_CODE_SUCCESS) { - log.error( - "Save new version process task relations error, processDefinitionCode:{}, processDefinitionVersion:{}.", - processDefinition.getCode(), processDefinition.getVersion()); - putMsg(result, Status.CREATE_PROCESS_TASK_RELATION_ERROR); - throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR); - } else - log.info( - "Save new version process task relations complete, processDefinitionCode:{}, processDefinitionVersion:{}.", - processDefinition.getCode(), processDefinition.getVersion()); - - int saveTaskResult = - processService.saveTaskDefine(loginUser, projectCode, Lists.newArrayList(taskDefinition), Boolean.TRUE); - if (saveTaskResult == Constants.DEFINITION_FAILURE) { - log.error("Save task definition error, projectCode:{}, taskDefinitionCode:{}.", projectCode, - taskDefinition.getCode()); - putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR); - throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR); - } else - log.info("Save task definition complete, projectCode:{}, taskDefinitionCode:{}.", projectCode, - taskDefinition.getCode()); - putMsg(result, Status.SUCCESS); - result.put(Constants.DATA_LIST, taskDefinition); - return result; - } - /** * query task definition * @@ -453,45 +236,6 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe } } - /** - * Delete resource task definition by code - *

- * Only task release state offline and no downstream tasks can be deleted, will also remove the exists - * task relation [upstreamTaskCode, taskCode] - * - * @param loginUser login user - * @param taskCode task code - */ - @Transactional - @Override - public void deleteTaskDefinitionByCode(User loginUser, long taskCode) { - TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode); - if (taskDefinition == null) { - throw new ServiceException(Status.TASK_DEFINE_NOT_EXIST, taskCode); - } - - this.taskCanDeleteValid(loginUser, taskDefinition, loginUser); - int delete = taskDefinitionMapper.deleteByCode(taskCode); - if (delete <= 0) { - throw new ServiceException(Status.DELETE_TASK_DEFINE_BY_CODE_MSG_ERROR, taskDefinition.getCode()); - } - - // Delete task upstream tasks if exists - List taskRelationList = - processTaskRelationMapper.queryUpstreamByCode(taskDefinition.getProjectCode(), taskCode); - if (CollectionUtils.isNotEmpty(taskRelationList)) { - log.debug( - "Task definition has upstream tasks, start handle them after delete task, taskDefinitionCode:{}.", - taskCode); - long processDefinitionCode = taskRelationList.get(0).getProcessDefinitionCode(); - List processTaskRelations = - processTaskRelationMapper.queryByProcessCode(processDefinitionCode); - List relationList = processTaskRelations.stream() - .filter(r -> r.getPostTaskCode() != taskCode).collect(Collectors.toList()); - updateDag(loginUser, processDefinitionCode, relationList, Lists.newArrayList()); - } - } - public void updateDag(User loginUser, long processDefinitionCode, List processTaskRelationList, List taskDefinitionLogs) { @@ -525,42 +269,6 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe } } - /** - * update task definition - * - * @param loginUser login user - * @param projectCode project code - * @param taskCode task code - * @param taskDefinitionJsonObj task definition json object - */ - @Transactional - @Override - public Map updateTaskDefinition(User loginUser, long projectCode, long taskCode, - String taskDefinitionJsonObj) { - Map result = new HashMap<>(); - TaskDefinitionLog taskDefinitionToUpdate = - updateTask(loginUser, projectCode, taskCode, taskDefinitionJsonObj, result); - if (taskDefinitionToUpdate == null) { - return result; - } - List taskRelationList = - processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode); - if (CollectionUtils.isNotEmpty(taskRelationList)) { - log.info( - "Task definition has upstream tasks, start handle them after update task, taskDefinitionCode:{}.", - taskCode); - long processDefinitionCode = taskRelationList.get(0).getProcessDefinitionCode(); - List processTaskRelations = - processTaskRelationMapper.queryByProcessCode(processDefinitionCode); - updateDag(loginUser, processDefinitionCode, processTaskRelations, - Lists.newArrayList(taskDefinitionToUpdate)); - } - log.info("Update task definition complete, projectCode:{}, taskDefinitionCode:{}.", projectCode, taskCode); - result.put(Constants.DATA_LIST, taskCode); - putMsg(result, Status.SUCCESS); - return result; - } - private void TaskDefinitionUpdateValid(TaskDefinition taskDefinitionOriginal, TaskDefinition taskDefinitionUpdate) { // Task already online if (processService.isTaskOnline(taskDefinitionOriginal.getCode()) @@ -583,52 +291,6 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe } } - /** - * update task definition - * - * @param loginUser login user - * @param taskCode task code - * @param taskUpdateRequest task definition json object - * @return new TaskDefinition have updated - */ - @Transactional - @Override - public TaskDefinition updateTaskDefinitionV2(User loginUser, - long taskCode, - TaskUpdateRequest taskUpdateRequest) { - TaskDefinition taskDefinitionOriginal = taskDefinitionMapper.queryByCode(taskCode); - if (taskDefinitionOriginal == null) { - throw new ServiceException(Status.TASK_DEFINITION_NOT_EXISTS, taskCode); - } - - TaskDefinition taskDefinitionUpdate = taskUpdateRequest.mergeIntoTaskDefinition(taskDefinitionOriginal); - - this.checkTaskDefinitionValid(loginUser, taskDefinitionUpdate, TASK_DEFINITION_UPDATE); - this.TaskDefinitionUpdateValid(taskDefinitionOriginal, taskDefinitionUpdate); - - int update = taskDefinitionMapper.updateById(taskDefinitionUpdate); - if (update <= 0) { - throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR); - } - TaskDefinitionLog taskDefinitionLog = this.persist2TaskDefinitionLog(loginUser, taskDefinitionUpdate); - - List taskRelationList = - processTaskRelationMapper.queryUpstreamByCode(taskDefinitionUpdate.getProjectCode(), taskCode); - - if (CollectionUtils.isNotEmpty(taskRelationList)) { - log.info( - "Task definition has upstream tasks, start handle them after update task, taskDefinitionCode:{}.", - taskCode); - long processDefinitionCode = taskRelationList.get(0).getProcessDefinitionCode(); - List processTaskRelations = - processTaskRelationMapper.queryByProcessCode(processDefinitionCode); - updateDag(loginUser, processDefinitionCode, processTaskRelations, Lists.newArrayList(taskDefinitionLog)); - } - this.updateTaskUpstreams(loginUser, taskUpdateRequest.getWorkflowCode(), taskDefinitionUpdate.getCode(), - taskUpdateRequest.getUpstreamTasksCodes()); - return taskDefinitionUpdate; - } - /** * Get resource task definition by code * @@ -1144,39 +806,6 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe return result; } - @Override - public Result queryTaskDefinitionListPaging(User loginUser, - long projectCode, - String searchTaskName, - String taskType, - TaskExecuteType taskExecuteType, - Integer pageNo, - Integer pageSize) { - Result result = new Result(); - Project project = projectMapper.queryByCode(projectCode); - // check user access for project - Map checkResult = - projectService.checkProjectAndAuth(loginUser, project, projectCode, TASK_DEFINITION); - Status resultStatus = (Status) checkResult.get(Constants.STATUS); - if (resultStatus != Status.SUCCESS) { - putMsg(result, resultStatus); - return result; - } - taskType = taskType == null ? StringUtils.EMPTY : taskType; - Page page = new Page<>(pageNo, pageSize); - // first, query task code by page size - IPage taskMainInfoIPage = taskDefinitionMapper.queryDefineListPaging(page, projectCode, - searchTaskName, taskType, taskExecuteType); - // then, query task relevant info by task code - fillRecords(projectCode, taskMainInfoIPage); - PageInfo pageInfo = new PageInfo<>(pageNo, pageSize); - pageInfo.setTotal((int) taskMainInfoIPage.getTotal()); - pageInfo.setTotalList(taskMainInfoIPage.getRecords()); - result.setData(pageInfo); - putMsg(result, Status.SUCCESS); - return result; - } - private void fillRecords(long projectCode, IPage taskMainInfoIPage) { List records = Collections.emptyList(); if (CollectionUtils.isNotEmpty(taskMainInfoIPage.getRecords())) { diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java index 8460c59924..bb235cb952 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java @@ -17,12 +17,7 @@ package org.apache.dolphinscheduler.api.service; -import static org.apache.dolphinscheduler.api.AssertionsHelper.assertDoesNotThrow; -import static org.apache.dolphinscheduler.api.AssertionsHelper.assertThrowsServiceException; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION; -import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_CREATE; -import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_DELETE; -import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_UPDATE; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_SWITCH_TO_THIS_VERSION; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.eq; @@ -31,18 +26,12 @@ import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.when; -import org.apache.dolphinscheduler.api.dto.task.TaskCreateRequest; -import org.apache.dolphinscheduler.api.dto.task.TaskUpdateRequest; -import org.apache.dolphinscheduler.api.dto.taskRelation.TaskRelationUpdateUpstreamRequest; -import org.apache.dolphinscheduler.api.dto.workflow.WorkflowUpdateRequest; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.exceptions.ServiceException; import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl; import org.apache.dolphinscheduler.api.service.impl.TaskDefinitionServiceImpl; -import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.enums.Flag; -import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.utils.JSONUtils; @@ -53,7 +42,6 @@ import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; -import org.apache.dolphinscheduler.dao.entity.TaskMainInfo; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; @@ -64,14 +52,12 @@ import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.repository.ProcessTaskRelationLogDao; import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager; -import org.apache.dolphinscheduler.plugin.task.shell.ShellTaskChannelFactory; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessServiceImpl; import java.text.MessageFormat; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -88,9 +74,6 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; -import com.baomidou.mybatisplus.core.metadata.IPage; -import com.baomidou.mybatisplus.extension.plugins.pagination.Page; - @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.LENIENT) public class TaskDefinitionServiceImplTest { @@ -161,80 +144,6 @@ public class TaskDefinitionServiceImplTest { user = loginUser; } - @Test - public void createTaskDefinition() { - try ( - MockedStatic taskPluginManagerMockedStatic = - Mockito.mockStatic(TaskPluginManager.class)) { - taskPluginManagerMockedStatic - .when(() -> TaskPluginManager.checkTaskParameters(Mockito.any(), Mockito.any())) - .thenReturn(true); - Project project = getProject(); - when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(project); - - Map result = new HashMap<>(); - when(projectService.hasProjectAndWritePerm(user, project, result)) - .thenReturn(true); - - String createTaskDefinitionJson = - "[{\"name\":\"detail_up\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":" - + "\"{\\\"resourceList\\\":[],\\\"localParams\\\":[{\\\"prop\\\":\\\"datetime\\\",\\\"direct\\\":\\\"IN\\\"," - + "\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"${system.datetime}\\\"}],\\\"rawScript\\\":" - + "\\\"echo ${datetime}\\\",\\\"conditionResult\\\":\\\"{\\\\\\\"successNode\\\\\\\":[\\\\\\\"\\\\\\\"]," - + "\\\\\\\"failedNode\\\\\\\":[\\\\\\\"\\\\\\\"]}\\\",\\\"dependence\\\":{}}\",\"flag\":0,\"taskPriority\":0," - + "\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":0,\"timeoutFlag\":0," - + "\"timeoutNotifyStrategy\":0,\"timeout\":0,\"delayTime\":0,\"resourceIds\":\"\"}]"; - Map relation = taskDefinitionService - .createTaskDefinition(user, PROJECT_CODE, createTaskDefinitionJson); - assertEquals(Status.SUCCESS, relation.get(Constants.STATUS)); - - } - } - - @Test - public void updateTaskDefinition() { - try ( - MockedStatic taskPluginManagerMockedStatic = - Mockito.mockStatic(TaskPluginManager.class)) { - taskPluginManagerMockedStatic - .when(() -> TaskPluginManager.checkTaskParameters(Mockito.any(), Mockito.any())) - .thenReturn(true); - String taskDefinitionJson = getTaskDefinitionJson(); - - Project project = getProject(); - when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(project); - - Map result = new HashMap<>(); - putMsg(result, Status.SUCCESS, PROJECT_CODE); - when(projectService.hasProjectAndWritePerm(user, project, new HashMap<>())).thenReturn(true); - - when(processService.isTaskOnline(TASK_CODE)).thenReturn(Boolean.FALSE); - when(taskDefinitionMapper.queryByCode(TASK_CODE)).thenReturn(new TaskDefinition()); - when(taskDefinitionMapper.updateById(Mockito.any(TaskDefinitionLog.class))).thenReturn(1); - when(taskDefinitionLogMapper.insert(Mockito.any(TaskDefinitionLog.class))).thenReturn(1); - when(processTaskRelationLogDao.insert(Mockito.any(ProcessTaskRelationLog.class))).thenReturn(1); - when(processDefinitionMapper.queryByCode(2L)).thenReturn(new ProcessDefinition()); - when(processDefinitionMapper.updateById(Mockito.any(ProcessDefinition.class))).thenReturn(1); - when(processDefinitionLogMapper.insert(Mockito.any(ProcessDefinitionLog.class))).thenReturn(1); - when(taskDefinitionLogMapper.queryMaxVersionForDefinition(TASK_CODE)).thenReturn(1); - when(processTaskRelationMapper.queryProcessTaskRelationByTaskCodeAndTaskVersion(TASK_CODE, 0)) - .thenReturn(getProcessTaskRelationList2()); - when(processTaskRelationMapper - .updateProcessTaskRelationTaskVersion(Mockito.any(ProcessTaskRelation.class))).thenReturn(1); - result = taskDefinitionService.updateTaskDefinition(user, PROJECT_CODE, TASK_CODE, taskDefinitionJson); - assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); - // failure - when(processTaskRelationMapper - .updateProcessTaskRelationTaskVersion(Mockito.any(ProcessTaskRelation.class))).thenReturn(2); - exception = Assertions.assertThrows(ServiceException.class, - () -> taskDefinitionService.updateTaskDefinition(user, PROJECT_CODE, TASK_CODE, - taskDefinitionJson)); - assertEquals(Status.PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR.getCode(), - ((ServiceException) exception).getCode()); - } - - } - @Test public void queryTaskDefinitionByName() { String taskName = "task"; @@ -255,33 +164,6 @@ public class TaskDefinitionServiceImplTest { assertEquals(Status.SUCCESS, relation.get(Constants.STATUS)); } - @Test - public void deleteTaskDefinitionByCode() { - Project project = getProject(); - when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(project); - - // error task definition not find - exception = Assertions.assertThrows(ServiceException.class, - () -> taskDefinitionService.deleteTaskDefinitionByCode(user, TASK_CODE)); - assertEquals(Status.TASK_DEFINE_NOT_EXIST.getCode(), ((ServiceException) exception).getCode()); - - // error delete single task definition object - when(taskDefinitionMapper.queryByCode(TASK_CODE)).thenReturn(getTaskDefinition()); - when(taskDefinitionMapper.deleteByCode(TASK_CODE)).thenReturn(0); - when(projectService.hasProjectAndWritePerm(user, project, new HashMap<>())).thenReturn(true); - exception = Assertions.assertThrows(ServiceException.class, - () -> taskDefinitionService.deleteTaskDefinitionByCode(user, TASK_CODE)); - assertEquals(Status.DELETE_TASK_DEFINE_BY_CODE_MSG_ERROR.getCode(), - ((ServiceException) exception).getCode()); - - // success - doNothing().when(projectService).checkProjectAndAuthThrowException(user, project, - TASK_DEFINITION_DELETE); - when(processTaskRelationMapper.queryDownstreamByTaskCode(TASK_CODE)).thenReturn(new ArrayList<>()); - when(taskDefinitionMapper.deleteByCode(TASK_CODE)).thenReturn(1); - Assertions.assertDoesNotThrow(() -> taskDefinitionService.deleteTaskDefinitionByCode(user, TASK_CODE)); - } - @Test public void switchVersion() { Project project = getProject(); @@ -356,33 +238,6 @@ public class TaskDefinitionServiceImplTest { assertEquals(Status.SUCCESS, genTaskCodeList.get(Constants.STATUS)); } - @Test - public void testQueryTaskDefinitionListPaging() { - Project project = getProject(); - Map checkResult = new HashMap<>(); - checkResult.put(Constants.STATUS, Status.SUCCESS); - Integer pageNo = 1; - Integer pageSize = 10; - IPage taskMainInfoIPage = new Page<>(); - TaskMainInfo taskMainInfo = new TaskMainInfo(); - taskMainInfo.setTaskCode(TASK_CODE); - taskMainInfo.setUpstreamTaskCode(4L); - taskMainInfo.setUpstreamTaskName("4"); - taskMainInfoIPage.setRecords(Collections.singletonList(taskMainInfo)); - taskMainInfoIPage.setTotal(10L); - when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(project); - when(projectService.checkProjectAndAuth(user, project, PROJECT_CODE, TASK_DEFINITION)) - .thenReturn(checkResult); - when(taskDefinitionMapper.queryDefineListPaging(Mockito.any(Page.class), Mockito.anyLong(), - Mockito.isNull(), Mockito.anyString(), Mockito.isNull())) - .thenReturn(taskMainInfoIPage); - when(taskDefinitionMapper.queryDefineListByCodeList(PROJECT_CODE, Collections.singletonList(3L))) - .thenReturn(Collections.singletonList(taskMainInfo)); - Result result = taskDefinitionService.queryTaskDefinitionListPaging(user, PROJECT_CODE, - null, null, null, pageNo, pageSize); - assertEquals(Status.SUCCESS.getMsg(), result.getMsg()); - } - @Test public void testReleaseTaskDefinition() { when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(getProject()); @@ -425,140 +280,6 @@ public class TaskDefinitionServiceImplTest { assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR, failResult.get(Constants.STATUS)); } - @Test - public void testCreateTaskDefinitionV2() { - TaskCreateRequest taskCreateRequest = new TaskCreateRequest(); - taskCreateRequest.setProjectCode(PROJECT_CODE); - taskCreateRequest.setWorkflowCode(PROCESS_DEFINITION_CODE); - - // error process definition not find - assertThrowsServiceException(Status.PROCESS_DEFINE_NOT_EXIST, - () -> taskDefinitionService.createTaskDefinitionV2(user, taskCreateRequest)); - - // error project not find - when(processDefinitionMapper.queryByCode(PROCESS_DEFINITION_CODE)).thenReturn(getProcessDefinition()); - when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(getProject()); - doThrow(new ServiceException(Status.PROJECT_NOT_EXIST)).when(projectService) - .checkProjectAndAuthThrowException(user, getProject(), TASK_DEFINITION_CREATE); - assertThrowsServiceException(Status.PROJECT_NOT_EXIST, - () -> taskDefinitionService.createTaskDefinitionV2(user, taskCreateRequest)); - - // error task definition - taskCreateRequest.setTaskType(ShellTaskChannelFactory.NAME); - taskCreateRequest.setTaskParams(JSONUtils.toJsonString(new HashMap<>())); - doNothing().when(projectService).checkProjectAndAuthThrowException(user, getProject(), TASK_DEFINITION_CREATE); - assertThrowsServiceException(Status.PROCESS_NODE_S_PARAMETER_INVALID, - () -> taskDefinitionService.createTaskDefinitionV2(user, taskCreateRequest)); - - try ( - MockedStatic taskPluginManagerMockedStatic = - Mockito.mockStatic(TaskPluginManager.class)) { - taskPluginManagerMockedStatic - .when(() -> TaskPluginManager.checkTaskParameters(Mockito.any(), Mockito.any())) - .thenReturn(true); - - // error create task definition object - when(taskDefinitionMapper.insert(isA(TaskDefinition.class))).thenReturn(0); - assertThrowsServiceException(Status.CREATE_TASK_DEFINITION_ERROR, - () -> taskDefinitionService.createTaskDefinitionV2(user, taskCreateRequest)); - - // error sync to task definition log - when(taskDefinitionMapper.insert(isA(TaskDefinition.class))).thenReturn(1); - when(taskDefinitionLogMapper.insert(isA(TaskDefinitionLog.class))).thenReturn(0); - assertThrowsServiceException(Status.CREATE_TASK_DEFINITION_LOG_ERROR, - () -> taskDefinitionService.createTaskDefinitionV2(user, taskCreateRequest)); - - // success - when(taskDefinitionLogMapper.insert(isA(TaskDefinitionLog.class))).thenReturn(1); - // we do not test updateUpstreamTaskDefinition, because it should be tested in processTaskRelationService - when( - processTaskRelationService.updateUpstreamTaskDefinitionWithSyncDag(isA(User.class), isA(Long.class), - isA(Boolean.class), - isA(TaskRelationUpdateUpstreamRequest.class))) - .thenReturn(getProcessTaskRelationList()); - when(processDefinitionService.updateSingleProcessDefinition(isA(User.class), isA(Long.class), - isA(WorkflowUpdateRequest.class))).thenReturn(getProcessDefinition()); - assertDoesNotThrow(() -> taskDefinitionService.createTaskDefinitionV2(user, taskCreateRequest)); - } - - } - - @Test - public void testUpdateTaskDefinitionV2() { - TaskUpdateRequest taskUpdateRequest = new TaskUpdateRequest(); - TaskDefinition taskDefinition = getTaskDefinition(); - Project project = getProject(); - - // error task definition not exists - assertThrowsServiceException(Status.TASK_DEFINITION_NOT_EXISTS, - () -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest)); - - // error project not find - when(taskDefinitionMapper.queryByCode(TASK_CODE)).thenReturn(taskDefinition); - when(projectMapper.queryByCode(isA(Long.class))).thenReturn(project); - doThrow(new ServiceException(Status.PROJECT_NOT_EXIST)).when(projectService) - .checkProjectAndAuthThrowException(user, project, TASK_DEFINITION_UPDATE); - assertThrowsServiceException(Status.PROJECT_NOT_EXIST, - () -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest)); - - // error task definition - doNothing().when(projectService).checkProjectAndAuthThrowException(user, project, TASK_DEFINITION_UPDATE); - - try ( - MockedStatic taskPluginManagerMockedStatic = - Mockito.mockStatic(TaskPluginManager.class)) { - taskPluginManagerMockedStatic - .when(() -> TaskPluginManager.checkTaskParameters(Mockito.any(), Mockito.any())) - .thenReturn(false); - assertThrowsServiceException(Status.PROCESS_NODE_S_PARAMETER_INVALID, - () -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest)); - } - - try ( - MockedStatic taskPluginManagerMockedStatic = - Mockito.mockStatic(TaskPluginManager.class)) { - taskPluginManagerMockedStatic - .when(() -> TaskPluginManager.checkTaskParameters(Mockito.any(), Mockito.any())) - .thenReturn(true); - // error task definition nothing update - when(processService.isTaskOnline(TASK_CODE)).thenReturn(false); - assertThrowsServiceException(Status.TASK_DEFINITION_NOT_CHANGE, - () -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest)); - - // error task definition version invalid - taskUpdateRequest.setTaskPriority(String.valueOf(Priority.HIGH)); - assertThrowsServiceException(Status.DATA_IS_NOT_VALID, - () -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest)); - - // error task definition update effect number - when(taskDefinitionLogMapper.queryMaxVersionForDefinition(TASK_CODE)).thenReturn(VERSION); - when(taskDefinitionMapper.updateById(isA(TaskDefinition.class))).thenReturn(0); - assertThrowsServiceException(Status.UPDATE_TASK_DEFINITION_ERROR, - () -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest)); - - // error task definition log insert - when(taskDefinitionMapper.updateById(isA(TaskDefinition.class))).thenReturn(1); - when(taskDefinitionLogMapper.insert(isA(TaskDefinitionLog.class))).thenReturn(0); - assertThrowsServiceException(Status.CREATE_TASK_DEFINITION_LOG_ERROR, - () -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest)); - - // success - when(taskDefinitionLogMapper.insert(isA(TaskDefinitionLog.class))).thenReturn(1); - // we do not test updateUpstreamTaskDefinition, because it should be tested in processTaskRelationService - when( - processTaskRelationService.updateUpstreamTaskDefinitionWithSyncDag(isA(User.class), isA(Long.class), - isA(Boolean.class), - isA(TaskRelationUpdateUpstreamRequest.class))) - .thenReturn(getProcessTaskRelationList()); - Assertions.assertDoesNotThrow( - () -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest)); - - taskDefinition = - taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest); - assertEquals(getTaskDefinition().getVersion() + 1, taskDefinition.getVersion()); - } - } - @Test public void testUpdateDag() { User loginUser = getLoginUser(); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java index cfbe6e94e0..3f05b11f6d 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.dao.mapper; -import org.apache.dolphinscheduler.common.enums.TaskExecuteType; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; import org.apache.dolphinscheduler.dao.entity.TaskMainInfo; @@ -88,22 +87,6 @@ public interface TaskDefinitionMapper extends BaseMapper { */ int batchInsert(@Param("taskDefinitions") List taskDefinitions); - /** - * task main info page - * - * @param page page - * @param projectCode projectCode - * @param searchTaskName searchTaskName - * @param taskType taskType - * @param taskExecuteType taskExecuteType - * @return task main info IPage - */ - IPage queryDefineListPaging(IPage page, - @Param("projectCode") long projectCode, - @Param("searchTaskName") String searchTaskName, - @Param("taskType") String taskType, - @Param("taskExecuteType") TaskExecuteType taskExecuteType); - /** * task main info * @param projectCode project code diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml index ba06d5a9bd..71b1302f7c 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml @@ -98,21 +98,6 @@ #{taskDefinition.taskGroupId}, #{taskDefinition.taskExecuteType}) -